Skip to content

Instantly share code, notes, and snippets.

@cpcloud
Last active June 2, 2022 02:21
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save cpcloud/3b989f8ab483d9c89f77c8d83dbb0ffa to your computer and use it in GitHub Desktop.
Save cpcloud/3b989f8ab483d9c89f77c8d83dbb0ffa to your computer and use it in GitHub Desktop.
Streaming Arrow
import random
import time
from collections import deque
from threading import Thread
import zmq
import numpy as np
import pandas as pd
import pyarrow as pa
def send_arrow(socket, df, flags=0, copy=True, track=False):
buf = pa.serialize_pandas(df)
return socket.send(buf, flags, copy=copy, track=track)
def recv_arrow(socket, flags=0, copy=True, track=False):
msg = socket.recv(flags=flags, copy=copy, track=track)
df = pa.deserialize_pandas(msg)
return df
def produce(url, ident, generate_data):
ctx = zmq.Context()
s = ctx.socket(zmq.PUSH)
s.connect(url)
while True:
df = generate_data()
send_arrow(s, df)
time.sleep(random.random())
s.close()
def consume(url, source):
ctx = zmq.Context()
s = ctx.socket(zmq.PULL)
s.connect(url)
while True:
df = recv_arrow(s)
source.emit(df)
s.close()
def proxy(in_url, out_url):
"""Simulate a device running in the background"""
ctx = zmq.Context()
in_s = ctx.socket(zmq.PULL)
in_s.bind(in_url)
out_s = ctx.socket(zmq.PUSH)
out_s.bind(out_url)
try:
zmq.proxy(in_s, out_s)
except zmq.ContextTerminated:
in_s.close()
out_s.close()
def main(source, generate_data, producers=2):
in_url = 'ipc:///tmp/data/0'
out_url = 'ipc:///tmp/data/1'
consumer = Thread(target=consume, args=(out_url, source))
proxy_thread = Thread(target=proxy, args=(in_url, out_url))
producers = [
Thread(target=produce, args=(in_url, i, generate_data))
for i in range(producers)
]
consumer.start()
proxy_thread.start()
for p in producers:
p.start()
consumer.join()
def generate_data():
n = int(3e6)
return pd.DataFrame({
'key': np.random.choice(list('abc'), size=n),
'value': np.random.randn(n)
})
if __name__ == '__main__':
from streamz import Stream
source = Stream()
buffered = (
source.map(lambda df: (df.groupby('key')
.value.sum()
.rename('value_sum').reset_index()))
.filter(lambda df: df.value_sum.gt(0).all())
.buffer(5))
buffered.sink(print)
results = deque(maxlen=5)
buffered.map(lambda df: results.append(df.value_sum.max())).sink(
lambda x: print(np.round(np.array(list(results)), 2)))
main(source, generate_data)
@cpcloud
Copy link
Author

cpcloud commented Feb 27, 2018

@mrakitin
Copy link

Thanks @cpcloud!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment