Skip to content

Instantly share code, notes, and snippets.

@charnley
Created September 4, 2019 18:36
Show Gist options
  • Save charnley/0a6c6500b7483dc543852129dd287d6a to your computer and use it in GitHub Desktop.
Save charnley/0a6c6500b7483dc543852129dd287d6a to your computer and use it in GitHub Desktop.
Cute multiprocessing of task coming from stdin pipe
#!/bin/bash
sort -R /usr/share/dict/words | head -n 4 | sed -e "s/'s//g" | awk '{print}' ORS=''
echo
PY=python3
all:
find data -name "*.txt" | ${PY} test.py
data:
mkdir -p data
for x in `seq 1 150`; do echo $$x; ./generate_password.sh > data/$$x.txt; done
import sys
import select
import multiprocessing as mp
def stdin():
"""
Reads lines from stdin pipe and yields results one-by-one.
Yields:
line: The next line from stdin
Example:
will yield a line for each txt file found in this folder
find . -name "*.txt" | python file.py
you can also read a file line by line
cat filename.txt | python file.py
"""
while sys.stdin in select.select([sys.stdin], [], [], 0)[0]:
line = sys.stdin.readline()
if not line:
yield from []
break
line = line.strip()
yield line
def parallel(lines, func, args, kwargs, procs=1):
"""
Takes a list or a generator `lines` and spawns `procs` processes, calling
`func` with prefined arguments `args` and `kwargs`.
Using a queue and multiprocessing to call `func` with the format
func(line, *args, **kwargs)
Args:
lines: generator or list to parallelise the computation.
func: function to call every line.
args: Variable length argument list for `func`.
kwargs: Arbitrary keyword arguments for `func`.
procs: how many processes to start.
Returns:
results: List of all results from the parallel call (random order).
"""
# Start a queue with the size of processes for jobs and a result queue to
# collect results
q_res = mp.Queue()
q_job = mp.Queue(maxsize=procs)
# print lock
iolock = mp.Lock()
# Start the pool and await queue data
pool = mp.Pool(procs,
initializer=process,
initargs=(q_job, q_res, iolock, func, args, kwargs))
# stream the data to queue
for line in lines:
# halts if queue is full
q_job.put(line)
# stop the process and pool
for _ in range(procs):
q_job.put(None)
pool.close()
pool.join()
# Collect all results
results = []
while not q_res.empty():
results.append(q_res.get(block=False))
return results
def process(q, results, iolock, func, args, kwargs):
"""
multiprocessing interface for calling
func(x,*args, **kwargs) with `x` coming from q
args
q: multiprocessing queue.
iolock: print lock.
func: function to be called with `q` output.
kwargs: Arbitrary keyword arguments for `func`.
procs: how many processes to start.
return:
results: return values from `func`.
"""
kwargs["iolock"] = iolock
while True:
line = q.get()
if line is None:
break
result = func(line, *args, **kwargs)
results.put(result)
return
import stdlib as sl
import hashlib
def read_file(filename, *args, iolock=None, **kwargs):
with open(filename, 'r') as f:
line = f.read()
line = line.strip()
hash_object = hashlib.md5(line.encode())
hash_str = hash_object.hexdigest()
return hash_str
def main():
# Generator, not a list
filenames = sl.stdin()
# slow for large filenames
# hashes = [read_file(filename) for filename in filenames]
# fast for large filenames
hashes = sl.parallel(filenames, read_file, (), {}, procs=2)
for h in hashes:
print(h)
return
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment