Skip to content

Instantly share code, notes, and snippets.

@chintal
Last active September 1, 2017 16:11
Show Gist options
  • Save chintal/6014e20a4cac6cebf51e to your computer and use it in GitHub Desktop.
Save chintal/6014e20a4cac6cebf51e to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# encoding: utf-8
import os
import inspect
import time
import copy
from twisted.internet.protocol import Factory
from twisted.internet.protocol import connectionDone
from twisted.internet.protocol import ProcessProtocol
import logging
logging.basicConfig(logLevel=logging.DEBUG)
from crochet import run_in_reactor
from crochet import setup
from twisted.internet import reactor
BIN_NAME = 'dmesg'
BIN_PATH = 'dmesg'
class ProtocolTest(ProcessProtocol):
def __init__(self):
self.delimiter = '\n'
self._buffer = ""
self._count = 0
def disconnect(self):
self.transport.signalProcess("KILL")
def connectionMade(self):
print "Made Connection"
def outReceived(self, data):
print "Got data : ", data
def processExited(self, reason):
if reason is not connectionDone:
print "processExited, status %d" % (reason.value.exitCode,)
def processEnded(self, reason):
if reason is not connectionDone:
print "processEnded, status %d" % (reason.value.exitCode,)
class TestFactory(Factory):
def __init__(self):
self.instances = []
def buildProtocol(self):
instance = ProtocolTest()
print 'BIN_PATH', BIN_PATH
print 'BIN_NAME', BIN_NAME
reactor.spawnProcess(instance, BIN_PATH, [BIN_NAME], {})
return instance
factory = TestFactory()
@run_in_reactor
def run():
protocol = factory.buildProtocol()
def main1():
print "Using Crochet"
setup()
run()
def main2():
print "Using Twisted"
protocol = factory.buildProtocol()
reactor.run()
if __name__ == '__main__':
main1()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment