Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Streaming Arrow
import random
import time
from collections import deque
from threading import Thread
import zmq
import numpy as np
import pandas as pd
import pyarrow as pa
def send_arrow(socket, df, flags=0, copy=True, track=False):
buf = pa.serialize_pandas(df)
return socket.send(buf, flags, copy=copy, track=track)
def recv_arrow(socket, flags=0, copy=True, track=False):
msg = socket.recv(flags=flags, copy=copy, track=track)
df = pa.deserialize_pandas(msg)
return df
def produce(url, ident, generate_data):
ctx = zmq.Context()
s = ctx.socket(zmq.PUSH)
s.connect(url)
while True:
df = generate_data()
send_arrow(s, df)
time.sleep(random.random())
s.close()
def consume(url, source):
ctx = zmq.Context()
s = ctx.socket(zmq.PULL)
s.connect(url)
while True:
df = recv_arrow(s)
source.emit(df)
s.close()
def proxy(in_url, out_url):
"""Simulate a device running in the background"""
ctx = zmq.Context()
in_s = ctx.socket(zmq.PULL)
in_s.bind(in_url)
out_s = ctx.socket(zmq.PUSH)
out_s.bind(out_url)
try:
zmq.proxy(in_s, out_s)
except zmq.ContextTerminated:
in_s.close()
out_s.close()
def main(source, generate_data, producers=2):
in_url = 'ipc:///tmp/data/0'
out_url = 'ipc:///tmp/data/1'
consumer = Thread(target=consume, args=(out_url, source))
proxy_thread = Thread(target=proxy, args=(in_url, out_url))
producers = [
Thread(target=produce, args=(in_url, i, generate_data))
for i in range(producers)
]
consumer.start()
proxy_thread.start()
for p in producers:
p.start()
consumer.join()
def generate_data():
n = int(3e6)
return pd.DataFrame({
'key': np.random.choice(list('abc'), size=n),
'value': np.random.randn(n)
})
if __name__ == '__main__':
from streamz import Stream
source = Stream()
buffered = (
source.map(lambda df: (df.groupby('key')
.value.sum()
.rename('value_sum').reset_index()))
.filter(lambda df: df.value_sum.gt(0).all())
.buffer(5))
buffered.sink(print)
results = deque(maxlen=5)
buffered.map(lambda df: results.append(df.value_sum.max())).sink(
lambda x: print(np.round(np.array(list(results)), 2)))
main(source, generate_data)
@cpcloud

This comment has been minimized.

Copy link
Owner Author

@cpcloud cpcloud commented Feb 27, 2018

@mrakitin

This comment has been minimized.

Copy link

@mrakitin mrakitin commented Feb 27, 2018

Thanks @cpcloud!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.