Skip to content

Instantly share code, notes, and snippets.

@walkingpendulum
Created August 5, 2018 13:49
Show Gist options
  • Save walkingpendulum/f3fc2e7238994b889c1c7629280f02f9 to your computer and use it in GitHub Desktop.
Save walkingpendulum/f3fc2e7238994b889c1c7629280f02f9 to your computer and use it in GitHub Desktop.
multiprocessing: shared mem vs separate mem
"""
$ dd if=/dev/urandom of=sample.txt bs=16M count=1
$ python test.py
shared:
process 44666, memory 1M
process 44667, memory 1M
process 44668, memory 1M
total: 9M
separate:
process 44669, memory 17M
process 44670, memory 17M
process 44671, memory 17M
total: 57M
"""
import multiprocessing
import os
import time
from subprocess import check_output
def memory_usage(pid):
m = float(check_output(
"ps u -p %s | awk '{sum=sum+$6}; END {print sum/1024}'" % pid, shell=True
).strip())
return m
class Detector(object):
def __init__(self):
self.blob = None
self.report_event = None
def init(self, path):
with open(path) as f:
self.blob = f.read()
def start(self, init_args, stop_event):
self.init(*init_args)
# print 'process %s ready' % os.getpid()
while True:
if stop_event.is_set():
break
time.sleep(0.1)
# print 'process %s stopped' % os.getpid()
class Handler(object):
def __init__(self, workers_num, stop_event, detector_factory):
self.workers = []
self.workers_num = workers_num
self.processes = []
self.stop_event = stop_event
self.detector_factory = detector_factory
def start(self, init_args):
self.workers = [self.detector_factory() for _ in range(self.workers_num)]
self.processes = [
multiprocessing.Process(target=worker.start, args=(init_args, self.stop_event)) for worker in self.workers
]
[p.start() for p in self.processes]
def stop(self):
self.stop_event.set()
[p.join() for p in self.processes]
self.stop_event.clear()
def memory_report(self):
memory = map(lambda pid: int(memory_usage(pid)), [p.pid for p in self.processes] + [os.getpid()])
lines = ['process %s, memory %sM' % (p.pid, m) for p, m in zip(self.processes, memory)]
lines.append('total: %sM' % sum(memory))
print '\n'.join(lines)
class SharedBlobDetector(Detector):
def init(self, blob):
self.blob = blob
def spawn_separate(workers_num, path):
stop_event = multiprocessing.Event()
handler = Handler(workers_num, stop_event, Detector)
handler.start((path, ))
return handler
def spawn_shared(workers_num, path):
stop_event = multiprocessing.Event()
handler = Handler(workers_num, stop_event, SharedBlobDetector)
with open(path) as f:
blob = f.read()
handler.start((blob, ))
return handler
if __name__ == '__main__':
h_shared = spawn_shared(3, 'sample.txt')
h_separate = spawn_separate(3, 'sample.txt')
print 'shared:'
h_shared.memory_report()
print '\nseparate:'
h_separate.memory_report()
[h.stop() for h in [h_shared, h_separate]]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment