Skip to content

Instantly share code, notes, and snippets.

@devilholk
Created August 31, 2016 12:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save devilholk/68d4677b55f5070e97893eb5f46ddb20 to your computer and use it in GitHub Desktop.
Save devilholk/68d4677b55f5070e97893eb5f46ddb20 to your computer and use it in GitHub Desktop.
import struct
def Align(data, size=4, padding=b'\x00'):
return data + ((size-(len(data) % size)) % size) * padding
def Aligned_split(data, char, size=4):
i = data.index(char)
i += ((size-(i % size)) % size)
return data[:i], data[i:]
class OSC_Controller:
def __init__(self, address):
self.struct_fmt = '>'
for f in self.Format:
if f == 'f':
self.struct_fmt += 'f'
else:
raise NotImplementedError
self.address = Align(bytes(address, encoding='utf-8') + b'\x00') #Seems we need to pad this, apparently /1/push1 which is 8 bytes becomes /1/push1____ which is 12 bytes (_ = \x00)
self.format = Align(b','+ bytes(self.Format, encoding='utf-8'))
if hasattr(self, 'post_init'):
self.post_init()
def decode(self, data):
fmt, pkg = Aligned_split(data, b'\x00')
assert fmt == self.format, 'Format mismatch'
self.set_data( *struct.unpack(self.struct_fmt, pkg) )
def set_data(self, *data):
self.value = data
def encode(self, *values):
data = Align(struct.pack(self.struct_fmt, *values))
return self.address + self.format + data
class XY(OSC_Controller):
Format = 'ff'
class Encoder(OSC_Controller):
Format = 'f'
class Pushbutton(OSC_Controller):
Format = 'f'
class LED(OSC_Controller):
Format = 'f'
import socket, threading, time, functools
import osc_protocol
# Create a TCP/IP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Bind the socket to the port
server_address = ('0.0.0.0', 8000)
print ('starting up on %s port %s' % server_address)
sock.bind(server_address)
r_adr = None
r_port = None
controllers = dict()
def process_incoming():
global r_adr, r_port, controllers
while True:
data, address = sock.recvfrom(4096)
r_adr, r_port = address
rawadr, packet = osc_protocol.Aligned_split(data, b',')
if rawadr in controllers:
controllers[rawadr].decode(packet)
else:
print(data, address)
class Joystick(osc_protocol.XY):
def post_init(self):
self.current_x = 0.5
self.current_y = 0.5
def think(self):
self.current_x = self.current_x * 0.9 + 0.5*0.1
self.current_y = self.current_y * 0.9 + 0.5*0.1
def get_data(self):
return self.encode(self.current_x, self.current_y)
def set_data(self, x, y):
self.current_x = self.current_x * 0.75 + x * 0.25
self.current_y = self.current_y * 0.75 + y * 0.25
class Encoder(osc_protocol.Encoder):
def set_data(self, step):
print(self.address, step)
class Pushbutton(osc_protocol.Pushbutton):
def post_init(self):
self.state = False
self.last_state = False
self.on_push = None
def set_data(self, value):
self.state = value == 1.0
if self.state and not self.last_state:
if self.on_push:
self.on_push()
self.last_state = self.state
class LED(osc_protocol.LED):
def post_init(self):
self.blinking = False
self.state = False
def think(self):
if self.blinking:
self.state = (time.time() % 0.5) < 0.25
else:
self.state = False
def get_data(self):
return self.encode(1.0 if self.state else 0.0)
def add_control(control):
global controllers
assert control.address not in controllers, 'Address already taken: %r' % (control.address,)
controllers[control.address] = control
return control
js1 = add_control(Joystick('/1/xy1'))
js2 = add_control(Joystick('/1/xy2'))
js3 = add_control(Joystick('/1/xy3'))
js4 = add_control(Joystick('/1/xy4'))
js5 = add_control(Joystick('/1/xy5'))
js6 = add_control(Joystick('/1/xy6'))
enc1 = add_control(Encoder('/1/encoder1'))
enc2 = add_control(Encoder('/1/encoder2'))
enc3 = add_control(Encoder('/1/encoder3'))
enc4 = add_control(Encoder('/1/encoder4'))
push1 = add_control(Pushbutton('/1/push1'))
push2 = add_control(Pushbutton('/1/push2'))
push3 = add_control(Pushbutton('/1/push7'))
push4 = add_control(Pushbutton('/1/push8'))
push5 = add_control(Pushbutton('/1/push9'))
push6 = add_control(Pushbutton('/1/push10'))
led1 = add_control(LED('/1/led1'))
led2 = add_control(LED('/1/led2'))
led3 = add_control(LED('/1/led7'))
led4 = add_control(LED('/1/led8'))
led5 = add_control(LED('/1/led9'))
led6 = add_control(LED('/1/led10'))
def toggle(led):
led.blinking = not led.blinking
push1.on_push = functools.partial(toggle, led1)
push2.on_push = functools.partial(toggle, led2)
push3.on_push = functools.partial(toggle, led3)
push4.on_push = functools.partial(toggle, led4)
push5.on_push = functools.partial(toggle, led5)
push6.on_push = functools.partial(toggle, led6)
in_thread = threading.Thread(target=process_incoming)
in_thread.start()
while 1:
time.sleep(0.01)
for controller in controllers.values():
if hasattr(controller, 'think'):
controller.think()
if hasattr(controller, 'get_data'):
if r_adr:
#Now we send always but ofcourse it would be better to only send if changed (TODO)
sock.sendto(controller.get_data(), (r_adr, 9000))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment