Skip to content

Instantly share code, notes, and snippets.

@dariussullivan
Last active July 6, 2017 01:38
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dariussullivan/8916e975e22054ff470d to your computer and use it in GitHub Desktop.
Save dariussullivan/8916e975e22054ff470d to your computer and use it in GitHub Desktop.
Small demonstration of secs equipment server. Uses secsgem and twisted.
from twisted.internet import reactor, protocol, endpoints
from twisted.protocols import basic
from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
from twisted.internet import task
from twisted.python import log
# log errors inside reactor to stdout
from twisted.python.log import startLogging
from sys import stdout
startLogging(stdout)
from struct import pack
from itertools import count
import logging
import secsgem
from secsgem.hsmsPackets import hsmsPacket, hsmsStreamFunctionHeader
from secsgem.hsmsPackets import hsmsSelectRspHeader, hsmsLinktestReqHeader, hsmsLinktestRspHeader
from secsgem import secsFunctions
def wait(delay) :
"""Returns a Deferred that fires after the specified delay.
Useful for inserting delays into inlineCallbacks coroutines.
"""
return task.deferLater(reactor, delay, lambda : None)
class ResponseTimeoutError(Exception) :
pass
from secsgem.secsHandler import secsHandler
class secsFunctionDirectory(secsHandler) :
def __init__(self, isHost=False) :
self.isHost = isHost
self.logger = logging.getLogger(self.__module__ + "." + self.__class__.__name__)
def respondingStreamFunction(self, sfn) :
return self.streamFunction(sfn._stream, sfn._function+1)
# handlers = dict()
# def handle_secs(stream, function) :
# def sfdef(f) :
# cls = f.im_class
# handlers[(stream, function)] = f.__name__
# print "registered handler for (%d,%d) : %s" % (stream, function, f.__name__)
# return f
# return sfdef
class hsmsProtocol(basic.Int32StringReceiver) :
hsmsConnectionState = "NOT_CONNECTED"
handle_select_req = None
# The 'system' field in HSMS packet header must be unique among open and recent transactions.
# This counter serves as a generator of system values.
unique_sys_ids = count()
def __init__(self) :
self.pending = dict()
def stringReceived(self, data) :
print "received : " + " ".join("%2x"%ord(c) for c in data),
print " (%d bytes)" % len(data)
# It's unfortunate that Int32StringReceiver goes to a lot of effort to avoid copying,
# and then we do this:
intdata = pack(self.structFormat, len(data)) + data
packet = hsmsPacket.decode(intdata)
print packet
print repr(packet)
# handle Select.req
if ((packet.header.stream == 0x00)
and (packet.header.sType == 0x01)) :
# received Select.req
self.handle_select_req(packet)
return
# handle Linktest.req
if ((packet.header.sessionID == 0xFFFF)
and (packet.header.stream == 0x00)
and (packet.header.sType == 0x05)) :
# received Linktest.req
self.handle_linktest_req(packet)
return
if self.hsmsConnectionState == "SELECTED" :
# should only handle data messages when in the selected state
try :
respondee = self.pending.pop(packet.header.system)
except KeyError :
print "Got unsolicited packet"
else :
respondee.callback(packet)
return
self.handle_data_message(packet)
else :
# if really data message send Reject.req ?
pass
def handle_data_message(self, packet) :
pass
def handle_select_req(self, packet) :
system = packet.header.system
sessionID = packet.header.sessionID
# Send a Select Response to the remote host
packet = hsmsPacket(hsmsSelectRspHeader(system, sessionID=sessionID))
self.transport.write(packet.encode())
# E37 states that standard allows Select.req even when already selected.
if self.hsmsConnectionState != "SELECTED" :
print "SELECTED"
self.hsmsConnectionState = "SELECTED"
# E37 8.2.6.1 sessionID of select message used for all subsequent data messages
self.sessionID = sessionID
self.selected.callback(packet)
print "selected.callback"
def handle_linktest_req(self, packet) :
print "received link test"
system = packet.header.system
# Send a Select Response to the remote host
packet = hsmsPacket(hsmsLinktestRspHeader(system))
self.transport.write(packet.encode())
def read_packet(self, timeout=None) :
pending = Deferred()
if timeout :
timeout = task.deferLater(reactor, timeout, pending.errback, ResponseTimeoutError(timeout))
# from twisted.internet import defer
# timeout = task.deferLater(reactor, timeout, defer.timeout, pending)
def cancel(result) :
timeout.cancel()
return result
pending.addCallback(cancel)
return pending
def query(self, packet, *args, **kwargs) :
data = packet.encode()
self.transport.write(data)
response = self.read_packet(*args, **kwargs)
self.pending[packet.header.system] = response
return response
def connectionMade(self) :
print "NOT_SELECTED"
self.hsmsConnectionState = "NOT_SELECTED"
self.selected = Deferred()
d = self.passive_handshake()
d.addErrback(log.err)
@inlineCallbacks
def passive_handshake(self) :
print "awaiting selection"
packet = yield self.selected
# yield wait(10)
# print "waited"
# ## The following link test is not required by E37
# ## Send a Linktest Request to the remote host
# linkTestReq = hsmsPacket(hsmsLinktestReqHeader(self.unique_sys_ids.next()))
# ## Wait for an incoming Linktest Response
# print "send link test req"
# try :
# packet = yield self.query(linkTestReq, timeout=2.0)
# except ResponseTimeoutError as e :
# print "link test timeout", e
# else :
# assert (packet.header.sessionID == 0xFFFF)
# assert (packet.header.stream == 0x00)
# assert (packet.header.sType == 0x06)
# print "sending 'are you there?'"
# try :
# response = yield self.query_host_present()
# except ResponseTimeoutError as e :
# print "'are you there?' request timeout", e
# else :
# print response
#print "Handshake complete"
returnValue(self)
def query_host_present(self) :
msg = self.secs_funcs.streamFunction(1,1)()
system = self.unique_sys_ids.next()
packet = hsmsPacket(hsmsStreamFunctionHeader(system, msg._stream, msg._function, False, self.sessionID), msg.encode())
return self.query(packet, timeout=5.0)
def active_handshake(self) :
## based on hsmsConnection.startReceiver
# self.sendSelectReq()
# self.waitforSelectRsp()
# self.sendLinktestReq()
# self.waitforLinktestRsp()
pass
def connectionLost(self, *args) :
print "lost connected"
class hsmsProtocolFactory(protocol.Factory) :
protocol = hsmsProtocol
class SecsProtocol(hsmsProtocol) :
secs_funcs = secsFunctionDirectory()
def connectionMade(self) :
hsmsProtocol.connectionMade(self)
self.secs_connection_state = SecsCommsEnabledNotCommunicating(self)
def handle_data_message(self, packet) :
secsfn = self.secs_funcs.secsDecode(packet)
self.secs_connection_state.handle_message(secsfn, packet)
def sendResponse(self, packet, system, sessionID=None):
"""Send response packet for system
:param packet: packet to be sent
:type packet: :class:`secsgem.hsmsPackets.hsmsPacket`
:param system: system to reply to
:type system: integer
"""
sessionID = sessionID or self.sessionID
outPacket = hsmsPacket(hsmsStreamFunctionHeader(system, packet._stream, packet._function, False, sessionID), packet.encode())
self.transport.write(outPacket.encode())
print "sent response s%d f%d" % (packet._stream, packet._function)
class SecsConnectionState(object) :
handle_secs = None
def __init__(self, protocol) :
self.protocol = protocol
def handle_message(self, secsfn, packet) :
try :
handle_msg = self.handle_secs.get_handler(self, secsfn._stream, secsfn._function)
except KeyError :
self.handle_message_default(secsfn, packet)
else :
handle_msg(packet)
def handle_message_default(self, secsfn, packet) :
print "no handler registered for %s" % secsfn.__class__.__name__
print secsfn
print secsfn.__doc__
print self.protocol.secs_funcs.respondingStreamFunction(secsfn).__doc__
class StreamFuncDispatchTable(object) :
def __init__(self) :
self.handlers = {}
def __call__(self, stream, function) :
def sfdef(f) :
self.handlers[stream, function] = f.__name__
print "registered handler for (%d,%d) : %s" % (stream, function, f.__name__)
return f
return sfdef
def get_handler(self, obj, stream, function) :
return getattr(obj, self.handlers[stream, function])
class SecsCommsEnabledNotCommunicating(SecsConnectionState) :
label = "ENABLED / NOT COMMUNICATING"
handle_secs = StreamFuncDispatchTable()
def __init__(self, protocol) :
SecsConnectionState.__init__(self, protocol)
# self.protocol.selected.addCallback(self.equipment_initiated_connect)
self.equipment_connected = self.equipment_initiated_connect()
@inlineCallbacks
def equipment_initiated_connect(self) :
mdln = self.protocol.equipment.MDLN
softrev = self.protocol.equipment.SOFTREV
msg = secsgem.secsS01F13E({"MDLN": mdln, "SOFTREV": softrev})
# Use the same system id for each retry.
# This avoids filling our pending dictionary with incomplete transactions.
# Strictly we should have a small pool because we shouldn't use the systemid from the previous transaction.
system = self.unique_sys_ids.next()
while True :
packet = hsmsPacket(hsmsStreamFunctionHeader(system, msg._stream, msg._function, False, self.sessionID), msg.encode())
timeout = self.protocol.equipment.EstablishCommunicationsTimeout
self.cr_response = self.query(packet, timeout=timeout)
try :
response = yield self.cr_response
except TimeOutError, InvalidMessage :
# TODO: ensure that if response comes in later with matching systemID for our (1,13) request
# then we refuse it.
pass
else :
if response.COMMACK == 0 :
self.connect()
# transition to communicating
break
@handle_secs(1,13)
def establish_communications_request(self, packet) :
mdln = self.protocol.equipment.MDLN
softrev = self.protocol.equipment.SOFTREV
response = self.protocol.secs_funcs.streamFunction(1, 14)({"COMMACK": 0, "DATA": {"MDLN": mdln, "SOFTREV": softrev}})
header = packet.header
self.protocol.sendResponse(response, header.system, header.sessionID)
self.equipment_connected.cancel()
self.connect()
def handle_message_default(self, secsfn, packet) :
self.cr_response.errback(InvalidMessage("Invalid message received")
class SecsCommsCommunicating(SecsConnectionState) :
label = "ENABLED / COMMUNICATING"
handle_secs = StreamFuncDispatchTable()
@handle_secs(1,11)
def request_status_variable_namelist(self, packet) :
response = self.protocol.secs_funcs.streamFunction(1, 12)([
{"SVID": 1, "SVNAME": "SV1", "UNITS":"mm"},
{"SVID": 1337, "SVNAME": "SV2", "UNITS": ""}
])
header = packet.header
self.protocol.sendResponse(response, header.system, header.sessionID)
@handle_secs(2,29)
def request_equipment_constant_namelist(self, packet) :
response = self.protocol.secs_funcs.streamFunction(2, 30)([
{
"ECID": 1,
"ECNAME": "EC1",
"ECMIN": secsgem.secsVarU1(value=0),
"ECMAX": secsgem.secsVarU1(value=100),
"ECDEF": secsgem.secsVarU1(value=50),
"UNITS": "mm"
},
{
"ECID": 1337,
"ECNAME": "EC2",
"ECMIN": "",
"ECMAX": "",
"ECDEF": "",
"UNITS": ""
}
])
header = packet.header
self.protocol.sendResponse(response, header.system, header.sessionID)
class EquipmentModel(object) :
MDLN = "secsgem" # Equipment model type (20 bytes max)
SOFTREV = "unknown" # Software revision (20 bytes max)
EstablishCommunicationsTimeout = 5.0 # (seconds) E30 3.2.2
class SecsProtocolFactory(protocol.Factory) :
protocol = SecsProtocol
equipment = EquipmentModel()
def buildProtocol(self, addr) :
p = protocol.Factory.buildProtocol(self, addr)
p.equipment = self.equipment
return p
endpoint = endpoints.TCP4ServerEndpoint(reactor, 13002, interface="127.0.0.1")
d = endpoint.listen(SecsProtocolFactory())
def listening(connection) :
print "listening"
d.addCallback(listening)
reactor.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment