Skip to content

Instantly share code, notes, and snippets.

@ivlevdenis
Created November 18, 2016 11:28
Show Gist options
  • Save ivlevdenis/500660d7a7933506cd92d2613ba44051 to your computer and use it in GitHub Desktop.
Save ivlevdenis/500660d7a7933506cd92d2613ba44051 to your computer and use it in GitHub Desktop.
Simple autobahn websocket server with asyncio loop.
# ! /usr/bin/env python3
import asyncio
from autobahn.asyncio.websocket import WebSocketServerProtocol, WebSocketServerFactory
class MyServerProtocol(WebSocketServerProtocol):
def processHandshake(self):
hand = super().processHandshake()
print(self.http_headers)
return hand
def onConnect(self, request):
print(request)
print("Client connecting: {0}".format(request.peer))
def onOpen(self):
print()
print("WebSocket connection open.")
def onMessage(self, payload, isBinary):
print("Text message received: {0}".format(payload))
def onPong(self, payload):
print("Pong")
def onClose(self, wasClean, code, reason):
print("WebSocket connection closed: {0}".format(reason))
factory = WebSocketServerFactory()
factory.protocol = MyServerProtocol
factory.setProtocolOptions(
# autoPingInterval=10,
# autoPingTimeout=20,
# failByDrop=False,
# openHandshakeTimeout=20,
# closeHandshakeTimeout=20,
# tcpNoDelay=False,
)
loop = asyncio.get_event_loop()
esp_coro = loop.create_server(factory, '0.0.0.0', 8000)
esp_server = loop.run_until_complete(esp_coro)
loop.run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment