{ "cells": [ { "cell_type": "code", "execution_count": 30, "id": "83f86a7c", "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import torch\n", "import torch.nn as nn\n", "import matplotlib.pyplot as plt\n", "from collections import OrderedDict" ] }, { "cell_type": "code", "execution_count": 31, "id": "38e97e1e", "metadata": {}, "outputs": [], "source": [ "## Create PINN, following from tensorflow example\n", "\n", "class PINN(nn.Module):\n", " def __init__(\n", " self,\n", " N_input,\n", " N_hidden_arr,\n", " N_output,\n", " activation = nn.Tanh\n", " ):\n", " super(PINN, self).__init__() # Create PINN object\n", "\n", " # Create input layer w/ activation function\n", " layers = [('Input', nn.Linear(N_input, N_hidden_arr[0]))]\n", " layers.append(('Input activation', activation()))\n", "\n", " # Create hidden layers\n", " for i in range(len(N_hidden_arr)-1):\n", " layers.append(\n", " (\"Hidden %d\" % (i+1), nn.Linear(N_hidden_arr[i], N_hidden_arr[i+1]))\n", " )\n", " layers.append(('Hidden activation %d' % (i+1), activation()))\n", " layers.append(('Output', nn.Linear(N_hidden_arr[-1], N_output)))\n", " layerdict = OrderedDict(layers)\n", " self.layers = nn.Sequential(layerdict)\n", "\n", " \n", " def forward(self, x):\n", " y = self.layers(x)\n", " return y" ] }, { "cell_type": "code", "execution_count": 32, "id": "f0d875b6", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Sequential(\n", " (Input): Linear(in_features=2, out_features=32, bias=True)\n", " (Input activation): Tanh()\n", " (Hidden 1): Linear(in_features=32, out_features=16, bias=True)\n", " (Hidden activation 1): Tanh()\n", " (Hidden 2): Linear(in_features=16, out_features=16, bias=True)\n", " (Hidden activation 2): Tanh()\n", " (Hidden 3): Linear(in_features=16, out_features=32, bias=True)\n", " (Hidden activation 3): Tanh()\n", " (Output): Linear(in_features=32, out_features=2, bias=True)\n", ")\n" ] } ], "source": [ "test = PINN(2, [32,16,16,32], 2)\n", "print(test.layers)" ] }, { "cell_type": "code", "execution_count": 33, "id": "b7d0f531", "metadata": {}, "outputs": [], "source": [ "## constant values\n", "u0 = 1 # defining velocity, density, and viscocity for fluid\n", "rho0 = 1\n", "nu0 = 0.01" ] }, { "cell_type": "code", "execution_count": 34, "id": "cb620c0e", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "torch.Size([84, 2])\n", "tensor([[1., 0.],\n", " [1., 0.],\n", " [1., 0.],\n", " [1., 0.],\n", " [1., 0.],\n", " [1., 0.],\n", " [1., 0.],\n", " [1., 0.],\n", " [1., 0.],\n", " [1., 0.],\n", " [1., 0.],\n", " [1., 0.],\n", " [1., 0.],\n", " [1., 0.],\n", " [1., 0.],\n", " [1., 0.],\n", " [1., 0.],\n", " [1., 0.],\n", " [1., 0.],\n", " [1., 0.],\n", " [1., 0.]])\n", "torch.Size([84, 2])\n" ] }, { "data": { "image/png": "", "text/plain": [ "
" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "## Generate data, plot for sake of testing\n", "\n", "# Create domain for collocation points\n", "d = 0.05\n", "x = torch.arange(0, 1+d, d)\n", "y = torch.arange(0, 1+d, d)\n", "\n", "# Collocation tensor\n", "X = torch.stack(torch.meshgrid(x, y)).reshape(2,-1).T\n", "\n", "# Get boundary conditions\n", "b_left = torch.stack(torch.meshgrid(x[0], y)).reshape(2,-1).T # x = 0, y = var, left wall\n", "b_right = torch.stack(torch.meshgrid(x[-1],y)).reshape(2,-1).T\n", "b_bottom = torch.stack(torch.meshgrid(x, y[0])).reshape(2,-1).T\n", "b_top = torch.stack(torch.meshgrid(x, y[-1])).reshape(2,-1).T\n", "\n", "X_train = torch.cat([b_left, b_bottom, b_right, b_top])\n", "print(X_train.size())\n", "\n", "## Get solutions for each boundary condition\n", "walls_Y = torch.zeros(b_left.size())\n", "\n", "# Get BC for lid driven component\n", "wall_lid = torch.zeros(b_top.size()[0], b_top.size()[1])\n", "wall_lid[:,0] = u0\n", "print(wall_lid)\n", "\n", "train_Y = torch.cat([walls_Y, walls_Y, walls_Y, wall_lid])\n", "print(train_Y.size())\n", "\n", "# Try and plot for test\n", "plt.figure()\n", "plt.scatter(X[:,0].numpy(), X[:,1].numpy(), marker='.', c='r')\n", "plt.scatter(b_left[:,0].numpy(), b_left[:,1].numpy(), marker='x', c='k')\n", "plt.scatter(b_right[:,0].numpy(), b_right[:,1].numpy(), marker='x', c='k')\n", "plt.scatter(b_top[:,0].numpy(), b_top[:,1].numpy(), marker='x', c='k')\n", "plt.scatter(b_bottom[:,0].numpy(), b_bottom[:,1].numpy(), marker='x', c='k')\n", "plt.grid()\n", "plt.show()\n" ] }, { "cell_type": "code", "execution_count": 35, "id": "d1368356", "metadata": {}, "outputs": [], "source": [ "# Testing input/output for model\n", "test = PINN(\n", " N_input=2,\n", " N_hidden_arr=[32,16,16,32],\n", " N_output=2\n", ")\n", " # output and input are the same size/dimensions, means things are mappable???\n", "\n", "\n", "# Plan" ] }, { "cell_type": "code", "execution_count": null, "id": "e52e8c17", "metadata": {}, "outputs": [], "source": [ "## create neural network\n", "\n", "class Net:\n", " def __init__(self):\n", " # Create PINN, send off to device if CUDA is available\n", " device = torch.device(\"cpu\")\n", " print(\"Using CPU\")\n", " self.model = PINN(\n", " N_input=2,\n", " N_hidden_arr=[32,16,16,32],\n", " N_output = 2\n", " ).to(device)\n", "\n", " # Create data\n", " self.d = 0.05\n", " x = torch.arange(0, 1+d, d)\n", " y = torch.arange(0, 1+d, d)\n", "\n", " self.X = torch.stack(torch.meshgrid(x,y)).reshape(2,-1).T\n", "\n", " b_left = torch.stack(torch.meshgrid(x[0], y)).reshape(2,-1).T # x = 0, y = var, left wall\n", " b_right = torch.stack(torch.meshgrid(x[-1],y)).reshape(2,-1).T\n", " b_bottom = torch.stack(torch.meshgrid(x, y[0])).reshape(2,-1).T\n", " b_top = torch.stack(torch.meshgrid(x, y[-1])).reshape(2,-1).T\n", "\n", " self.X_train = torch.cat([b_left, b_bottom, b_right, b_top])\n", "\n", " # Get wall velocities in form of [u,v]\n", " walls_Y = torch.zeros(b_left.size())\n", "\n", " # Get BC for lid driven component\n", " wall_lid = torch.zeros(b_top.size()[0], b_top.size()[1])\n", " wall_lid[:,0] = u0\n", "\n", " self.data_train = torch.cat([walls_Y, walls_Y, walls_Y, wall_lid])\n", "\n", " # Send data off to devices\n", " self.X = self.X.to(device)\n", " self.X_train = self.X_train.to(device)\n", " self.data_train = self.data_train.to(device)\n", " self.X.requires_grad = True\n", " self.X_train.requires_grad = True\n", "\n", "\n", " self.criterion = nn.MSELoss()\n", " self.iter = 1\n", "\n", " # Set up optimiser for LBFGS\n", " self.optimiser = torch.optim.LBFGS(\n", " params=self.model.parameters(),\n", " lr=1.0,\n", " max_iter = 75*10**3,\n", " max_eval = 75*10**3,\n", " history_size=50,\n", " tolerance_change=1e-15,\n", " tolerance_grad=1e-15,\n", " line_search_fn=\"strong_wolfe\"\n", " )\n", "\n", " self.adam = torch.optim.Adam(self.model.parameters())\n", " \n", " def loss_function(self):\n", " self.optimiser.zero_grad()\n", " self.adam.zero_grad()\n", "\n", " def get_gradient(output, input):\n", " return torch.autograd.grad(outputs=output, \n", " inputs=input,\n", " grad_outputs=torch.ones_like(output),\n", " create_graph=True,\n", " retain_graph=True)[0]\n", "\n", " # Get data loss\n", " output = self.model(self.X_train)\n", " psi_pred, p_pred = output[:,0], output[:,1]\n", " psipred_dX = torch.autograd.grad( # get velocities from psi \n", " outputs=psi_pred,\n", " inputs = self.X_train,\n", " grad_outputs = torch.ones_like(psi_pred),\n", " retain_graph=True, \n", " create_graph=True,\n", " )[0]\n", "\n", " u_pred = psipred_dX[:,1] # get each component from grad\n", " v_pred = -psipred_dX[:,0]\n", " vel_pred = torch.column_stack([u_pred, v_pred])\n", "\n", " data_loss = self.criterion(vel_pred, self.data_train) # compute loss from known values\n", "\n", " # Get physics loss\n", " output_phys = self.model(self.X)\n", " psi, p = output_phys[:,0], output_phys[:,1]\n", " psi_dX, p_dX = map(lambda x: get_gradient(x, self.X), [psi, p])\n", " self.u, self.v = psi_dX[:,1], -1*psi_dX[:,0]\n", " \n", " p_x, p_y = p_dX[:,0], p_dX[:,1]\n", "\n", " u_dX, v_dX = map(lambda x: get_gradient(x, self.X), [self.u,self.v])\n", " u_dXX, v_dXX = map(lambda x: get_gradient(x, self.X), [u_dX, v_dX])\n", "\n", " u_x, u_y = u_dX[:,0], u_dX[:,1]\n", " v_x, v_y = v_dX[:,0], v_dX[:,1]\n", "\n", " u_xx, u_yy = u_dXX[:,0], u_dXX[:,1]\n", " v_xx, v_yy = v_dXX[:,0], v_dXX[:,1]\n", "\n", " continuity_loss = self.criterion(u_x+v_y, torch.zeros(len(u_x)))\n", " x_mom_loss = self.criterion(self.u*u_x+self.v*u_y+p_x/rho0, nu0*(u_xx+u_yy))\n", " y_mom_loss = self.criterion(self.u*v_x+self.v*v_y+p_y/rho0, nu0*(v_xx+v_yy))\n", "\n", " phys_loss = continuity_loss+x_mom_loss+y_mom_loss\n", "\n", " # Put more weight on data loss?\n", " # Change scheme as data goes on\n", " if self.iter >= 1000:\n", " loss = data_loss\n", " elif self.iter > 1000 and self.iter < 2500:\n", " loss = 0.7*data_loss+0.3*phys_loss\n", " else:\n", " loss = data_loss+phys_loss\n", " loss.backward()\n", "\n", " if self.iter % 100 == 0:\n", " print(f\"Iteration {self.iter:5}, Total Loss {loss:.9f}\")\n", "\n", " print((vel_pred).size())\n", " self.iter+= 1\n", "\n", " return loss\n", " \n", " def train(self):\n", " self.model.train()\n", " for i in range(1000):\n", " self.adam.step(self.loss_function)\n", " self.optimiser.step(self.loss_function)\n", "\n", " def eval(self):\n", " self.model.eval()\n", "\n", " \n", "\n" ] }, { "cell_type": "code", "execution_count": 98, "id": "38c6e9e1", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Using CPU\n", "Iteration 100, Total Loss 0.026230508\n", "torch.Size([84, 2])\n", "Iteration 200, Total Loss 0.014797823\n", "torch.Size([84, 2])\n", "Iteration 300, Total Loss 0.013963315\n", "torch.Size([84, 2])\n", "Iteration 400, Total Loss 0.013744197\n", "torch.Size([84, 2])\n", "Iteration 500, Total Loss 0.013631235\n", "torch.Size([84, 2])\n", "Iteration 600, Total Loss 0.013560196\n", "torch.Size([84, 2])\n", "Iteration 700, Total Loss 0.013503134\n", "torch.Size([84, 2])\n", "Iteration 800, Total Loss 0.013437574\n", "torch.Size([84, 2])\n", "Iteration 900, Total Loss 0.013379758\n", "torch.Size([84, 2])\n", "Iteration 1000, Total Loss 0.012725575\n", "torch.Size([84, 2])\n", "Iteration 1100, Total Loss 0.008411493\n", "torch.Size([84, 2])\n" ] } ], "source": [ "test = Net()\n", "test.train()" ] }, { "cell_type": "code", "execution_count": null, "id": "78e07c9a", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "pytorch", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.18" } }, "nbformat": 4, "nbformat_minor": 5 }