-
-
Save ambv/e422fc092d3ac3e79e8b53f8efcb3108 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
import asyncio | |
from dataclasses import dataclass, field | |
import gc | |
import random | |
import time | |
import monome | |
import numpy as np | |
import uvloop | |
@dataclass | |
class MyArcApp(monome.ArcApp): | |
connected: bool = False | |
rings: np.array[np.float64] = field(default_factory=lambda: np.zeros(4, np.float64)) | |
def __post_init__(self) -> None: | |
super().__init__() | |
self._buffer = np.zeros(64, dtype=">i") | |
def on_arc_ready(self) -> None: | |
print("Arc connected") | |
self.connected = True | |
def on_arc_disconnect(self) -> None: | |
print("Arc disconnected") | |
self.connected = False | |
def on_arc_delta(self, ring: int, delta: int) -> None: | |
self.rings[ring] += 0.1 * delta | |
def advance(self) -> None: | |
for ring in range(4): | |
cur = self.rings[ring] | |
if cur > 64: | |
self.rings[ring] = 0 | |
else: | |
self.rings[ring] = cur + 0.1 * (ring + 1) | |
def draw(self) -> None: | |
if not self.connected: | |
return | |
b = self._buffer | |
for ring in range(4): | |
b[:] = 0 | |
for led in range(64): | |
if self.rings[ring] > led: | |
b[led] = 15 | |
self.arc.ring_map_raw(ring, b) | |
@dataclass | |
class MyGridApp(monome.GridApp): | |
width: int = 0 | |
height: int = 0 | |
connected: bool = False | |
leds: list[list[int]] = field(default_factory=lambda: [[0]]) | |
def __post_init__(self) -> None: | |
super().__init__() | |
self._buffer = np.zeros(8, dtype=(">i", 8)) | |
self._counter = 0 | |
def on_grid_ready(self) -> None: | |
self.width = self.grid.width | |
self.height = self.grid.height | |
self.connected = True | |
self.leds = [[0 for _ in range(self.height)] for _ in range(self.width)] | |
g = "Grid" | |
if self.grid.varibright: | |
g = "Varibright grid" | |
print(f"{g} {self.width}x{self.height} connected") | |
def on_grid_disconnect(self) -> None: | |
print("Grid disconnected") | |
self.connected = False | |
def on_grid_key(self, x: int, y: int, s: int) -> None: | |
if s > 0: | |
self.leds[x][y] = 100 | |
else: | |
self.leds[x][y] = 99 | |
def advance(self) -> None: | |
for x in range(self.width): | |
for y in range(self.height): | |
cur = self.leds[x][y] | |
if cur < 100: | |
self.leds[x][y] = max(0, cur - 1) | |
self._counter += 1 | |
if self._counter % 11 == 0: | |
x = random.randint(0, len(self.leds)-1) | |
y = random.randint(0, len(self.leds[0])-1) | |
self.leds[x][y] = 99 | |
def draw(self) -> None: | |
if not self.connected: | |
return | |
b = self._buffer | |
for x_offset in range(0, self.width, 8): | |
for y_offset in range(0, self.height, 8): | |
b[:] = 0 | |
for x in range(8): | |
for y in range(8): | |
v = self.leds[x_offset + x][y_offset + y] | |
b[y][x] = max(0, min(15, v // 6)) | |
self.grid.led_level_map_raw(x_offset, y_offset, b) | |
async def main(): | |
arc_app = MyArcApp() | |
grid_app = MyGridApp() | |
render_fps = 0.0 | |
async with asyncio.TaskGroup() as tg: | |
def serialosc_device_added(id, type, port): | |
if type == "monome arc": | |
tg.create_task(arc_app.arc.connect("127.0.0.1", port)) | |
elif type == "monome 128": | |
tg.create_task(grid_app.grid.connect("127.0.0.1", port)) | |
else: | |
print( | |
f"warning: unknown Monome device connected - type {type!r}, id {id}" | |
) | |
async def draw_all(): | |
nonlocal render_fps | |
fps = 0 | |
last_sec = time.monotonic() | |
while True: | |
fps += 1 | |
grid_app.draw() | |
arc_app.draw() | |
now = time.monotonic() | |
if now - last_sec >= 1: | |
render_fps = fps / (now - last_sec) | |
fps = 0 | |
last_sec = now | |
await asyncio.sleep(0.03) | |
serialosc = monome.SerialOsc() | |
serialosc.device_added_event.add_handler(serialosc_device_added) | |
await serialosc.connect() | |
tg.create_task(draw_all()) | |
fps = 0 | |
last_sec = time.monotonic() | |
delta = time.monotonic() | |
delta_max = 0.0 | |
while True: | |
fps += 1 | |
start = time.monotonic() | |
grid_app.advance() | |
arc_app.advance() | |
now = time.monotonic() | |
if now - last_sec >= 1: | |
comp_fps = fps / (now - last_sec) | |
comp_min = 1 / delta_max | |
print( | |
f"FPS: computation={comp_fps:.2f} (min={comp_min:.2f}) rendering={render_fps:.2f}" | |
) | |
fps = 0 | |
delta_max = 0 | |
last_sec = now | |
if (now - delta) > delta_max: | |
delta_max = now - delta | |
delta = now | |
await asyncio.sleep(max(0, 0.01 - (now - start))) | |
if __name__ == "__main__": | |
uvloop.install() | |
gc.disable() | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment