Last active
July 6, 2017 01:38
-
-
Save dariussullivan/8916e975e22054ff470d to your computer and use it in GitHub Desktop.
Small demonstration of secs equipment server. Uses secsgem and twisted.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from twisted.internet import reactor, protocol, endpoints | |
from twisted.protocols import basic | |
from twisted.internet.defer import Deferred, inlineCallbacks, returnValue | |
from twisted.internet import task | |
from twisted.python import log | |
# log errors inside reactor to stdout | |
from twisted.python.log import startLogging | |
from sys import stdout | |
startLogging(stdout) | |
from struct import pack | |
from itertools import count | |
import logging | |
import secsgem | |
from secsgem.hsmsPackets import hsmsPacket, hsmsStreamFunctionHeader | |
from secsgem.hsmsPackets import hsmsSelectRspHeader, hsmsLinktestReqHeader, hsmsLinktestRspHeader | |
from secsgem import secsFunctions | |
def wait(delay) : | |
"""Returns a Deferred that fires after the specified delay. | |
Useful for inserting delays into inlineCallbacks coroutines. | |
""" | |
return task.deferLater(reactor, delay, lambda : None) | |
class ResponseTimeoutError(Exception) : | |
pass | |
from secsgem.secsHandler import secsHandler | |
class secsFunctionDirectory(secsHandler) : | |
def __init__(self, isHost=False) : | |
self.isHost = isHost | |
self.logger = logging.getLogger(self.__module__ + "." + self.__class__.__name__) | |
def respondingStreamFunction(self, sfn) : | |
return self.streamFunction(sfn._stream, sfn._function+1) | |
# handlers = dict() | |
# def handle_secs(stream, function) : | |
# def sfdef(f) : | |
# cls = f.im_class | |
# handlers[(stream, function)] = f.__name__ | |
# print "registered handler for (%d,%d) : %s" % (stream, function, f.__name__) | |
# return f | |
# return sfdef | |
class hsmsProtocol(basic.Int32StringReceiver) : | |
hsmsConnectionState = "NOT_CONNECTED" | |
handle_select_req = None | |
# The 'system' field in HSMS packet header must be unique among open and recent transactions. | |
# This counter serves as a generator of system values. | |
unique_sys_ids = count() | |
def __init__(self) : | |
self.pending = dict() | |
def stringReceived(self, data) : | |
print "received : " + " ".join("%2x"%ord(c) for c in data), | |
print " (%d bytes)" % len(data) | |
# It's unfortunate that Int32StringReceiver goes to a lot of effort to avoid copying, | |
# and then we do this: | |
intdata = pack(self.structFormat, len(data)) + data | |
packet = hsmsPacket.decode(intdata) | |
print packet | |
print repr(packet) | |
# handle Select.req | |
if ((packet.header.stream == 0x00) | |
and (packet.header.sType == 0x01)) : | |
# received Select.req | |
self.handle_select_req(packet) | |
return | |
# handle Linktest.req | |
if ((packet.header.sessionID == 0xFFFF) | |
and (packet.header.stream == 0x00) | |
and (packet.header.sType == 0x05)) : | |
# received Linktest.req | |
self.handle_linktest_req(packet) | |
return | |
if self.hsmsConnectionState == "SELECTED" : | |
# should only handle data messages when in the selected state | |
try : | |
respondee = self.pending.pop(packet.header.system) | |
except KeyError : | |
print "Got unsolicited packet" | |
else : | |
respondee.callback(packet) | |
return | |
self.handle_data_message(packet) | |
else : | |
# if really data message send Reject.req ? | |
pass | |
def handle_data_message(self, packet) : | |
pass | |
def handle_select_req(self, packet) : | |
system = packet.header.system | |
sessionID = packet.header.sessionID | |
# Send a Select Response to the remote host | |
packet = hsmsPacket(hsmsSelectRspHeader(system, sessionID=sessionID)) | |
self.transport.write(packet.encode()) | |
# E37 states that standard allows Select.req even when already selected. | |
if self.hsmsConnectionState != "SELECTED" : | |
print "SELECTED" | |
self.hsmsConnectionState = "SELECTED" | |
# E37 8.2.6.1 sessionID of select message used for all subsequent data messages | |
self.sessionID = sessionID | |
self.selected.callback(packet) | |
print "selected.callback" | |
def handle_linktest_req(self, packet) : | |
print "received link test" | |
system = packet.header.system | |
# Send a Select Response to the remote host | |
packet = hsmsPacket(hsmsLinktestRspHeader(system)) | |
self.transport.write(packet.encode()) | |
def read_packet(self, timeout=None) : | |
pending = Deferred() | |
if timeout : | |
timeout = task.deferLater(reactor, timeout, pending.errback, ResponseTimeoutError(timeout)) | |
# from twisted.internet import defer | |
# timeout = task.deferLater(reactor, timeout, defer.timeout, pending) | |
def cancel(result) : | |
timeout.cancel() | |
return result | |
pending.addCallback(cancel) | |
return pending | |
def query(self, packet, *args, **kwargs) : | |
data = packet.encode() | |
self.transport.write(data) | |
response = self.read_packet(*args, **kwargs) | |
self.pending[packet.header.system] = response | |
return response | |
def connectionMade(self) : | |
print "NOT_SELECTED" | |
self.hsmsConnectionState = "NOT_SELECTED" | |
self.selected = Deferred() | |
d = self.passive_handshake() | |
d.addErrback(log.err) | |
@inlineCallbacks | |
def passive_handshake(self) : | |
print "awaiting selection" | |
packet = yield self.selected | |
# yield wait(10) | |
# print "waited" | |
# ## The following link test is not required by E37 | |
# ## Send a Linktest Request to the remote host | |
# linkTestReq = hsmsPacket(hsmsLinktestReqHeader(self.unique_sys_ids.next())) | |
# ## Wait for an incoming Linktest Response | |
# print "send link test req" | |
# try : | |
# packet = yield self.query(linkTestReq, timeout=2.0) | |
# except ResponseTimeoutError as e : | |
# print "link test timeout", e | |
# else : | |
# assert (packet.header.sessionID == 0xFFFF) | |
# assert (packet.header.stream == 0x00) | |
# assert (packet.header.sType == 0x06) | |
# print "sending 'are you there?'" | |
# try : | |
# response = yield self.query_host_present() | |
# except ResponseTimeoutError as e : | |
# print "'are you there?' request timeout", e | |
# else : | |
# print response | |
#print "Handshake complete" | |
returnValue(self) | |
def query_host_present(self) : | |
msg = self.secs_funcs.streamFunction(1,1)() | |
system = self.unique_sys_ids.next() | |
packet = hsmsPacket(hsmsStreamFunctionHeader(system, msg._stream, msg._function, False, self.sessionID), msg.encode()) | |
return self.query(packet, timeout=5.0) | |
def active_handshake(self) : | |
## based on hsmsConnection.startReceiver | |
# self.sendSelectReq() | |
# self.waitforSelectRsp() | |
# self.sendLinktestReq() | |
# self.waitforLinktestRsp() | |
pass | |
def connectionLost(self, *args) : | |
print "lost connected" | |
class hsmsProtocolFactory(protocol.Factory) : | |
protocol = hsmsProtocol | |
class SecsProtocol(hsmsProtocol) : | |
secs_funcs = secsFunctionDirectory() | |
def connectionMade(self) : | |
hsmsProtocol.connectionMade(self) | |
self.secs_connection_state = SecsCommsEnabledNotCommunicating(self) | |
def handle_data_message(self, packet) : | |
secsfn = self.secs_funcs.secsDecode(packet) | |
self.secs_connection_state.handle_message(secsfn, packet) | |
def sendResponse(self, packet, system, sessionID=None): | |
"""Send response packet for system | |
:param packet: packet to be sent | |
:type packet: :class:`secsgem.hsmsPackets.hsmsPacket` | |
:param system: system to reply to | |
:type system: integer | |
""" | |
sessionID = sessionID or self.sessionID | |
outPacket = hsmsPacket(hsmsStreamFunctionHeader(system, packet._stream, packet._function, False, sessionID), packet.encode()) | |
self.transport.write(outPacket.encode()) | |
print "sent response s%d f%d" % (packet._stream, packet._function) | |
class SecsConnectionState(object) : | |
handle_secs = None | |
def __init__(self, protocol) : | |
self.protocol = protocol | |
def handle_message(self, secsfn, packet) : | |
try : | |
handle_msg = self.handle_secs.get_handler(self, secsfn._stream, secsfn._function) | |
except KeyError : | |
self.handle_message_default(secsfn, packet) | |
else : | |
handle_msg(packet) | |
def handle_message_default(self, secsfn, packet) : | |
print "no handler registered for %s" % secsfn.__class__.__name__ | |
print secsfn | |
print secsfn.__doc__ | |
print self.protocol.secs_funcs.respondingStreamFunction(secsfn).__doc__ | |
class StreamFuncDispatchTable(object) : | |
def __init__(self) : | |
self.handlers = {} | |
def __call__(self, stream, function) : | |
def sfdef(f) : | |
self.handlers[stream, function] = f.__name__ | |
print "registered handler for (%d,%d) : %s" % (stream, function, f.__name__) | |
return f | |
return sfdef | |
def get_handler(self, obj, stream, function) : | |
return getattr(obj, self.handlers[stream, function]) | |
class SecsCommsEnabledNotCommunicating(SecsConnectionState) : | |
label = "ENABLED / NOT COMMUNICATING" | |
handle_secs = StreamFuncDispatchTable() | |
def __init__(self, protocol) : | |
SecsConnectionState.__init__(self, protocol) | |
# self.protocol.selected.addCallback(self.equipment_initiated_connect) | |
self.equipment_connected = self.equipment_initiated_connect() | |
@inlineCallbacks | |
def equipment_initiated_connect(self) : | |
mdln = self.protocol.equipment.MDLN | |
softrev = self.protocol.equipment.SOFTREV | |
msg = secsgem.secsS01F13E({"MDLN": mdln, "SOFTREV": softrev}) | |
# Use the same system id for each retry. | |
# This avoids filling our pending dictionary with incomplete transactions. | |
# Strictly we should have a small pool because we shouldn't use the systemid from the previous transaction. | |
system = self.unique_sys_ids.next() | |
while True : | |
packet = hsmsPacket(hsmsStreamFunctionHeader(system, msg._stream, msg._function, False, self.sessionID), msg.encode()) | |
timeout = self.protocol.equipment.EstablishCommunicationsTimeout | |
self.cr_response = self.query(packet, timeout=timeout) | |
try : | |
response = yield self.cr_response | |
except TimeOutError, InvalidMessage : | |
# TODO: ensure that if response comes in later with matching systemID for our (1,13) request | |
# then we refuse it. | |
pass | |
else : | |
if response.COMMACK == 0 : | |
self.connect() | |
# transition to communicating | |
break | |
@handle_secs(1,13) | |
def establish_communications_request(self, packet) : | |
mdln = self.protocol.equipment.MDLN | |
softrev = self.protocol.equipment.SOFTREV | |
response = self.protocol.secs_funcs.streamFunction(1, 14)({"COMMACK": 0, "DATA": {"MDLN": mdln, "SOFTREV": softrev}}) | |
header = packet.header | |
self.protocol.sendResponse(response, header.system, header.sessionID) | |
self.equipment_connected.cancel() | |
self.connect() | |
def handle_message_default(self, secsfn, packet) : | |
self.cr_response.errback(InvalidMessage("Invalid message received") | |
class SecsCommsCommunicating(SecsConnectionState) : | |
label = "ENABLED / COMMUNICATING" | |
handle_secs = StreamFuncDispatchTable() | |
@handle_secs(1,11) | |
def request_status_variable_namelist(self, packet) : | |
response = self.protocol.secs_funcs.streamFunction(1, 12)([ | |
{"SVID": 1, "SVNAME": "SV1", "UNITS":"mm"}, | |
{"SVID": 1337, "SVNAME": "SV2", "UNITS": ""} | |
]) | |
header = packet.header | |
self.protocol.sendResponse(response, header.system, header.sessionID) | |
@handle_secs(2,29) | |
def request_equipment_constant_namelist(self, packet) : | |
response = self.protocol.secs_funcs.streamFunction(2, 30)([ | |
{ | |
"ECID": 1, | |
"ECNAME": "EC1", | |
"ECMIN": secsgem.secsVarU1(value=0), | |
"ECMAX": secsgem.secsVarU1(value=100), | |
"ECDEF": secsgem.secsVarU1(value=50), | |
"UNITS": "mm" | |
}, | |
{ | |
"ECID": 1337, | |
"ECNAME": "EC2", | |
"ECMIN": "", | |
"ECMAX": "", | |
"ECDEF": "", | |
"UNITS": "" | |
} | |
]) | |
header = packet.header | |
self.protocol.sendResponse(response, header.system, header.sessionID) | |
class EquipmentModel(object) : | |
MDLN = "secsgem" # Equipment model type (20 bytes max) | |
SOFTREV = "unknown" # Software revision (20 bytes max) | |
EstablishCommunicationsTimeout = 5.0 # (seconds) E30 3.2.2 | |
class SecsProtocolFactory(protocol.Factory) : | |
protocol = SecsProtocol | |
equipment = EquipmentModel() | |
def buildProtocol(self, addr) : | |
p = protocol.Factory.buildProtocol(self, addr) | |
p.equipment = self.equipment | |
return p | |
endpoint = endpoints.TCP4ServerEndpoint(reactor, 13002, interface="127.0.0.1") | |
d = endpoint.listen(SecsProtocolFactory()) | |
def listening(connection) : | |
print "listening" | |
d.addCallback(listening) | |
reactor.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment