Skip to content

Instantly share code, notes, and snippets.

@minrk
Created March 25, 2011 21:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save minrk/887655 to your computer and use it in GitHub Desktop.
Save minrk/887655 to your computer and use it in GitHub Desktop.
def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
ident=None):
"""construct and send an apply message via a socket"""
assert not self._closed, "cannot use me anymore, I'm closed!"
# defaults:
args = args if args is not None else []
kwargs = kwargs if kwargs is not None else {}
subheader = subheader if subheader is not None else {}
# validate arguments
if not callable(f):
raise TypeError("f must be callable, not %s"%type(f))
if not isinstance(args, (tuple, list)):
raise TypeError("args must be tuple or list, not %s"%type(args))
if not isinstance(kwargs, dict):
raise TypeError("kwargs must be dict, not %s"%type(kwargs))
if not isinstance(subheader, dict):
raise TypeError("subheader must be dict, not %s"%type(subheader))
if not self._ids:
# flush notification socket if no engines yet
any_ids = self.ids
if not any_ids:
raise error.NoEnginesRegistered("Can't execute without any connected engines.")
# enforce types of f,args,kwargs
bufs = util.pack_apply_message(f,args,kwargs)
msg = self.session.send(socket, "apply_request", buffers=bufs,
subheader=subheader, track=track)
msg_id = msg['msg_id']
self.outstanding.add(msg_id)
self.history.append(msg_id)
self.metadata[msg_id]['submitted'] = datetime.now()
return msg
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment