Skip to content

Instantly share code, notes, and snippets.

@wkettler
Last active November 22, 2022 03:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wkettler/57b94bb146cd1d05bb5a3e77751cfb9c to your computer and use it in GitHub Desktop.
Save wkettler/57b94bb146cd1d05bb5a3e77751cfb9c to your computer and use it in GitHub Desktop.
A multiprocess stats engine example.
#!/usr/bin/env python3
from multiprocessing import Process, Pool, Manager
import random
import time
def percentile(values, percentile):
n = int(round((percentile / 100) * len(values) + 0.5))
return values[n-1]
def print_interval_stats(latencies, bytes, interval):
#latencies.sort()
ops = len(latencies)
iops = ops / interval
fifty_lat = percentile(latencies, 50) * 1000
ninenine_lat = percentile(latencies, 99) * 1000
min_lat = min(latencies) * 1000
max_lat = max(latencies) * 1000
avg_lat = sum(latencies) / ops * 1000
bw = bytes / 2**20 / interval
print(f'Ops: {ops}, Iops: {iops:.2f}, MiB/s: {bw:.2f}, '
f'Lat(ms): [ min: {min_lat:.2f}, max: {max_lat:.2f}, '
f'avg: {avg_lat:.2f}, 50th: {fifty_lat:.2f}, '
f'99th: {ninenine_lat:.2f} ]')
def print_total_stats(times, latencies, bytes):
ops = len(latencies)
fifty_lat = percentile(latencies, 50) * 1000
ninenine_lat = percentile(latencies, 99) * 1000
min_lat = min(latencies) * 1000
max_lat = max(latencies) * 1000
avg_lat = sum(latencies) / ops * 1000
duration = max([x[1] for x in times]) - min([x[0] for x in times])
iops = ops / duration
bw = bytes / 2**20 / duration
print(f'[TOTAL] Ops: {ops}, Iops: {iops:.2f}, MiB/s: {bw:.2f}, '
f'Lat(ms): [ min: {min_lat:.2f}, max: {max_lat:.2f}, '
f'avg: {avg_lat:.2f}, 50th: {fifty_lat:.2f}, '
f'99th: {ninenine_lat:.2f} ]')
def init(queue):
global q
q = queue
def generator(end_time):
size = 4194304
while time.time() <= end_time:
start = time.time()
time.sleep(random.uniform(.05,.09))
end = time.time()
q.put((size, start, end))
def statistics(q):
interval = 5
bytes = 0
times = []
latencies = []
t = time.time() + interval
ilatencies = []
ibytes = 0
while True:
stat = q.get()
if stat is None:
break
sz, start, end = stat
lat = end - start
if end >= t:
t += interval
print_interval_stats(ilatencies, ibytes, interval)
# Reset interval stats
ibytes = 0
ilatencies = []
ibytes += sz
ilatencies.append(lat)
# Update totals
bytes += sz
times.append((start, end))
latencies.append(lat)
print_total_stats(times, latencies, bytes)
if __name__ == '__main__':
count = 10
duration = 60
queue = Manager().Queue()
c = Process(target=statistics, args=[queue])
c.start()
with Pool(count, init, [queue]) as pool:
end_time = time.time() + duration
pool.map(generator, [end_time for x in range(count)])
queue.put(None)
c.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment