Skip to content

Instantly share code, notes, and snippets.

@d1manson
Last active August 29, 2015 13:58
Show Gist options
  • Save d1manson/9943712 to your computer and use it in GitHub Desktop.
Save d1manson/9943712 to your computer and use it in GitHub Desktop.
wrap matplotlib for piping to main process from subprocess (incomplete) http://stackoverflow.com/q/22819254/2399799
# -*- coding: utf-8 -*-
"""
The purpose of this module is to do monoprocessing for a single package when using multiprocessing.
It is designed with matplotlib in mind, but most of it is pretty general.
The idea is that you may have a block of code that you want to run in parallel
processes, but because of some "small" thing (i.e. something that isn't computationally expensive)
you can't do it easily without some major, ugly, alterations to your existing code. Monobrow.py
injects all the ugly stuff for you at the point you launch the processes, so you don't need to
alter your base code at all.
WARNING: It currently only works in fairly simple circumstances.
Many, many, many, things here were learnt from Stack Overflow at one time or another.
TOOOs:
Implement more stuff
We have to be a careful in the way we listen for stuff and hold references on
the main process. We could in principle, I believe, find out the reference
count on the other process (for our fake isntances) and do garbage collection
on the main process...but in the short term it will be sufficient to just wait
for the given process to complete and allow all referenecs on the main thread
to go out of scope at that point.
I'm not sure how much of this could be implemented using multiprocessing managers.
Need to take care of using fake_obj_sub instance in argument.kwargs list, i.e. need to
convert them back to basic dummy class with nothing but an _id. And then
need to recover that back in the main process.
Need to find and implement a nice way of transfering large amounts of data that
won't fit in the pipe...various options, not sure what is easiest/best...ideally
want a way to make data available as readonly.
Need to wrap up the stuff for the main process more nicely.
Would be good if we could return a placeholder object from the piped calls, and
then only provide values/more fake objs on request, thus we could prevent locking.
In the spawned process, statements like ``impport bigThing.subThing``
fail, saying there is no "subThing", but you can do::
import bitThing
bigThing.subThing # this is fine
Also, if you have the statement ``import bigThing.subThing`` before the ``runWithPiping``
function does it's work then __import__ accepts that ``subThing`` exists and will
happily call ``getattr`` on ``bigThing``, which is what we want...need to fix this.
"""
"""
###############################################################################
Stuff required by main and spawned processes (roughly speaking)
###############################################################################
"""
def strTrunc(x,at=4):
s = str(x)
return s[:at] + (s[at:] and "...")
"""
###############################################################################
stuff required in spawned process (roughly speaking)
###############################################################################
"""
import sys
def unwrapThing(thing,socket):
"""This takes "things" which have been reiceved from the main process
and prepares them for useage in this process."""
if isinstance(thing,FAKE_OBJ_MAIN):
return FAKE_OBJ_SUB(thing._id,socket)
else:
return thing
def piped_call(method_name):
def wrapped(self,*args,**kwargs):
self_id = object.__getattribute__(self,'_id')
self_socket = object.__getattribute__(self,'_socket')
print "requesting", method_name, 'args=', strTrunc(args), 'kwargs=', strTrunc(kwargs)
sys.stdout.flush()
self_socket.send((method_name,self_id,args,kwargs))
_,_,ret = self_socket.recv()
return unwrapThing(ret,self_socket)
return wrapped
PIPEABLE_METHODS = [
'__getattribute__',
'__getitem__',
'__setitem__',
'__delitem__',
'__iter__',
'__reversed__',
'__contains__',
'__missing__',
'__call__',
'__setattr__',
'__len__',
'__enter__',
'__exit__',
'__dir__',
] # TODO: grow this list
class FAKE_OBJ_SUB(list,object):
""" This class is a generic wrapper for all objects. It pipes all access
requests to the main process and gets results back, thu it is able to act
as though it were an object of the expected type,
e.g. a matplotlib axes instance.
TODO: proably want to subclass FAKE_OBJ_SUB with different fake basecalsses
rather than use list here. Could then get info on baseclass as the point of
creating the FAKE_OBJ, and used the relevant subclass.
"""
def __init__(self,_id,socket):
object.__setattr__(self,'_id',_id)
object.__setattr__(self,'_socket',socket)
def __str__(self):
return "FAKE_OBJ_SUB for real id=%s" % object.__getattribute__(self,'_id')
"""Add all the required methods to FAKE_OBJ_SUB"""
for method_name in PIPEABLE_METHODS:
setattr(FAKE_OBJ_SUB,method_name,piped_call(method_name))
def runWithPiping(socket,targetPackage,func,args,kwargs):
if targetPackage in sys.modules: del sys.modules[targetPackage]
sys.modules[targetPackage] = FAKE_OBJ_SUB(_id=targetPackage,socket=socket)
return func(*args,**kwargs)
"""
###############################################################################
Stuff required in main process (roughly speaking)
###############################################################################
"""
from importlib import import_module
import sys
from multiprocessing import Process, Pipe
class FAKE_OBJ_MAIN():
"""This is a dummy class which makes it easy to recognise fake object ids
at the sub process end...ie. it exists purely to be pickled and unpickled."""
def __init__(self,_id):
self._id = _id
def __str__(self):
return "FAKE_OBJ_MAIN for real id=%s" % self._id
class MonobrowProcess(object):
""" Monobrow is exposed through this class.
See bottom of this file for example.
TODO: It might be better to subclass Process and implement pickle as
required """
def __init__(self,group,target,args=(),kwargs={},targetPackage='matplotlib'):
"""See multiprocessing.Process for meaning of inputs.
``targetPackage`` is the name of the package to apply monobrow to."""
self._nearSocket,farSocket = Pipe()
self._process = Process(target=runWithPiping,args=(farSocket,targetPackage,target,args,kwargs))
self._inst_cache = {}
def start(self):
self._process.start()
def _getCachedItem(self,_id):
if _id not in self._inst_cache:
return self._getModule(_id)
else:
return self._inst_cache[_id]
def _getModule(self,module_name):
if module_name not in sys.modules:
import_module(module_name)
return sys.modules[module_name]
def _wrapThing(self,thing,new_id):
"""This prepares things to be send to the spawned process"""
IMMUTABLE_TYPES = (float,int,long,str,unicode)
if isinstance(thing,IMMUTABLE_TYPES):
return thing
else:
new_id = id(thing) if new_id is None else new_id
self._inst_cache[new_id] = thing
return FAKE_OBJ_MAIN(new_id)
def recvSome(self):
while self._nearSocket.poll():
packet = self._nearSocket.recv()
magic_name,_id,args,kwargs = packet
print "doing", magic_name, 'args=', strTrunc(args,30),'kwargs=', strTrunc(kwargs)
sys.stdout.flush()
obj = self._getCachedItem(_id)
func = getattr(obj,magic_name)
ret,new_id = None,None
try:
ret = func(*args,**kwargs)
except AttributeError: # if obj is a module we may need to import submodule...
new_id = obj.__name__ + "." + args[0]
ret = self._getModule(new_id)
finally:
ret = self._wrapThing(ret,new_id)
self._nearSocket.send((magic_name,_id,ret))
def is_alive(self):
return self._process.is_alive()
"""
###############################################################################
nonesense example usage
###############################################################################
"""
import matplotlib.pylab as plt # this looks stupid, but seems neccessary (see note at top of file)
""" this is the thing we want to run in parallel"""
def testPlot(v,c):
import numpy as np
from time import sleep
import matplotlib.pylab as plt
sleep(v) # give this process a random handicap'
plt.hold(True)
for ii in np.arange(0,2,0.1):
y = np.sin(ii+v) # pretend this is a complex calculation
plt.plot([ii],[y],'.'+c)
def makeSomePretendArgs(N):
from numpy.random import rand
args = [None]*N
colors = 'rgbkmyc'
for ii in range(N):
args[ii] = (rand(1)*5,colors[ii])
return args
""" this is how we get it to run """
if __name__ == "__main__":
from multiprocessing import freeze_support
freeze_support()
import matplotlib.pylab as plt
N = 4
jobs = [None]*N
for ii,args_ii in zip(range(N),makeSomePretendArgs(N)):
jobs[ii] = MonobrowProcess(None,testPlot,args=args_ii)
jobs[ii].start()
while any((jj.is_alive() for jj in jobs)):
for jj in jobs:
jj.recvSome()
plt.pause(0.1)
for jj in jobs: # we have to do it one last time
jj.recvSome()
@d1manson
Copy link
Author

d1manson commented Apr 2, 2014

This is actually working.!!..the plot commands from the multiple processes get applied to a single figure, a single axis in fact.
Of course there are many ways to break it (as mentioned in TODOs and other unknowns).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment