Skip to content

Instantly share code, notes, and snippets.

@bnigatu
Created February 2, 2020 03:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bnigatu/7635f639efb80a3670acfbad8a203b46 to your computer and use it in GitHub Desktop.
Save bnigatu/7635f639efb80a3670acfbad8a203b46 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"__author__ = \"Nigatu, Biz\"\n",
"__copyright__ = \"Copyright 2020, Nigatu.com\"\n",
"\n",
"import numpy as np\n",
"import pandas as pd\n",
"import math\n",
"from numpy import array\n",
"from tqdm import tqdm_notebook as tqdm\n",
"import matplotlib.pyplot as plt\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Function"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"14"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"def sphere(x):\n",
" return sum ([y**2 for y in x])\n",
"\n",
"#sanity\n",
"x = [1,2,3]\n",
"sphere(x)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Problem definition\n",
"----"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"utility_function = sphere # cost function\n",
"n_var = 5; # number of unkown/ decision variables\n",
"var_size = array([1, n_var]) # matrix size of decision variables\n",
"var_min = -10 # lower bound of decision variable\n",
"var_max = 10 # upper bound of decision variables\n",
"max_velocity = 0.2* (var_max - var_min)\n",
"min_velocity = -max_velocity\n",
"\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# PSO parameters\n",
"-----"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<h3><u>Constriction Coefficients</u>\n",
"\n",
"$\\chi = \\frac{2\\kappa}{\\lvert 2-\\phi - \\sqrt{\\phi^2 - 4\\phi} \\rvert} $\n",
" \n",
"$where: 0 \\leq \\kappa \\leq 1$\n",
"<br>\n",
"$\\;\\;\\;\\;\\;\\;\\;\\;\\;\\;\\phi = \\phi_1 + \\phi_2 \\geq 4 $</h3> \n",
" "
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"kappa = 1\n",
"phi1 = 2.05\n",
"phi2 = 2.05\n",
"phi = phi1 + phi1\n",
"chi = 2 * kappa / abs(2 - phi - math.sqrt(phi**2 - 4* phi))\n",
"\n",
"max_iter = 1000 # maximum iteration\n",
"n_pop = 50 # population/swarm size\n",
"w = chi # inertia coefficient\n",
"w_damp = 1 # damping ratio of inertia coefficient\n",
"c1 = chi * phi1 # personal acceleration coefficient\n",
"c2 = chi * phi2 # social/ global acceleration coefficient\n",
"show_iter_info = True # show iteration information"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Initialization\n",
"----"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"global_cost = 59.64772423349538\n",
"global_postion = [ 2.46091651 -3.01579963 5.46084206 -3.12011952 -2.22275165]\n"
]
}
],
"source": [
"global_best = {'Cost': float('inf'), # global cost for minimization problem before optimization is infinity\n",
" 'Position': np.zeros(var_size)[0]} # global positoin\n",
"best_cost = {} # vector of best costs\n",
"empty_particle = {'Position': [],\n",
" 'Velocity': [],\n",
" 'Cost': [],\n",
" 'BestPosition': [],\n",
" 'BestCost': []} # empty dict template variable\n",
"\n",
"particle = np.tile(empty_particle, (n_pop,1)) # create particle of size n_pop\n",
"\n",
"\n",
"tmp = [] # temp var\n",
"for p in particle: \n",
" x = np.random.uniform(var_min, var_max, var_size)[0] # generate a random solution\n",
" tmp.append({'Position': x, # add particle (i) position for random x\n",
" 'Velocity': np.zeros(var_size), # intialize velocity to vector of zero's size var_size\n",
" 'Cost': utility_function(x), # evaluate the cost function\n",
" 'BestPosition': x, # update personal best to current position\n",
" 'BestCost': utility_function(x) # update best cost to current cost\n",
" })\n",
" if utility_function(x) < global_best['Cost']: \n",
" global_best['Cost'] = utility_function(x) # replace global_best[Cost] if current_cost is better\n",
" global_best['Position'] = x # replace global_best[Position] if current_cost is better\n",
" \n",
"particle = tmp \n",
"\n",
"# sanity test\n",
"print('global_cost =',global_best['Cost'])\n",
"print('global_postion =',global_best['Position'])\n",
"#pd.DataFrame(particle).head(2)\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# PSO main loop\n",
"----"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "0304527ea42f47e1ae172c211af23591",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"HBox(children=(IntProgress(value=0, max=1000), HTML(value='')))"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"itter: 0 best_cost: 29.748573\n",
"itter: 50 best_cost: 0.120425\n",
"itter: 100 best_cost: 0.050334\n",
"itter: 150 best_cost: 0.010788\n",
"itter: 200 best_cost: 0.003608\n",
"itter: 250 best_cost: 0.003002\n",
"itter: 300 best_cost: 0.002862\n",
"itter: 350 best_cost: 0.002355\n",
"itter: 400 best_cost: 0.001341\n",
"itter: 450 best_cost: 0.001341\n",
"itter: 500 best_cost: 0.000151\n",
"itter: 550 best_cost: 0.000114\n",
"itter: 600 best_cost: 0.000080\n",
"itter: 650 best_cost: 0.000009\n",
"itter: 700 best_cost: 0.000003\n",
"itter: 750 best_cost: 0.000000\n",
"itter: 800 best_cost: 0.000000\n",
"itter: 850 best_cost: 0.000000\n",
"itter: 900 best_cost: 0.000000\n",
"itter: 950 best_cost: 0.000000\n",
"itter: 999 best_cost: 0.000000\n",
"\n",
"final best_cost: 0.000000 \n",
"best_position: [-4.06284600e-06 -3.40693044e-06 -3.89274948e-07 -4.04148438e-06\n",
" 6.15179129e-07]\n"
]
}
],
"source": [
"for r in tqdm(range(max_iter)):\n",
" for i in range(n_pop):\n",
" \"\"\"update velocity\"\"\"\n",
" particle[i]['Velocity'] = np.squeeze(w * particle[i]['Velocity'] + \\\n",
" np.multiply((c1 * np.random.rand(var_size[0],var_size[1])[0]), \\\n",
" (particle[i]['BestPosition'] - particle[i]['Position'][0])) + \\\n",
" np.multiply((c2 * np.random.rand(var_size[0],var_size[1])[0]), \\\n",
" (global_best['Position'] - particle[i]['Position']))) # equation 1\n",
" \"\"\"apply velocity limits\"\"\"\n",
" particle[i]['Velocity'] = np.maximum(particle[i]['Velocity'], min_velocity)\n",
" particle[i]['Velocity'] = np.minimum(particle[i]['Velocity'], max_velocity)\n",
" \"\"\"update positon\"\"\" \n",
" particle[i]['Position'] = np.squeeze(particle[i]['Position'] + particle[i]['Velocity']) # equation 2 \n",
" \"\"\"apply lower-upper bound limits\"\"\"\n",
" particle[i]['Position'] = np.maximum(particle[i]['Position'], var_min)\n",
" particle[i]['Position'] = np.minimum(particle[i]['Position'], var_max)\n",
" \"\"\"evaluation\"\"\"\n",
" particle[i]['Cost'] = utility_function(particle[i]['Position']) # equation 3\n",
" \"\"\"update personal best\"\"\"\n",
" if particle[i]['Cost'] < particle[i]['BestCost']: \n",
" particle[i]['BestCost'] = particle[i]['Cost']\n",
" particle[i]['BestPosition'] = particle[i]['Position'] # equation 4 \n",
" \"\"\"update global best\"\"\"\n",
" if particle[i]['BestCost'] < global_best['Cost']: # equation 5\n",
" global_best['Cost'] = particle[i]['BestCost']\n",
" global_best['Position'] = particle[i]['Position']\n",
" w *= w_damp\n",
" best_cost[r] = global_best['Cost']\n",
" if r % 50 == 0 or r == max_iter-1:\n",
" print(\"itter: %d best_cost: %f\" % (r, global_best['Cost']))\n",
" \n",
"print(\"final best_cost: %f \\nbest_position: %s\" % (global_best['Cost'], str(global_best['Position']) )) \n",
" "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Result\n",
"-----"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"Text(0, 0.5, 'Best Cost')"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
},
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 432x288 with 1 Axes>"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"%matplotlib inline\n",
"plt.style.use('seaborn-whitegrid')\n",
"fig = plt.figure()\n",
"ax = plt.axes()\n",
"\n",
"items = sorted(best_cost.items())\n",
"x, y = zip(*items) \n",
"\n",
"ax.plot(x, y);\n",
"plt.title('PSO Training')\n",
"plt.xlabel('Iteration')\n",
"plt.ylabel('Best Cost')\n"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"Text(0, 0.5, 'Best Cost')"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
},
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 432x288 with 1 Axes>"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"plt.semilogy(x, y )\n",
"plt.title('PSO Convergence')\n",
"plt.xlabel('Iteration')\n",
"plt.ylabel('Best Cost')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Object Oriented Implementation\n",
"---"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"class Param(object):\n",
" def __init__(self, max_iter = 1000, n_pop = 50, kappa = 1,phi1 = 2.05, phi2 = 2.05, w_damp = 1, show_iter_info = True):\n",
" #super(Param, self).__init__(max_iter, n_pop, kappa, phi1, phi2, w_damp, show_iter_info)\n",
" self.kappa = kappa\n",
" self.phi1 = phi1\n",
" self.phi2 = phi2\n",
" self.phi = phi1 + phi1\n",
" self.chi = 2 * self.kappa / abs(2 - self.phi - math.sqrt(self.phi**2 - 4* self.phi))\n",
"\n",
" self.max_iter = max_iter # maximum iteration\n",
" self.n_pop = n_pop # population/swarm size\n",
" self.w = self.chi # inertia coefficient\n",
" self.w_damp = w_damp # damping ratio of inertia coefficient\n",
" self.c1 = self.chi * self.phi1 # personal acceleration coefficient\n",
" self.c2 = self.chi * self.phi2 # social/ global acceleration coefficient\n",
" self.show_iter_info = show_iter_info # show iteration information\n",
" self.global_best = {} # global best\n",
" self.best_cost = [] # vector of best costs\n",
" self.empty_particle = {} # empty dict template variable\n",
" self.particle = {}\n",
" def get_particle(self):\n",
" raise NotImplementedError\n",
" def __repr__(self):\n",
" return (\"\"\"\n",
" kappa = %r \n",
" phi1 = %r\n",
" phi2 = %r\n",
" phi = %r\n",
" chi = %r\n",
" max_iter = %r\n",
" n_pop = %r\n",
" w = %r\n",
" w_damp = %r\n",
" c1 = %r\n",
" c2 = %r\n",
" show_iter_info = %r\n",
" \"\"\" % (self.kappa,\n",
" self.phi1,\n",
" self.phi2,\n",
" self.phi,\n",
" self.chi,\n",
" self.max_iter,\n",
" self.n_pop,\n",
" self.w,\n",
" self.w_damp,\n",
" self.c1,\n",
" self.c2,\n",
" self.show_iter_info))\n",
"#sanity \n",
"# param = Param()\n",
"# print(param)"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"class Init(Param):\n",
" def __init__(self, max_iter = 1000, n_pop = 50, kappa = 1,phi1 = 2.05, phi2 = 2.05, w_damp = 1, show_iter_info = True):\n",
" super(Init, self).__init__(max_iter, n_pop, kappa, phi1, phi2, w_damp, show_iter_info)\n",
" self.global_best = {'Cost': float('inf'), # global cost for minimization problem before optimization is infinity\n",
" 'Position': np.zeros(var_size)[0]} # global positoin\n",
" self.best_cost = {} # vector of best costs\n",
" self.empty_particle = {'Position': [],\n",
" 'Velocity': [],\n",
" 'Cost': [],\n",
" 'BestPosition': [],\n",
" 'BestCost': []} # empty dict template variable\n",
" self.particle = self.get_particle() # create particle of size n_pop\n",
"\n",
" def get_particle(self):\n",
" particle = np.tile(self.empty_particle, (self.n_pop,1)) # create particle of size n_pop\n",
" tmp = [] # temp var\n",
" for p in particle: \n",
" x = np.random.uniform(var_min, var_max, var_size)[0] # generate a random solution\n",
" tmp.append({'Position': x, # add particle (i) position for random x\n",
" 'Velocity': np.zeros(var_size), # intialize velocity to vector of zero's size var_size\n",
" 'Cost': utility_function(x), # evaluate the cost function\n",
" 'BestPosition': x, # update personal best to current position\n",
" 'BestCost': utility_function(x) # update best cost to current cost\n",
" })\n",
" if utility_function(x) < self.global_best['Cost']: \n",
" self.global_best['Cost'] = utility_function(x) # replace global_best[Cost] if current_cost is better\n",
" self.global_best['Position'] = x # replace global_best[Position] if current_cost is better\n",
" return tmp \n",
" \n",
" def __repr__(self):\n",
" return super().__repr__() + \\\n",
" \"\"\"\n",
" global_cost = %s\n",
" global_postion = %s\n",
" \"\"\" % (self.global_best['Cost'], self.global_best['Position'])\n",
" \n",
"# sanity\n",
"# init = Init()\n",
"# print(init)"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"class PSO(Init):\n",
" def __init__(self, max_iter = 1000, n_pop = 50, kappa = 1,phi1 = 2.05, phi2 = 2.05, w_damp = 1, show_iter_info = True):\n",
" super(PSO, self).__init__(max_iter, n_pop, kappa, phi1, phi2, w_damp, show_iter_info)\n",
" pass\n",
" \n",
" def pso(self):\n",
" for r in tqdm(range(self.max_iter)):\n",
" for i in range(self.n_pop):\n",
" \"\"\"update velocity\"\"\"\n",
" self.particle[i]['Velocity'] = np.squeeze(self.w * self.particle[i]['Velocity'] + \\\n",
" np.multiply((self.c1 * np.random.rand(var_size[0],var_size[1])[0]), \\\n",
" (self.particle[i]['BestPosition'] - self.particle[i]['Position'][0])) + \\\n",
" np.multiply((self.c2 * np.random.rand(var_size[0],var_size[1])[0]), \\\n",
" (self.global_best['Position'] - self.particle[i]['Position']))) # equation 1\n",
" \"\"\"apply velocity limits\"\"\"\n",
" self.particle[i]['Velocity'] = np.maximum(self.particle[i]['Velocity'], min_velocity)\n",
" self.particle[i]['Velocity'] = np.minimum(self.particle[i]['Velocity'], max_velocity)\n",
" \"\"\"update positon\"\"\" \n",
" self.particle[i]['Position'] = np.squeeze(self.particle[i]['Position'] + self.particle[i]['Velocity']) # equation 2 \n",
" \"\"\"apply lower-upper bound limits\"\"\"\n",
" self.particle[i]['Position'] = np.maximum(self.particle[i]['Position'], var_min)\n",
" self.particle[i]['Position'] = np.minimum(self.particle[i]['Position'], var_max)\n",
" \"\"\"evaluation\"\"\"\n",
" self.particle[i]['Cost'] = utility_function(self.particle[i]['Position']) # equation 3\n",
" \"\"\"update personal best\"\"\"\n",
" if self.particle[i]['Cost'] < self.particle[i]['BestCost']: \n",
" self.particle[i]['BestCost'] = self.particle[i]['Cost']\n",
" self.particle[i]['BestPosition'] = self.particle[i]['Position'] # equation 4 \n",
" \"\"\"update global best\"\"\"\n",
" if self.particle[i]['BestCost'] < self.global_best['Cost']: # equation 5\n",
" self.global_best['Cost'] = self.particle[i]['BestCost']\n",
" self.global_best['Position'] = self.particle[i]['Position']\n",
" self.w *= self.w_damp\n",
" self.best_cost[r] = self.global_best['Cost']\n",
" if r % 50 == 0 or r == self.max_iter-1:\n",
" print(\"itter: %d best_cost: %f\" % (r, self.global_best['Cost']))\n",
" print(\"final best_cost: %f \\nbest_position: %s\" % (self.global_best['Cost'], str(self.global_best['Position']) ))\n",
" return self.global_best['Cost'], str(self.global_best['Position'])\n",
" def display(self, graph='plot'):\n",
" if(len(self.best_cost) == 0): return \"Run you_object.pso() first.\"\n",
" %matplotlib inline\n",
" plt.style.use('seaborn-whitegrid')\n",
" fig = plt.figure()\n",
" ax = plt.axes()\n",
" items = sorted(self.best_cost.items())\n",
" x, y = zip(*items) \n",
" if (graph=='plot'):\n",
" ax.plot(x, y);\n",
" plt.title('PSO Training')\n",
" plt.xlabel('Iteration')\n",
" plt.ylabel('Best Cost')\n",
" elif (graph=='semilogy'):\n",
" plt.semilogy(x, y )\n",
" plt.title('PSO Convergence')\n",
" plt.xlabel('Iteration')\n",
" plt.ylabel('Best Cost') \n",
" def __repr__(self):\n",
" return super().__repr__() \n",
" \n",
" "
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "7737beb0afd74b5aba5bdebb3ff58676",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"HBox(children=(IntProgress(value=0, max=1000), HTML(value='')))"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"itter: 0 best_cost: 33.991001\n",
"itter: 50 best_cost: 1.036756\n",
"itter: 100 best_cost: 0.020032\n",
"itter: 150 best_cost: 0.002777\n",
"itter: 200 best_cost: 0.000274\n",
"itter: 250 best_cost: 0.000040\n",
"itter: 300 best_cost: 0.000004\n",
"itter: 350 best_cost: 0.000003\n",
"itter: 400 best_cost: 0.000000\n",
"itter: 450 best_cost: 0.000000\n",
"itter: 500 best_cost: 0.000000\n",
"itter: 550 best_cost: 0.000000\n",
"itter: 600 best_cost: 0.000000\n",
"itter: 650 best_cost: 0.000000\n",
"itter: 700 best_cost: 0.000000\n",
"itter: 750 best_cost: 0.000000\n",
"itter: 800 best_cost: 0.000000\n",
"itter: 850 best_cost: 0.000000\n",
"itter: 900 best_cost: 0.000000\n",
"itter: 950 best_cost: 0.000000\n",
"itter: 999 best_cost: 0.000000\n",
"\n",
"final best_cost: 0.000000 \n",
"best_position: [ 4.61533124e-08 -5.30349479e-09 -3.02241719e-08 1.21532736e-08\n",
" -2.42517884e-08]\n"
]
},
{
"data": {
"text/plain": [
"(3.807607177069591e-15,\n",
" '[ 4.61533124e-08 -5.30349479e-09 -3.02241719e-08 1.21532736e-08\\n -2.42517884e-08]')"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"pso = PSO()\n",
"pso.pso()"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 432x288 with 1 Axes>"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"pso.display(graph='plot')"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 432x288 with 1 Axes>"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"pso.display(graph='semilogy')"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.4"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment