Skip to content

Instantly share code, notes, and snippets.

@pveierland
Created July 10, 2021 12:56
Show Gist options
  • Save pveierland/cf0c07e63e5b99ae77817048088d0508 to your computer and use it in GitHub Desktop.
Save pveierland/cf0c07e63e5b99ae77817048088d0508 to your computer and use it in GitHub Desktop.
uavcan-tools
import datetime
import uavcan
run_start_time = datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
node = uavcan.make_node('/dev/ttyS12', bitrate=250000, baudrate=3000000)
log_file_registry = dict()
def build_log_filename(uavcan_type):
global run_start_time
return '{}_{}_{}.csv'.format(
uavcan_type.full_name,
uavcan_type.get_data_type_signature(),
run_start_time)
def message_field_iterator(entry, parent_ids = None):
if isinstance(entry, uavcan.transport.CompoundValue):
for entry_field in uavcan.transport.get_fields(entry).items():
for x in message_field_iterator(entry_field, (parent_ids or []) + [uavcan.transport.get_uavcan_data_type(entry).full_name]):
yield x
else:
key, value = entry
if not isinstance(value, uavcan.transport.VoidValue):
yield ('.'.join((parent_ids or []) + [key]), value)
def message_handler(event):
(log_file, log_file_exists) = setup_message_log_file(event.message)
prepend_fields = [
('timestamp', datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%S.%fZ"))
]
transfer_fields = [
('transfer.ts_monotonic', event.transfer.ts_monotonic),
('transfer.ts_real', event.transfer.ts_real),
('transfer.id', event.transfer.transfer_id),
('transfer.source_node_id', event.transfer.source_node_id),
('transfer.transfer_priority', event.transfer.transfer_priority)
]
message_fields = list(message_field_iterator(event.message))
fields = prepend_fields + transfer_fields + message_fields
if not log_file_exists:
log_file.write(','.join(field[0] for field in fields) + '\n')
log_file.write(','.join(str(field[1]) for field in fields) + '\n')
log_file.flush()
def setup_message_log_file(message):
global log_file_registry
uavcan_type = uavcan.transport.get_uavcan_data_type(message)
data_type_signature = uavcan_type.get_data_type_signature()
entry_exists = data_type_signature in log_file_registry
if not entry_exists:
log_filename = build_log_filename(uavcan_type)
log_file_registry[data_type_signature] = open(log_filename, 'a+')
return (log_file_registry[data_type_signature], entry_exists)
for (datatype_id, datatype_kind), datatype in uavcan.DATATYPES.items():
if datatype_kind == uavcan.dsdl.CompoundType.KIND_MESSAGE:
node.add_handler(datatype, message_handler)
node.spin()
import csv
import numpy as np
import sys
with open(sys.argv[1]) as csv_file:
csv_reader = csv.DictReader(csv_file)
last_received_messages = dict()
time_deltas = dict()
for message in csv_reader:
source_node_id = message['transfer.source_node_id']
last_received_message = last_received_messages.get(source_node_id)
if last_received_message is not None:
time_deltas.setdefault(source_node_id, []).append(float(message['transfer.ts_real']) - float(last_received_message['transfer.ts_real']))
last_received_messages[source_node_id] = message
for source_node_id in time_deltas.keys():
time_delta_entries = time_deltas[source_node_id]
print('source_node_id = {} mean = {} std-dev = {} min = {} max = {} samples = {}'.format(
source_node_id,
np.mean(time_delta_entries),
np.std(time_delta_entries),
np.min(time_delta_entries) - np.mean(time_delta_entries),
np.max(time_delta_entries) - np.mean(time_delta_entries),
len(time_delta_entries) + 1))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment