Created
March 25, 2016 15:07
-
-
Save tomasbedrich/6532487b09ae3343314e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from matplotlib import pyplot as plt | |
from matplotlib import animation | |
import threading | |
import collections | |
import queue | |
import logging | |
def _plotter(end, data, plot_params, *, plot_setup=None, plot_setup_args=None, plot_setup_kwargs=None, | |
buffer_size=1000, window_size=10, fps=1, reset_scale_every=float('inf'), blit=False): | |
"""Create, setup and run realtime plot using data queue.""" | |
num_lines = len(plot_params) | |
if not plot_setup_args: | |
plot_setup_args = [] | |
if not plot_setup_kwargs: | |
plot_setup_kwargs = {} | |
# create buffers | |
x_bufs = [collections.deque(maxlen=buffer_size) for i in range(num_lines)] | |
y_bufs = [collections.deque(maxlen=buffer_size) for i in range(num_lines)] | |
# y-axis min and max | |
y_min, y_max, x_max = 0, 0, 0 | |
# prepare plot | |
fig, ax = plt.subplots() | |
lines = [ax.plot([], [], params)[0] for params in plot_params] | |
if plot_setup: | |
plot_setup(plt, fig, ax, lines, *plot_setup_args, **plot_setup_kwargs) | |
def redraw(num): | |
global y_min, y_max, x_max | |
# reset scale if desired | |
if not num % reset_scale_every: | |
y_min, y_max, x_max = 0, 0, 0 | |
# copy data from queue to buffers | |
while True: | |
try: | |
i, x, y = data.get_nowait() | |
y_min, y_max, x_max = min(y_min, y), max(y_max, y), max(x_max, x) | |
x_bufs[i].append(x) | |
y_bufs[i].append(y) | |
except queue.Empty: | |
break | |
for i in range(num_lines): | |
lines[i].set_data(x_bufs[i], y_bufs[i]) | |
# rescale view | |
y_padding = max(0.01, (y_max - y_min) / 20) # padding = 5% of data range | |
x_padding = max(0.01, window_size / 20) # padding = 5% of window size | |
ax.set_xlim(x_max - window_size - x_padding, x_max + x_padding) | |
ax.set_ylim(y_min - y_padding, y_max + y_padding) | |
# MUST create variable or the animation is immediately destroyed (on Windows) | |
ani = animation.FuncAnimation(fig, redraw, interval=1e3//fps, blit=blit) | |
plt.show() | |
end.set() | |
logging.debug("ending plotter") | |
def _receiver(gen, i, data, end): | |
"""Receive generator samples (tuples of x and y data). Used as a thread.""" | |
try: | |
for x, y in gen: | |
data.put((i, x, y)) | |
if end.is_set(): | |
break | |
except BrokenPipeError: | |
pass | |
logging.debug("ending {!r} receiver".format(gen)) | |
def plot(lines, *args, **kwargs): | |
"""Plot lines (list of tuples of generator and its plot params). For more args see _plotter.""" | |
end = threading.Event() | |
generators, plot_params = zip(*lines) | |
data = queue.Queue() | |
# start generator receivers | |
logging.debug("starting receiver threads") | |
receivers = [threading.Thread(name="{!r} receiver".format(gen), target=_receiver, | |
args=(gen, i, data, end)) for i, gen in enumerate(generators)] | |
# set as daemon, because we do not want to wait for generator to check end flag | |
for r in receivers: | |
r.daemon = True | |
r.start() | |
_plotter(end, data, plot_params, **kwargs) | |
# run that baby | |
if __name__ == "__main__": | |
logging.basicConfig(level=logging.DEBUG) | |
def rand_gen(sleep, max): | |
import random | |
import time | |
t0 = time.time() | |
while True: | |
yield time.time() - t0, random.randint(0, max) | |
time.sleep(sleep) | |
def plot_setup(plt, fig, ax, lines): | |
plt.grid() | |
generators = [ | |
(rand_gen(0.5, 10), "ro-"), | |
(rand_gen(0.05, 5), "g-"), | |
] | |
plot(generators, fps=10, buffer_size=100, window_size=5, plot_setup=plot_setup) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment