Skip to content

Instantly share code, notes, and snippets.

@jimmo
Created September 9, 2020 02:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jimmo/03add465433686c416df43611bdbc7b7 to your computer and use it in GitHub Desktop.
Save jimmo/03add465433686c416df43611bdbc7b7 to your computer and use it in GitHub Desktop.
import sys
sys.path.append('')
from micropython import const
import micropython
import uasyncio as asyncio
import io
_MP_STREAM_POLL = const(3)
class PollingEvent(io.IOBase):
def __init__(self):
self._flag = 0
def ioctl(self, req, flags):
if req == _MP_STREAM_POLL:
return self._flag * flags
return None
def set(self):
self._flag = 1
async def wait(self):
yield asyncio.core._io_queue.queue_read(self)
self._flag = 0
async def f(ev):
print('f start')
await asyncio.sleep_ms(2000)
print('f set')
micropython.schedule(lambda _: ev.set(), None)
print('f done')
async def g(ev):
print('g wait')
await ev.wait()
print('g ready')
async def main():
ev = PollingEvent()
t1 = asyncio.create_task(f(ev))
t2 = asyncio.create_task(g(ev))
await asyncio.gather(t1, t2)
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment