/dante_device_info.py Secret
Last active
June 21, 2023 03:38
Star
You must be signed in to star a gist
Example implementing sequence ID while getting Dante device info
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# pip install netifaces twisted | |
# ./dante_device_info.py [ipv4] | |
import asyncio | |
import sys | |
import time | |
import netifaces | |
from twisted.internet import asyncioreactor, task, error | |
from twisted.internet.defer import inlineCallbacks, ensureDeferred, Deferred | |
from twisted.internet.protocol import DatagramProtocol | |
print(asyncio.get_event_loop()) | |
asyncioreactor.install() | |
ARC_PORT = 4440 | |
DEVICE_INFO_CONTROL_PORT = 8700 | |
DEVICE_INFO_GROUP = '224.0.0.231' | |
DEVICE_INFO_PORT = 8702 | |
DEVICE_CONTROL_PORTS = [ | |
ARC_PORT, | |
DEVICE_INFO_CONTROL_PORT | |
] | |
REQUEST_DANTE_MODEL = 97 | |
REQUEST_MAKE_MODEL = 193 | |
RESPONSE_DANTE_MODEL = 96 | |
RESPONSE_MAKE_MODEL = 192 | |
TYPE_CHANNEL_COUNTS = 4096 | |
TYPE_IDENTIFY_DEVICE = 4302 | |
class DanteDevice(): | |
def __init__(self, ipv4): | |
self.channel_count_rx = None | |
self.channel_count_tx = None | |
self.controls = {} | |
self.dante_model = None | |
self.dante_model_id = None | |
self.ipv4 = ipv4 | |
self.manufacturer = None | |
self.model = None | |
self.channel_counts = None | |
def __str__(self): | |
device_info_list = [self.manufacturer, self.model, self.dante_model, self.dante_model_id, str(self.channel_count_rx), str(self.channel_count_tx)] | |
device_info = [s for s in device_info_list if s] | |
return ' '.join(device_info) | |
async def get_info(self): | |
dante_model = await self.get_dante_model() | |
make_model = await self.get_make_model() | |
channel_counts = await self.get_channel_counts() | |
self.manufacturer, self.model = make_model.result() | |
self.dante_model_id, self.dante_model = dante_model.result() | |
self.channel_count_rx, self.channel_count_tx = channel_counts.result() | |
async def get_dante_model(self): | |
return await Deferred.fromFuture(asyncio.ensure_future(self.controls[DEVICE_INFO_CONTROL_PORT].get_dante_model())) | |
async def get_channel_counts(self): | |
return await Deferred.fromFuture(asyncio.ensure_future(self.controls[ARC_PORT].get_channel_counts())) | |
async def get_make_model(self): | |
return await Deferred.fromFuture(asyncio.ensure_future(self.controls[DEVICE_INFO_CONTROL_PORT].get_make_model())) | |
class DanteControl(DatagramProtocol): | |
def __init__(self, device, port): | |
self.client = None | |
self.device = device | |
self.port = port | |
self.sequence_id = 0 | |
self.client_id = 0 | |
def startProtocol(self): | |
self.transport.connect(self.device.ipv4, self.port) | |
async def send_message(self, data): | |
command_type = int.from_bytes(data[26:28], 'big') | |
self.sequence_id = self.sequence_id + 1 | |
self.transport.write(data) | |
def datagramReceived(self, datagram, addr): | |
response_type = int.from_bytes(datagram[6:8], 'big') | |
if response_type == TYPE_CHANNEL_COUNTS: | |
channel_count_rx = int.from_bytes(datagram[15:16], 'big') | |
channel_count_tx = int.from_bytes(datagram[13:14], 'big') | |
self.device.channel_counts.set_result((channel_count_rx, channel_count_tx)) | |
def get_command_info(self, command): | |
data_len = 32 | |
return bytes.fromhex(f'ffff{data_len:04x}{self.sequence_id:04x}0000{self.client.mac}0000417564696e6174650731{command:04x}00000000') | |
def get_command_arc(self, command, command_args='0000'): | |
data_length = 10 | |
command_hex = f'27{self.client_id:02x}{data_length:04x}{self.sequence_id:04x}{command:04x}{command_args}' | |
return bytes.fromhex(command_hex) | |
async def get_channel_counts(self): | |
self.device.channel_counts = asyncio.Future() | |
while not self.device.channel_counts.done(): | |
await self.send_message(self.get_command_arc(command=TYPE_CHANNEL_COUNTS)) | |
await asyncio.sleep(0.008) | |
return self.device.channel_counts | |
async def get_make_model(self): | |
while not self.client.make_model.done(): | |
await self.send_message(self.get_command_info(command=REQUEST_MAKE_MODEL)) | |
await asyncio.sleep(0.008) | |
return self.client.make_model | |
async def get_dante_model(self): | |
while not self.client.dante_model.done(): | |
await self.send_message(self.get_command_info(command=REQUEST_DANTE_MODEL)) | |
await asyncio.sleep(0.008) | |
return self.client.dante_model | |
class MulticastClient(DatagramProtocol): | |
def __init__(self, device, group, mac, port): | |
self.group = group | |
self.device = device | |
self.mac = mac | |
self.port = port | |
self.dante_model = asyncio.Future() | |
self.make_model = asyncio.Future() | |
def startProtocol(self): | |
self.transport.joinGroup(self.group) | |
def datagramReceived(self, datagram, address): | |
if address[0] != self.device.ipv4: | |
return | |
command_type = int.from_bytes(datagram[26:28], 'big') | |
if command_type == RESPONSE_DANTE_MODEL and not self.dante_model.done(): | |
model = datagram[88:].partition(b'\x00')[0].decode('utf-8') | |
model_id = datagram[43:].partition(b'\x00')[0].decode('utf-8').replace('\u0003', '') | |
self.dante_model.set_result((model, model_id)) | |
if command_type == RESPONSE_MAKE_MODEL and not self.make_model.done(): | |
manufacturer = datagram[76:].partition(b'\x00')[0].decode('utf-8') | |
model = datagram[204:].partition(b'\x00')[0].decode('utf-8') | |
self.make_model.set_result((manufacturer, model)) | |
async def get_device_info(reactor): | |
start = time.time() | |
hosts = sys.argv[1:] | |
try: | |
interface = netifaces.ifaddresses(list(netifaces.gateways()['default'].values())[0][1]) | |
ipv4 = interface[netifaces.AF_INET][0]['addr'] | |
mac = interface[netifaces.AF_LINK][0]['addr'].replace(':', '') | |
except error.InvalidAddressError as e: | |
print(f'{e.address} is {e.message}') | |
tasks = [] | |
devices = [] | |
for ipv4 in hosts: | |
device = DanteDevice(ipv4=ipv4) | |
for port in DEVICE_CONTROL_PORTS: | |
control = DanteControl(device, port) | |
if port == DEVICE_INFO_CONTROL_PORT: | |
client = MulticastClient(device=device, group=DEVICE_INFO_GROUP, mac=mac, port=DEVICE_INFO_PORT) | |
control.client = client | |
reactor.listenMulticast(DEVICE_INFO_PORT, client, listenMultiple=True) | |
device.controls[port] = control | |
reactor.listenUDP(0, control) | |
tasks.append(device.get_info()) | |
devices.append(device) | |
for task in tasks: | |
await task | |
for device in devices: | |
print(device) | |
print(f'Time taken: {time.time() - start}') | |
def main(reactor): | |
return ensureDeferred(get_device_info(reactor)) | |
if __name__ == '__main__': | |
if len(sys.argv) == 1: | |
print('Specify a Dante device IP') | |
else: | |
task.react(main) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment