Skip to content

Instantly share code, notes, and snippets.

@jkent
Last active April 27, 2020 05:10
Show Gist options
  • Save jkent/e5667fbf35403345e6f3df511f3ff40f to your computer and use it in GitHub Desktop.
Save jkent/e5667fbf35403345e6f3df511f3ff40f to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
from binascii import hexlify
from time import sleep
from zag import MHR, Radio
def main():
def packet_handler(data, rssi, link_quality):
mhr, offset = MHR.decode(data)
print(hexlify(mhr.encode() + data[offset:]).decode('utf8'))
radio = Radio('/dev/ttyACM1', packet_handler)
_, short = radio.get_value(Radio.Param.ShortAddr)
_, ext = radio.get_object(Radio.Param.LongAddr, 8)
print('I\'m 0x%04X, aka %s' % (short, hexlify(ext).decode('utf8').upper()))
radio.set_value(Radio.Param.Channel, 11)
radio.set_value(Radio.Param.RxMode, 0)
radio.set_value(Radio.Param.TxMode, Radio.TxMode.SendOnCca)
try:
while True:
sleep(1)
except KeyboardInterrupt:
radio.shutdown()
if __name__ == '__main__':
main()
#!/usr/bin/env python3
from binascii import hexlify
from time import sleep
from zag import MHR, Radio
def main():
def packet_handler(data, rssi, link_quality):
mhr, offset = MHR.decode(data)
print(hexlify(mhr.encode() + data[offset:]).decode('utf8'))
radio = Radio('/dev/ttyACM0', packet_handler)
_, short = radio.get_value(Radio.Param.ShortAddr)
_, ext = radio.get_object(Radio.Param.LongAddr, 8)
print('I\'m 0x%04X, aka %s' % (short, hexlify(ext).decode('utf8').upper()))
radio.set_value(Radio.Param.Channel, 11)
radio.set_value(Radio.Param.RxMode, 0)
radio.set_value(Radio.Param.TxMode, Radio.TxMode.SendOnCca)
mhr = MHR()
mhr.frame_type = MHR.FrameType.Beacon
mhr.src_addr = short
packet = mhr.encode() + b'beacon'
print(hexlify(packet).decode('utf8'))
try:
while True:
radio.send(packet)
sleep(1)
except KeyboardInterrupt:
radio.shutdown()
if __name__ == '__main__':
main()
#!/usr/bin/env python3
from enum import IntEnum, IntFlag, unique
from queue import Queue
import random
from serial import Serial
import struct
from threading import Thread
class Radio(object):
packet_magic = b'ZPB'
header_struct = struct.Struct('!BHH')
@unique
class Request(IntEnum):
Prepare = 0
Transmit = 1
Send = 2
ChannelClear = 3
On = 4
Off = 5
GetValue = 6
SetValue = 7
GetObject = 8
SetObject = 9
InitPendingTable = 10
SetPending = 11
def __str__(self):
return str(self.name)
@unique
class Response(IntEnum):
OK = 128
Err = 129
def __str__(self):
return str(self.name)
@unique
class Event(IntEnum):
OnPacket = 192
def __str__(self):
return str(self.name)
class ResponseErr(Exception):
pass
@unique
class Result(IntEnum):
OK = 0
NotSupported = 1
InvalidValue = 2
Error = 3
def __str__(self):
return str(self.name)
@unique
class TransmitResult(IntEnum):
OK = 0
Err = 1
Collision = 2
NoAck = 3
def __str__(self):
return str(self.name)
@unique
class Param(IntEnum):
PowerMode = 0
Channel = 1
PanID = 2
ShortAddr = 3
RxMode = 4
TxMode = 5
TxPower = 6
CcaThreshold = 7
Rssi = 8
LastRssi = 9
LastLinkQuality = 10
LongAddr = 11
LastPacketTimestamp = 12
# Constants
ChannelMin = 13
ChannelMax = 14
TxpowerMin = 15
TxpowerMax = 16
def __str__(self):
return str(self.name)
@unique
class RxMode(IntFlag):
AddressFilter = (1 << 0)
AutoAck = (1 << 1)
PollMode = (1 << 2)
@unique
class TxMode(IntFlag):
SendOnCca = (1 << 0)
def __init__(self, port, packet_handler=None):
self.packet_handler = packet_handler
self.next_request_id = random.randint(0, 65534)
self.waiting_request_id = None
self.serial = Serial(port, timeout=1)
self.reader_queue = Queue()
self.thread = Thread(target=self.reader)
self.thread.start()
def shutdown(self):
self.done = True
def reader(self):
self.done = False
while not self.done:
data = self.serial.read_until(Radio.packet_magic)
if not data.endswith(Radio.packet_magic):
continue
data = self.serial.read(Radio.header_struct.size)
if len(data) != Radio.header_struct.size:
continue
response, request_id, data_len = Radio.header_struct.unpack(data)
data = self.serial.read(data_len)
if len(data) != data_len:
continue
if response & 0xC0 == 0xC0:
event = Radio.Event(response)
if event == Radio.Event.OnPacket:
rssi, link_quality = struct.unpack('!bB', data[-2:])
if self.packet_handler:
self.packet_handler(data[:-2], rssi, link_quality)
continue
if self.waiting_request_id != request_id:
continue
response = Radio.Response(response)
self.reader_queue.put((response, data))
def write(self, command, data=b''):
request_id = self.next_request_id
self.next_request_id = 0 if request_id >= 65534 else request_id + 1
self.waiting_request_id = request_id
packet = Radio.packet_magic + Radio.header_struct.pack(command.value, request_id, len(data)) + data
self.serial.write(packet)
response, data = self.reader_queue.get()
if response == Radio.Response.Err:
raise Radio.ResponseErr
return data
def prepare(self, data):
data = self.write(Radio.Request.Prepare, data)
result, = struct.unpack('!H', data)
result = Radio.Result(result)
return result,
def transmit(self, transmit_len):
data = struct.pack('!H', transmit_len)
data = self.write(Radio.Request.Transmit, data)
result, = struct.unpack('!H', data)
result = Radio.TransmitResult(result)
return result,
def send(self, data):
data = self.write(Radio.Request.Send, data)
result, = struct.unpack('!H', data)
result = Radio.TransmitResult(result)
return result,
def channel_clear(self):
data = self.write(Radio.Request.ChannelClear)
result, = struct.unpack('!H', data)
result = Radio.Result(result)
return result,
def on(self):
self.write(Radio.Request.On)
return Radio.Result.OK,
def off(self):
self.write(Radio.Request.Off)
return Radio.Result.OK,
def get_value(self, param):
data = struct.pack('!H', int(param))
data = self.write(Radio.Request.GetValue, data)
result, value = struct.unpack('!HH', data)
result = Radio.Result(result)
return result, value
def set_value(self, param, value):
data = struct.pack('!HH', int(param), value)
data = self.write(Radio.Request.SetValue, data)
result, = struct.unpack('!H', data)
result = Radio.Result(result)
return result,
def get_object(self, param, expected_len):
data = struct.pack('!HH', int(param), expected_len)
data = self.write(Radio.Request.GetObject, data)
result, = struct.unpack('!H', data[:2])
result = Radio.Result(result)
return result, data[2:]
def set_object(self, param, data):
data = struct.pack('!HH', int(param), len(data)) + data
data = self.write(Radio.Request.GetObject, data)
result, = struct.unpack('!H', data)
result = Radio.Result(result)
return result,
def init_pending_table(self):
data = self.write(Radio.Request.InitPendingTable)
result, = struct.unpack('!H', data)
result = Radio.Result(result)
return result,
def set_pending_short(self, index, address=None):
if address == None:
address = b''
elif isinstance(address, int):
address = struct.pack('!H', address)
data = struct.pack('!B', index) + address
data = self.write(Radio.Request.SetPending, data)
result, = struct.unpack('!H', data)
result = Radio.Result(result)
return result,
def set_pending_ext(self, index, address=None):
if address == None:
address = b''
data = struct.pack('!B', index | 0x80) + address
data = self.write(Radio.Request.SetPending, data)
result, = struct.unpack('!H', data)
result = Radio.Result(result)
return result,
class MHR(object):
@unique
class FrameType(IntEnum):
Beacon = 0
Data = 1
Ack = 2
MacCommand = 3
def __str__(self):
return str(self.name)
@unique
class FrameControl(IntFlag):
SecurityEnabled = (1 << 3)
FramePending = (1 << 4)
AckRequest = (1 << 5)
PanidCompression = (1 << 6)
def __str__(self):
return str(self.name)
@unique
class AddrMode(IntEnum):
NotPresent = 0
ShortAddr = 2
ExtendedAddr = 3
def __str__(self):
return str(self.name)
@unique
class FrameVersion(IntEnum):
IEEE802_15_4_2003 = 0
IEEE802_15_4_2006 = 1
def __str__(self):
return str(self.name)
@classmethod
def decode(cls, packet):
mhr = cls()
offset = 0
mhr._frame_control, mhr._seq_num = struct.unpack_from('!HB', packet, offset)
offset += 3
if mhr.frame_version > MHR.FrameVersion.IEEE802_15_4_2006:
return None
if mhr.dst_addrmode != MHR.AddrMode.NotPresent:
mhr._dst_panid, = struct.unpack_from('!H', packet, offset)
offset += 2
if mhr.dst_addrmode == MHR.AddrMode.ShortAddr:
mhr._dst_addr, = struct.unpack_from('!H', packet, offset)
offset += 2
elif mhr.dst_addrmode == MHR.AddrMode.ExtendedAddr:
mhr._dst_addr = packet[offset:offset+8]
offset += 8
if mhr.src_addrmode != MHR.AddrMode.NotPresent and not mhr.panid_compression:
mhr._src_panid, = struct.unpack_from('!H', packet, offset)
offset += 2
if mhr.src_addrmode == MHR.AddrMode.ShortAddr:
mhr._src_addr, = struct.unpack_from('!H', packet, offset)
offset += 2
elif mhr.src_addrmode == MHR.AddrMode.ExtendedAddr:
mhr._src_addr = packet[offset:offset+8]
offset += 8
return mhr, offset
def __init__(self):
self._frame_control = 0
self._seq_num = 0
self._dst_panid = None
self._dst_addr = None
self._src_panid = None
self._src_addr = None
def encode(self):
data = struct.pack('!HB', self._frame_control, self._seq_num)
if self.dst_addrmode != MHR.AddrMode.NotPresent:
data += struct.pack('!H', self._dst_panid)
if self.dst_addrmode == MHR.AddrMode.ShortAddr:
data += struct.pack('!H', self._dst_addr)
elif self.dst_addrmode == MHR.AddrMode.ExtendedAddr:
data += self._dst_addr
if self.src_addrmode != MHR.AddrMode.NotPresent and not self.panid_compression:
data += struct.pack('!H', self._src_panid)
if self.src_addrmode == MHR.AddrMode.ShortAddr:
data += struct.pack('!H', self._src_addr)
elif self.src_addrmode == MHR.AddrMode.ExtendedAddr:
data += self._src_addr
return data
@property
def frame_control(self):
return self._frame_control
@frame_control.setter
def frame_control(self, value):
print('fc!')
self._frame_control = value
if self.dst_addrmode == MHR.AddrMode.NotPresent:
self._dst_addr = None
else:
if self.dst_addrmode == MHR.AddrMode.ShortAddr and not isinstance(self._dst_addr, int):
self._dst_addr = 0
elif self.dst_addrmode == MHR.AddrMode.ExtendedAddr and not isinstance(self._dst_addr, bytes):
self._dst_addr = b'\x00' * 8
if self.src_addrmode == MHR.AddrMode.NotPresent:
self._src_addr = None
else:
if self.src_addrmode == MHR.AddrMode.ShortAddr and not isinstance(self._src_addr, int):
self._src_addr = 0
elif self.src_addrmode == MHR.AddrMode.ExtendedAddr and not isinstance(self._src_addr, bytes):
self._src_addr = b'\x00' * 8
@property
def frame_type(self):
return MRH.FrameType(self._frame_control & 0x3)
@frame_type.setter
def frame_type(self, value):
self._frame_control &= ~0x3
self._frame_control |= int(value) & 0x3
@property
def security_enabled(self):
return bool(self._frame_control & 0x4)
@security_enabled.setter
def security_enabled(self, value):
self._frame_control &= ~0x4
if value:
self._frame_control |= 0x4
@property
def frame_pending(self):
return bool(self._frame_control & 0x8)
@frame_pending.setter
def frame_pending(self, value):
self._frame_control &= ~0x8
if value:
self._frame_control |= 0x08
@property
def ack_request(self):
return bool(self._frame_control & MHR.FrameControl.AckRequest)
@ack_request.setter
def ack_request(self, value):
self._frame_control &= ~0x10
if value:
self._frame_control |= 0x10
@property
def panid_compression(self):
return bool(self._frame_control & MHR.FrameControl.PanidCompression)
@panid_compression.setter
def panid_compression(self, value):
self._frame_control &= ~MHR.FrameControl.PanidCompression
if value:
self._frame_control |= MHR.FrameControl.PanidCompression
if self.src_addrmode == MHR.AddrMode.NotPresent:
self._src_panid = None
self._src_addr = None
elif self.panid_compression:
self._src_panid = None
@property
def dst_addrmode(self):
return MHR.AddrMode((self._frame_control >> 10) & 0x3)
@dst_addrmode.setter
def dst_addrmode(self, value):
self._frame_control &= ~0xC00
self._frame_control |= (int(value) & 0x3) << 10
if self.dst_addrmode == MHR.AddrMode.NotPresent:
self._dst_addr = None
self._dst_panid = None
else:
if self.dst_addrmode != MHR.AddrMode.NotPresent and self._dst_panid == None:
self._dst_panid = 0
if self.dst_addrmode == MHR.AddrMode.ShortAddr and not isinstance(self._dst_addr, int):
self._dst_addr = 0
elif self.dst_addrmode == MHR.AddrMode.ExtendedAddr and not isinstance(self._dst_addr, bytes):
self._dst_addr = b'\x00' * 8
@property
def frame_version(self):
return MHR.FrameVersion((self._frame_control >> 12) & 0x3)
@frame_version.setter
def frame_version(self, value):
self._frame_control &= ~0x3000
self._frame_control |= (int(value) & 0x3) << 12
@property
def src_addrmode(self):
return MHR.AddrMode((self._frame_control >> 14) & 0x3)
@src_addrmode.setter
def src_addrmode(self, value):
self._frame_control &= ~0xC000
self._frame_control |= (int(value) & 0x3) << 14
if self.src_addrmode == MHR.AddrMode.NotPresent:
self._src_addr = None
self._src_panid = None
else:
if self.panid_compression:
self._src_panid = None
if self.src_addrmode != MHR.AddrMode.NotPresent and self._src_panid == None:
self._src_panid = 0
if self.src_addrmode == MHR.AddrMode.ShortAddr and not isinstance(self._src_addr, int):
self._src_addr = 0
elif self.src_addrmode == MHR.AddrMode.ExtendedAddr and not isinstance(self._src_addr, bytes):
self._src_addr = b'\x00' * 8
@property
def seq_num(self):
return self._seq_num
@seq_num.setter
def seq_num(self, value):
self._seq_num = value & 0xFF
@property
def dst_panid(self):
return self._dst_panid
@dst_panid.setter
def dst_panid(self, value):
self._dst_panid = value & 0xFFFF
@property
def dst_addr(self):
return self._dst_addr
@dst_addr.setter
def dst_addr(self, value):
if isinstance(value, int):
self._dst_addr = value & 0xFFFF
if self.dst_addrmode != MHR.AddrMode.ShortAddr:
self.dst_addrmode = MHR.AddrMode.ShortAddr
elif isinstance(value, bytes) and len(value) == 8:
self._dst_addr = value
if self.dst_addrmode != MHR.AddrMode.ExtendedAddr:
self.dst_addrmode = MHR.AddrMode.ExtendedAddr
else:
self.dst_addrmode = MHR.AddrMode.NotPresent
@property
def src_panid(self):
if self.panid_compression:
return self._dst_panid
return self._src_panid
@src_panid.setter
def src_panid(self, value):
if value == None:
self.src_addrmode = MHR.AddrMode.NotPresent
elif value == self.dst_panid:
self.panid_compression = True
else:
self.panid_compression = False
self._src_panid = value & 0xFFFF
if self.src_addrmode == MHR.AddrMode.NotPresent:
self.src_addr = 0
@property
def src_addr(self):
return self._dst_addr
@src_addr.setter
def src_addr(self, value):
if isinstance(value, int):
self._src_addr = value & 0xFFFF
if self.src_addrmode != MHR.AddrMode.ShortAddr:
self.src_addrmode = MHR.AddrMode.ShortAddr
elif isinstance(value, bytes) and len(value) == 8:
self._src_addr = value
if self.src_addrmode != MHR.AddrMode.ExtendedAddr:
self.src_addrmode = MHR.AddrMode.ExtendedAddr
else:
self.src_addrmode = MHR.AddrMode.NotPresent
if __name__ == '__main__':
import sys
from time import sleep
def packet_handler(data, rssi, link_quality):
print(f'Data: {data} RSSI: {rssi} Link Quality: {link_quality}')
port = sys.argv[1]
radio = Radio(port, packet_handler)
try:
radio.set_value(Radio.Param.Channel, 11)
radio.set_value(Radio.Param.RxMode, 0)
radio.set_value(Radio.Param.TxMode, Radio.TxMode.SendOnCca)
packet = b'Hello from ' + port.encode('utf8') + b'!'
while True:
radio.send(packet)
sleep(0.1)
except KeyboardInterrupt:
radio.shutdown()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment