Skip to content

Instantly share code, notes, and snippets.

@espeed
Created October 20, 2011 19:32
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save espeed/1302075 to your computer and use it in GitHub Desktop.
Save espeed/1302075 to your computer and use it in GitHub Desktop.
Python ZeroMQ Streamer Device
import time
import zmq
from zmq.devices.basedevice import ProcessDevice
from multiprocessing import Process, Pool
def create_socket(socket_type,uri):
context = zmq.Context()
socket = context.socket(socket_type)
socket.connect(uri)
return socket
class Device(object):
message_format = "json"
def __init__(self):
self.out_port = None
self.in_port = None
def _set_random_port(self,proto,host):
base_uri = '%s://%s' % (proto,host)
binder = create_socket(zmq.PULL,base_uri)
port = binder.bind_to_random_port(base_uri)
binder.close()
return port
def _get_port(self,proto,host,port):
if port is None:
port = self._set_random_port(proto,host)
return port
def _build_uri(self,proto,host,port):
assert port is not None
uri = '%s://%s:%d' % (proto,host,port)
return uri
def bind_out(self,proto="tcp",host="127.0.0.1",port=None):
self.out_port = self._get_port(proto,host,port)
self.out_uri = self._build_uri(proto,host,port)
ProcessDevice.bind_out(self,self.out_uri)
return self.out_port
def bind_in(self,proto="tcp",host="127.0.0.1",port=None):
self.in_port = self._get_port(proto,host,port)
self.in_uri = self._build_uri(proto,host,port)
ProcessDevice.bind_in(self,self.in_uri)
return self.in_port
def connect_out(self,proto="tcp",host="127.0.0.1",port=None):
uri, port = self._build_uri(proto,host,port)
ProcessDevice.connect_out(self,uri)
def connect_in(self,proto="tcp",host="127.0.0.1",port=None):
uri, port = self._build_uri(proto,host,port)
ProcessDevice.connect_in(self,uri)
class Streamer(Device,ProcessDevice):
def __init__(self):
ProcessDevice.__init__(self,zmq.STREAMER,zmq.PULL,zmq.PUSH)
self.worker = None
self.worker_args = None
self.client = None
self.client_args = None
def set_worker(self,worker,*args):
self.worker = worker
self.worker_args = args
def set_client(self,client,*args):
self.client = client
self.client_args = args
def start_workers(self,number_of_workers):
#pool = Pool(processes=number_of_workers)
#pool.apply_async(self.worker,*self.worker_args)
for work_num in range(number_of_workers):
Process(target=self.worker, args=self.worker_args).start()
time.sleep(6)
def start_client(self):
print "Starting_client..."
self.client(*self.client_args)
class Queue(ProcessDevice,Device):
def __init__(self):
ProcessDevice.__init__(self,zmq.QUEUE,zmq.REQ,zmq.REP)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment