Skip to content

Instantly share code, notes, and snippets.

@dadon
Last active August 29, 2015 14:02
Show Gist options
  • Save dadon/2a8145b5c7a9ace89e73 to your computer and use it in GitHub Desktop.
Save dadon/2a8145b5c7a9ace89e73 to your computer and use it in GitHub Desktop.
# -*- encoding: utf-8 -*-
import logging
from twisted.python import log
from twisted.trial import unittest
from twisted.internet import defer, reactor, protocol
from twisted.internet.protocol import Protocol, Factory
observer = log.PythonLoggingObserver()
observer.start()
logging.basicConfig(level=logging.DEBUG)
class Echo(Protocol):
def dataReceived(self, data):
log.msg(data)
class EchoFactory(Factory):
protocol = Echo
class EchoClient(Protocol):
def sendMsg(self, msg):
self.transport.write("hello, world!")
self.transport.loseConnection()
class TestCase1(unittest.TestCase):
def setUp(self):
self.server = reactor.listenTCP(8033, EchoFactory(), interface="127.0.0.1")
def tearDown(self):
return self.server.stopListening()
@defer.inlineCallbacks
def test_add(self):
creator = protocol.ClientCreator(reactor, EchoClient)
client = yield creator.connectTCP("127.0.0.1", 8033)
yield client.sendMsg("hello")
log.msg("done")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment