Skip to content

Instantly share code, notes, and snippets.

@cibomahto
Last active September 21, 2021 15:04
Show Gist options
  • Save cibomahto/3d3564b6b75ff94bf247e06868384caa to your computer and use it in GitHub Desktop.
Save cibomahto/3d3564b6b75ff94bf247e06868384caa to your computer and use it in GitHub Desktop.
Pattern file format
//! @brief Pattern file recorder/playback
//!
//! The purpose is to allow capure and playback of streamed pattern data. The
//! recorder is intended to capture raw data and sync packets directly from the
//! listener (ie, before mapping or color/brightness manipulation is applied).
//! During playback, the raw packets are sent to the mapper for processing.
//! This allows the mapping and output settings to be adjusted after recording.
//!
//! Packets are recorded with a time resolution of 1 ms.
//!
//! The pattern file consists of the following sections:
//! | Name | Size (bytes) | Description |
//! | --- | --- | --- |
//! | prelude | 19 | File type information |
//! | header | 20 | Pattern information |
//! | data | n | data and sync packets |
//! | timing index | time_s * 4 | Table of pointers into the data section |
//!
//! The prelude is used to identify the file type, and was inspired by the
//! PHP file header:
//! https://fadden.com/tech/file-formats.html
//!
//! The header describes the position and length of the data and time index
//! sections. For convenience, it also includes the file duration, which is
//! calculated from the timestamp of the last packet in the file.
//!
//! The data section contains a stream of pattern_packet_t data packets,
//! followed by 0-n bytes of payload. Note that this makes it easy to read
//! the packets out in sequential order, but random seeking is difficult
//! because of the variable payload lengths. This format was chosen to limit
//! the amount of SD card bandwidth required during record and playback.
//!
//! The timing index section provides an interface for random seeking into the
//! data section. The timing index is a list of offsets into the data section.
//! The first timing index entry points to the first packet in the data section
//! with a timestamp after 0s, the second timing index entry points to the
//! first packet in the data section with a timestamp after 1s, and so on.
//! The number of entries in the timing index table is 1 + the pattern duration
//! in seconds.
//!
//! To reduce the amount of SD card bandwidth required during recording, the
//! intended way to record a file is to only fill in the data section during
//! recording, then after recording is finished, to calculate the timing index
//! on the captured stream by calling pattern_finalize(). It is recommended to
//! initially use a temporary extension for the filename (*.tmp), and only
//! rename the file to *.led once the finalize procedure has finished.
#!/usr/bin/python3
import struct
import os
import statistics
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('filename', action="store")
args = parser.parse_args()
filename = args.filename
#filename = '16-vs-8-countdown.led'
print(filename)
with open(filename, 'rb') as f:
# Prelude
PRELUDE_FORMAT = '<8sIIII'
PRELUDE_MAGIC = b'\x5FLED\r\n\x1A\n'
f.seek(0)
prelude = f.read(struct.calcsize(PRELUDE_FORMAT))
[magic, checksum, version, header_offset, size] = struct.unpack(PRELUDE_FORMAT, prelude)
print('Prelude checksum:{:04x} version:{} header_offset:{:08x} size:{}'.format(checksum, version, header_offset, size))
if magic != PRELUDE_MAGIC:
print('bad magic expected:{} got:{}'.format(PRELUDE_MAGIC, magic))
exit(1)
expected_size = os.path.getsize(filename)
if expected_size != size:
print('bad size expected:{} got:{}'.format(expected_size, size))
exit(1)
# todo: checksum
# todo: version
# Header
HEADER_FORMAT = '<IIIII'
f.seek(header_offset)
header = f.read(struct.calcsize(HEADER_FORMAT))
[duration_ms, data_offset, data_size, time_index_offset, time_index_size] = struct.unpack(HEADER_FORMAT, header)
print('Header duration_ms:{} data_offset{:08x} data_size:{} time_index_offset{:08x} time_index_size:{}'.format(
duration_ms, data_offset, data_size, time_index_offset, time_index_size))
# TODO: check that time index is correct length
# TODO: check that data section fits in file and doesn't overlap prelude, header, or time index
# TODO: check that time index section fits in file and doesn't overlap prelude, header, or data
# Analyze data
packet_stats = {'data':0, 'sync':0}
universe_stats = {}
packet_offsets = []
f.seek(data_offset)
while f.tell() < (data_offset + data_size):
PATTERN_PACKET_FORMAT = '<IHII?'
PATTERN_PACKET_TYPE_DATA = 0
PATTERN_PACKET_TYPE_SYNC = 1
packet_offsets.append(f.tell())
packet = f.read(struct.calcsize(PATTERN_PACKET_FORMAT))
[timestamp_ms, size, packet_type, A, B] = struct.unpack(PATTERN_PACKET_FORMAT, packet)
print('Packet timestamp:{} size:{} type:{}'.format(timestamp_ms, size, packet_type))
# TODO: Check that the timestamp is >= previous record
if packet_type == PATTERN_PACKET_TYPE_DATA:
[universe, sync] = [A, B]
# print(' universe:{} sync:{}'.format(universe, sync))
packet_stats['data'] += 1
universe_count = universe_stats.setdefault(universe, [])
universe_stats[universe].append(timestamp_ms)
# TODO: Check that size is <= 512?
elif packet_type == PATTERN_PACKET_TYPE_SYNC:
# print(' output_flags:{:08x}'.format(A))
packet_stats['sync'] += 1
else:
print('bad type:{}'.format(packet_type))
exit(1)
# discard the data portion
f.read(size)
print('data_packets:{} sync_packets{}: universes: {}'.format(packet_stats['data'], packet_stats['sync'], len(universe_stats)))
for universe in sorted(universe_stats):
universe_stat = universe_stats[universe]
time_deltas = []
last_time = universe_stat[0]
for time in universe_stat[1:]:
time_deltas.append(time - last_time)
last_time = time
mean = statistics.mean(time_deltas)
fps = 1000/mean
standard_dev = statistics.stdev(time_deltas)
print('universe:{} count:{} fps:{:0.2f} mean:{:0.2f} standard_deviation:{:0.2f}'.format(
universe, len(universe_stat), fps, mean, standard_dev))
# Analyze time index
f.seek(time_index_offset)
time_indexes = []
while f.tell() < (time_index_offset + time_index_size):
PATTERN_TIME_INDEX_ENTRY_FORMAT = '<I'
entry = f.read(struct.calcsize(PATTERN_TIME_INDEX_ENTRY_FORMAT))
[offset] = struct.unpack(PATTERN_TIME_INDEX_ENTRY_FORMAT, entry)
time_indexes.append(offset)
# Add the end of the data section as a final offset, so we can calculate size of all index sections
time_indexes.append(data_offset+data_size+1)
time_index = time_indexes[0]
for next_time_index in time_indexes[1:]:
err = ''
size = next_time_index - time_index
# Check that the offsets are increasing
if time_index > next_time_index:
err += '(out of order)'
# Check that the offsets point to a valid record
if time_index not in packet_offsets:
err += '(does not point to packet)'
packets = 0
for packet_offset in packet_offsets:
if packet_offset >= time_index and packet_offset < next_time_index:
packets += 1
print('offset:{:08x} size:{} packets:{} '.format(time_index, size, packets) + err)
time_index = next_time_index
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment