Created
July 22, 2019 16:44
-
-
Save ryansmccoy/a810edad2a8577cfe714a9e0cfe0eb20 to your computer and use it in GitHub Desktop.
benchmarking python multiprocessing queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import time, datetime | |
import numpy as np | |
import pandas as pd | |
pd.set_option("display.float_format", lambda x: f"{x:.5f}") | |
pd.set_option("display.max_columns", 100) | |
pd.set_option("display.max_rows", 100) | |
pd.set_option("display.width", 600) | |
from multiprocessing import Process, Queue | |
def worker(q): | |
print(f"\nStarting Worker\t{datetime.datetime.now()}") | |
while True: | |
message = q.get() | |
if message is None: | |
print(f'\nShutdown Received\t{datetime.datetime.now()}') | |
break | |
_ = np.frombuffer(message) | |
print(f'\nWorker Finished\t{datetime.datetime.now()}') | |
sys.exit() | |
def producer(df): | |
print(f"\nStarting Publisher\t{datetime.datetime.now()}") | |
send_q = Queue() | |
Process(target=worker, args=(send_q,)).start() | |
for idx, num in df.iterrows(): | |
send_q.put(num.values.tobytes()) | |
# send_q.put(",".join(list(num.astype(str).to_dict().values()))) | |
print(f"\nSending Shutdown\t{datetime.datetime.now()}") | |
send_q.put(None) | |
if __name__ == "__main__": | |
filepath = r'https://storage.googleapis.com/ryansmccoy/tick_data.csv' | |
df_temp = pd.read_csv(filepath) | |
df_temp = df_temp.astype(str) | |
df = pd.DataFrame(np.repeat(df_temp.values, 300000, axis=0)) | |
df.columns = df_temp.columns | |
start_time = time.time() | |
print(f"\nStarting Timer\t{datetime.datetime.now()}") | |
#-------------------------- | |
producer(df) | |
#-------------------------- | |
end_time = time.time() | |
duration = end_time - start_time | |
msg_per_sec = len(df) / duration | |
time.sleep(5) | |
print(f"\nTotal Messages: 300,000") | |
print(f"\nDuration: {duration}") | |
print(f"\nMessages Per Second: {msg_per_sec}") | |
""" | |
# python multiprocessing_queue.py | |
# values only (no header), no encoding | |
Total Messages: 300,000 | |
Duration: 54.57179665565491 | |
Messages Per Second: 5,497.34 | |
------------ | |
# using np.bytes and values only (no header) | |
Total Messages: 300,000 | |
Duration: 22.01723074913025 | |
Messages Per Second: 13,625.69 | |
""" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment