Skip to content

Instantly share code, notes, and snippets.

@sebjai
Last active April 28, 2024 15:10
Show Gist options
  • Save sebjai/5d119118295f3619b8e2f1a1bb4e01b6 to your computer and use it in GitHub Desktop.
Save sebjai/5d119118295f3619b8e2f1a1bb4e01b6 to your computer and use it in GitHub Desktop.
Implementation of optimal execution using limit and market orders from Chap 8.5 of Algorithmic and High-Frequency Trading by Cartea, Jaimungal, & Penalva (2015) Cambridge University Press
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import matplotlib\n",
"\n",
"import matplotlib.pyplot as plt\n",
"\n",
"import matplotlib.pylab as pylab\n",
"params = {'legend.fontsize': 10,\n",
" 'figure.figsize': (8, 4),\n",
" 'axes.labelsize': 20,\n",
" 'axes.titlesize': 20,\n",
" 'xtick.labelsize': 15,\n",
" 'ytick.labelsize': 15}\n",
"pylab.rcParams.update(params)\n",
"font = {'family': 'serif',\n",
" 'style': 'italic',\n",
" 'weight': 1,\n",
" 'size': 16,\n",
" }\n",
"\n",
"import numpy as np"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"def nan_to_num(x, nan):\n",
" \"\"\" Change the NaNs in a numpy array to the desired values.\n",
" :param x: a numpy array\n",
" :param nan: desired value\n",
" :return: a deep copy of x with changed values\n",
" \"\"\"\n",
" y = np.copy(x)\n",
" y[np.isnan(y)] = nan\n",
" return y"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"def AC_solver(phiAC, aAC, tau, T, Nq):\n",
" gamma = np.sqrt(phiAC / aAC)\n",
" qAC = Nq * np.divide((np.exp(gamma * tau) - np.exp(-gamma * tau)), np.exp(gamma * T) - np.exp(-gamma * T))\n",
" return qAC"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"def plot_curve(x, y, xlab=None, ylab=None , title=None):\n",
" plt.plot(x, y)\n",
" if xlab is not None:\n",
" plt.xlabel(xlab)\n",
" if ylab is not None:\n",
" plt.ylabel(ylab)\n",
" if title is not None:\n",
" plt.title(title)\n",
" plt.show()"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"def hjb_solver(t, dt, kappa, xi, phi, q, qAC, lamb):\n",
" Ndt = t.shape[0]\n",
" Nq = q.shape[0]\n",
"\n",
" omega = np.full((Nq, Ndt), np.NaN)\n",
" # Terminal conditions for all q\n",
" omega[:, omega.shape[1]-1] = np.exp(-kappa * xi * q)\n",
"\n",
" # Boundary conditions along q = 0\n",
" omega[0, :] = np.exp(-kappa * phi * (np.sum(np.power(qAC, 2) * dt) - np.cumsum(np.power(qAC, 2) * dt)))\n",
" exe = np.zeros((Nq, Ndt))\n",
" exe[:, exe.shape[1]-1] = 1\n",
" for k in range(Ndt - 2, -1, -1):\n",
" \n",
" # Solve the HJB in the continuation region from t+dt to t\n",
" omega[1:omega.shape[0], k] = omega[1:omega.shape[0], k+1] + \\\n",
" dt * (-kappa * phi * np.multiply(np.power((q[1:q.shape[0]] - qAC[k+1]), 2), omega[1:omega.shape[0], k+1]) +\n",
" np.exp(-1) * lamb * omega[0:(omega.shape[0]-1), k+1])\n",
"\n",
" idx_exe = np.insert(np.exp(-kappa * xi) * omega[0:(omega.shape[0]-1), k] > omega[1:omega.shape[0], k], 0, 0)\n",
" exe[:, k] = idx_exe\n",
" omega[1:omega.shape[0], k] = np.maximum(np.exp(-kappa * xi) * omega[0:(omega.shape[0]-1), k], omega[1:omega.shape[0], k])\n",
" return omega, exe\n",
"\n",
"\n",
"def find_opt_t(exe, t):\n",
" Nq = exe.shape[0]\n",
" t_opt = np.full(Nq, np.NaN)\n",
" for k in range(0, Nq, 1):\n",
" idx = np.where(exe[k, :] == 1)[0]\n",
" if idx.shape[0] > 0:\n",
" t_opt[k] = t[idx[0]]\n",
" return t_opt"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"def plot_topt(t_opt, q, qAC, t):\n",
" # Plot q vs t_opt\n",
" line1, = plt.plot(t_opt, q, 'bo')\n",
" # Plot qAC vs t\n",
" line2, = plt.plot(t, qAC, 'k-', linewidth=2)\n",
" plt.legend([line1, line2], [\"Execute MO\", \"Target Schedule\"])\n",
" plt.ylabel(r'Inventory ($q_t$)')\n",
" plt.xlabel(r'Time ($sec$)')\n",
" plt.title(r'Target and Behind Schedule Times')\n",
" plt.show()"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"def find_delta(kappa, omega, Nq, Ndt):\n",
" delta = np.full((Nq + 1, Ndt + 1), np.NaN)\n",
" delta[1:delta.shape[0], :] = 1 / kappa + 1 / kappa * np.log(np.divide(omega[1:omega.shape[0], :], omega[0:(omega.shape[0]-1), :]))\n",
" return delta"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"def plot_multi_lines(t, y, xlab=None, ylab=None, title=None):\n",
" color_idx = np.linspace(0, 1, y.shape[0])\n",
" for i, line in zip(color_idx, range(0, y.shape[0], 1)):\n",
" plt.plot(t, y[line, :], color=plt.cm.rainbow(i))\n",
" if xlab is not None:\n",
" plt.xlabel(xlab)\n",
" if ylab is not None:\n",
" plt.ylabel(ylab)\n",
" if title is not None:\n",
" plt.title(title)\n",
" plt.show()"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"def generate_simulations(Nsims, s0, Ndt, Nq, dt, delta, lamb, kappa, sigma, xi, t_opt, t):\n",
" \n",
" Qpath = np.full((Nsims, Ndt + 1), np.NaN)\n",
" # Starting inventory\n",
" Qpath[:, 0] = Nq\n",
" Xpath = np.full((Nsims, Ndt + 1), np.NaN)\n",
" # Starting cash\n",
" Xpath[:, 0] = 0\n",
" Spath = np.full((Nsims, Ndt + 1), np.NaN)\n",
" # Starting Mid-price\n",
" Spath[:, 0] = s0\n",
" deltaPath = np.full((Nsims, Ndt + 1), np.NaN)\n",
" isFilled = np.full((Nsims, Ndt + 1), np.NaN)\n",
" isMO = np.zeros((Nsims, Ndt + 1))\n",
" pricePerShare = np.full((Nsims, Ndt + 1), np.NaN)\n",
"\n",
" mu = 0\n",
" for k in range(0, Ndt, 1):\n",
" idx = Qpath[:, k] > 0\n",
"\n",
" deltaPath[idx, k] = delta[Qpath[idx, k].astype(int) - 1, k]\n",
" isFilled[idx, k] = np.random.rand(np.sum(idx)) < nan_to_num(lamb * np.exp(-kappa * deltaPath[idx, k]) * dt, np.Inf)\n",
"\n",
" deltaPath[(1 - idx).astype(bool), k] = np.NaN\n",
" isFilled[(1 - idx).astype(bool), k] = 0\n",
"\n",
" idx = np.logical_and((k + 1) * dt > t_opt[Qpath[:, k].astype(int)], Qpath[:, k] > 0)\n",
" isMO[idx, k] = 1\n",
" isFilled[:, k] = np.logical_and(1 - isMO[:, k], isFilled[:, k])\n",
"\n",
" idx = Qpath[:, k] > 0\n",
" Xpath[idx, k + 1] = Xpath[idx, k] + np.multiply(isFilled[idx, k], Spath[idx, k] + deltaPath[idx, k]) + \\\n",
" np.multiply(isMO[idx, k], Spath[idx, k] - xi)\n",
" Xpath[(1 - idx).astype(bool), k + 1] = Xpath[(1 - idx).astype(bool), k]\n",
"\n",
" Qpath[:, k + 1] = Qpath[:, k] - isMO[:, k] - isFilled[:, k]\n",
" Spath[:, k + 1] = Spath[:, k] + mu * dt + sigma * np.sqrt(dt) * np.random.randn(Nsims)\n",
"\n",
" idx = np.logical_and(Qpath[:, k + 1] < Nq, Qpath[:, k + 1] > 0)\n",
"\n",
" pricePerShare[idx, k + 1] = np.divide(Xpath[idx, k + 1], Nq - Qpath[idx, k + 1])\n",
"\n",
" idx = Qpath[:, k + 1] == 0\n",
" pricePerShare[idx, k + 1] = pricePerShare[idx, k]\n",
"\n",
" Xpath[:, Xpath.shape[1] - 1] += np.multiply(Qpath[:, Qpath.shape[1] - 1], Spath[:, Spath.shape[1] - 1] - xi)\n",
" Qpath[:, Qpath.shape[1] - 1] = 0\n",
"\n",
" idx = (Nq - Qpath[:, Qpath.shape[1] - 1]) > 0\n",
" pricePerShare[idx, pricePerShare.shape[1] - 1] = np.divide(Xpath[idx, Xpath.shape[1] - 1],\n",
" Nq - Qpath[idx, Qpath.shape[1] - 1])\n",
"\n",
" twap = np.divide(np.cumsum(Spath[:, 0:(Spath.shape[1] - 1)] * dt, axis=1), t[1:t.shape[0]]) - xi\n",
" twap = np.concatenate((Spath[:, 0][:, np.newaxis], twap), axis=1)\n",
"\n",
" return deltaPath, Qpath, isMO, Xpath, Spath, pricePerShare, twap"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"def plot_inventory(is_MO, Qpath, t):\n",
" color_idx = np.linspace(0, 1, Qpath.shape[0])\n",
" for i, line in zip(color_idx, range(0, Qpath.shape[0], 1)):\n",
" plt.step(t, Qpath[line, :], color=plt.cm.rainbow(i))\n",
" specific_qpath = Qpath[line, :]\n",
" thisisMO = is_MO[line, :].astype(bool)\n",
" plt.plot(t[thisisMO], specific_qpath[thisisMO], 'bo')\n",
" plt.xlabel(r'Time ($sec$)')\n",
" plt.ylabel(r'Inventory ($Q^*_t$)')\n",
" plt.title(r'Inventory Sample Paths')\n",
" plt.show()"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"def plot_multi_steps(t, y, xlab=None, ylab=None, title=None):\n",
" \n",
" color_idx = np.linspace(0, 1, y.shape[0])\n",
" for i, line in zip(color_idx, range(0, y.shape[0], 1)):\n",
" plt.step(t, y[line, :], color=plt.cm.rainbow(i))\n",
" if xlab is not None:\n",
" plt.xlabel(xlab)\n",
" if ylab is not None:\n",
" plt.ylabel(ylab)\n",
" if title is not None: \n",
" plt.title(title)\n",
" plt.show()"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"def plot_price_per_share(t, pricePerShare, twap):\n",
" color_idx = np.linspace(0, 1, pricePerShare.shape[0])\n",
" for i, line in zip(color_idx, range(0, pricePerShare.shape[0], 1)):\n",
" plt.plot(t, pricePerShare[line, :], color=plt.cm.rainbow(i), linestyle='-')\n",
" plt.plot(t, twap[line, :], color=plt.cm.rainbow(i), linestyle='--')\n",
" plt.ylabel(r'Price / Share ($\\frac{X_t}{Q_t}$)')\n",
" plt.xlabel(r'Time ($sec$)')\n",
" plt.show()"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"def plot_histogram(x, xlab=None, prob=None, bins= None): \n",
" \n",
" if bins is None:\n",
" counts = plt.hist(x[~np.isnan(x)])\n",
" else:\n",
" counts = plt.hist(x[~np.isnan(x)], bins=bins)\n",
" \n",
" if prob is not None: \n",
" color_idx = np.linspace(0, 1, prob.shape[0]) \n",
" q = np.quantile(x[~np.isnan(x)], prob) \n",
" for i, vline in zip(color_idx, range(0, prob.shape[0], 1)): \n",
" plt.axvline(x=q[vline], ymin=0, ymax=np.max(counts[0]), linestyle='--', color=plt.cm.rainbow(i), label='quantile ' + str(prob[vline]))\n",
"\n",
" if xlab is not None:\n",
" plt.xlabel(xlab)\n",
" plt.ylabel(r'Frequency')\n",
" plt.legend() \n",
" plt.show()"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [],
"source": [
"class FormatScalarFormatter(matplotlib.ticker.ScalarFormatter):\n",
" def __init__(self, fformat=\"%1.1f\", offset=True, mathText=True):\n",
" self.fformat = fformat\n",
" matplotlib.ticker.ScalarFormatter.__init__(self,useOffset=offset,\n",
" useMathText=mathText)\n",
" def _set_format(self, vmin=None, vmax=None):\n",
" self.format = self.fformat\n",
" if self._useMathText:\n",
" self.format = '$%s$' % matplotlib.ticker._mathdefault(self.format)\n",
"\n",
"\n",
"def plot_heat_map(t, q, myn_per_sim, meanq, medq, qAC, xlab=None, ylab=None, title=None):\n",
" \n",
" plt.tick_params(direction='in', bottom=True, top=True, left=True, right=True)\n",
"\n",
" x_cord, y_cord = np.meshgrid(t, q)\n",
" cmap = plt.get_cmap('YlOrRd')\n",
" plot = plt.contourf(x_cord, y_cord, myn_per_sim, 100, cmap=cmap, levels=np.linspace(myn_per_sim.min(), myn_per_sim.max(), 1000))\n",
" fmt = FormatScalarFormatter(\"%.2f\")\n",
" plt.colorbar(plot, format=fmt)\n",
"\n",
" plt.plot(t, meanq, linestyle='-', color='black', label=r'mean $Q_t^*$')\n",
" plt.plot(t, medq, linestyle='--', color='black', label=r'median $Q_t^*$')\n",
" plt.plot(t, qAC, linestyle='-', color='blue', label=r'target $q_t$')\n",
" if xlab is not None:\n",
" plt.xlabel(xlab)\n",
" if ylab is not None:\n",
" plt.ylabel(ylab)\n",
" if title is not None:\n",
" plt.title(title)\n",
" plt.legend()\n",
" plt.show()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.3"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment