Created
June 12, 2012 17:21
-
-
Save mdellavo/2918809 to your computer and use it in GitHub Desktop.
python-zmq-worker-queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import zmq | |
import logging | |
import uuid | |
MASTER_ENDPOINT = 'tcp://127.0.0.1:5000' | |
WORKER_ENDPOINT = 'tcp://127.0.0.1:5001' | |
log = logging.getLogger(__name__) | |
TASKS = {} | |
# FIXME add administrative endpoint for req/resp queries | |
# FIXME master communication should be REQ/RESP | |
# FIXME use pickle rather than json? | |
# FIXME better connection handling/pooling in invoke task? | |
def invoke_task(f, *args, **kwargs): | |
context = zmq.Context() | |
socket = context.socket(zmq.PUSH) | |
socket.push_json({'task': f.__name__, 'args': args, 'kwargs': kwargs}) | |
socket.close() | |
context.term() | |
return uuid.uuid4().hex | |
def task_stub(f): | |
return lambda *args, **kwargs: invoke_task(f, *args, **kwargs) | |
def task(f): | |
TASKS[f.__name__] = f | |
return task_stub(f) | |
def master_main(): | |
context = zmq.Context() | |
master_socket = context.socket(zmq.PULL) | |
master_socket.bind(MASTER_ENDPOINT) | |
worker_socket = context.socket(zmq.PUSH) | |
worker_socket.bind(WORKER_ENDPOINT) | |
running = True | |
while running: | |
try: | |
worker_socket.send_json(master_socket.recv_json()) | |
except Exception, e: | |
logger.error(e) | |
worker_socket.close() | |
master_socket.close() | |
context.term() | |
def worker_main(): | |
context = zmq.Context() | |
socket = context.socket(zmq.PULL) | |
socket.bind(WORKER_ENDPOINT) | |
running = True | |
while running: | |
try: | |
data = socket.recv_json() | |
task_name = data['task'] | |
args = data['args'] | |
kwargs = data['kwargs'] | |
f = TASKS[taks_name] | |
f(*args, **kwargs) | |
except Exception, e: | |
logger.error(e) | |
socket.close() | |
context.term() | |
if __name__ == '__main__': | |
master_main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment