Skip to content

Instantly share code, notes, and snippets.

@ambv
Last active February 27, 2023 21:42
Show Gist options
  • Save ambv/e422fc092d3ac3e79e8b53f8efcb3108 to your computer and use it in GitHub Desktop.
Save ambv/e422fc092d3ac3e79e8b53f8efcb3108 to your computer and use it in GitHub Desktop.
from __future__ import annotations
import asyncio
from dataclasses import dataclass, field
import gc
import random
import time
import monome
import numpy as np
import uvloop
@dataclass
class MyArcApp(monome.ArcApp):
connected: bool = False
rings: np.array[np.float64] = field(default_factory=lambda: np.zeros(4, np.float64))
def __post_init__(self) -> None:
super().__init__()
self._buffer = np.zeros(64, dtype=">i")
def on_arc_ready(self) -> None:
print("Arc connected")
self.connected = True
def on_arc_disconnect(self) -> None:
print("Arc disconnected")
self.connected = False
def on_arc_delta(self, ring: int, delta: int) -> None:
self.rings[ring] += 0.1 * delta
def advance(self) -> None:
for ring in range(4):
cur = self.rings[ring]
if cur > 64:
self.rings[ring] = 0
else:
self.rings[ring] = cur + 0.1 * (ring + 1)
def draw(self) -> None:
if not self.connected:
return
b = self._buffer
for ring in range(4):
b[:] = 0
for led in range(64):
if self.rings[ring] > led:
b[led] = 15
self.arc.ring_map_raw(ring, b)
@dataclass
class MyGridApp(monome.GridApp):
width: int = 0
height: int = 0
connected: bool = False
leds: list[list[int]] = field(default_factory=lambda: [[0]])
def __post_init__(self) -> None:
super().__init__()
self._buffer = np.zeros(8, dtype=(">i", 8))
self._counter = 0
def on_grid_ready(self) -> None:
self.width = self.grid.width
self.height = self.grid.height
self.connected = True
self.leds = [[0 for _ in range(self.height)] for _ in range(self.width)]
g = "Grid"
if self.grid.varibright:
g = "Varibright grid"
print(f"{g} {self.width}x{self.height} connected")
def on_grid_disconnect(self) -> None:
print("Grid disconnected")
self.connected = False
def on_grid_key(self, x: int, y: int, s: int) -> None:
if s > 0:
self.leds[x][y] = 100
else:
self.leds[x][y] = 99
def advance(self) -> None:
for x in range(self.width):
for y in range(self.height):
cur = self.leds[x][y]
if cur < 100:
self.leds[x][y] = max(0, cur - 1)
self._counter += 1
if self._counter % 11 == 0:
x = random.randint(0, len(self.leds)-1)
y = random.randint(0, len(self.leds[0])-1)
self.leds[x][y] = 99
def draw(self) -> None:
if not self.connected:
return
b = self._buffer
for x_offset in range(0, self.width, 8):
for y_offset in range(0, self.height, 8):
b[:] = 0
for x in range(8):
for y in range(8):
v = self.leds[x_offset + x][y_offset + y]
b[y][x] = max(0, min(15, v // 6))
self.grid.led_level_map_raw(x_offset, y_offset, b)
async def main():
arc_app = MyArcApp()
grid_app = MyGridApp()
render_fps = 0.0
async with asyncio.TaskGroup() as tg:
def serialosc_device_added(id, type, port):
if type == "monome arc":
tg.create_task(arc_app.arc.connect("127.0.0.1", port))
elif type == "monome 128":
tg.create_task(grid_app.grid.connect("127.0.0.1", port))
else:
print(
f"warning: unknown Monome device connected - type {type!r}, id {id}"
)
async def draw_all():
nonlocal render_fps
fps = 0
last_sec = time.monotonic()
while True:
fps += 1
grid_app.draw()
arc_app.draw()
now = time.monotonic()
if now - last_sec >= 1:
render_fps = fps / (now - last_sec)
fps = 0
last_sec = now
await asyncio.sleep(0.03)
serialosc = monome.SerialOsc()
serialosc.device_added_event.add_handler(serialosc_device_added)
await serialosc.connect()
tg.create_task(draw_all())
fps = 0
last_sec = time.monotonic()
delta = time.monotonic()
delta_max = 0.0
while True:
fps += 1
start = time.monotonic()
grid_app.advance()
arc_app.advance()
now = time.monotonic()
if now - last_sec >= 1:
comp_fps = fps / (now - last_sec)
comp_min = 1 / delta_max
print(
f"FPS: computation={comp_fps:.2f} (min={comp_min:.2f}) rendering={render_fps:.2f}"
)
fps = 0
delta_max = 0
last_sec = now
if (now - delta) > delta_max:
delta_max = now - delta
delta = now
await asyncio.sleep(max(0, 0.01 - (now - start)))
if __name__ == "__main__":
uvloop.install()
gc.disable()
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment