Created
November 30, 2014 13:46
-
-
Save bbc2/9509c171bc2b6e0fa01d to your computer and use it in GitHub Desktop.
CoAP server bug
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import aiocoap | |
@asyncio.coroutine | |
def observe(protocol, addr): | |
host, port = addr | |
request = aiocoap.Message(code=aiocoap.GET) | |
request.set_request_uri('coap://{}:{}/time'.format(host, port)) | |
request.opt.observe = 0 | |
requester = protocol.request(request) | |
requester.observation.register_errback(lambda e: print('Error: {!r}'.format(e))) | |
requester.observation.register_callback(lambda o: print('Observe: {}'.format(o.payload))) | |
try: | |
response = yield from requester.response | |
except Exception as e: | |
print('Failed to fetch resource:\n{!r}'.format(e)) | |
else: | |
print('Result: {}\n{!r}'.format(response.code, response.payload)) | |
@asyncio.coroutine | |
def create_endpoint(loop, bind_addr): | |
factory = lambda: aiocoap.Context(loop=loop) | |
_, protocol = yield from loop.create_datagram_endpoint(factory, local_addr=bind_addr) | |
yield from protocol.ready | |
return protocol | |
if __name__ == '__main__': | |
loop = asyncio.get_event_loop() | |
protocol = loop.run_until_complete(create_endpoint(loop, ('0.0.0.0', 2000))) | |
loop.run_until_complete(observe(protocol, ('127.0.0.1', 5683))) | |
loop.run_forever() | |
loop.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import aiocoap | |
import aiocoap.resource as resource | |
from datetime import datetime | |
class TimeResource(resource.ObservableResource): | |
def __init__(self): | |
super(TimeResource, self).__init__() | |
self.notify() | |
def notify(self): | |
self.updated_state() | |
asyncio.get_event_loop().call_later(2, self.notify) | |
@asyncio.coroutine | |
def render_GET(self, request): | |
payload = datetime.now().strftime("%Y-%m-%d %H:%M").encode('ascii') | |
return aiocoap.Message(code=aiocoap.CONTENT, payload=payload) | |
def create_site(): | |
root = resource.Site() | |
root.add_resource(('time',), TimeResource()) | |
return root | |
@asyncio.coroutine | |
def create_server(loop, site): | |
protofact = lambda: aiocoap.Context(loop, site) | |
protocol = yield from aiocoap.Context.create_server_context(site) | |
yield from protocol.ready | |
return protocol | |
if __name__ == '__main__': | |
loop = asyncio.get_event_loop() | |
site = create_site() | |
protocol = loop.run_until_complete(create_server(loop, site)) | |
loop.run_forever() | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment