Created
September 4, 2019 18:36
-
-
Save charnley/0a6c6500b7483dc543852129dd287d6a to your computer and use it in GitHub Desktop.
Cute multiprocessing of task coming from stdin pipe
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
sort -R /usr/share/dict/words | head -n 4 | sed -e "s/'s//g" | awk '{print}' ORS='' | |
echo |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
PY=python3 | |
all: | |
find data -name "*.txt" | ${PY} test.py | |
data: | |
mkdir -p data | |
for x in `seq 1 150`; do echo $$x; ./generate_password.sh > data/$$x.txt; done |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import select | |
import multiprocessing as mp | |
def stdin(): | |
""" | |
Reads lines from stdin pipe and yields results one-by-one. | |
Yields: | |
line: The next line from stdin | |
Example: | |
will yield a line for each txt file found in this folder | |
find . -name "*.txt" | python file.py | |
you can also read a file line by line | |
cat filename.txt | python file.py | |
""" | |
while sys.stdin in select.select([sys.stdin], [], [], 0)[0]: | |
line = sys.stdin.readline() | |
if not line: | |
yield from [] | |
break | |
line = line.strip() | |
yield line | |
def parallel(lines, func, args, kwargs, procs=1): | |
""" | |
Takes a list or a generator `lines` and spawns `procs` processes, calling | |
`func` with prefined arguments `args` and `kwargs`. | |
Using a queue and multiprocessing to call `func` with the format | |
func(line, *args, **kwargs) | |
Args: | |
lines: generator or list to parallelise the computation. | |
func: function to call every line. | |
args: Variable length argument list for `func`. | |
kwargs: Arbitrary keyword arguments for `func`. | |
procs: how many processes to start. | |
Returns: | |
results: List of all results from the parallel call (random order). | |
""" | |
# Start a queue with the size of processes for jobs and a result queue to | |
# collect results | |
q_res = mp.Queue() | |
q_job = mp.Queue(maxsize=procs) | |
# print lock | |
iolock = mp.Lock() | |
# Start the pool and await queue data | |
pool = mp.Pool(procs, | |
initializer=process, | |
initargs=(q_job, q_res, iolock, func, args, kwargs)) | |
# stream the data to queue | |
for line in lines: | |
# halts if queue is full | |
q_job.put(line) | |
# stop the process and pool | |
for _ in range(procs): | |
q_job.put(None) | |
pool.close() | |
pool.join() | |
# Collect all results | |
results = [] | |
while not q_res.empty(): | |
results.append(q_res.get(block=False)) | |
return results | |
def process(q, results, iolock, func, args, kwargs): | |
""" | |
multiprocessing interface for calling | |
func(x,*args, **kwargs) with `x` coming from q | |
args | |
q: multiprocessing queue. | |
iolock: print lock. | |
func: function to be called with `q` output. | |
kwargs: Arbitrary keyword arguments for `func`. | |
procs: how many processes to start. | |
return: | |
results: return values from `func`. | |
""" | |
kwargs["iolock"] = iolock | |
while True: | |
line = q.get() | |
if line is None: | |
break | |
result = func(line, *args, **kwargs) | |
results.put(result) | |
return | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import stdlib as sl | |
import hashlib | |
def read_file(filename, *args, iolock=None, **kwargs): | |
with open(filename, 'r') as f: | |
line = f.read() | |
line = line.strip() | |
hash_object = hashlib.md5(line.encode()) | |
hash_str = hash_object.hexdigest() | |
return hash_str | |
def main(): | |
# Generator, not a list | |
filenames = sl.stdin() | |
# slow for large filenames | |
# hashes = [read_file(filename) for filename in filenames] | |
# fast for large filenames | |
hashes = sl.parallel(filenames, read_file, (), {}, procs=2) | |
for h in hashes: | |
print(h) | |
return | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment