Skip to content

Instantly share code, notes, and snippets.

@jmbarr
Created November 1, 2020 15:57
Show Gist options
  • Save jmbarr/897f85b078cc296f6a7c4cd8df9d1a28 to your computer and use it in GitHub Desktop.
Save jmbarr/897f85b078cc296f6a7c4cd8df9d1a28 to your computer and use it in GitHub Desktop.
Sensor management example 1
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"from datetime import datetime, timedelta\n",
"start_time = datetime.now()\n",
"\n",
"from stonesoup.models.transition.linear import CombinedLinearGaussianTransitionModel, \\\n",
" ConstantVelocity\n",
"from stonesoup.types.groundtruth import GroundTruthPath, GroundTruthState"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Generate ground truth"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"# np.random.seed(1991)\n",
"\n",
"\n",
"transition_model = CombinedLinearGaussianTransitionModel([ConstantVelocity(0.005),\n",
" ConstantVelocity(0.005)])\n",
"\n",
"truth = GroundTruthPath([GroundTruthState([0, 1, 0, 1], timestamp=start_time)])\n",
"for k in range(1, 21):\n",
" truth.append(GroundTruthState(\n",
" transition_model.function(truth[k-1], noise=True, time_interval=timedelta(seconds=1)),\n",
" timestamp=start_time+timedelta(seconds=k)))\n",
"truths = truth\n",
"\n",
"truth = GroundTruthPath([GroundTruthState([0, 1, 20, 1], timestamp=start_time)])\n",
"for k in range(1, 21):\n",
" truth.append(GroundTruthState(\n",
" transition_model.function(truth[k-1], noise=True, time_interval=timedelta(seconds=1)),\n",
" timestamp=start_time+timedelta(seconds=k)))\n",
"truths = [truths, truth]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Plot ground truth"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 720x432 with 1 Axes>"
]
},
"metadata": {
"needs_background": "light"
},
"output_type": "display_data"
}
],
"source": [
"from matplotlib import pyplot as plt\n",
"\n",
"fig = plt.figure(figsize=(10, 6))\n",
"ax = fig.add_subplot(1, 1, 1)\n",
"ax.set_xlabel(\"$x$\")\n",
"ax.set_ylabel(\"$y$\")\n",
"ax.set_ylim(0, 45)\n",
"ax.set_xlim(0, 25)\n",
"\n",
"for truth in truths:\n",
" ax.plot([state.state_vector[0] for state in truth],\n",
" [state.state_vector[2] for state in truth],\n",
" linestyle=\"--\",)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create a sensor manager class"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"from stonesoup.base import Property, Base\n",
"\n",
"class aSensorManager(Base):\n",
" \n",
" action_list = Property(list, doc=\"List of possible actions\")\n",
" \n",
" def choose_actions(self):\n",
" return self.action_list[int(np.floor(len(self.action_list)*np.random.uniform()))]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Create an instance of a sensor manager"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"actions = [0,1]\n",
"sensormanager = aSensorManager(actions)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create a measurement model"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"from stonesoup.models.measurement.linear import LinearGaussian\n",
"\n",
"measurement_model = LinearGaussian(\n",
" ndim_state=4,\n",
" mapping=(0, 2),\n",
" noise_covar=np.array([[0.75, 0],\n",
" [0, 0.75]])\n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create the Kalman predictor and updater"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"from stonesoup.predictor.kalman import KalmanPredictor\n",
"predictor = KalmanPredictor(transition_model)\n",
"\n",
"from stonesoup.updater.kalman import KalmanUpdater\n",
"updater = KalmanUpdater(measurement_model)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Run the Kalman filters\n",
"\n",
"#### We create 2 priors which estimate the targets’ initial states."
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"from stonesoup.types.state import GaussianState\n",
"prior = [GaussianState([[0], [1], [0], [1]], np.diag([1.5, 0.5, 1.5, 0.5]), timestamp=start_time), \n",
" GaussianState([[0], [1], [20], [1]], np.diag([1.5, 0.5, 1.5, 0.5]), timestamp=start_time)]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Run the loop"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"from stonesoup.types.hypothesis import SingleHypothesis\n",
"from stonesoup.types.track import Track\n",
"from stonesoup.types.detection import Detection\n",
"\n",
"tracks = [Track(),Track()]\n",
"tracks[0].append(prior[0])\n",
"tracks[1].append(prior[1])\n",
"\n",
"measurements = []\n",
"predictions = []\n",
"\n",
"for k in range(1, 21):\n",
" # At each timestep activate the sensor manager to decide which object to observe\n",
" chosen_target = sensormanager.choose_actions()\n",
" \n",
" # The ground truth will therefore be:\n",
" selected_truth = truths[chosen_target][k]\n",
" \n",
" # observe this\n",
" measurement = measurement_model.function(selected_truth, noise=True)\n",
" measurements.append(Detection(measurement, timestamp=selected_truth.timestamp))\n",
" \n",
" # Generate clutter at this time-step\n",
" # Skip for now\n",
"\n",
" # Do the prediction (for both targets) and the update for those\n",
" for ind in range(0,len(tracks)):\n",
" prediction = predictor.predict(tracks[ind][-1], timestamp=measurements[-1].timestamp)\n",
" \n",
" if ind == chosen_target: # update the prediction \n",
" # Association - just a single hypothesis at present\n",
" hypothesis = SingleHypothesis(prediction, measurements[-1]) # Group a prediction and measurement\n",
"\n",
" # update and add to track\n",
" post = updater.update(hypothesis) \n",
" else:\n",
" post = prediction\n",
" \n",
" tracks[ind].append(post)\n",
" prior[ind] = tracks[ind][-1]\n",
" "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Do plotting"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 720x432 with 1 Axes>"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ax.plot([state.state_vector[0] for state in tracks[0]],\n",
" [state.state_vector[2] for state in tracks[0]],\n",
" marker=\".\", color='r')\n",
"\n",
"\n",
"ax.plot([state.state_vector[0] for state in tracks[1]],\n",
" [state.state_vector[2] for state in tracks[1]],\n",
" marker=\".\", color='b')\n",
"\n",
"from matplotlib.patches import Ellipse\n",
"for state in tracks[0]:\n",
" w, v = np.linalg.eig(measurement_model.matrix()@state.covar@measurement_model.matrix().T)\n",
" max_ind = np.argmax(w)\n",
" min_ind = np.argmin(w)\n",
" orient = np.arctan2(v[1,max_ind], v[0,max_ind])\n",
" ellipse = Ellipse(xy=(state.state_vector[0], state.state_vector[2]),\n",
" width=2*np.sqrt(w[max_ind]), height=2*np.sqrt(w[min_ind]),\n",
" angle=np.rad2deg(orient),\n",
" alpha=0.2,\n",
" color='r')\n",
" ax.add_artist(ellipse)\n",
"\n",
"for state in tracks[1]:\n",
" w, v = np.linalg.eig(measurement_model.matrix()@state.covar@measurement_model.matrix().T)\n",
" max_ind = np.argmax(w)\n",
" min_ind = np.argmin(w)\n",
" orient = np.arctan2(v[1,max_ind], v[0,max_ind])\n",
" ellipse = Ellipse(xy=(state.state_vector[0], state.state_vector[2]),\n",
" width=2*np.sqrt(w[max_ind]), height=2*np.sqrt(w[min_ind]),\n",
" angle=np.rad2deg(orient),\n",
" alpha=0.2,\n",
" color='b')\n",
" ax.add_artist(ellipse)\n",
"\n",
"fig"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment