Skip to content

Instantly share code, notes, and snippets.

@Evidlo
Created July 23, 2017 00:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Evidlo/9639cd2e7c6a3d87c45a3fca34ed32b3 to your computer and use it in GitHub Desktop.
Save Evidlo/9639cd2e7c6a3d87c45a3fca34ed32b3 to your computer and use it in GitHub Desktop.
from twisted.logger import Logger
from twisted.internet.defer import inlineCallbacks
from autobahn.twisted.wamp import ApplicationRunner
from autobahn.twisted.wamp import ApplicationSession
import time
class RobotSession(ApplicationSession):
log = Logger()
@inlineCallbacks
def onJoin(self, details):
# periodically publish armed
while True:
self.log.info("publishing armed")
self.publish(u'com.quadcopter.armed', True)
yield time.sleep(1)
from autobahn.twisted.wamp import ApplicationRunner
if __name__ == '__main__':
runner = ApplicationRunner(url=u"ws://localhost:8080/ws", realm=u"realm1")
runner.run(RobotSession)
{
"version": 2,
"workers": [
{
"type": "router",
"realms": [
{
"name": "realm1",
"roles": [
{
"name": "anonymous",
"permissions": [
{
"uri": "*",
"allow": {
"call": true,
"register": true,
"publish": true,
"subscribe": true
},
"disclose": {
"caller": false,
"publisher": false
},
"cache": true
}
]
}
],
"storage": {
"type": "memory",
"event-history": [
{
"uri": "com.quadcopter.armed",
"limit": 100
}
]
}
}
],
"transports": [
{
"type": "web",
"endpoint": {
"type": "tcp",
"port": 8080
},
"paths": {
"/": {
"type": "static",
"directory": "../"
},
"ws": {
"type": "websocket"
}
}
}
]
},
{
"type": "container",
"options": {
"pythonpath": [
".."
]
}
}
]
}
from twisted.logger import Logger
from twisted.internet.defer import inlineCallbacks
from autobahn.twisted.wamp import ApplicationRunner
from autobahn.twisted.wamp import ApplicationSession
from autobahn.wamp.types import SubscribeOptions
class GUISession(ApplicationSession):
log = Logger()
@inlineCallbacks
def onJoin(self, details):
print("session attached {}".format(details))
def got_event(*args, **kw):
print("got_event(): args={}, kwargs={}".format(args, kw))
# subscribe and get subscription id
topic = u'com.quadcopter.armed'
print("subscribing to '{}'".format(topic))
pub = yield self.subscribe(
got_event, topic,
options=SubscribeOptions(get_retained=True),
)
print("id={}".format(pub.id))
# get event history
events = yield self.call(u"wamp.subscription.get_events", pub.id)
print("wamp.subscription.get_events {}: {}".format(pub.id, len(events)))
for event in events:
print(" {event[timestamp]} {event[topic]} args={event[args]} kwargs={event[kwargs]}".format(event=event))
if __name__ == '__main__':
runner = ApplicationRunner(url=u"ws://localhost:8080/ws", realm=u"realm1")
runner.run(GUISession)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment