### PINN model import numpy as np import torch import torch.nn as nn from collections import OrderedDict import scipy.io as sio import scipy.stats.qmc as qmc import matplotlib.pyplot as plt import os import pandas as pd ''' ## Keep these variables consistent between files!!!!!!!! u0 = 1 rho0 = 1 nu0 = 0.01 r = 0.5 test_ver = "remake_files" ### Setup working directory - makes it so it's git_main/PINN for ease curr_cwd_path = os.getcwd() working_path = curr_cwd_path+"\PINN" os.chdir(working_path) # print(os.getcwd()) ## I did this because my default working directory was set as the masters-project folder rather than the PINN folder # Might change this because it could cause conflict??? ''' # Gradient Functions ## Compute gradients def grad(outputs, inputs): return torch.autograd.grad( outputs, inputs, grad_outputs=torch.ones_like(outputs), create_graph=True, retain_graph=True )[0] def compute_gradients(model, xy): """ Args: model : PyTorch neural network, with three models inside it (u,v and p) for input xy xy : tensor of shape (N, 2), # when setting up model set requires_grad=True Returns: p_grads : (p, p_x, p_y) as a tuple u_grads : (u, u_x, u_y, u_xx, u_yy) as a tuple v_grads : (v, v_x, v_y, v_xx, v_yy) as a tuple """ # create a clone so we dont change the original tensor # detatch the xy data from previous gradients so it can be evaluated in isolation xy = xy.clone().detach().requires_grad_(True) x = xy[:, 0:1] y = xy[:, 1:2] # Make a fwd pass that goes through the combined mini models collecting all coefficients needed for derivatives u_v_p = model(torch.cat([x, y], dim=1)) u = u_v_p[:,0:1] v = u_v_p[:,1:2] p = u_v_p[:,2:3] # First-order derivatives p_x = grad(p, x) p_y = grad(p, y) # Second-order derivatives u_x = grad(u, x) u_y = grad(u, y) v_x = grad(v, x) v_y = grad(v, y) # Third-order derivatives u_xx = grad(u_x, x) u_yy = grad(u_y, y) v_xx = grad(v_x, x) v_yy = grad(v_y, y) p_grads = (p, p_x, p_y) u_grads = (u, u_x, u_y, u_xx, u_yy) v_grads = (v, v_x, v_y, v_xx, v_yy) return p_grads, u_grads, v_grads ##### Define Sub-model ## CREATING THE MINI MODEL FOR u,v AND p AND THEN COMBINE THEM INTO ONE MODEL # sub model for u,v and p class submodel(nn.Module): def __init__( self, N_input, N_hidden_arr, N_output, activation = nn.Tanh ): super(submodel, self).__init__() # Create network # Create input layer w/ activation function layers = [('Input', nn.Linear(N_input, N_hidden_arr[0]))] layers.append(('Input activation', activation())) # Create hidden layers for i in range(len(N_hidden_arr)-1): layers.append( ("Hidden %d" % (i+1), nn.Linear(N_hidden_arr[i], N_hidden_arr[i+1])) ) layers.append(('Hidden activation %d' % (i+1), activation())) layers.append(('Output', nn.Linear(N_hidden_arr[-1], N_output))) layerdict = OrderedDict(layers) self.layers = nn.Sequential(layerdict) def forward(self, x): y = self.layers(x) return y ### Define class for one model, containing several sub-models class Net(nn.Module): def __init__(self, N_input, N_hidden_arr, N_output, activation = nn.Tanh): super(Net, self).__init__() # Create network # creates three models using submodel as the blueprint self.model_u = submodel(N_input, N_hidden_arr ,N_output, activation) self.model_v = submodel(N_input, N_hidden_arr, N_output, activation) self.model_p = submodel(N_input, N_hidden_arr, N_output, activation) # combine the outputs of all the models into a single output def forward(self, xy): out_u = self.model_u(xy) out_v = self.model_v(xy) out_p = self.model_p(xy) combined = torch.cat((out_u, out_v, out_p), dim=1) return combined ## LOSS EQUATIONS def navier_stokes_loss(model,X, input_variables): """ calculates steady navier stokes residuals at collocation points Args: model : whatever model calls this function X : input collocation [x,y] coords as defined in models data creation Returns: tensor of all collocation point residuals """ p_grads, u_grads, v_grads = compute_gradients(model, X) _, p_x, p_y = p_grads u, u_x, u_y, u_xx, u_yy = u_grads v, v_x, v_y, v_xx, v_yy = v_grads #compute PDE residuals u_eqn = u*u_x + v*u_y + p_x/input_variables["rho0"] - input_variables["nu0"]*(u_xx + u_yy) v_eqn = u*v_x + v*v_y + p_y/input_variables["rho0"] - input_variables["nu0"]*(v_xx + v_yy) # combine into one tensor # [u_residual, v_residual, continuity] return torch.cat([u_eqn, v_eqn,(u_x+v_y)], dim=1) def BC_loss(model,BC_X): """ calculates u and v at boundary conditions Args: model : whatever model calls this function BC_X : Input Boundary conditions [x,y] coords as defined in models data creation Returns: tensor of u,v at all boundary condition coords """ _, u_grads, v_grads = compute_gradients(model,BC_X) u, u_x, u_y, u_xx, u_yy = u_grads v, v_x, v_y, v_xx, v_yy = v_grads return torch.cat([u, v], dim=1) def UV_Data_loss(model, X_D): p_grads, u_grads, v_grads = compute_gradients(model,X_D) p, p_x, p_y = p_grads u, u_x, u_y, u_xx, u_yy = u_grads v, v_x, v_y, v_xx, v_yy = v_grads return torch.cat([u, v], dim=1) def P_Data_loss(model, PXY_D): p_grads, u_grads, v_grads = compute_gradients(model,PXY_D) p, p_x, p_y = p_grads u, u_x, u_y, u_xx, u_yy = u_grads v, v_x, v_y, v_xx, v_yy = v_grads # solution need to use torch.cat rather then torch.tensor to create p data loss as it retains the gradient computation graph whilst torch.tensor performs a .detach() pval = torch.cat([p], dim=1) return pval ## create neural network class PINN: def __init__(self, Tensors, input_variables): ### Need to change this to check if gpu available as well device = torch.device("cpu") print("Using CPU") self.model = Net( N_input=2, N_hidden_arr=[32,16,16,32], N_output = 1 ).to(device) #Tensors = torch.load(f"PINN_results/{test_ver}.pt") self.input_variables = input_variables # DATA CREATION self.X, self.BC_X, self.BC_Y = Tensors["X_col"], Tensors["BC_X"], Tensors["BC_Y"] self.X_D, self.U_D, self.PXY_D, self.P_D = Tensors["X_D"], Tensors["U_D"], Tensors["PXY_D"], Tensors["P_D"] # copy and seperate tensors into format for loss calculations self.X = self.X.clone().detach().requires_grad_(True) self.BC_X = self.BC_X.clone().detach().requires_grad_(True) self.X_D = self.X_D.clone().detach().requires_grad_(True) self.PXY_D = self.PXY_D.clone().detach().requires_grad_(True) # OPTIMISERS self.optimiser = torch.optim.LBFGS( params=self.model.parameters(), lr=1.0, max_iter = 25*10**3, max_eval = 25*10**3, history_size=50, tolerance_change=1e-7, tolerance_grad=1e-7, line_search_fn="strong_wolfe" ) self.adam = torch.optim.Adam(self.model.parameters()) self.loss_fn = nn.MSELoss() # Counter for printing loss self.iter =1 # Loss def compute_loss(self, epoch_split=50): # Data Loss uv_data_loss = UV_Data_loss(self.model, self.X_D) uv_data_loss = self.loss_fn(uv_data_loss, self.U_D) p_data_loss = P_Data_loss(self.model, self.PXY_D) p_data_loss = self.loss_fn(p_data_loss, self.P_D) data_loss = p_data_loss+uv_data_loss pde_loss = bc_loss = 0 if self.iter >= epoch_split: residuals = navier_stokes_loss(self.model, self.X, self.input_variables) ru, rv, conservation = residuals[:, 0:1], residuals[:, 1:2], residuals[:, 2:3] # PDE loss (residuals against tensor of zeros) pde_loss = self.loss_fn(ru, torch.zeros_like(ru)) + self.loss_fn(rv, torch.zeros_like(rv))+self.loss_fn(conservation, torch.zeros_like(conservation)) # BC loss bc_loss = BC_loss(self.model,self.BC_X) bc_loss = self.loss_fn(bc_loss,self.BC_Y) # Evaluate losses only on steps when they are necessary to save computational load # Compute PDE residuals at collocation points total_loss = data_loss+pde_loss+bc_loss if self.iter >= epoch_split: alt_hist = [data_loss.item(), uv_data_loss.item(), p_data_loss.item(), pde_loss.item(), bc_loss.item()] if self.iter % 100 == 0: print(f"Iteration {self.iter:5}, Total Loss {total_loss:.9f}, Data loss {data_loss:.9f} = uv data loss: {uv_data_loss} + p data loss: {p_data_loss} ,pde loss {pde_loss:.9f}, BC loss {bc_loss:.9f}") else: alt_hist = [data_loss.item(), uv_data_loss.item(), p_data_loss.item()] if self.iter % 100 == 0: print(f"Iteration: {self.iter:5}, Total Loss {total_loss:.9f}, Data loss {data_loss:.9f} = uv data loss: {uv_data_loss} + p data loss: {p_data_loss}") self.iter+= 1 return total_loss, alt_hist def train(self, adam_epochs=100, lbfgs_epochs=10, epoch_split = 50): self.model.train() loss_history = [] data_loss_history = [] uv_loss_history = [] p_loss_history = [] pde_loss_history = [] bc_loss_history = [] def record_history(total_loss, loss_components): loss_history.append(total_loss.item()) data_loss_history.append(loss_components[0]) uv_loss_history.append(loss_components[1]) p_loss_history.append(loss_components[2]) if self.iter > epoch_split: pde_loss_history.append(loss_components[3]) bc_loss_history.append(loss_components[4]) for epoch in range(adam_epochs): self.adam.zero_grad() loss, components = self.compute_loss() record_history(loss, components) loss.backward() self.adam.step() # extra printing of loss for when adam optimiser in use """ if epoch % 50 == 0: with torch.inference_mode(): print(f"[Adam] Step {epoch:4}, Loss = {loss.item():.6e}") """ """ for epoch in range(lbfgs_epochs): def closure(): self.optimiser.zero_grad() loss, components = self.compute_loss() record_history(loss, components) loss.backward() return loss self.optimiser.step(closure) """ # return [loss_history, data_loss_history, pde_loss_history, bc_loss_history], self.iter return loss_history, data_loss_history, uv_loss_history, p_loss_history, pde_loss_history, bc_loss_history def eval(self): self.model.eval() # Make the model, then train it ''' test = PINN() L_hist, L_data_hist, L_UV_data_hist, L_P_data_hist, L_PDE_hist, L_BC_hist = test.train() ## Save model and loss report history torch.save(test.model.state_dict(), f"PINN_results/Model ver - {test_ver}.pt") # Save loss history # Sort out histories arrays hist_data_dict = {"Total Loss" : L_hist, "Data Loss" : L_data_hist, "UV Data Loss" : L_UV_data_hist, "P Data Loss": L_P_data_hist} hist_extra_dict = {"Physics Loss": L_PDE_hist, "Boundary Condition Loss": L_BC_hist} hist_df = pd.DataFrame(data=hist_data_dict) hist_df.to_csv(f"PINN_results/{test_ver} pure data.csv") hist_ex_df = pd.DataFrame(data=hist_extra_dict) hist_ex_df.to_csv(f"PINN_results/{test_ver} physics and BCs.csv") '''