Skip to content

Instantly share code, notes, and snippets.

@mdellavo
Created June 12, 2012 17:21
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save mdellavo/2918809 to your computer and use it in GitHub Desktop.
Save mdellavo/2918809 to your computer and use it in GitHub Desktop.
python-zmq-worker-queue
import zmq
import logging
import uuid
MASTER_ENDPOINT = 'tcp://127.0.0.1:5000'
WORKER_ENDPOINT = 'tcp://127.0.0.1:5001'
log = logging.getLogger(__name__)
TASKS = {}
# FIXME add administrative endpoint for req/resp queries
# FIXME master communication should be REQ/RESP
# FIXME use pickle rather than json?
# FIXME better connection handling/pooling in invoke task?
def invoke_task(f, *args, **kwargs):
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.push_json({'task': f.__name__, 'args': args, 'kwargs': kwargs})
socket.close()
context.term()
return uuid.uuid4().hex
def task_stub(f):
return lambda *args, **kwargs: invoke_task(f, *args, **kwargs)
def task(f):
TASKS[f.__name__] = f
return task_stub(f)
def master_main():
context = zmq.Context()
master_socket = context.socket(zmq.PULL)
master_socket.bind(MASTER_ENDPOINT)
worker_socket = context.socket(zmq.PUSH)
worker_socket.bind(WORKER_ENDPOINT)
running = True
while running:
try:
worker_socket.send_json(master_socket.recv_json())
except Exception, e:
logger.error(e)
worker_socket.close()
master_socket.close()
context.term()
def worker_main():
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind(WORKER_ENDPOINT)
running = True
while running:
try:
data = socket.recv_json()
task_name = data['task']
args = data['args']
kwargs = data['kwargs']
f = TASKS[taks_name]
f(*args, **kwargs)
except Exception, e:
logger.error(e)
socket.close()
context.term()
if __name__ == '__main__':
master_main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment