Skip to content

Instantly share code, notes, and snippets.

@pbfy0
Created April 22, 2017 15:51
Show Gist options
  • Save pbfy0/e18e04f825bd91fb4c846b2db5e01fc8 to your computer and use it in GitHub Desktop.
Save pbfy0/e18e04f825bd91fb4c846b2db5e01fc8 to your computer and use it in GitHub Desktop.
from ctypes import BigEndianStructure as Structure
from ctypes import Union as union
import ctypes as c
from enum import Enum
import io
import struct
import math
class RobotModes(Enum):
#DISABLED = 0
TELEOP = 0
TEST = 1
AUTO = 2
ESTOPPED = 0x80
M_ENABLED = 4
class TeamStation(Enum):
RED_1 = 0
RED_2 = 1
RED_3 = 2
BLUE_1 = 3
BLUE_2 = 4
BLUE_3 = 5
class RobotMode:
def __init__(self, val=0):
self.mode = 0
self.update(val)
def update(self, val):
if self.mode != RobotModes.ESTOPPED: self.mode = RobotModes(val & ~4)
self.enabled = bool(val & 4)
def __int__(self):
return self.mode.value | (4 if self.enabled else 0)
def __repr__(self):
return 'RobotMode(enabled={}, mode={})'.format(self.enabled, self.mode)
#class RobotModeStruct(Structure):
# _fields_ = [('test', c.c_uint, 1),
# ('auto', c.c_uint, 1),
# ('teleop', c.c_uint, 1),
# ('enabled', c.c_uint, 1),
# ('reserved', c.c_uint, 4),
# ('estopped', c.c_uint, 1)]
class TaggedStruct(Structure):
_fields_ = [('tag', c.c_uint8)]
_tag = 0
def __init__(self, **kwargs):
if 'tag' not in kwargs: kwargs['tag'] = type(self)._tag
super().__init__(**kwargs)
class StatusPacket(TaggedStruct):
_tag = 1
class BatteryVoltage(Structure):
_fields_ = [('integer', c.c_uint8),
('decimal', c.c_uint8)]
def f_set(self, val):
self.integer = int(val)
self.decimal = int(val * 100) % 100
def __float__(self):
return self.integer + self.decimal / 100.
def __str__(self):
return '{}.{:02}'.format(self.integer, self.decimal)
_fields_ = [('mode', c.c_uint8),
('status', c.c_uint8),
('bat_voltage', BatteryVoltage),
('request', c.c_uint8)]
tags = {}
def r_tag(tag):
def _wrap(cls):
tags[tag] = cls
return cls
return _wrap
class RecievePacket(Structure):
@classmethod
def i_read(cls, state, bytes):
val = cls.from_buffer_copy(bytes)
val.read(state)
@r_tag(1)
class ControlPacket(RecievePacket):
name = 'control'
_fields_ = [('mode', c.c_uint8), # control
('request', c.c_uint8),
('alliance', c.c_uint8),
]
@r_tag(0x0c)
class JoystickPacket(RecievePacket):
@staticmethod
def read(rs, joysticks):
rs.joysticks = rs.joysticks[:len(joysticks)]
for i in range(len(joysticks) - len(rs.joysticks)): rs.joysticks.append(JoystickState())
for n, o in zip(joysticks, rs.joysticks):
o.update(n)
pass
class ButtonState:
def __init__(self, val, n):
self.n = n
self.val = val
def __getitem__(self, n):
return bool(self.val & 1<<n)
def __len__(self):
return self.n
def __repr__(self):
buttons = ('{:0' + str(self.n) + 'b}').format(self.val)
return 'ButtonState(0b{}, {})'.format(buttons, self.n)
def __iter__(self):
for i in range(len(self)):
yield self[i]
raise StopIteration
class JoystickState:
def __init__(self, axes=[], buttons=None):
self.axes = axes
self.buttons = buttons
def update(self, b):
rio = io.BytesIO(b)
n_axes = rio.read(1)[0]#struct.unpack('>B', rio.read(1))
#if n_axes != len(self.axes): self.axes = [None] * n_axes
d = rio.read(n_axes)
self.axes = [((x ^ 0x80) - 0x80) / 128 for x in d]
n_buttons = rio.read(1)[0]#struct.unpack('>B', rio.read(1))
x = 0
#for i in range(0, n_buttons, 8):
# x <<= 8
# x |= rio.read(1)[0]
for c in rio.read(math.ceil(n_buttons / 8)):
x <<= 8
x |= c
self.buttons = ButtonState(x, n_buttons)
# for i in range(8):
# self.buttons.append(bool(x & 1))
# x >>= 1
# bl -= 1
# if bl == 0: break
#print(repr(rio.read()))
def __repr__(self):
return 'JoystickState(axes=[{}], buttons={})'.format(', '.join(str(x) for x in self.axes), self.buttons)
def tag_read(type, f):
return type.from_buffer_copy(f.read(c.sizeof(type)))
class RobotState:
def __init__(self):
#self.status = None
self.voltage = 6.39
self.mode = RobotMode()
self.alliance = None
self.status = 0x20
self.joysticks = []
def __repr__(self):
return 'RobotState(mode={}, alliance={}, joysticks={})'.format(self.mode, self.alliance , self.joysticks)
def recv(self, pkt):
f = io.BytesIO(pkt)
pn, = struct.unpack_from('>H', f.read(2))
tag = f.read(1)[0]#, = struct.unpack_from('>B', f.read(1))
#print(tag)
assert(tag == 1) # right?
self.read_control(tag_read(ControlPacket, f))
joysticks = []
while True:
x = f.read(2)
if x == b'': break
len, tag = struct.unpack('>BB', x) # might be uint16
#print(len, tag, repr(f.read(len)))
# why is the protocol so weird
t = tags[tag]
if t == JoystickPacket:
joysticks.append(f.read(len))
else:
t.i_read(self, f.read(len))
if joysticks: JoystickPacket.read(self, joysticks)
#o.read(self)
# deal with it somehow
return pn
def read_control(self, packet):
self.mode.update(packet.mode)
#print('R', packet.mode)
if packet.request: self.request(packet.request)
self.alliance = TeamStation(packet.alliance)
def write_status(self):
p = StatusPacket()
p.mode = int(self.mode)
#print('S', p.mode)
p.request = 0 # whatever
p.bat_voltage.f_set(self.voltage)
p.status = self.status
p.tag = 1
return p
def send(self, pn):
f = io.BytesIO()
f.write(struct.pack('>H', pn))
f.write(self.write_status())
#print(pn)
f.seek(0)
return f.read()
def respond(self, pkt):
return self.send(self.recv(pkt))
def request(self, req):
#print('{:02x}'.format(req))
pass
#if r
#print(req)
import socketserver
state = RobotState()
class MyUDPHandler(socketserver.BaseRequestHandler):
"""
This class works similar to the TCP handler class, except that
self.request consists of a pair of data and client socket, and since
there is no connection the client address must be given explicitly
when sending data back via sendto().
"""
def handle(self):
data = self.request[0]#.strip()
socket = self.request[1]
#print("{} wrote:".format(self.client_address[0]))
resp = state.respond(data)
print(state)
#print(resp)
#print(repr(data))
#a = bytes(state.send())
socket.sendto(resp, (self.client_address[0], 1150))
if __name__ == "__main__":
HOST, PORT = "localhost", 1110
server = socketserver.UDPServer((HOST, PORT), MyUDPHandler)
server.serve_forever()
z = b'2\xc5\x01\x00\x00\x00\x00\x00'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment