Skip to content

Instantly share code, notes, and snippets.

@aschokking
Last active March 22, 2021 16:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aschokking/da378442463bc0ac598187d50e726294 to your computer and use it in GitHub Desktop.
Save aschokking/da378442463bc0ac598187d50e726294 to your computer and use it in GitHub Desktop.
Jenkins light control
# Limbo slack chat bot framework plugin
import re
import logging
from datetime import datetime
import requests
_log = logging.getLogger('status_blinkm.py')
failed_repos = {}
def on_message(msg, server):
text = msg.get("text", "")
if not text:
if msg.get("attachments"):
text = msg['attachments'][0]['fallback']
if re.match(r"!jenkins_failures", text):
return "Current failures: %s" % failed_repos
if re.match(r"!jenkins_reset", text):
failed_repos.clear()
all_passed()
return "Jenkins state reset"
if re.match(r"Deployer finished sending notification email.*(SUCCESS|FAILURE).*", text):
blinkm.send_arg_to_photon('rainbow')
if 'non_gating' in text:
return
fail_match = re.match(r"(.*) - .* Failure after", text)
normal_match = re.match(r"(.*) - .* Back to normal after", text)
if fail_match:
repo = fail_match.groups()[0]
if repo not in failed_repos:
failed_repos[repo] = datetime.now()
elif normal_match:
repo = normal_match.groups()[0]
if repo in failed_repos:
failed_repos.pop(repo)
if failed_repos:
# lights red
failure()
else:
#lights green
all_passed()
def failure():
send_arg_to_photon('red')
def all_passed():
send_arg_to_photon('green')
def send_arg_to_photon(arg):
try:
result = requests.post(
'https://api.particle.io/v1/devices/#####################################', # obfuscated
data={'args': arg}
)
if result.status_code != 200:
_log.warn("Post didn't 200")
except:
_log.warn("Failed to make post request")
#include "application.h"
#include "neopixel/neopixel.h"
SYSTEM_MODE(AUTOMATIC);
// IMPORTANT: Set pixel COUNT, PIN and TYPE
#define PIXEL_PIN D2
#define PIXEL_COUNT 60
#define PIXEL_TYPE WS2812B
Adafruit_NeoPixel strip = Adafruit_NeoPixel(PIXEL_COUNT, PIXEL_PIN, PIXEL_TYPE);
void setup()
{
Spark.function("led",ledToggle);
strip.begin();
strip.show(); // Initialize all pixels to 'off'
ledToggle("0,0,10");
delay(1500);
ledToggle("green");
}
void loop()
{
}
String getValue(String data, char separator, int index)
{
int maxIndex = data.length()-1;
int j=0;
String chunkVal = "";
for(int i=0; i<=maxIndex && j<=index; i++)
{
chunkVal.concat(data[i]);
if(data[i]==separator)
{
j++;
if(j>index)
{
chunkVal.trim();
return chunkVal;
}
chunkVal = "";
}
}
}
void setColor(uint32_t color) {
int i;
for(i=0; i<strip.numPixels(); i++) {
strip.setPixelColor(i, color);
}
strip.show();
}
int ledToggle(String command) {
/* Spark.functions always take a string as an argument and return an integer.
Since we can pass a string, it means that we can give the program commands on how the function should be used.
In this case, telling the function "on" will turn the LED on and telling it "off" will turn the LED off.
Then, the function returns a value to us to let us know what happened.
In this case, it will return 1 for the LEDs turning on, 0 for the LEDs turning off,
and -1 if we received a totally bogus command that didn't do anything to the LEDs.
*/
if (command=="red") {
setColor(strip.Color(150, 0, 0));
return 1;
}
else if (command=="back_to_green") {
setColor(strip.Color(0, 255, 0));
delay(5000);
return ledToggle("green");
}
else if (command=="green") {
setColor(strip.Color(0, 15, 0));
return 1;
}
else if (command=="rainbow") {
rainbow(20);
return ledToggle("green");
}
else if (command=="rainbowforever") {
int i;
for(i=0; i<100; i++) {
rainbow(20);
}
return ledToggle("green");
}
else if (command=="flash") {
int i;
for(i=0; i<5; i++) {
setColor(strip.Color(0, 0, 255));
delay(500);
setColor(strip.Color(0, 0, 0));
delay(500);
}
return ledToggle("green");
}
else if (command=="off") {
setColor(strip.Color(0, 0, 0));
return 0;
}
else {
int r = getValue(command, ',', 0).toInt();
int g = getValue(command, ',', 1).toInt();
int b = getValue(command, ',', 2).toInt();
setColor(strip.Color(r, g, b));
//setColor(strip.Color(0, command.toInt(), 0));
return -1;
}
}
void rainbow(uint8_t wait) {
uint16_t i, j;
for(j=0; j<256; j++) {
for(i=0; i<strip.numPixels(); i++) {
strip.setPixelColor(i, Wheel((i+j) & 255));
}
strip.show();
delay(wait);
}
}
// Input a value 0 to 255 to get a color value.
// The colours are a transition r - g - b - back to r.
uint32_t Wheel(byte WheelPos) {
if(WheelPos < 85) {
return strip.Color(WheelPos * 3, 255 - WheelPos * 3, 0);
} else if(WheelPos < 170) {
WheelPos -= 85;
return strip.Color(255 - WheelPos * 3, 0, WheelPos * 3);
} else {
WheelPos -= 170;
return strip.Color(0, WheelPos * 3, 255 - WheelPos * 3);
}
}
@iwanna-bot
Copy link

import itertools
import colorsys
import datetime
import irc.bot
import random
import socket
import struct
import sys
import re
from tinycss import color3

def log(*args):
sys.stdout.write('%s: %s\n' % (datetime.datetime.now(), ' '.join(args)))
sys.stdout.flush()

def rainbowify(txt):
colors = (5,8,3,10,2,6)
return '\x02' + ''.join(
'\x03%d%c' % (i, c) for c,i in zip(txt, itertools.cycle(colors)))

rainbow_yay = rainbowify('Y' + ('A' * 88) + 'Y')

class LEDs(object):
def init(self):
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

def _conv(self, x):
    if x<0: return 0
    if x>1: return 31
    return int(round(x*31))

def send(self, colors):
    dat = ''
    for cx in colors:
        r,g,b = map(self._conv, cx)
        dat += struct.pack('<H', (b << 10) | (r << 5) | g)

    self._sock.sendto(dat, ('127.0.0.1', 2772))

if len(sys.argv) > 1:
my_nick = 'LEDzplntest'
my_channel = '#ptopology-test'
else:
my_nick = 'LEDzpln'
my_channel = '#ptopology'

server = ('irc.freenode.net', 6667)

class Bot(irc.bot.SingleServerIRCBot):
def init(self):
irc.bot.SingleServerIRCBot.init(self, [server], my_nick, my_nick)
self.leds = LEDs()

def on_welcome(self, conn, event):
    log('connected')
    conn.join(my_channel)

def on_pubmsg(self, conn, event):
    match = re.match(conn.get_nickname() + '[,:] (.*)$', event.arguments[0])
    if match:
        self.do_command(conn, event.source.nick, event.target, match.group(1))

def on_privmsg(self, conn, event):
    nick = event.source.nick
    self.do_command(conn, nick, nick, event.arguments[0])

def do_command(self, conn, sender, respond_to, cmd):
    cmd = cmd.strip()
    log('commanded by', sender, 'in', respond_to, ':', cmd)

    if cmd == 'off':
        cmd = 'black'

    if cmd == 'rainbow':
        return self.do_rainbow(conn, respond_to)

    rgb = color3.parse_color_string(cmd)
    if not rgb:
        conn.privmsg(respond_to, sender + ": Sorry, I don't understand that color.")
        return
    self.leds.send([(rgb.red, rgb.green, rgb.blue)] * 198)

def do_rainbow(self, conn, target):
    phase = random.random()
    self.leds.send([colorsys.hsv_to_rgb((phase + 3*i/198.0) % 1.0, 1, 1) for i in xrange(198)])
    conn.privmsg(target, rainbow_yay)

bot = Bot()
bot.start()
ledd.c
#define _GNU_SOURCE
#include <time.h>
#include <sched.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/mman.h>
#include <netinet/in.h>
#include <sys/select.h>
#include <sys/socket.h>

#define BCM2708_PERI_BASE 0x20000000
#define GPIO_BASE (BCM2708_PERI_BASE + 0x200000)

#define PAGE 4096

#define DAT_A 24
#define CLK_A 23
#define DAT_B 8
#define CLK_B 25

volatile unsigned *gpio;

inline void setup_output(uint32_t g) {
*(gpio+((g)/10)) &= ~(7<<(((g)%10)*3));
*(gpio+((g)/10)) |= (1<<(((g)%10)*3));
}

inline void set(uint32_t mask) {
*(gpio+7) = mask;
}

inline void clr(uint32_t mask) {
*(gpio+10) = mask;
}

void setup_gpio() {
int mem_fd;
void *map;

if ((mem_fd = open("/dev/mem", O_RDWR|O_SYNC)) < 0) {
    perror("open");
    exit(1);
}

if ((map = mmap(NULL, PAGE, PROT_READ|PROT_WRITE, MAP_SHARED, mem_fd, GPIO_BASE))
        == MAP_FAILED) {
    perror("mmap");
    exit(1);
}

gpio = (volatile unsigned *) map;
close(mem_fd);

setup_output(DAT_A);
setup_output(CLK_A);
setup_output(DAT_B);
setup_output(CLK_B);

}

void delay() {
// 1 loop iter ~= 4.35 ns
// max clock: 25 MHz = 40 ns delay
for (size_t i=0; i<200; i++) {
asm volatile("nop");
}
/*
struct timespec timeout = { .tv_sec = 0, .tv_nsec = 40 };
nanosleep(&timeout, NULL);
*/
}

void write_bits(size_t strip, size_t n, uint32_t bits) {
uint32_t dat = 1 << (strip ? DAT_B : DAT_A);
uint32_t clk = 1 << (strip ? CLK_B : CLK_A);

uint32_t mask = (1 << (n-1));
while (mask) {
    if (bits & mask)
        set(dat);
    else
        clr(dat);

    delay();
    set(clk);
    delay();
    clr(clk);

    mask >>= 1;
}

}

#define CHANNELS 198
#define BUF_SIZE (CHANNELS * 2)

uint16_t display_buf[CHANNELS];
uint8_t read_buf[BUF_SIZE];

int main() {
struct sched_param sched_param
= { .sched_priority = 99 };

setup_gpio();
if (sched_setscheduler(0, SCHED_FIFO, &sched_param) < 0) {
    perror("sched_setscheduler");
    exit(1);
}

/*
setresuid(65534, 65534, 65534);
setresgid(65534, 65534, 65534);
*/

int sock_fd;
if ((sock_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
    perror("socket");
    exit(1);
}

struct sockaddr_in server_addr = {
    .sin_family = AF_INET,
    .sin_addr = { .s_addr = htonl(INADDR_ANY) },
    .sin_port = htons(2772)
};
if (bind(sock_fd, (struct sockaddr *) &server_addr, sizeof(server_addr)) < 0) {
    perror("bind");
    exit(1);
}

for (;;) {
    fd_set read_fds;
    FD_ZERO(&read_fds);
    FD_SET(sock_fd, &read_fds);

    struct timeval timeout = { .tv_sec = 0, .tv_usec = 14000 };
    int ready = select(sock_fd+1, &read_fds, NULL, NULL, &timeout);
    if (ready < 0) {
        perror("select");
        exit(1);
    }

    if (ready > 0) {
        ssize_t nread = recv(sock_fd, read_buf, BUF_SIZE, 0);
        if (nread == BUF_SIZE) {
            memcpy(display_buf, read_buf, BUF_SIZE);
        } else {
            fprintf(stderr, "short packet: %d bytes\n", nread);
        }
    }

    write_bits(0, 32, 0);
    for (int i=102; i>=0; i--) {
        write_bits(0, 16, display_buf[i] | 0x8000);
    }

    write_bits(1, 32, 0);
    for (int i=103; i<198; i++) {
        write_bits(1, 16, display_buf[i] | 0x8000);
    }
}

return 0;

}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment