Skip to content

Instantly share code, notes, and snippets.

@zgoda
Created April 24, 2017 12:11
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zgoda/98c9b8e27b42ae0f84a682ed8dfab5d7 to your computer and use it in GitHub Desktop.
Save zgoda/98c9b8e27b42ae0f84a682ed8dfab5d7 to your computer and use it in GitHub Desktop.
Listen to serial communication in Twisted and route it to TCP connection
import sys
from twisted.internet import reactor
from twisted.internet.serialport import SerialPort
from twisted.internet.protocol import Protocol, Factory
from twisted.python import log
class SerialProtocol(Protocol):
def __init__(self, factory):
self.factory = factory
def dataReceived(self, data):
log.msg(data.strip())
self.factory.notify(data)
class TCPProtocol(Protocol):
def connectionMade(self):
self.factory.client = self
def connectionLost(self, reason):
self.factory.client = None
class SerialCommFactory(Factory):
protocol = TCPProtocol
def __init__(self):
self.client = None
def notify(self, data):
if self.client is None:
log.msg('client is None')
else:
self.client.transport.write(data)
if __name__ == '__main__':
log.startLogging(sys.stdout)
factory = SerialCommFactory()
reactor.listenTCP(8000, factory)
SerialPort(SerialProtocol(factory), '/dev/ttyUSB0', reactor, baudrate='115200')
reactor.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment