Skip to content

Instantly share code, notes, and snippets.

@chris-ritsen
Last active June 21, 2023 03:38
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chris-ritsen/ceb9466c6385968357a506d707c07677 to your computer and use it in GitHub Desktop.
Save chris-ritsen/ceb9466c6385968357a506d707c07677 to your computer and use it in GitHub Desktop.
Example implementing sequence ID while getting Dante device info
#!/usr/bin/env python3
# pip install netifaces twisted
# ./dante_device_info.py [ipv4]
import asyncio
import sys
import time
import netifaces
from twisted.internet import asyncioreactor, task, error
from twisted.internet.defer import inlineCallbacks, ensureDeferred, Deferred
from twisted.internet.protocol import DatagramProtocol
print(asyncio.get_event_loop())
asyncioreactor.install()
ARC_PORT = 4440
DEVICE_INFO_CONTROL_PORT = 8700
DEVICE_INFO_GROUP = '224.0.0.231'
DEVICE_INFO_PORT = 8702
DEVICE_CONTROL_PORTS = [
ARC_PORT,
DEVICE_INFO_CONTROL_PORT
]
REQUEST_DANTE_MODEL = 97
REQUEST_MAKE_MODEL = 193
RESPONSE_DANTE_MODEL = 96
RESPONSE_MAKE_MODEL = 192
TYPE_CHANNEL_COUNTS = 4096
TYPE_IDENTIFY_DEVICE = 4302
class DanteDevice():
def __init__(self, ipv4):
self.channel_count_rx = None
self.channel_count_tx = None
self.controls = {}
self.dante_model = None
self.dante_model_id = None
self.ipv4 = ipv4
self.manufacturer = None
self.model = None
self.channel_counts = None
def __str__(self):
device_info_list = [self.manufacturer, self.model, self.dante_model, self.dante_model_id, str(self.channel_count_rx), str(self.channel_count_tx)]
device_info = [s for s in device_info_list if s]
return ' '.join(device_info)
async def get_info(self):
dante_model = await self.get_dante_model()
make_model = await self.get_make_model()
channel_counts = await self.get_channel_counts()
self.manufacturer, self.model = make_model.result()
self.dante_model_id, self.dante_model = dante_model.result()
self.channel_count_rx, self.channel_count_tx = channel_counts.result()
async def get_dante_model(self):
return await Deferred.fromFuture(asyncio.ensure_future(self.controls[DEVICE_INFO_CONTROL_PORT].get_dante_model()))
async def get_channel_counts(self):
return await Deferred.fromFuture(asyncio.ensure_future(self.controls[ARC_PORT].get_channel_counts()))
async def get_make_model(self):
return await Deferred.fromFuture(asyncio.ensure_future(self.controls[DEVICE_INFO_CONTROL_PORT].get_make_model()))
class DanteControl(DatagramProtocol):
def __init__(self, device, port):
self.client = None
self.device = device
self.port = port
self.sequence_id = 0
self.client_id = 0
def startProtocol(self):
self.transport.connect(self.device.ipv4, self.port)
async def send_message(self, data):
command_type = int.from_bytes(data[26:28], 'big')
self.sequence_id = self.sequence_id + 1
self.transport.write(data)
def datagramReceived(self, datagram, addr):
response_type = int.from_bytes(datagram[6:8], 'big')
if response_type == TYPE_CHANNEL_COUNTS:
channel_count_rx = int.from_bytes(datagram[15:16], 'big')
channel_count_tx = int.from_bytes(datagram[13:14], 'big')
self.device.channel_counts.set_result((channel_count_rx, channel_count_tx))
def get_command_info(self, command):
data_len = 32
return bytes.fromhex(f'ffff{data_len:04x}{self.sequence_id:04x}0000{self.client.mac}0000417564696e6174650731{command:04x}00000000')
def get_command_arc(self, command, command_args='0000'):
data_length = 10
command_hex = f'27{self.client_id:02x}{data_length:04x}{self.sequence_id:04x}{command:04x}{command_args}'
return bytes.fromhex(command_hex)
async def get_channel_counts(self):
self.device.channel_counts = asyncio.Future()
while not self.device.channel_counts.done():
await self.send_message(self.get_command_arc(command=TYPE_CHANNEL_COUNTS))
await asyncio.sleep(0.008)
return self.device.channel_counts
async def get_make_model(self):
while not self.client.make_model.done():
await self.send_message(self.get_command_info(command=REQUEST_MAKE_MODEL))
await asyncio.sleep(0.008)
return self.client.make_model
async def get_dante_model(self):
while not self.client.dante_model.done():
await self.send_message(self.get_command_info(command=REQUEST_DANTE_MODEL))
await asyncio.sleep(0.008)
return self.client.dante_model
class MulticastClient(DatagramProtocol):
def __init__(self, device, group, mac, port):
self.group = group
self.device = device
self.mac = mac
self.port = port
self.dante_model = asyncio.Future()
self.make_model = asyncio.Future()
def startProtocol(self):
self.transport.joinGroup(self.group)
def datagramReceived(self, datagram, address):
if address[0] != self.device.ipv4:
return
command_type = int.from_bytes(datagram[26:28], 'big')
if command_type == RESPONSE_DANTE_MODEL and not self.dante_model.done():
model = datagram[88:].partition(b'\x00')[0].decode('utf-8')
model_id = datagram[43:].partition(b'\x00')[0].decode('utf-8').replace('\u0003', '')
self.dante_model.set_result((model, model_id))
if command_type == RESPONSE_MAKE_MODEL and not self.make_model.done():
manufacturer = datagram[76:].partition(b'\x00')[0].decode('utf-8')
model = datagram[204:].partition(b'\x00')[0].decode('utf-8')
self.make_model.set_result((manufacturer, model))
async def get_device_info(reactor):
start = time.time()
hosts = sys.argv[1:]
try:
interface = netifaces.ifaddresses(list(netifaces.gateways()['default'].values())[0][1])
ipv4 = interface[netifaces.AF_INET][0]['addr']
mac = interface[netifaces.AF_LINK][0]['addr'].replace(':', '')
except error.InvalidAddressError as e:
print(f'{e.address} is {e.message}')
tasks = []
devices = []
for ipv4 in hosts:
device = DanteDevice(ipv4=ipv4)
for port in DEVICE_CONTROL_PORTS:
control = DanteControl(device, port)
if port == DEVICE_INFO_CONTROL_PORT:
client = MulticastClient(device=device, group=DEVICE_INFO_GROUP, mac=mac, port=DEVICE_INFO_PORT)
control.client = client
reactor.listenMulticast(DEVICE_INFO_PORT, client, listenMultiple=True)
device.controls[port] = control
reactor.listenUDP(0, control)
tasks.append(device.get_info())
devices.append(device)
for task in tasks:
await task
for device in devices:
print(device)
print(f'Time taken: {time.time() - start}')
def main(reactor):
return ensureDeferred(get_device_info(reactor))
if __name__ == '__main__':
if len(sys.argv) == 1:
print('Specify a Dante device IP')
else:
task.react(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment