Skip to content

Instantly share code, notes, and snippets.

@Sasszem
Created July 17, 2018 23:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Sasszem/090a3860b4c6a792128a641be7c10508 to your computer and use it in GitHub Desktop.
Save Sasszem/090a3860b4c6a792128a641be7c10508 to your computer and use it in GitHub Desktop.
A socket wrapper to help write custom protocols
import json
import struct
import array
def _toBytes(list):
return array.array('B', list).tostring()
def _toId(int):
return _toBytes((int %256, int//256))
class SocketConnector(object):
IDs={"byte": _toBytes([1, 0]),
"short":_toBytes([2, 0]),
"int":_toBytes([3, 0]),
"float":_toBytes([4, 0])} # Name => Id
_IDs_reversed={v: k for k, v in IDs.items()}
_lastid=len(IDs)
@classmethod
def _createType(self, type):
if len(type)==4: #It's a function based one
self.senders[type[0]]=type[-1]
self.readers[type[1]]=type[2]
self.IDs[type[0]]=type[1]
self._IDs_reversed[type[1]]=type[0]
elif len(type)==3: #It's a compbound one
self.IDs[type[0]]=type[1]
self._IDs_reversed[type[1]]=type[0]
fields=type[-1]
c=len(fields)
def send(self, data):
return b"".join(self.senders[fields[i]](self, data[i]) for i in range(c))
def read(self):
return [self.readers[self.IDs[fields[i]]](self) for i in range(c)]
self.readers[type[1]]=read
self.senders[type[0]]=send
@classmethod
def load(self, files):
if isinstance(files, list):
for f in files: self._load(f)
elif isinstance(files, str):
self._load(files)
else:
raise Exception("Sc.load takes str or list of multiple str, not %s"%type(files))
self.IDs.update({v: k for k, v in self.IDs.items()})
@classmethod
def _load(self, file):
f=open(file, "r")
data=json.loads(f.read())
f.close()
if "module" in data:
m= __import__(data['module'])
for tp in data["func_types"]:
self._lastid+=1
name=tp
reader=getattr(m, "read_"+tp)
sender=getattr(m, "send_"+tp)
self._createType((name, _toId(self._lastid), reader, sender))
print("Registrated type: %s with id %s"%(name, self._lastid))
if "comp_types" in data:
for tp in data["comp_types"]:
self._lastid+=1
name=tp[0]
fields=tp[1]
self._createType((name, _toId(self._lastid), fields))
print("Registrated type: %s with id %s"%(name, self._lastid))
def __init__(self,sock,return_types=False, return_types_as_str=1):
self.sock=sock
self.intpacker=struct.Struct("i")
self.shortpacker=struct.Struct("h")
self.floatpacker=struct.Struct("f")
self._typeConverter = (lambda x: self._IDs_reversed[x]) if return_types_as_str else (lambda x: x)
if return_types:
self.read=self.read_types
def send(self, id, payload):
resoult=self.IDs[id]
p=self.senders[id](self, payload)
resoult+=p
self.send_raw(resoult)
def read(self):
id=self.read_raw(2)
return self.readers[id](self)
def read_types(self):
id=self.read_raw(2)
return self._typeConverter(id),self.readers[id](self)
def send_byte(self, b):
return b
def send_short(self, s):
return self.shortpacker.pack(s)
def send_int(self, i):
return self.intpacker.pack(i)
def send_float(self, f):
return self.floatpacker.pack(f)
senders={"byte":send_byte,
"short":send_short,
"int":send_int,
"float":send_float} #Name => function
def read_byte(self):
return self.sock.read(1)
def read_short(self):
return self.shortpacker.unpack(self.sock.recv(2))[0]
def read_int(self):
return self.intpacker.unpack(self.sock.recv(4))[0]
def read_float(self):
return self.floatpacker.unpack(self.sock.recv(4))[0]
readers={IDs["byte"]:read_byte,
IDs["short"]:read_short,
IDs["int"]:read_int,
IDs["float"]:read_float} #ID => function
def close(self):
self.sock.close()
def send_raw(self, data):
self.sock.send(data)
def read_raw(self, l):
return self.sock.recv(l)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
pass
Sc=SocketConnector
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment