Last active
February 26, 2024 13:33
-
-
Save sutaburosu/43d46240dc2b05683d1e98872885f8db to your computer and use it in GitHub Desktop.
A tool to stream from Wokwi's Arduino sim to real LEDs. Optimised for AVR Arduinos with CH340 USB serial
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdarg.h> | |
#include <FastLED.h> | |
#define LED_TYPE WS2812B | |
#define DATA_PIN 2 | |
#define COLOR_ORDER GRB | |
#define XY_WIDTH 16 | |
#define XY_HEIGHT 16 | |
#define XY_LAYOUT (SERPENTINE | ROWMAJOR) | |
#define BRIGHTNESS 16 | |
#define LED_CORRECTION UncorrectedColor | |
#define LED_TEMPERATURE UncorrectedTemperature | |
#define BAUD 2000000 | |
#define FRAME_TIMEOUT_MS 250 | |
#define SCREENSAVER_MS (1000 + FRAME_TIMEOUT_MS) | |
#define INITIAL_TIMEOUT_MS 5000 | |
#define RECV_TIMEOUT_MS 5 | |
#define NO_INPUT_AWAIT_MS 1 | |
// The HardwareSerial library is too slow for 2Mbaud on AVR. ꜙ\(°_°)/ꜙ that. | |
// Only tested at 2Mbaud. 1Mbaud should work too. Others not so much. | |
#if defined(__AVR_ATmega328P__) || defined(__AVR_ATmega328PB__) || \ | |
defined(__AVR_ATmega328__) || defined(__AVR_ATmega168__) || \ | |
defined(__AVR_ATmega168P__) || defined(__AVR_ATmega8__) | |
#define FAST_ATMEGA | |
#endif | |
#define NUM_LEDS ((XY_WIDTH) * (XY_HEIGHT)) | |
// 1 extra for XY() to use when out-of-bounds | |
CRGB leds[NUM_LEDS + 1]; | |
enum XY_layout { | |
// I think TRANSPOSE is only useful to rotate square panels | |
SERPENTINE = 16, ROWMAJOR = 8, TRANSPOSE = 4, FLIPMAJOR = 2, FLIPMINOR = 1 | |
}; | |
uint16_t XY(uint8_t x, uint8_t y) { | |
uint8_t major, minor, sz_major, sz_minor; | |
if (x >= XY_WIDTH || y >= XY_HEIGHT) | |
return NUM_LEDS; | |
if (XY_LAYOUT & ROWMAJOR) | |
major = x, minor = y, sz_major = XY_WIDTH, sz_minor = XY_HEIGHT; | |
else | |
major = y, minor = x, sz_major = XY_HEIGHT, sz_minor = XY_WIDTH; | |
if (((XY_LAYOUT & FLIPMAJOR) != 0) ^ ((minor & 1) && (XY_LAYOUT & SERPENTINE))) | |
major = sz_major - 1 - major; | |
if (XY_LAYOUT & FLIPMINOR) | |
minor = sz_minor - 1 - minor; | |
if (XY_LAYOUT & TRANSPOSE) | |
return major * (uint16_t) sz_major + minor; | |
else | |
return minor * (uint16_t) sz_major + major; | |
} | |
uint8_t brightness = BRIGHTNESS; | |
uint32_t total_frames = 0; | |
void setup() { | |
FastLED.addLeds<LED_TYPE, DATA_PIN, COLOR_ORDER>(leds, NUM_LEDS); | |
FastLED.setCorrection(LED_CORRECTION); | |
FastLED.setTemperature(LED_TEMPERATURE); | |
FastLED.setDither(DISABLE_DITHER); | |
FastLED.setBrightness(brightness); | |
FastLED.clear(); | |
FastLED.show(); | |
myserial_begin(BAUD, RECV_TIMEOUT_MS); | |
#if !defined(FAST_ATMEGA) | |
pinMode(LED_BUILTIN, OUTPUT); | |
#else | |
DDRB |= 1 << PB5; // set pin 13 to output | |
#endif | |
} | |
void loop() { | |
static uint32_t last_frame_ms = millis(); | |
static uint16_t await_ms = INITIAL_TIMEOUT_MS; | |
static uint32_t good_frames = 0; | |
static uint32_t bad_frames = 0; | |
// wait for some serial data to arrive | |
int16_t in = 0; | |
uint32_t ms = millis(); | |
while (in <= 0 && millis() - ms < await_ms) { | |
#if !defined(FAST_ATMEGA) | |
in = Serial.available(); | |
#else | |
in = UCSR0A & (1 << RXC0); | |
#endif | |
} | |
// nothing received for a while: fade to black then show screensaver | |
if (in <= 0) { | |
await_ms = NO_INPUT_AWAIT_MS; | |
if (millis() - last_frame_ms < SCREENSAVER_MS) | |
fadeToBlackBy(leds, NUM_LEDS, 8); | |
else | |
screensaver(); | |
showLEDs(); | |
return; | |
} | |
// receive up to NUM_LEDS * 3 bytes, with a timeout | |
uint16_t recvd = 0; | |
uint8_t *dst = (uint8_t *) leds; | |
#if !defined(FAST_ATMEGA) | |
recvd = Serial.readBytes(dst, NUM_LEDS * 3); | |
#else | |
ms = millis(); | |
while (recvd < NUM_LEDS * 3 && millis() - ms < RECV_TIMEOUT_MS) { | |
while (!(UCSR0A & (1 << RXC0)) && millis() - ms < RECV_TIMEOUT_MS); | |
if ((UCSR0A & (1 << RXC0))) { | |
*dst++ = UDR0; | |
if (++recvd == NUM_LEDS * 3) | |
break; | |
ms = millis(); | |
} | |
} | |
#endif | |
// discard any surplus bytes to help with sync | |
#if !defined(FAST_ATMEGA) | |
while (Serial.available() > 0) { | |
uint8_t tmp; | |
Serial.readBytes(&tmp, 1); | |
recvd++; | |
} | |
#else | |
while (UCSR0A & (1 << RXC0)) { | |
uint8_t tmp = UDR0; | |
(void) tmp; | |
recvd++; | |
} | |
#endif | |
// full frame received: show it | |
if (recvd == NUM_LEDS * 3) { | |
last_frame_ms = millis(); | |
good_frames++; | |
await_ms = FRAME_TIMEOUT_MS; | |
showLEDs(); | |
return; | |
} | |
// 1-byte commands | |
if (recvd == 1) { | |
byte * data = (byte *) leds; | |
switch (data[0]) { | |
case 'B': | |
brightness = qadd8(brightness, 1); | |
myserial_printint(brightness); | |
break; | |
case 'b': | |
brightness = qsub8(brightness, 1); | |
myserial_printint(brightness); | |
break; | |
case '?': | |
print_config(); | |
break; | |
case '/': | |
myserial_snprintf(64, "FPS %d b%" PRIu32 " g%" PRIu32 "\t", | |
FastLED.getFPS(), bad_frames, good_frames); | |
break; | |
case 0: | |
// quit screensaver command | |
await_ms = FRAME_TIMEOUT_MS; | |
return; | |
break; | |
default: | |
break; | |
} | |
FastLED.setBrightness(brightness); | |
return; | |
} | |
// If we get this far, we're out-of-sync with the sender. | |
// Pause the screensaver for a moment to help to regain sync. | |
await_ms = FRAME_TIMEOUT_MS; | |
bad_frames++; | |
} | |
void showLEDs() { | |
#if !defined(FAST_ATMEGA) | |
digitalWrite(LED_BUILTIN, digitalRead(LED_BUILTIN) ^ 1); | |
FastLED.show(); | |
digitalWrite(LED_BUILTIN, digitalRead(LED_BUILTIN) ^ 1); | |
#else | |
PORTB ^= 1 << PORTB5; // toggle pin 13 | |
FastLED.show(); | |
PORTB ^= 1 << PORTB5; // toggle pin 13 | |
#endif | |
total_frames++; | |
} | |
void screensaver() { | |
} | |
void myserial_begin(long baud, uint16_t timeout) { | |
#if !defined(FAST_ATMEGA) | |
Serial.begin(baud); | |
Serial.setTimeout(timeout); | |
#else | |
(void) timeout; // suppress unused variable warning | |
UBRR0 = F_CPU / 8 / baud - 1; // baud rate pre-scale for U2X == 1 | |
UCSR0A = 1 << U2X0; // double speed asynchronous | |
UCSR0B |= (1 << TXEN0); // enable transmit | |
UCSR0B |= (1 << RXEN0); // enable receive | |
UCSR0C = 3 << UCSZ00; // 8-None-1 asynchronous | |
#endif | |
} | |
void myserial_printint(int16_t val) { | |
char buffer[9]; | |
int len = sprintf(buffer, "%d\r\n", val); | |
myserial_write(buffer, len); | |
} | |
void myserial_println_uint16(uint16_t val) { | |
char buffer[8]; | |
int len = sprintf(buffer, "%u\r\n", val); | |
myserial_write(buffer, len); | |
} | |
void myserial_write(const char * out, size_t len) { | |
#if !defined(FAST_ATMEGA) | |
Serial.write(out, len); | |
#else | |
while (len-- > 0) { | |
while (!(UCSR0A & (1 << UDRE0))); // wait for USART to be idle | |
UDR0 = *out++; | |
} | |
#endif | |
} | |
void myserial_snprintf(const uint8_t buf_size, const char *fmt, ...) { | |
if (buf_size == 0) | |
return; | |
char buffer[buf_size]; | |
va_list args; | |
va_start(args, fmt); | |
int len = vsnprintf(buffer, buf_size, fmt, args); | |
if (len > buf_size - 1) | |
len = buf_size - 1; | |
myserial_write(buffer, len); | |
va_end(args); | |
} | |
void print_config() { | |
#define STRINGIFY(a) _STRINGIFY(a) | |
#define _STRINGIFY(a) #a | |
myserial_snprintf(100, | |
"xy:%dx%d layout:%d type:" STRINGIFY(LED_TYPE) "\r\n", | |
XY_WIDTH, XY_HEIGHT, XY_LAYOUT); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# pip3 install websockets serial | |
# This script forwards websocket payloads to a com port, another websocket, or /dev/null | |
# '.' is printed for each frame forwarded over serial | |
# 'e' is printed for each frame that couldn't be forwarded because the receiver would still be busy receiving or show()ing | |
# digits are printed to show how many duplicate frames were received and not forwarded. | |
import argparse | |
import asyncio | |
import serial | |
import time | |
import websockets | |
async def recv(websocket, path): | |
clientsocket = None | |
if wsclient: | |
clientsocket = await(wsclient) | |
prev = None | |
dups = 0 | |
next_ms = 0 | |
async for message in websocket: | |
if type(message) is bytes and path == "/": | |
if message == prev: | |
dups += 1 | |
print(dups, end='', flush=True) | |
else: | |
prev = message | |
dups = 0 | |
this_ms = time.time() * 1000 | |
if next_ms - this_ms > 0.01: | |
# too early | |
print('e', end='', flush=True) | |
else: | |
next_ms = this_ms | |
# delay for serial: 10 bits per byte for 8-N-1 | |
if tty: | |
next_ms += (len(message) * 10) / (args.baud / 1000) | |
# delay for FastLED.show() assuming WS2812B protocol | |
next_ms += (len(message) / 3 * 30 + 90) / 1000 | |
print('.', end='', flush=True) | |
await send_leds(message, clientsocket) | |
async def send_leds(payload, clientsocket): | |
if clientsocket: | |
await clientsocket.send(payload) | |
elif tty: | |
tty.write(payload) | |
elif args.port == '/dev/stdout': | |
print(payload) | |
parser = argparse.ArgumentParser(description='A WebSocket server to stream the received payloads to a serial port') | |
parser.add_argument('-l', '--listen', type=int, default=2020, | |
help='the TCP port to listen on') | |
parser.add_argument('-b', '--baud', type=int, default=2000000) | |
parser.add_argument('port', type=str, default='/dev/ttyUSB0', nargs='?', | |
help="the serial port or websocket which will receive the payload") | |
args = parser.parse_args() | |
tty = None | |
wsclient = None | |
if args.port == '/dev/null' or args.port == '/dev/stdout': | |
pass | |
elif 'ws://' in args.port: | |
wsclient = websockets.connect(args.port) | |
asyncio.get_event_loop().run_until_complete(wsclient) | |
else: | |
tty = serial.Serial(args.port, args.baud) | |
start_server = websockets.serve(recv, "localhost", args.listen, compression=None) | |
asyncio.get_event_loop().run_until_complete(start_server) | |
asyncio.get_event_loop().run_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment