Skip to content

Instantly share code, notes, and snippets.

@tacaswell
Forked from cpcloud/streaming_arrow.py
Created February 27, 2018 20:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tacaswell/82fc49b682b90327813fd14dc960369d to your computer and use it in GitHub Desktop.
Save tacaswell/82fc49b682b90327813fd14dc960369d to your computer and use it in GitHub Desktop.
Streaming Arrow
import random
import time
from collections import deque
from threading import Thread
import zmq
import numpy as np
import pandas as pd
import pyarrow as pa
def send_arrow(socket, df, flags=0, copy=True, track=False):
buf = pa.serialize_pandas(df)
return socket.send(buf, flags, copy=copy, track=track)
def recv_arrow(socket, flags=0, copy=True, track=False):
msg = socket.recv(flags=flags, copy=copy, track=track)
df = pa.deserialize_pandas(msg)
return df
def produce(url, ident, generate_data):
ctx = zmq.Context()
s = ctx.socket(zmq.PUSH)
s.connect(url)
while True:
df = generate_data()
send_arrow(s, df)
time.sleep(random.random())
s.close()
def consume(url, source):
ctx = zmq.Context()
s = ctx.socket(zmq.PULL)
s.connect(url)
while True:
df = recv_arrow(s)
source.emit(df)
s.close()
def proxy(in_url, out_url):
"""Simulate a device running in the background"""
ctx = zmq.Context()
in_s = ctx.socket(zmq.PULL)
in_s.bind(in_url)
out_s = ctx.socket(zmq.PUSH)
out_s.bind(out_url)
try:
zmq.proxy(in_s, out_s)
except zmq.ContextTerminated:
in_s.close()
out_s.close()
def main(source, generate_data, producers=2):
in_url = 'ipc:///tmp/data/0'
out_url = 'ipc:///tmp/data/1'
consumer = Thread(target=consume, args=(out_url, source))
proxy_thread = Thread(target=proxy, args=(in_url, out_url))
producers = [
Thread(target=produce, args=(in_url, i, generate_data))
for i in range(producers)
]
consumer.start()
proxy_thread.start()
for p in producers:
p.start()
consumer.join()
def generate_data():
n = int(3e6)
return pd.DataFrame({
'key': np.random.choice(list('abc'), size=n),
'value': np.random.randn(n)
})
if __name__ == '__main__':
from streamz import Stream
source = Stream()
buffered = (
source.map(lambda df: (df.groupby('key')
.value.sum()
.rename('value_sum').reset_index()))
.filter(lambda df: df.value_sum.gt(0).all())
.buffer(5))
buffered.sink(print)
results = deque(maxlen=5)
buffered.map(lambda df: results.append(df.value_sum.max())).sink(
lambda x: print(np.round(np.array(list(results)), 2)))
main(source, generate_data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment