Skip to content

Instantly share code, notes, and snippets.

@mrakitin
Forked from cpcloud/streaming_arrow.py
Created February 27, 2018 21:35
Show Gist options
  • Save mrakitin/2608492a8d6e15f286a3aedaa252af95 to your computer and use it in GitHub Desktop.
Save mrakitin/2608492a8d6e15f286a3aedaa252af95 to your computer and use it in GitHub Desktop.
Streaming Arrow
import random
import time
from collections import deque
from threading import Thread
import zmq
import numpy as np
import pandas as pd
import pyarrow as pa
def send_arrow(socket, df, flags=0, copy=True, track=False):
buf = pa.serialize_pandas(df)
return socket.send(buf, flags, copy=copy, track=track)
def recv_arrow(socket, flags=0, copy=True, track=False):
msg = socket.recv(flags=flags, copy=copy, track=track)
df = pa.deserialize_pandas(msg)
return df
def produce(url, ident, generate_data):
ctx = zmq.Context()
s = ctx.socket(zmq.PUSH)
s.connect(url)
while True:
df = generate_data()
send_arrow(s, df)
time.sleep(random.random())
s.close()
def consume(url, source):
ctx = zmq.Context()
s = ctx.socket(zmq.PULL)
s.connect(url)
while True:
df = recv_arrow(s)
source.emit(df)
s.close()
def proxy(in_url, out_url):
"""Simulate a device running in the background"""
ctx = zmq.Context()
in_s = ctx.socket(zmq.PULL)
in_s.bind(in_url)
out_s = ctx.socket(zmq.PUSH)
out_s.bind(out_url)
try:
zmq.proxy(in_s, out_s)
except zmq.ContextTerminated:
in_s.close()
out_s.close()
def main(source, generate_data, producers=2):
in_url = 'ipc:///tmp/data/0'
out_url = 'ipc:///tmp/data/1'
consumer = Thread(target=consume, args=(out_url, source))
proxy_thread = Thread(target=proxy, args=(in_url, out_url))
producers = [
Thread(target=produce, args=(in_url, i, generate_data))
for i in range(producers)
]
consumer.start()
proxy_thread.start()
for p in producers:
p.start()
consumer.join()
def generate_data():
n = int(3e6)
return pd.DataFrame({
'key': np.random.choice(list('abc'), size=n),
'value': np.random.randn(n)
})
if __name__ == '__main__':
from streamz import Stream
source = Stream()
buffered = (
source.map(lambda df: (df.groupby('key')
.value.sum()
.rename('value_sum').reset_index()))
.filter(lambda df: df.value_sum.gt(0).all())
.buffer(5))
buffered.sink(print)
results = deque(maxlen=5)
buffered.map(lambda df: results.append(df.value_sum.max())).sink(
lambda x: print(np.round(np.array(list(results)), 2)))
main(source, generate_data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment