Skip to content

Instantly share code, notes, and snippets.

@norohind
Created December 11, 2022 10:10
Show Gist options
  • Save norohind/0d38fa1d4daace0643c6f2c396fada08 to your computer and use it in GitHub Desktop.
Save norohind/0d38fa1d4daace0643c6f2c396fada08 to your computer and use it in GitHub Desktop.
import sys
import ujson as json
from multiprocessing import Process, Queue
from random import randint
# To measure performance
# pv galaxy.json.gz | pigz -d -p 2 | python3 systems_names.py | pv 2>/dev/pts/2 > /dev/null
# Without measuring performance
# cat galaxy.json.gz | pigz -d -p 2 | python3 systems_names.py
def eprint(*args, **kwargs):
print(*args, **kwargs, file=sys.stderr)
def worker(in_q: Queue, out_q: Queue):
batch = str() # Using immutable type to accumulate lines isn't a good choice
# But haven't problems with workers performance yet
limit = randint(15000, 20000) # random batch size to split Queues locks in time, haven't properly tested performance impact though
while lines := in_q.get():
if randint(0, 1000) == 5: # For debug
eprint('worker', in_q.qsize())
for line in lines:
if len(batch) > limit:
out_q.put(batch)
batch = str()
else:
batch += json.loads(line.rstrip(',\n'))['name'] + '\n' # TODO: handle closing ]
eprint('Worker stopped', lines) # For debug
def printer(in_q: Queue):
while lines := in_q.get():
if randint(0, 1000) == 5: # For debug
eprint('printer', in_q.qsize())
sys.stdout.write(lines) # TODO: measure performance over print()
eprint('Printer stopped') # For debug
def main(to_workers, from_workers, workers_count):
input() # To skip openning [
workers = []
for _ in range(workers_count):
thread = Process(target=worker, args=(to_workers, from_workers))
thread.start()
workers.append(thread)
printer_thread = Process(target=printer, args=(from_workers,))
printer_thread.start()
batch = []
batch_limit = randint(14, 127) # random batch size to split Queues locks in time, haven't properly tested performance impact though
for line in sys.stdin: # Performance TODO: Use /dev/stdin and read it as a binary stream
if len(batch) > batch_limit:
to_workers.put(batch)
# batch.clear()
batch = [] # [] seems to be x5 times faster than list()
# We can't just clear old list as it's may still not be transferred to a worker process
# And then worker process will receive an empty list
batch_limit = randint(14, 127)
else:
batch.append(line)
# def main(): # Just to convert it to jsonl
# input()
# for line in sys.stdin:
# print(json.loads(line.rstrip(',\n'))['name'])
# # print(line.rstrip(',\n'))
if __name__ == '__main__':
# import signal
_to_workers = Queue()
_from_workers = Queue()
count = 5
# def shutdown(_, __): # TODO: proper sigint, sigterm handling
# for _ in range(count):
# _to_workers.put(None)
# _from_workers.put(None)
# signal.signal(signal.SIGINT, shutdown)
# signal.signal(signal.SIGTERM, shutdown)
main(_to_workers, _from_workers, count)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment