Skip to content

Instantly share code, notes, and snippets.

@hetii
Created October 31, 2016 21:54
Show Gist options
  • Save hetii/613e23daa7a402de7ccd7479a29e9660 to your computer and use it in GitHub Desktop.
Save hetii/613e23daa7a402de7ccd7479a29e9660 to your computer and use it in GitHub Desktop.
`# This is simple bridge between PTY and TCP endpoint
# You can run it by executing following step in 3 terminals:
# 1. nc -l 8080
# 2. python simpleclient.py
# 3. minicom -p /dev/pts/3
from __future__ import print_function
from twisted.internet import reactor, protocol, inotify, fdesc
from twisted.python.util import println
from twisted.web.client import getPage
from twisted.python import filepath
import os, pty, array, fcntl
TCGETS2 = 0x802C542A
class PtyEndpoint:
old_buf = array.array('i', [0] * 64)
def __init__(self):
self.pty_master, self.pty_slave = pty.openpty()
self.pty_master_name = os.ttyname(self.pty_master)
self.pty_slave_name = os.ttyname(self.pty_slave)
print("Master name: %s" % self.pty_master_name)
print("Slave : %s" % self.pty_slave_name)
def fileno(self):
""" We want to select on FD 0 """
return self.pty_master
def changeBaudrate(self):
buf = array.array('i', [0] * 64)
fcntl.ioctl(self.pty_slave, TCGETS2, buf)
if buf != self.old_buf:
self.old_buf = buf
rate = buf[9]
print ( "Slave name is: %s, baud-rate: %s" % (self.pty_slave_name, rate))
print(self.transport.realAddress)
page="http://%s/console/baud?rate=%s/" % (self.transport.realAddress[0], rate)
getPage(page).addCallbacks(callback=lambda value:(println(value)),
errback=lambda error:(println("an error occurred", error), reactor.stop()))
def doRead(self):
"""Read data from PTY and write it to TCP endpoint."""
data = os.read(self.pty_master, 1024)
print("doRead called with data: %s" % data)
self.changeBaudrate();
self.transport.write(data)
def doWrite(self, data):
os.write(self.pty_master, data)
print("doWrite called, ", data)
def logPrefix(self):
return 'PtyStdIO'
def connectionMade(self):
print("PtyStdIO connectionMade")
def connectionLost(self, reason):
print("PtyStdIO connectionLost", reason)
class TcpEndpoint(protocol.Protocol):
def __init__(self, pty_instance):
self.pty_instance = pty_instance
def connectionMade(self):
self.pty_instance.transport = self.transport
print("TcpEndpoint connectionMade")
def dataReceived(self, data):
self.pty_instance.doWrite(data)
print("FROM TCP: %r" % data)
def connectionLost(self, reason):
print("TcpEndpoint connection lost")
class TCPFactory(protocol.ClientFactory):
protocol = TcpEndpoint
def buildProtocol(self, addr=None):
pty_instance = PtyEndpoint()
pty_protocol = self.protocol(pty_instance)
reactor.addReader(pty_instance)
return pty_protocol
def clientConnectionLost(self, conn, reason):
print("TCPFactory clientConnectionLost")
pass
def main():
f = TCPFactory()
reactor.connectTCP("192.168.4.1", 23, f)
reactor.run()
# this only runs if the module was *not* imported
if __name__ == '__main__':
main()
`
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment