Skip to content

Instantly share code, notes, and snippets.

@zarya
Created February 8, 2020 01:25
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zarya/3d5447556296d6923bee232e4cc7fca1 to your computer and use it in GitHub Desktop.
Save zarya/3d5447556296d6923bee232e4cc7fca1 to your computer and use it in GitHub Desktop.
import socket
import struct
from PIL import Image
class PixelClient():
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
buffer = []
def __init__(self, host: str, port=5004):
self.host = host
self.port = port
self.verModeString = self.SetVersionBit(1) + self.SetRGBAMode(0)
def _send(self, data: bytes):
""" Send data to the server"""
self.sock.sendto(data, (self.host, self.port))
def flush(self):
""" Send everything that is inside the buffer to the server"""
self._send(self.verModeString + b"".join(self.buffer))
self.buffer = []
def RGBPixel(self, x: int, y: int, r: int, g: int, b: int, a=None):
"""Generates the packed data for a pixel"""
if a:
self.buffer.append(bytes(struct.pack("<2H4B", x, y, r, g, b, a)))
self.buffer.append(bytes(struct.pack("<2H3B", x, y, r, g, b)))
@staticmethod
def SetRGBAMode(mode: int) -> bytes:
"""
Generate the rgb/rgba bit
0 for RGB and 1 for RGBA
"""
return struct.pack("<?", mode)
@staticmethod
def SetVersionBit(protocol=1) -> bytes:
"""Generate the Version bit"""
return struct.pack("<B", protocol)
import time
import pyaudio
import numpy as np
import audioop
from pixelflut import PixelClient
from colour import Color
CHUNK = 256
RATE = 4000
maxValue = 2**16
bars=128
barHeight=8
scaleL=500
scaleR=500
maxDB = [0,0] #last peak value
maxDBT = [0,0] #last peak time
avrScale = [1]
DBALength = 100
startY = 17
endY = 32
pixelflut = PixelClient('10.208.42.159', 5004)
colors = list(Color("green").range_to(Color("red"), 129))
r = []
g = []
b = []
for color in colors:
r.append(int(color.red * 255))
g.append(int(color.green * 255))
b.append(int(color.blue * 255))
p=pyaudio.PyAudio() # start the PyAudio class
stream=p.open(
format=pyaudio.paInt16,
channels=2, rate=RATE,
input=True,
frames_per_buffer=CHUNK) #uses default input device
def db_level(data, samplewidth=2, rms_mode=False):
maxvalue = 2**(8*samplewidth-1)
left_frames = audioop.tomono(data, samplewidth, 1, 0)
right_frames = audioop.tomono(data, samplewidth, 0, 1)
if rms_mode:
peak_left = (audioop.rms(left_frames, samplewidth)+1)/maxvalue
peak_right = (audioop.rms(right_frames, samplewidth)+1)/maxvalue
else:
peak_left = (audioop.max(left_frames, samplewidth)+1)/maxvalue
peak_right = (audioop.max(right_frames, samplewidth)+1)/maxvalue
return peak_left * 1000, peak_right * 1000
def valmap(value, istart, istop, ostart, ostop):
return int(ostart + (ostop - ostart) * ((value - istart) / (istop - istart)))
def vuPixel(channel,value):
startY = 16 + (barHeight * channel)
if value > 127:
value = 127
if maxDB[channel] < value:
maxDB[channel] = value
maxDBT[channel] = time.time()
avrScale.insert(0, value)
if len(avrScale) > DBALength:
avrScale.pop()
for y in range(startY, startY+barHeight):
for x in range(0,value):
pixelflut.RGBPixel(x,y,r[x],g[x],b[x])
for x in range(value,128):
pixelflut.RGBPixel(x,y,0,0,0)
pixelflut.RGBPixel(maxDB[channel],y,255,0,0)
while True:
dbm = db_level(stream.read(CHUNK))
scale = int(sum(avrScale) / len(avrScale))
if scale < 10:
scale = 500
else:
scale += 50
vuPixel(0,valmap(int(dbm[0]),0,scale,0,128))
vuPixel(1,valmap(int(dbm[1]),0,scale,0,128))
pixelflut.flush()
now = time.time()
if (now - maxDBT[0]) > 1.5:
maxDB[0] = 1
if (now - maxDBT[1]) > 1.5:
maxDB[1] = 1
stream.stop_stream()
stream.close()
p.terminate()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment