Skip to content

Instantly share code, notes, and snippets.

@peterblazejewicz
Created April 19, 2020 14:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save peterblazejewicz/2d48075f79392f27088885b89e5d536c to your computer and use it in GitHub Desktop.
Save peterblazejewicz/2d48075f79392f27088885b89e5d536c to your computer and use it in GitHub Desktop.
asyncio.Queue to process SenseHat keyboard events in asynchronous event loop
#!/usr/bin/python
import asyncio
from sense_hat import SenseHat
sense: SenseHat = None
loop: asyncio.AbstractEventLoop = None
queue: asyncio.Queue = None
def process_direction(event):
""" sychronous code executed on main thread
"""
if event.action == 'pressed':
direction = event.direction
asyncio.run_coroutine_threadsafe(queue.put(direction), loop=loop)
async def show(direction):
""" asynchronous code executed on the event loop
"""
if direction == 'up':
sense.show_letter("U")
elif direction == "down":
sense.show_letter("D")
elif direction == "left":
sense.show_letter("L")
elif direction == "right":
sense.show_letter("R")
elif direction == "middle":
sense.show_letter("M")
async def main():
global loop
global queue
loop = asyncio.get_event_loop()
queue = asyncio.Queue()
while True:
direction = await queue.get()
await show(direction)
queue.task_done()
if queue.empty():
await asyncio.sleep(.5)
sense.clear()
try:
sense = SenseHat()
sense.clear()
sense.stick.direction_any = process_direction
asyncio.run(main())
except Exception as ex:
print(ex)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment