Created
April 22, 2017 15:51
-
-
Save pbfy0/e18e04f825bd91fb4c846b2db5e01fc8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from ctypes import BigEndianStructure as Structure | |
from ctypes import Union as union | |
import ctypes as c | |
from enum import Enum | |
import io | |
import struct | |
import math | |
class RobotModes(Enum): | |
#DISABLED = 0 | |
TELEOP = 0 | |
TEST = 1 | |
AUTO = 2 | |
ESTOPPED = 0x80 | |
M_ENABLED = 4 | |
class TeamStation(Enum): | |
RED_1 = 0 | |
RED_2 = 1 | |
RED_3 = 2 | |
BLUE_1 = 3 | |
BLUE_2 = 4 | |
BLUE_3 = 5 | |
class RobotMode: | |
def __init__(self, val=0): | |
self.mode = 0 | |
self.update(val) | |
def update(self, val): | |
if self.mode != RobotModes.ESTOPPED: self.mode = RobotModes(val & ~4) | |
self.enabled = bool(val & 4) | |
def __int__(self): | |
return self.mode.value | (4 if self.enabled else 0) | |
def __repr__(self): | |
return 'RobotMode(enabled={}, mode={})'.format(self.enabled, self.mode) | |
#class RobotModeStruct(Structure): | |
# _fields_ = [('test', c.c_uint, 1), | |
# ('auto', c.c_uint, 1), | |
# ('teleop', c.c_uint, 1), | |
# ('enabled', c.c_uint, 1), | |
# ('reserved', c.c_uint, 4), | |
# ('estopped', c.c_uint, 1)] | |
class TaggedStruct(Structure): | |
_fields_ = [('tag', c.c_uint8)] | |
_tag = 0 | |
def __init__(self, **kwargs): | |
if 'tag' not in kwargs: kwargs['tag'] = type(self)._tag | |
super().__init__(**kwargs) | |
class StatusPacket(TaggedStruct): | |
_tag = 1 | |
class BatteryVoltage(Structure): | |
_fields_ = [('integer', c.c_uint8), | |
('decimal', c.c_uint8)] | |
def f_set(self, val): | |
self.integer = int(val) | |
self.decimal = int(val * 100) % 100 | |
def __float__(self): | |
return self.integer + self.decimal / 100. | |
def __str__(self): | |
return '{}.{:02}'.format(self.integer, self.decimal) | |
_fields_ = [('mode', c.c_uint8), | |
('status', c.c_uint8), | |
('bat_voltage', BatteryVoltage), | |
('request', c.c_uint8)] | |
tags = {} | |
def r_tag(tag): | |
def _wrap(cls): | |
tags[tag] = cls | |
return cls | |
return _wrap | |
class RecievePacket(Structure): | |
@classmethod | |
def i_read(cls, state, bytes): | |
val = cls.from_buffer_copy(bytes) | |
val.read(state) | |
@r_tag(1) | |
class ControlPacket(RecievePacket): | |
name = 'control' | |
_fields_ = [('mode', c.c_uint8), # control | |
('request', c.c_uint8), | |
('alliance', c.c_uint8), | |
] | |
@r_tag(0x0c) | |
class JoystickPacket(RecievePacket): | |
@staticmethod | |
def read(rs, joysticks): | |
rs.joysticks = rs.joysticks[:len(joysticks)] | |
for i in range(len(joysticks) - len(rs.joysticks)): rs.joysticks.append(JoystickState()) | |
for n, o in zip(joysticks, rs.joysticks): | |
o.update(n) | |
pass | |
class ButtonState: | |
def __init__(self, val, n): | |
self.n = n | |
self.val = val | |
def __getitem__(self, n): | |
return bool(self.val & 1<<n) | |
def __len__(self): | |
return self.n | |
def __repr__(self): | |
buttons = ('{:0' + str(self.n) + 'b}').format(self.val) | |
return 'ButtonState(0b{}, {})'.format(buttons, self.n) | |
def __iter__(self): | |
for i in range(len(self)): | |
yield self[i] | |
raise StopIteration | |
class JoystickState: | |
def __init__(self, axes=[], buttons=None): | |
self.axes = axes | |
self.buttons = buttons | |
def update(self, b): | |
rio = io.BytesIO(b) | |
n_axes = rio.read(1)[0]#struct.unpack('>B', rio.read(1)) | |
#if n_axes != len(self.axes): self.axes = [None] * n_axes | |
d = rio.read(n_axes) | |
self.axes = [((x ^ 0x80) - 0x80) / 128 for x in d] | |
n_buttons = rio.read(1)[0]#struct.unpack('>B', rio.read(1)) | |
x = 0 | |
#for i in range(0, n_buttons, 8): | |
# x <<= 8 | |
# x |= rio.read(1)[0] | |
for c in rio.read(math.ceil(n_buttons / 8)): | |
x <<= 8 | |
x |= c | |
self.buttons = ButtonState(x, n_buttons) | |
# for i in range(8): | |
# self.buttons.append(bool(x & 1)) | |
# x >>= 1 | |
# bl -= 1 | |
# if bl == 0: break | |
#print(repr(rio.read())) | |
def __repr__(self): | |
return 'JoystickState(axes=[{}], buttons={})'.format(', '.join(str(x) for x in self.axes), self.buttons) | |
def tag_read(type, f): | |
return type.from_buffer_copy(f.read(c.sizeof(type))) | |
class RobotState: | |
def __init__(self): | |
#self.status = None | |
self.voltage = 6.39 | |
self.mode = RobotMode() | |
self.alliance = None | |
self.status = 0x20 | |
self.joysticks = [] | |
def __repr__(self): | |
return 'RobotState(mode={}, alliance={}, joysticks={})'.format(self.mode, self.alliance , self.joysticks) | |
def recv(self, pkt): | |
f = io.BytesIO(pkt) | |
pn, = struct.unpack_from('>H', f.read(2)) | |
tag = f.read(1)[0]#, = struct.unpack_from('>B', f.read(1)) | |
#print(tag) | |
assert(tag == 1) # right? | |
self.read_control(tag_read(ControlPacket, f)) | |
joysticks = [] | |
while True: | |
x = f.read(2) | |
if x == b'': break | |
len, tag = struct.unpack('>BB', x) # might be uint16 | |
#print(len, tag, repr(f.read(len))) | |
# why is the protocol so weird | |
t = tags[tag] | |
if t == JoystickPacket: | |
joysticks.append(f.read(len)) | |
else: | |
t.i_read(self, f.read(len)) | |
if joysticks: JoystickPacket.read(self, joysticks) | |
#o.read(self) | |
# deal with it somehow | |
return pn | |
def read_control(self, packet): | |
self.mode.update(packet.mode) | |
#print('R', packet.mode) | |
if packet.request: self.request(packet.request) | |
self.alliance = TeamStation(packet.alliance) | |
def write_status(self): | |
p = StatusPacket() | |
p.mode = int(self.mode) | |
#print('S', p.mode) | |
p.request = 0 # whatever | |
p.bat_voltage.f_set(self.voltage) | |
p.status = self.status | |
p.tag = 1 | |
return p | |
def send(self, pn): | |
f = io.BytesIO() | |
f.write(struct.pack('>H', pn)) | |
f.write(self.write_status()) | |
#print(pn) | |
f.seek(0) | |
return f.read() | |
def respond(self, pkt): | |
return self.send(self.recv(pkt)) | |
def request(self, req): | |
#print('{:02x}'.format(req)) | |
pass | |
#if r | |
#print(req) | |
import socketserver | |
state = RobotState() | |
class MyUDPHandler(socketserver.BaseRequestHandler): | |
""" | |
This class works similar to the TCP handler class, except that | |
self.request consists of a pair of data and client socket, and since | |
there is no connection the client address must be given explicitly | |
when sending data back via sendto(). | |
""" | |
def handle(self): | |
data = self.request[0]#.strip() | |
socket = self.request[1] | |
#print("{} wrote:".format(self.client_address[0])) | |
resp = state.respond(data) | |
print(state) | |
#print(resp) | |
#print(repr(data)) | |
#a = bytes(state.send()) | |
socket.sendto(resp, (self.client_address[0], 1150)) | |
if __name__ == "__main__": | |
HOST, PORT = "localhost", 1110 | |
server = socketserver.UDPServer((HOST, PORT), MyUDPHandler) | |
server.serve_forever() | |
z = b'2\xc5\x01\x00\x00\x00\x00\x00' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment