Skip to content

Instantly share code, notes, and snippets.

@tomasbedrich
Created March 25, 2016 15:07
Show Gist options
  • Save tomasbedrich/6532487b09ae3343314e to your computer and use it in GitHub Desktop.
Save tomasbedrich/6532487b09ae3343314e to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
from matplotlib import pyplot as plt
from matplotlib import animation
import threading
import collections
import queue
import logging
def _plotter(end, data, plot_params, *, plot_setup=None, plot_setup_args=None, plot_setup_kwargs=None,
buffer_size=1000, window_size=10, fps=1, reset_scale_every=float('inf'), blit=False):
"""Create, setup and run realtime plot using data queue."""
num_lines = len(plot_params)
if not plot_setup_args:
plot_setup_args = []
if not plot_setup_kwargs:
plot_setup_kwargs = {}
# create buffers
x_bufs = [collections.deque(maxlen=buffer_size) for i in range(num_lines)]
y_bufs = [collections.deque(maxlen=buffer_size) for i in range(num_lines)]
# y-axis min and max
y_min, y_max, x_max = 0, 0, 0
# prepare plot
fig, ax = plt.subplots()
lines = [ax.plot([], [], params)[0] for params in plot_params]
if plot_setup:
plot_setup(plt, fig, ax, lines, *plot_setup_args, **plot_setup_kwargs)
def redraw(num):
global y_min, y_max, x_max
# reset scale if desired
if not num % reset_scale_every:
y_min, y_max, x_max = 0, 0, 0
# copy data from queue to buffers
while True:
try:
i, x, y = data.get_nowait()
y_min, y_max, x_max = min(y_min, y), max(y_max, y), max(x_max, x)
x_bufs[i].append(x)
y_bufs[i].append(y)
except queue.Empty:
break
for i in range(num_lines):
lines[i].set_data(x_bufs[i], y_bufs[i])
# rescale view
y_padding = max(0.01, (y_max - y_min) / 20) # padding = 5% of data range
x_padding = max(0.01, window_size / 20) # padding = 5% of window size
ax.set_xlim(x_max - window_size - x_padding, x_max + x_padding)
ax.set_ylim(y_min - y_padding, y_max + y_padding)
# MUST create variable or the animation is immediately destroyed (on Windows)
ani = animation.FuncAnimation(fig, redraw, interval=1e3//fps, blit=blit)
plt.show()
end.set()
logging.debug("ending plotter")
def _receiver(gen, i, data, end):
"""Receive generator samples (tuples of x and y data). Used as a thread."""
try:
for x, y in gen:
data.put((i, x, y))
if end.is_set():
break
except BrokenPipeError:
pass
logging.debug("ending {!r} receiver".format(gen))
def plot(lines, *args, **kwargs):
"""Plot lines (list of tuples of generator and its plot params). For more args see _plotter."""
end = threading.Event()
generators, plot_params = zip(*lines)
data = queue.Queue()
# start generator receivers
logging.debug("starting receiver threads")
receivers = [threading.Thread(name="{!r} receiver".format(gen), target=_receiver,
args=(gen, i, data, end)) for i, gen in enumerate(generators)]
# set as daemon, because we do not want to wait for generator to check end flag
for r in receivers:
r.daemon = True
r.start()
_plotter(end, data, plot_params, **kwargs)
# run that baby
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
def rand_gen(sleep, max):
import random
import time
t0 = time.time()
while True:
yield time.time() - t0, random.randint(0, max)
time.sleep(sleep)
def plot_setup(plt, fig, ax, lines):
plt.grid()
generators = [
(rand_gen(0.5, 10), "ro-"),
(rand_gen(0.05, 5), "g-"),
]
plot(generators, fps=10, buffer_size=100, window_size=5, plot_setup=plot_setup)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment