Skip to content

Instantly share code, notes, and snippets.

@kmcallister
Created June 12, 2019 21:35
Show Gist options
  • Save kmcallister/72916bf1c278a15665031730c4251037 to your computer and use it in GitHub Desktop.
Save kmcallister/72916bf1c278a15665031730c4251037 to your computer and use it in GitHub Desktop.
LED strip code
#!/usr/bin/env python
import itertools
import colorsys
import datetime
import irc.bot
import random
import socket
import struct
import sys
import re
from tinycss import color3
def log(*args):
sys.stdout.write('%s: %s\n' % (datetime.datetime.now(), ' '.join(args)))
sys.stdout.flush()
def rainbowify(txt):
colors = (5,8,3,10,2,6)
return '\x02' + ''.join(
'\x03%d%c' % (i, c) for c,i in zip(txt, itertools.cycle(colors)))
rainbow_yay = rainbowify('Y' + ('A' * 88) + 'Y')
class LEDs(object):
def __init__(self):
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
def _conv(self, x):
if x<0: return 0
if x>1: return 31
return int(round(x*31))
def send(self, colors):
dat = ''
for cx in colors:
r,g,b = map(self._conv, cx)
dat += struct.pack('<H', (b << 10) | (r << 5) | g)
self._sock.sendto(dat, ('127.0.0.1', 2772))
if len(sys.argv) > 1:
my_nick = 'LEDzplntest'
my_channel = '#ptopology-test'
else:
my_nick = 'LEDzpln'
my_channel = '#ptopology'
server = ('irc.freenode.net', 6667)
class Bot(irc.bot.SingleServerIRCBot):
def __init__(self):
irc.bot.SingleServerIRCBot.__init__(self, [server], my_nick, my_nick)
self.leds = LEDs()
def on_welcome(self, conn, event):
log('connected')
conn.join(my_channel)
def on_pubmsg(self, conn, event):
match = re.match(conn.get_nickname() + '[,:] (.*)$', event.arguments[0])
if match:
self.do_command(conn, event.source.nick, event.target, match.group(1))
def on_privmsg(self, conn, event):
nick = event.source.nick
self.do_command(conn, nick, nick, event.arguments[0])
def do_command(self, conn, sender, respond_to, cmd):
cmd = cmd.strip()
log('commanded by', sender, 'in', respond_to, ':', cmd)
if cmd == 'off':
cmd = 'black'
if cmd == 'rainbow':
return self.do_rainbow(conn, respond_to)
rgb = color3.parse_color_string(cmd)
if not rgb:
conn.privmsg(respond_to, sender + ": Sorry, I don't understand that color.")
return
self.leds.send([(rgb.red, rgb.green, rgb.blue)] * 198)
def do_rainbow(self, conn, target):
phase = random.random()
self.leds.send([colorsys.hsv_to_rgb((phase + 3*i/198.0) % 1.0, 1, 1) for i in xrange(198)])
conn.privmsg(target, rainbow_yay)
bot = Bot()
bot.start()
#define _GNU_SOURCE
#include <time.h>
#include <sched.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/mman.h>
#include <netinet/in.h>
#include <sys/select.h>
#include <sys/socket.h>
#define BCM2708_PERI_BASE 0x20000000
#define GPIO_BASE (BCM2708_PERI_BASE + 0x200000)
#define PAGE 4096
#define DAT_A 24
#define CLK_A 23
#define DAT_B 8
#define CLK_B 25
volatile unsigned *gpio;
inline void setup_output(uint32_t g) {
*(gpio+((g)/10)) &= ~(7<<(((g)%10)*3));
*(gpio+((g)/10)) |= (1<<(((g)%10)*3));
}
inline void set(uint32_t mask) {
*(gpio+7) = mask;
}
inline void clr(uint32_t mask) {
*(gpio+10) = mask;
}
void setup_gpio() {
int mem_fd;
void *map;
if ((mem_fd = open("/dev/mem", O_RDWR|O_SYNC)) < 0) {
perror("open");
exit(1);
}
if ((map = mmap(NULL, PAGE, PROT_READ|PROT_WRITE, MAP_SHARED, mem_fd, GPIO_BASE))
== MAP_FAILED) {
perror("mmap");
exit(1);
}
gpio = (volatile unsigned *) map;
close(mem_fd);
setup_output(DAT_A);
setup_output(CLK_A);
setup_output(DAT_B);
setup_output(CLK_B);
}
void delay() {
// 1 loop iter ~= 4.35 ns
// max clock: 25 MHz = 40 ns delay
for (size_t i=0; i<200; i++) {
asm volatile("nop");
}
/*
struct timespec timeout = { .tv_sec = 0, .tv_nsec = 40 };
nanosleep(&timeout, NULL);
*/
}
void write_bits(size_t strip, size_t n, uint32_t bits) {
uint32_t dat = 1 << (strip ? DAT_B : DAT_A);
uint32_t clk = 1 << (strip ? CLK_B : CLK_A);
uint32_t mask = (1 << (n-1));
while (mask) {
if (bits & mask)
set(dat);
else
clr(dat);
delay();
set(clk);
delay();
clr(clk);
mask >>= 1;
}
}
#define CHANNELS 198
#define BUF_SIZE (CHANNELS * 2)
uint16_t display_buf[CHANNELS];
uint8_t read_buf[BUF_SIZE];
int main() {
struct sched_param sched_param
= { .sched_priority = 99 };
setup_gpio();
if (sched_setscheduler(0, SCHED_FIFO, &sched_param) < 0) {
perror("sched_setscheduler");
exit(1);
}
/*
setresuid(65534, 65534, 65534);
setresgid(65534, 65534, 65534);
*/
int sock_fd;
if ((sock_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
perror("socket");
exit(1);
}
struct sockaddr_in server_addr = {
.sin_family = AF_INET,
.sin_addr = { .s_addr = htonl(INADDR_ANY) },
.sin_port = htons(2772)
};
if (bind(sock_fd, (struct sockaddr *) &server_addr, sizeof(server_addr)) < 0) {
perror("bind");
exit(1);
}
for (;;) {
fd_set read_fds;
FD_ZERO(&read_fds);
FD_SET(sock_fd, &read_fds);
struct timeval timeout = { .tv_sec = 0, .tv_usec = 14000 };
int ready = select(sock_fd+1, &read_fds, NULL, NULL, &timeout);
if (ready < 0) {
perror("select");
exit(1);
}
if (ready > 0) {
ssize_t nread = recv(sock_fd, read_buf, BUF_SIZE, 0);
if (nread == BUF_SIZE) {
memcpy(display_buf, read_buf, BUF_SIZE);
} else {
fprintf(stderr, "short packet: %d bytes\n", nread);
}
}
write_bits(0, 32, 0);
for (int i=102; i>=0; i--) {
write_bits(0, 16, display_buf[i] | 0x8000);
}
write_bits(1, 32, 0);
for (int i=103; i<198; i++) {
write_bits(1, 16, display_buf[i] | 0x8000);
}
}
return 0;
}
@iwanna-bot
Copy link

import itertools
import colorsys
import datetime
import irc.bot
import random
import socket
import struct
import sys
import re
from tinycss import color3

def log(*args):
sys.stdout.write('%s: %s\n' % (datetime.datetime.now(), ' '.join(args)))
sys.stdout.flush()

def rainbowify(txt):
colors = (5,8,3,10,2,6)
return '\x02' + ''.join(
'\x03%d%c' % (i, c) for c,i in zip(txt, itertools.cycle(colors)))

rainbow_yay = rainbowify('Y' + ('A' * 88) + 'Y')

class LEDs(object):
def init(self):
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

def _conv(self, x):
    if x<0: return 0
    if x>1: return 31
    return int(round(x*31))

def send(self, colors):
    dat = ''
    for cx in colors:
        r,g,b = map(self._conv, cx)
        dat += struct.pack('<H', (b << 10) | (r << 5) | g)

    self._sock.sendto(dat, ('127.0.0.1', 2772))

if len(sys.argv) > 1:
my_nick = 'LEDzplntest'
my_channel = '#ptopology-test'
else:
my_nick = 'LEDzpln'
my_channel = '#ptopology'

server = ('irc.freenode.net', 6667)

class Bot(irc.bot.SingleServerIRCBot):
def init(self):
irc.bot.SingleServerIRCBot.init(self, [server], my_nick, my_nick)
self.leds = LEDs()

def on_welcome(self, conn, event):
    log('connected')
    conn.join(my_channel)

def on_pubmsg(self, conn, event):
    match = re.match(conn.get_nickname() + '[,:] (.*)$', event.arguments[0])
    if match:
        self.do_command(conn, event.source.nick, event.target, match.group(1))

def on_privmsg(self, conn, event):
    nick = event.source.nick
    self.do_command(conn, nick, nick, event.arguments[0])

def do_command(self, conn, sender, respond_to, cmd):
    cmd = cmd.strip()
    log('commanded by', sender, 'in', respond_to, ':', cmd)

    if cmd == 'off':
        cmd = 'black'

    if cmd == 'rainbow':
        return self.do_rainbow(conn, respond_to)

    rgb = color3.parse_color_string(cmd)
    if not rgb:
        conn.privmsg(respond_to, sender + ": Sorry, I don't understand that color.")
        return
    self.leds.send([(rgb.red, rgb.green, rgb.blue)] * 198)

def do_rainbow(self, conn, target):
    phase = random.random()
    self.leds.send([colorsys.hsv_to_rgb((phase + 3*i/198.0) % 1.0, 1, 1) for i in xrange(198)])
    conn.privmsg(target, rainbow_yay)

bot = Bot()
bot.start()
ledd.c
#define _GNU_SOURCE
#include <time.h>
#include <sched.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/mman.h>
#include <netinet/in.h>
#include <sys/select.h>
#include <sys/socket.h>

#define BCM2708_PERI_BASE 0x20000000
#define GPIO_BASE (BCM2708_PERI_BASE + 0x200000)

#define PAGE 4096

#define DAT_A 24
#define CLK_A 23
#define DAT_B 8
#define CLK_B 25

volatile unsigned *gpio;

inline void setup_output(uint32_t g) {
*(gpio+((g)/10)) &= ~(7<<(((g)%10)*3));
*(gpio+((g)/10)) |= (1<<(((g)%10)*3));
}

inline void set(uint32_t mask) {
*(gpio+7) = mask;
}

inline void clr(uint32_t mask) {
*(gpio+10) = mask;
}

void setup_gpio() {
int mem_fd;
void *map;

if ((mem_fd = open("/dev/mem", O_RDWR|O_SYNC)) < 0) {
    perror("open");
    exit(1);
}

if ((map = mmap(NULL, PAGE, PROT_READ|PROT_WRITE, MAP_SHARED, mem_fd, GPIO_BASE))
        == MAP_FAILED) {
    perror("mmap");
    exit(1);
}

gpio = (volatile unsigned *) map;
close(mem_fd);

setup_output(DAT_A);
setup_output(CLK_A);
setup_output(DAT_B);
setup_output(CLK_B);

}

void delay() {
// 1 loop iter ~= 4.35 ns
// max clock: 25 MHz = 40 ns delay
for (size_t i=0; i<200; i++) {
asm volatile("nop");
}
/*
struct timespec timeout = { .tv_sec = 0, .tv_nsec = 40 };
nanosleep(&timeout, NULL);
*/
}

void write_bits(size_t strip, size_t n, uint32_t bits) {
uint32_t dat = 1 << (strip ? DAT_B : DAT_A);
uint32_t clk = 1 << (strip ? CLK_B : CLK_A);

uint32_t mask = (1 << (n-1));
while (mask) {
    if (bits & mask)
        set(dat);
    else
        clr(dat);

    delay();
    set(clk);
    delay();
    clr(clk);

    mask >>= 1;
}

}

#define CHANNELS 198
#define BUF_SIZE (CHANNELS * 2)

uint16_t display_buf[CHANNELS];
uint8_t read_buf[BUF_SIZE];

int main() {
struct sched_param sched_param
= { .sched_priority = 99 };

setup_gpio();
if (sched_setscheduler(0, SCHED_FIFO, &sched_param) < 0) {
    perror("sched_setscheduler");
    exit(1);
}

/*
setresuid(65534, 65534, 65534);
setresgid(65534, 65534, 65534);
*/

int sock_fd;
if ((sock_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
    perror("socket");
    exit(1);
}

struct sockaddr_in server_addr = {
    .sin_family = AF_INET,
    .sin_addr = { .s_addr = htonl(INADDR_ANY) },
    .sin_port = htons(2772)
};
if (bind(sock_fd, (struct sockaddr *) &server_addr, sizeof(server_addr)) < 0) {
    perror("bind");
    exit(1);
}

for (;;) {
    fd_set read_fds;
    FD_ZERO(&read_fds);
    FD_SET(sock_fd, &read_fds);

    struct timeval timeout = { .tv_sec = 0, .tv_usec = 14000 };
    int ready = select(sock_fd+1, &read_fds, NULL, NULL, &timeout);
    if (ready < 0) {
        perror("select");
        exit(1);
    }

    if (ready > 0) {
        ssize_t nread = recv(sock_fd, read_buf, BUF_SIZE, 0);
        if (nread == BUF_SIZE) {
            memcpy(display_buf, read_buf, BUF_SIZE);
        } else {
            fprintf(stderr, "short packet: %d bytes\n", nread);
        }
    }

    write_bits(0, 32, 0);
    for (int i=102; i>=0; i--) {
        write_bits(0, 16, display_buf[i] | 0x8000);
    }

    write_bits(1, 32, 0);
    for (int i=103; i<198; i++) {
        write_bits(1, 16, display_buf[i] | 0x8000);
    }
}

return 0;

}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment