Skip to content

Instantly share code, notes, and snippets.

@bbc2
Created November 30, 2014 13:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bbc2/9509c171bc2b6e0fa01d to your computer and use it in GitHub Desktop.
Save bbc2/9509c171bc2b6e0fa01d to your computer and use it in GitHub Desktop.
CoAP server bug
import asyncio
import aiocoap
@asyncio.coroutine
def observe(protocol, addr):
host, port = addr
request = aiocoap.Message(code=aiocoap.GET)
request.set_request_uri('coap://{}:{}/time'.format(host, port))
request.opt.observe = 0
requester = protocol.request(request)
requester.observation.register_errback(lambda e: print('Error: {!r}'.format(e)))
requester.observation.register_callback(lambda o: print('Observe: {}'.format(o.payload)))
try:
response = yield from requester.response
except Exception as e:
print('Failed to fetch resource:\n{!r}'.format(e))
else:
print('Result: {}\n{!r}'.format(response.code, response.payload))
@asyncio.coroutine
def create_endpoint(loop, bind_addr):
factory = lambda: aiocoap.Context(loop=loop)
_, protocol = yield from loop.create_datagram_endpoint(factory, local_addr=bind_addr)
yield from protocol.ready
return protocol
if __name__ == '__main__':
loop = asyncio.get_event_loop()
protocol = loop.run_until_complete(create_endpoint(loop, ('0.0.0.0', 2000)))
loop.run_until_complete(observe(protocol, ('127.0.0.1', 5683)))
loop.run_forever()
loop.close()
import asyncio
import aiocoap
import aiocoap.resource as resource
from datetime import datetime
class TimeResource(resource.ObservableResource):
def __init__(self):
super(TimeResource, self).__init__()
self.notify()
def notify(self):
self.updated_state()
asyncio.get_event_loop().call_later(2, self.notify)
@asyncio.coroutine
def render_GET(self, request):
payload = datetime.now().strftime("%Y-%m-%d %H:%M").encode('ascii')
return aiocoap.Message(code=aiocoap.CONTENT, payload=payload)
def create_site():
root = resource.Site()
root.add_resource(('time',), TimeResource())
return root
@asyncio.coroutine
def create_server(loop, site):
protofact = lambda: aiocoap.Context(loop, site)
protocol = yield from aiocoap.Context.create_server_context(site)
yield from protocol.ready
return protocol
if __name__ == '__main__':
loop = asyncio.get_event_loop()
site = create_site()
protocol = loop.run_until_complete(create_server(loop, site))
loop.run_forever()
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment