Skip to content

Instantly share code, notes, and snippets.

@beenje
Last active April 24, 2016 07:20
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save beenje/6150400 to your computer and use it in GitHub Desktop.
Save beenje/6150400 to your computer and use it in GitHub Desktop.
Twisted gateway
import json
import time
from twisted.internet import defer
from twisted.internet.protocol import ServerFactory
from twisted.protocols.basic import LineReceiver
from twisted.python import log
class BasicProtocol(LineReceiver):
@defer.inlineCallbacks
def lineReceived(self, line):
yield self.messageReceived(line)
@defer.inlineCallbacks
def messageReceived(self, message):
try:
yield self.factory.messageReceived(message)
except AttributeError:
pass
class BasicGatewayFactory(ServerFactory):
protocol = BasicProtocol
def __init__(self, service, channel):
self.service = service
self.channel = channel
@defer.inlineCallbacks
def messageReceived(self, message):
yield self.service.publish(self.channel, message)
class RedisPublishService(object):
def __init__(self, factory):
"""
@param factory: redis client factory
"""
self.factory = factory
@defer.inlineCallbacks
def publish(self, channel, message):
log.msg("Publish message {} on {}".format(message, channel))
yield self.factory.client.publish(channel, message)
timestamp = int(time.time() * 1000)
# Include the timestamp in the value to allow
# duplicate message
value = json.dumps({"timestamp": timestamp, "message": message})
log.msg("Store message in {} sorted set with score {}".format(
channel, timestamp))
# Set the timestamp as score to easily fetch the values within a
# time period using zrangebyscore
yield self.factory.client.zadd(channel, timestamp, value)
if __name__ == '__main__':
import sys
from twisted.internet import reactor
from txredis.client import RedisClientFactory
log.startLogging(sys.stdout)
redis_factory = RedisClientFactory()
reactor.connectTCP('localhost', 6379, redis_factory)
redis_pub_service = RedisPublishService(redis_factory)
gw_factory = BasicGatewayFactory(redis_pub_service, "test")
reactor.listenTCP(8000, gw_factory)
reactor.run()
from twisted.internet import reactor, defer, protocol
from twisted.python import log
from twisted.test import proto_helpers
from twisted.trial.unittest import TestCase
from txredis.client import RedisSubscriber, RedisClientFactory
from txredis.testing import REDIS_HOST, REDIS_PORT
from gateway.example import BasicGatewayFactory, RedisPublishService
class GatewayServiceTestCase(TestCase):
@defer.inlineCallbacks
def setUp(self):
self.redis_factory = RedisClientFactory()
reactor.connectTCP(REDIS_HOST, REDIS_PORT, self.redis_factory)
yield self.redis_factory.deferred
self.redis_pub_service = RedisPublishService(self.redis_factory)
self.factory = BasicGatewayFactory(self.redis_pub_service, "test")
self.server = self.factory.buildProtocol(None)
self.transport = proto_helpers.StringTransportWithDisconnection()
self.transport.protocol = self.server
self.server.makeConnection(self.transport)
class MySubscriber(RedisSubscriber):
def __init__(self, *args, **kwargs):
RedisSubscriber.__init__(self, *args, **kwargs)
self.msg_channel = None
self.msg_message = None
self.msg_received = defer.Deferred()
def messageReceived(self, channel, message):
log.msg("Message received!")
self.msg_channel = channel
self.msg_message = message
self.msg_received.callback(None)
self.msg_received = defer.Deferred()
clientCreator = protocol.ClientCreator(reactor, MySubscriber)
self.subscriber = yield clientCreator.connectTCP(REDIS_HOST,
REDIS_PORT)
yield self.subscriber.subscribe("test")
def tearDown(self):
self.subscriber.transport.loseConnection()
self.redis_factory.continueTrying = 0
self.redis_factory.stopTrying()
if self.redis_factory.client:
self.redis_factory.client.setTimeout(None)
self.redis_factory.client.transport.loseConnection()
self.transport.loseConnection()
@defer.inlineCallbacks
def test_messageReceived(self):
cb = self.subscriber.msg_received
yield self.server.dataReceived('HELLO1\r\n')
yield cb
self.assertEqual(self.subscriber.msg_channel, "test")
self.assertEqual(self.subscriber.msg_message, "HELLO1")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment