Skip to content

Instantly share code, notes, and snippets.

@skchronicles
Created October 14, 2021 20:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save skchronicles/afbdfb8b41317f1a78d1eaac618a779a to your computer and use it in GitHub Desktop.
Save skchronicles/afbdfb8b41317f1a78d1eaac618a779a to your computer and use it in GitHub Desktop.
Real world parallel processing example using Ray
#!/usr/bin/env python3
"""md5.py: calculates md5s of multiple files in parallel.
The md5 calculation is memory safe. It reads in a file
in blocks of 64 KiB.
USAGE:
python3 md5.py file1.txt file2.csv 8
The last positional argument is number of concurrent
MD5 processes to spawn using ray.
"""
# Python standard library
from __future__ import print_function
import os, sys, hashlib
# 3rd party imports
import ray
@ray.remote
def md5sum(filename, first_block_only = False, blocksize = 65536):
"""Gets md5checksum of a file in memory-safe manner.
The file is read in blocks/chunks defined by the blocksize parameter. This is
a safer option to reading the entire file into memory if the file is very large.
@param filename <str>:
Input file on local filesystem to find md5 checksum
@param first_block_only <bool>:
Calculate md5 checksum of the first block/chunk only
@param blocksize <int>:
Blocksize of reading N chunks of data to reduce memory profile
@return (filename, hasher.hexdigest()) <tuple>:
filename and MD5 checksum of the file's contents
"""
hasher = hashlib.md5()
with open(filename, 'rb') as fh:
buf = fh.read(blocksize)
if first_block_only:
# Calculate MD5 of first block or chunck of file.
# This is a useful heuristic for when potentially
# calculating an MD5 checksum of thousand or
# millions of file.
hasher.update(buf)
return (filename, hasher.hexdigest())
while len(buf) > 0:
# Calculate MD5 checksum of entire file
hasher.update(buf)
buf = fh.read(blocksize)
return (filename, hasher.hexdigest())
if __name__ == '__main__':
# List of files to checksum
files = sys.argv[1:-1]
# Number of concurrent tasks
# or remote workers.
try: threads = int(sys.argv[-1])
except ValueError: threads = 4
# Initialize a ray cluster
# with X remote workers
ray.init(num_cpus = threads)
# Run md5sum function in
# parallel with the .remote()
# method. This methods yields
# a future or ObjectRef which
# can be fetched later with
# ray.get(). As files are
# processed, remote workers
# freed from a job queue.
workers = [md5sum.remote(f) for f in files]
# The ray.get() method is
# blocking waits until all
# tasks have completed. The
# order of the inputs and
# the function results are
# preserved. Print out results
# to standard output.
for worker in workers:
try:
result = ray.get(worker)
file, md5 = result
except Exception as e:
print('An error occured:\n{}'.format(e), file=sys.stderr)
continue
print("{}\t{}".format(file, md5))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment