Skip to content

Instantly share code, notes, and snippets.

@ryansmccoy
Created July 22, 2019 16:44
Show Gist options
  • Save ryansmccoy/a810edad2a8577cfe714a9e0cfe0eb20 to your computer and use it in GitHub Desktop.
Save ryansmccoy/a810edad2a8577cfe714a9e0cfe0eb20 to your computer and use it in GitHub Desktop.
benchmarking python multiprocessing queue
import sys
import time, datetime
import numpy as np
import pandas as pd
pd.set_option("display.float_format", lambda x: f"{x:.5f}")
pd.set_option("display.max_columns", 100)
pd.set_option("display.max_rows", 100)
pd.set_option("display.width", 600)
from multiprocessing import Process, Queue
def worker(q):
print(f"\nStarting Worker\t{datetime.datetime.now()}")
while True:
message = q.get()
if message is None:
print(f'\nShutdown Received\t{datetime.datetime.now()}')
break
_ = np.frombuffer(message)
print(f'\nWorker Finished\t{datetime.datetime.now()}')
sys.exit()
def producer(df):
print(f"\nStarting Publisher\t{datetime.datetime.now()}")
send_q = Queue()
Process(target=worker, args=(send_q,)).start()
for idx, num in df.iterrows():
send_q.put(num.values.tobytes())
# send_q.put(",".join(list(num.astype(str).to_dict().values())))
print(f"\nSending Shutdown\t{datetime.datetime.now()}")
send_q.put(None)
if __name__ == "__main__":
filepath = r'https://storage.googleapis.com/ryansmccoy/tick_data.csv'
df_temp = pd.read_csv(filepath)
df_temp = df_temp.astype(str)
df = pd.DataFrame(np.repeat(df_temp.values, 300000, axis=0))
df.columns = df_temp.columns
start_time = time.time()
print(f"\nStarting Timer\t{datetime.datetime.now()}")
#--------------------------
producer(df)
#--------------------------
end_time = time.time()
duration = end_time - start_time
msg_per_sec = len(df) / duration
time.sleep(5)
print(f"\nTotal Messages: 300,000")
print(f"\nDuration: {duration}")
print(f"\nMessages Per Second: {msg_per_sec}")
"""
# python multiprocessing_queue.py
# values only (no header), no encoding
Total Messages: 300,000
Duration: 54.57179665565491
Messages Per Second: 5,497.34
------------
# using np.bytes and values only (no header)
Total Messages: 300,000
Duration: 22.01723074913025
Messages Per Second: 13,625.69
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment