Skip to content

Instantly share code, notes, and snippets.

@tiefpunkt
Last active January 3, 2016 09:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tiefpunkt/8446666 to your computer and use it in GitHub Desktop.
Save tiefpunkt/8446666 to your computer and use it in GitHub Desktop.
WebService Proxy for MQTT, to be used with one of the contribites UIs of MQTTitude. Can connect to a TLS-only Broker as well.
#!/usr/bin/env python
# coding: utf-8
# ------------------------------------------------------
#
# MQTT WS Proxy with TLS support
#
# By Severin Schols (@tiefpunkt)
# Based on https://gist.github.com/fiorix/1878983
# ------------------------------------------------------
import sys
from twisted.internet import defer
from twisted.internet import protocol
from twisted.internet import reactor
from twisted.internet import ssl
from twisted.python import log
from autobahn.twisted.websocket import WebSocketServerProtocol, WebSocketServerFactory
WS_HOST = "mqtt.example.com"
WS_PORT = 18883
REMOTE_HOST = "mqtt.example.com"
REMOTE_PORT = 8883
USE_SSL = True
class ProxyClientProtocol(protocol.Protocol):
def connectionMade(self):
log.msg("Client: connected to peer")
self.cli_queue = self.factory.cli_queue
self.cli_queue.get().addCallback(self.serverDataReceived)
def serverDataReceived(self, chunk):
if chunk is False:
self.cli_queue = None
log.msg("Client: disconnecting from peer")
self.factory.continueTrying = False
self.transport.loseConnection()
elif self.cli_queue:
log.msg("Client: writing %d bytes to peer" % len(chunk))
self.transport.write(chunk)
self.cli_queue.get().addCallback(self.serverDataReceived)
else:
self.factory.cli_queue.put(chunk)
def dataReceived(self, chunk):
log.msg("Client: %d bytes received from peer" % len(chunk))
self.factory.srv_queue.put(chunk)
def connectionLost(self, why):
if self.cli_queue:
self.cli_queue = None
log.msg("Client: peer disconnected unexpectedly")
class ProxyClientFactory(protocol.ReconnectingClientFactory):
maxDelay = 10
continueTrying = True
protocol = ProxyClientProtocol
def __init__(self, srv_queue, cli_queue):
self.srv_queue = srv_queue
self.cli_queue = cli_queue
class ProxyServer(WebSocketServerProtocol):
def onOpen(self):
self.srv_queue = defer.DeferredQueue()
self.cli_queue = defer.DeferredQueue()
self.srv_queue.get().addCallback(self.clientDataReceived)
factory = ProxyClientFactory(self.srv_queue, self.cli_queue)
if USE_SSL:
reactor.connectSSL(REMOTE_HOST, REMOTE_PORT, factory, ssl.ClientContextFactory())
else:
reactor.connectTCP(REMOTE_HOST, REMOTE_PORT, factory)
def clientDataReceived(self, chunk):
log.msg("Server: writing %d bytes to original client" % len(chunk))
self.sendMessage(chunk,True)
self.srv_queue.get().addCallback(self.clientDataReceived)
def onMessage(self, payload, isBinary):
log.msg("Server: %d bytes received" % len(payload))
self.cli_queue.put(payload)
def onClose(self, wasClean, code, reason):
self.cli_queue.put(False)
if __name__ == "__main__":
log.startLogging(sys.stdout)
factory = WebSocketServerFactory("ws://" + WS_HOST + ":" + str(WS_PORT), debug = False)
factory.protocol = ProxyServer
reactor.listenTCP(WS_PORT, factory, interface="0.0.0.0")
reactor.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment