Skip to content

Instantly share code, notes, and snippets.

@shofetim
Created April 24, 2014 19:31
Show Gist options
  • Save shofetim/11266655 to your computer and use it in GitHub Desktop.
Save shofetim/11266655 to your computer and use it in GitHub Desktop.
"""Functions for integration testing the automation system. Use this
to drive the system in testing, create reproducible state, etc.
"""
import json
from gevent_zeromq import zmq
from automation.utils import make_message, parse_message
#-----------------------------------------------------------------------------#
# Setup
#-----------------------------------------------------------------------------#
# use local /etc/hosts or DNS to control whether you connect to a dev
# or production endpoint
AUTOMATION_SUB_URI = "tcp://automation.azurestandard.com:5556"
AUTOMATION_PUB_URI = "tcp://automation.azurestandard.com:5557"
context = zmq.Context()
pub = context.socket(zmq.PUB)
pub.connect(AUTOMATION_SUB_URI)
sub = context.socket(zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, "")
sub.connect(AUTOMATION_PUB_URI)
#-----------------------------------------------------------------------------#
# Helpers
#-----------------------------------------------------------------------------#
def send(topic, body):
pub.send_unicode(make_message(topic, body))
def recv(watch_for):
while True:
topic, msg = parse_message(sub.recv_unicode())
print topic
if topic == watch_for:
return msg
#-----------------------------------------------------------------------------#
# Message generation & handlers
#-----------------------------------------------------------------------------#
def sample_orders():
with open("automation/test/sample_orders.json") as f:
return json.load(f)
def send_order_to_automation(order):
send('/order/new', order)
def fast_movers_state():
send('/fastmovers/state/get', {})
return recv('/fastmovers/state')
def fast_movers_claim(piece):
piece['picker'] = 'TEST'
send('/order/piece/claim', piece)
def fast_movers_done(piece):
piece['picked'] = True
send('/order/piece/done', piece)
def fast_movers_claim_and_done(piece):
fast_movers_claim(piece)
fast_movers_done(piece)
def scanner_saw(piece, reject=False):
barcode = piece.get('barcodes', [False])[0]
if not barcode:
reject = True
send('/scanner/saw',
{ # The PLC is the only thing that cares about most of these
# properties, so it is ok to leave the as None for non-PLC
# testing
'position': None,
'length': None,
'speed': None,
'width': None,
'lateral_position': None,
'barcode': barcode,
'reject' : reject
})
def dump_state():
send('/state/dump', '')
return recv('/state')
orders = sample_orders()
for order in orders:
send_order_to_automation(order)
pieces = fast_movers_state()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment