Skip to content

Instantly share code, notes, and snippets.

@adavidzh
Created February 10, 2018 04:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save adavidzh/9ba740baac541c225cfd587b8905f224 to your computer and use it in GitHub Desktop.
Save adavidzh/9ba740baac541c225cfd587b8905f224 to your computer and use it in GitHub Desktop.
My little zmq DAQ
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""A client-server for simple DAQ operations.
The goal is offload the event data as soon as possible from the acquisition machine so that it can focus on talking to the hardware.
This code was developed with the concrete case of:
- a Raspberry Pi performing the detector readout via GPIO (the server) and
- the event data being procesed in a powerful computer (the client)
Flow control is performed over a synchronous REQquest-REPly connection and has three verbs:
- CONFIGUREBLING string - to configure the hardware.
- GIMMEEVENTS N_events - to request the acquisition on N_events.
- MICDROP - to signal the server that we're done with the session.
The client operates from a flight plan that is a list of commands to be executed.
The event data is sent from the server to the client using a PUSH-PULL topology. This means that more PULL clients can be added in which case they will receive events in a round-robin. This can be used to parallelise processing of the event data with more PULL clients.
"""
__version__ = '0.1'
__author__ = 'André David <andre.david@cern.ch>'
import zmq
import time
import sys
import string
import random as rd
from functools import partial
from collections import defaultdict
import logging
logging.basicConfig(
level=logging.INFO,
format='%(relativeCreated)6d %(levelname)10s %(name)s %(processName)12s: %(message)s'
)
logger = logging.getLogger(__name__)
def random_string(length, chars=string.ascii_letters):
"""just a way to get random data"""
return ''.join( rd.choice(chars) for x in xrange(length) )
def decode_request(request):
"""decodes a request as 'command [anything_else]'"""
parts = request.split(' ', 1)
command = parts.pop(0)
try:
payload = parts[0]
except IndexError:
payload = None
return (command, payload)
def server(port=5556):
"""Skeleton of the server side, running close to the hardware"""
context = zmq.Context()
# Create the command socket where requests are received from and replies are sent to
rep = context.socket(zmq.REP)
rep.bind("tcp://*:%d" % port)
logger.info("REP'ing @%d" % port)
def acquire_event():
"""emulates the acquisiton of an event from hardware"""
logger.debug("(Quickly) acquiring one event from hardware")
time.sleep(0.5)
event_data = random_string(10)
return event_data
def send_events(payload, port):
"""takes care of PUSHing events out asynchronously"""
nevents = int(payload)
push = zmq.Context().socket(zmq.PUSH)
push.bind("tcp://*:%d" % port)
logger.info("PUSH'ing @%d" % port)
for event in xrange(nevents):
event_data = acquire_event()
message = '%d %s'% (event, event_data)
push.send(message)
logger.info("PUSH'ed event %d [%s]" % (event, event_data) )
push.close()
logger.info("PUSH'ing @%d stopped" % port)
return 'SENT %d' % nevents
def configure_hw(configuration):
"""emulates the configuration of hardware"""
logger.info("(Slowly) configuring with [%s]" % configuration)
time.sleep(1)
return 'AWESOME SAUCE'
# Disptach table for the server functions
# the default returns a function (the second lambda) that returns something not obviously bad (empty string)
commands = defaultdict( lambda: lambda x: '',
{
'CONFIGUREBLING' : configure_hw,
'GIMMEEVENTS' : partial( send_events, port=port+1 ),
})
while (True):
# Wait for a command from the client
request = rep.recv()
logger.info("rx REQ [%s]" % request)
command, payload = decode_request(request)
logger.debug("Dispatching [%s][%s]" % (command, payload))
result = commands[command](payload)
reply = ' '.join( (command, result) )
rep.send(reply)
logger.debug("tx REP [%s]" % reply)
# Least fancy way to terminate the loop
if command == 'MICDROP':
break
rep.close()
context.term()
logger.info("Mic dropped")
def client(server_host='localhost', port=5556, flight_plan=('MICDROP',)):
"""Skeleton of the client side, running anywhere in the network (in a fast machine)"""
context = zmq.Context()
CMDhostport = '%s:%d' % (server_host, port)
EVThostport = '%s:%d' % (server_host, port+1)
# Connect to the control socket of the server to where requests are sent
logger.debug("REQ'ing @%s" % CMDhostport)
req = context.socket(zmq.REQ)
req.connect ("tcp://%s" % CMDhostport)
def process_event(data):
"""emulates the (slow) processing of one event's data"""
logger.debug("(Slowly) processing one event [%s]" % data)
time.sleep(1.5)
return
def receive_events(payload, hostport):
"""takes care of PULLing events in and processing them"""
nevents = int(payload)
logger.info("PULL'ing @%s started" % hostport)
pull = zmq.Context().socket(zmq.PULL)
pull.connect("tcp://%s" % hostport)
for event in xrange(nevents):
message = pull.recv()
logger.info("PULL'ed event %d [%s]" %(event,message))
process_event(message)
pull.close()
logger.info("PULL'ing @%s stopped" % hostport)
return
# Dispatch table for commands in the client
commands = defaultdict( lambda: lambda x: '',
{
'GIMMEEVENTS' : partial( receive_events, hostport=EVThostport ),
})
for request in flight_plan:
req.send(request)
logger.debug("tx REQ [%s]" % request)
command, payload = decode_request(request)
logger.debug("Dispatching [%s][%s]" % (command, payload))
result = commands[command](payload)
reply = req.recv()
logger.info("rx REP [%s]" % reply)
req.close()
context.term()
logger.info("All done with the flight plan")
if __name__ == "__main__":
"""This example code uses threads to emulate different processes"""
logger.setLevel(logging.DEBUG)
from pprint import pformat
import multiprocessing as mp
server_host = 'localhost'
server_port = rd.randint(49152, 65535)
flight_plan = (
'CONFIGUREBLING %s' % random_string(20),
'GIMMEEVENTS %d' % rd.randint(4, 12),
'MICDROP',
)
server = mp.Process(
target=server, args=(server_port,), name='Server',
)
client = mp.Process(
target=client, args=(server_host, server_port, flight_plan), name='Client',
)
logger.info('The flight plan reads as follows:')
logger.info(pformat(flight_plan))
logger.info("Starting server process")
server.start()
time.sleep(2)
logger.info("Starting client process")
client.start()
logger.info("Waiting for processes to end")
server.join()
client.join()
logger.info("All done")
@adavidzh
Copy link
Author

Sample output, showing the server being done with taking events at 6546 ms and the client only being done processing events at 14045 ms:

$ python mylittledaq.py
    16       INFO __main__  MainProcess: The flight plan reads as follows:
    16       INFO __main__  MainProcess: ('CONFIGUREBLING YFuiZalTwWAUMOFnGupc', 'GIMMEEVENTS 7', 'MICDROP')
    16       INFO __main__  MainProcess: Starting server process
    23       INFO __main__       Server: REP'ing @58444
  2024       INFO __main__  MainProcess: Starting client process
  2026       INFO __main__  MainProcess: Waiting for processes to end
  2028      DEBUG __main__       Client: REQ'ing @localhost:58444
  2029      DEBUG __main__       Client: tx REQ [CONFIGUREBLING YFuiZalTwWAUMOFnGupc]
  2029      DEBUG __main__       Client: Dispatching [CONFIGUREBLING][YFuiZalTwWAUMOFnGupc]
  2031       INFO __main__       Server: rx REQ [CONFIGUREBLING YFuiZalTwWAUMOFnGupc]
  2031      DEBUG __main__       Server: Dispatching [CONFIGUREBLING][YFuiZalTwWAUMOFnGupc]
  2031       INFO __main__       Server: (Slowly) configuring with [YFuiZalTwWAUMOFnGupc]
  3032      DEBUG __main__       Server: tx REP [CONFIGUREBLING AWESOME SAUCE]
  3032       INFO __main__       Client: rx REP [CONFIGUREBLING AWESOME SAUCE]
  3033      DEBUG __main__       Client: tx REQ [GIMMEEVENTS 7]
  3033       INFO __main__       Server: rx REQ [GIMMEEVENTS 7]
  3033      DEBUG __main__       Client: Dispatching [GIMMEEVENTS][7]
  3033       INFO __main__       Client: PULL'ing @localhost:58445 started
  3034      DEBUG __main__       Server: Dispatching [GIMMEEVENTS][7]
  3034       INFO __main__       Server: PUSH'ing @58445
  3034      DEBUG __main__       Server: (Quickly) acquiring one event from hardware
  3536       INFO __main__       Server: PUSH'ed event 0 [EFFSftkRir]
  3536      DEBUG __main__       Server: (Quickly) acquiring one event from hardware
  3536       INFO __main__       Client: PULL'ed event 0 [0 EFFSftkRir]
  3536      DEBUG __main__       Client: (Slowly) processing one event [0 EFFSftkRir]
  4038       INFO __main__       Server: PUSH'ed event 1 [BGRKhfXSAX]
  4038      DEBUG __main__       Server: (Quickly) acquiring one event from hardware
  4539       INFO __main__       Server: PUSH'ed event 2 [sOsGozSlDg]
  4540      DEBUG __main__       Server: (Quickly) acquiring one event from hardware
  5037       INFO __main__       Client: PULL'ed event 1 [1 BGRKhfXSAX]
  5037      DEBUG __main__       Client: (Slowly) processing one event [1 BGRKhfXSAX]
  5041       INFO __main__       Server: PUSH'ed event 3 [xyKOGPLNEX]
  5041      DEBUG __main__       Server: (Quickly) acquiring one event from hardware
  5542       INFO __main__       Server: PUSH'ed event 4 [bQIPPmmQcG]
  5542      DEBUG __main__       Server: (Quickly) acquiring one event from hardware
  6043       INFO __main__       Server: PUSH'ed event 5 [yWyAlLooJb]
  6043      DEBUG __main__       Server: (Quickly) acquiring one event from hardware
  6537       INFO __main__       Client: PULL'ed event 2 [2 sOsGozSlDg]
  6537      DEBUG __main__       Client: (Slowly) processing one event [2 sOsGozSlDg]
  6544       INFO __main__       Server: PUSH'ed event 6 [YzMSxtirkp]
  6545       INFO __main__       Server: PUSH'ing @58445 stopped
  6546      DEBUG __main__       Server: tx REP [GIMMEEVENTS SENT 7]
  8039       INFO __main__       Client: PULL'ed event 3 [3 xyKOGPLNEX]
  8039      DEBUG __main__       Client: (Slowly) processing one event [3 xyKOGPLNEX]
  9540       INFO __main__       Client: PULL'ed event 4 [4 bQIPPmmQcG]
  9541      DEBUG __main__       Client: (Slowly) processing one event [4 bQIPPmmQcG]
 11041       INFO __main__       Client: PULL'ed event 5 [5 yWyAlLooJb]
 11041      DEBUG __main__       Client: (Slowly) processing one event [5 yWyAlLooJb]
 12542       INFO __main__       Client: PULL'ed event 6 [6 YzMSxtirkp]
 12542      DEBUG __main__       Client: (Slowly) processing one event [6 YzMSxtirkp]
 14044       INFO __main__       Client: PULL'ing @localhost:58445 stopped
 14045       INFO __main__       Client: rx REP [GIMMEEVENTS SENT 7]
 14046      DEBUG __main__       Client: tx REQ [MICDROP]
 14046      DEBUG __main__       Client: Dispatching [MICDROP][None]
 14046       INFO __main__       Server: rx REQ [MICDROP]
 14046      DEBUG __main__       Server: Dispatching [MICDROP][None]
 14046      DEBUG __main__       Server: tx REP [MICDROP ]
 14046       INFO __main__       Client: rx REP [MICDROP ]
 14047       INFO __main__       Server: Mic dropped
 14047       INFO __main__       Client: All done with the flight plan
 14049       INFO __main__  MainProcess: All done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment