Skip to content

Instantly share code, notes, and snippets.

@sutaburosu
Last active February 26, 2024 13:33
Show Gist options
  • Save sutaburosu/43d46240dc2b05683d1e98872885f8db to your computer and use it in GitHub Desktop.
Save sutaburosu/43d46240dc2b05683d1e98872885f8db to your computer and use it in GitHub Desktop.
A tool to stream from Wokwi's Arduino sim to real LEDs. Optimised for AVR Arduinos with CH340 USB serial
#include <stdarg.h>
#include <FastLED.h>
#define LED_TYPE WS2812B
#define DATA_PIN 2
#define COLOR_ORDER GRB
#define XY_WIDTH 16
#define XY_HEIGHT 16
#define XY_LAYOUT (SERPENTINE | ROWMAJOR)
#define BRIGHTNESS 16
#define LED_CORRECTION UncorrectedColor
#define LED_TEMPERATURE UncorrectedTemperature
#define BAUD 2000000
#define FRAME_TIMEOUT_MS 250
#define SCREENSAVER_MS (1000 + FRAME_TIMEOUT_MS)
#define INITIAL_TIMEOUT_MS 5000
#define RECV_TIMEOUT_MS 5
#define NO_INPUT_AWAIT_MS 1
// The HardwareSerial library is too slow for 2Mbaud on AVR. ꜙ\(°_°)/ꜙ that.
// Only tested at 2Mbaud. 1Mbaud should work too. Others not so much.
#if defined(__AVR_ATmega328P__) || defined(__AVR_ATmega328PB__) || \
defined(__AVR_ATmega328__) || defined(__AVR_ATmega168__) || \
defined(__AVR_ATmega168P__) || defined(__AVR_ATmega8__)
#define FAST_ATMEGA
#endif
#define NUM_LEDS ((XY_WIDTH) * (XY_HEIGHT))
// 1 extra for XY() to use when out-of-bounds
CRGB leds[NUM_LEDS + 1];
enum XY_layout {
// I think TRANSPOSE is only useful to rotate square panels
SERPENTINE = 16, ROWMAJOR = 8, TRANSPOSE = 4, FLIPMAJOR = 2, FLIPMINOR = 1
};
uint16_t XY(uint8_t x, uint8_t y) {
uint8_t major, minor, sz_major, sz_minor;
if (x >= XY_WIDTH || y >= XY_HEIGHT)
return NUM_LEDS;
if (XY_LAYOUT & ROWMAJOR)
major = x, minor = y, sz_major = XY_WIDTH, sz_minor = XY_HEIGHT;
else
major = y, minor = x, sz_major = XY_HEIGHT, sz_minor = XY_WIDTH;
if (((XY_LAYOUT & FLIPMAJOR) != 0) ^ ((minor & 1) && (XY_LAYOUT & SERPENTINE)))
major = sz_major - 1 - major;
if (XY_LAYOUT & FLIPMINOR)
minor = sz_minor - 1 - minor;
if (XY_LAYOUT & TRANSPOSE)
return major * (uint16_t) sz_major + minor;
else
return minor * (uint16_t) sz_major + major;
}
uint8_t brightness = BRIGHTNESS;
uint32_t total_frames = 0;
void setup() {
FastLED.addLeds<LED_TYPE, DATA_PIN, COLOR_ORDER>(leds, NUM_LEDS);
FastLED.setCorrection(LED_CORRECTION);
FastLED.setTemperature(LED_TEMPERATURE);
FastLED.setDither(DISABLE_DITHER);
FastLED.setBrightness(brightness);
FastLED.clear();
FastLED.show();
myserial_begin(BAUD, RECV_TIMEOUT_MS);
#if !defined(FAST_ATMEGA)
pinMode(LED_BUILTIN, OUTPUT);
#else
DDRB |= 1 << PB5; // set pin 13 to output
#endif
}
void loop() {
static uint32_t last_frame_ms = millis();
static uint16_t await_ms = INITIAL_TIMEOUT_MS;
static uint32_t good_frames = 0;
static uint32_t bad_frames = 0;
// wait for some serial data to arrive
int16_t in = 0;
uint32_t ms = millis();
while (in <= 0 && millis() - ms < await_ms) {
#if !defined(FAST_ATMEGA)
in = Serial.available();
#else
in = UCSR0A & (1 << RXC0);
#endif
}
// nothing received for a while: fade to black then show screensaver
if (in <= 0) {
await_ms = NO_INPUT_AWAIT_MS;
if (millis() - last_frame_ms < SCREENSAVER_MS)
fadeToBlackBy(leds, NUM_LEDS, 8);
else
screensaver();
showLEDs();
return;
}
// receive up to NUM_LEDS * 3 bytes, with a timeout
uint16_t recvd = 0;
uint8_t *dst = (uint8_t *) leds;
#if !defined(FAST_ATMEGA)
recvd = Serial.readBytes(dst, NUM_LEDS * 3);
#else
ms = millis();
while (recvd < NUM_LEDS * 3 && millis() - ms < RECV_TIMEOUT_MS) {
while (!(UCSR0A & (1 << RXC0)) && millis() - ms < RECV_TIMEOUT_MS);
if ((UCSR0A & (1 << RXC0))) {
*dst++ = UDR0;
if (++recvd == NUM_LEDS * 3)
break;
ms = millis();
}
}
#endif
// discard any surplus bytes to help with sync
#if !defined(FAST_ATMEGA)
while (Serial.available() > 0) {
uint8_t tmp;
Serial.readBytes(&tmp, 1);
recvd++;
}
#else
while (UCSR0A & (1 << RXC0)) {
uint8_t tmp = UDR0;
(void) tmp;
recvd++;
}
#endif
// full frame received: show it
if (recvd == NUM_LEDS * 3) {
last_frame_ms = millis();
good_frames++;
await_ms = FRAME_TIMEOUT_MS;
showLEDs();
return;
}
// 1-byte commands
if (recvd == 1) {
byte * data = (byte *) leds;
switch (data[0]) {
case 'B':
brightness = qadd8(brightness, 1);
myserial_printint(brightness);
break;
case 'b':
brightness = qsub8(brightness, 1);
myserial_printint(brightness);
break;
case '?':
print_config();
break;
case '/':
myserial_snprintf(64, "FPS %d b%" PRIu32 " g%" PRIu32 "\t",
FastLED.getFPS(), bad_frames, good_frames);
break;
case 0:
// quit screensaver command
await_ms = FRAME_TIMEOUT_MS;
return;
break;
default:
break;
}
FastLED.setBrightness(brightness);
return;
}
// If we get this far, we're out-of-sync with the sender.
// Pause the screensaver for a moment to help to regain sync.
await_ms = FRAME_TIMEOUT_MS;
bad_frames++;
}
void showLEDs() {
#if !defined(FAST_ATMEGA)
digitalWrite(LED_BUILTIN, digitalRead(LED_BUILTIN) ^ 1);
FastLED.show();
digitalWrite(LED_BUILTIN, digitalRead(LED_BUILTIN) ^ 1);
#else
PORTB ^= 1 << PORTB5; // toggle pin 13
FastLED.show();
PORTB ^= 1 << PORTB5; // toggle pin 13
#endif
total_frames++;
}
void screensaver() {
}
void myserial_begin(long baud, uint16_t timeout) {
#if !defined(FAST_ATMEGA)
Serial.begin(baud);
Serial.setTimeout(timeout);
#else
(void) timeout; // suppress unused variable warning
UBRR0 = F_CPU / 8 / baud - 1; // baud rate pre-scale for U2X == 1
UCSR0A = 1 << U2X0; // double speed asynchronous
UCSR0B |= (1 << TXEN0); // enable transmit
UCSR0B |= (1 << RXEN0); // enable receive
UCSR0C = 3 << UCSZ00; // 8-None-1 asynchronous
#endif
}
void myserial_printint(int16_t val) {
char buffer[9];
int len = sprintf(buffer, "%d\r\n", val);
myserial_write(buffer, len);
}
void myserial_println_uint16(uint16_t val) {
char buffer[8];
int len = sprintf(buffer, "%u\r\n", val);
myserial_write(buffer, len);
}
void myserial_write(const char * out, size_t len) {
#if !defined(FAST_ATMEGA)
Serial.write(out, len);
#else
while (len-- > 0) {
while (!(UCSR0A & (1 << UDRE0))); // wait for USART to be idle
UDR0 = *out++;
}
#endif
}
void myserial_snprintf(const uint8_t buf_size, const char *fmt, ...) {
if (buf_size == 0)
return;
char buffer[buf_size];
va_list args;
va_start(args, fmt);
int len = vsnprintf(buffer, buf_size, fmt, args);
if (len > buf_size - 1)
len = buf_size - 1;
myserial_write(buffer, len);
va_end(args);
}
void print_config() {
#define STRINGIFY(a) _STRINGIFY(a)
#define _STRINGIFY(a) #a
myserial_snprintf(100,
"xy:%dx%d layout:%d type:" STRINGIFY(LED_TYPE) "\r\n",
XY_WIDTH, XY_HEIGHT, XY_LAYOUT);
}
#!/usr/bin/env python3
# pip3 install websockets serial
# This script forwards websocket payloads to a com port, another websocket, or /dev/null
# '.' is printed for each frame forwarded over serial
# 'e' is printed for each frame that couldn't be forwarded because the receiver would still be busy receiving or show()ing
# digits are printed to show how many duplicate frames were received and not forwarded.
import argparse
import asyncio
import serial
import time
import websockets
async def recv(websocket, path):
clientsocket = None
if wsclient:
clientsocket = await(wsclient)
prev = None
dups = 0
next_ms = 0
async for message in websocket:
if type(message) is bytes and path == "/":
if message == prev:
dups += 1
print(dups, end='', flush=True)
else:
prev = message
dups = 0
this_ms = time.time() * 1000
if next_ms - this_ms > 0.01:
# too early
print('e', end='', flush=True)
else:
next_ms = this_ms
# delay for serial: 10 bits per byte for 8-N-1
if tty:
next_ms += (len(message) * 10) / (args.baud / 1000)
# delay for FastLED.show() assuming WS2812B protocol
next_ms += (len(message) / 3 * 30 + 90) / 1000
print('.', end='', flush=True)
await send_leds(message, clientsocket)
async def send_leds(payload, clientsocket):
if clientsocket:
await clientsocket.send(payload)
elif tty:
tty.write(payload)
elif args.port == '/dev/stdout':
print(payload)
parser = argparse.ArgumentParser(description='A WebSocket server to stream the received payloads to a serial port')
parser.add_argument('-l', '--listen', type=int, default=2020,
help='the TCP port to listen on')
parser.add_argument('-b', '--baud', type=int, default=2000000)
parser.add_argument('port', type=str, default='/dev/ttyUSB0', nargs='?',
help="the serial port or websocket which will receive the payload")
args = parser.parse_args()
tty = None
wsclient = None
if args.port == '/dev/null' or args.port == '/dev/stdout':
pass
elif 'ws://' in args.port:
wsclient = websockets.connect(args.port)
asyncio.get_event_loop().run_until_complete(wsclient)
else:
tty = serial.Serial(args.port, args.baud)
start_server = websockets.serve(recv, "localhost", args.listen, compression=None)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment