Skip to content

Instantly share code, notes, and snippets.

@zusitools
Created August 23, 2016 08:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zusitools/f2b924ee783cf73481526578f627653c to your computer and use it in GitHub Desktop.
Save zusitools/f2b924ee783cf73481526578f627653c to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import socket
import struct
class Node:
def __init__(self, id, attribs=[], children=[]):
self.id = id
self.attribs = attribs
self.children = children
def send(self):
# print(self, "sending node id")
result = struct.pack("<Ih", 0x00000000, self.id)
for attrib in self.attribs:
# print(self, "sending attrib", attrib)
data = attrib.send()
# print(data)
result += data
for child in self.children:
# print(self, "sending child", child)
data = child.send()
# print(data)
result += data
# print(self, "sending node end")
result += struct.pack("<I", 0xffffffff)
return result
class Attribute:
def __init__(self, id, data):
self.id = id
self.data = data
self.datalen = 0
class WordAttribute(Attribute):
def send(self):
return struct.pack("<Ihh", 2 + 2, self.id, self.data)
class StringAttribute(Attribute):
def send(self):
return struct.pack("<Ih", 2 + len(self.data), self.id) + bytearray(self.data, 'ASCII')
def send(s, root):
data = root.send()
print("sending data", data, 'len', len(data))
s.send(data)
def parse(f, indent = 0):
laenge = struct.unpack("<I", f.read(4))[0]
if laenge == 0xFFFFFFFF:
return False
id = struct.unpack("<h", f.read(2))[0]
if laenge == 0:
print((" " * indent) + "Knoten, ID = %d" % id)
while parse(f, indent + 1):
pass
else:
print((" " * indent) + "Attribut, ID = %d, Laenge %d" % (id, laenge))
daten = f.read(laenge - 2)
if len(daten) == 4:
print((" " * indent) + "Daten: %f" % struct.unpack("<f", daten)[0])
else:
print((" " * indent) + "Daten: %s" % daten)
return True
if __name__ == "__main__":
HelloNode = Node(0x0001, [], [
Node(0x0001, [
WordAttribute(0x0001, 0x0002),
WordAttribute(0x0002, 0x0002),
StringAttribute(0x0003, "Test"),
StringAttribute(0x0004, "2.0"),
], [])
])
NeededDataNode = Node(0x0002, [], [
Node(0x0003, [], [
Node(0x000A, [
# WordAttribute(0x0001, 0x0001), # Geschwindigkeit
# WordAttribute(0x0001, 0x001b), # Schleudern
# WordAttribute(0x0001, 0x008C), # Batteriehauptschalter
WordAttribute(0x0001, 0x004F), # Beschl. x
WordAttribute(0x0001, 0x0050), # Beschl. y
WordAttribute(0x0001, 0x0051), # Beschl. z
WordAttribute(0x0001, 0x0052), # Beschl. phiX
WordAttribute(0x0001, 0x0053), # Beschl. phiY
WordAttribute(0x0001, 0x0054), # Beschl. phiZ
], [])
])
])
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("127.0.0.1", 1436))
f = s.makefile()
send(s, HelloNode)
print(parse(f))
send(s, NeededDataNode)
print(parse(f))
try:
while True:
print(parse(f))
finally:
s.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment