Skip to content

Instantly share code, notes, and snippets.

@youzaka
Created May 17, 2012 03:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save youzaka/2716124 to your computer and use it in GitHub Desktop.
Save youzaka/2716124 to your computer and use it in GitHub Desktop.
declarative ts parser
#!/usr/bin/env python3
from collections import defaultdict
from io import BufferedReader, FileIO
from types import FunctionType
class SyntaxTable(dict):
def __init__(self):
self.history = []
self.keys = []
def __setitem__(self, key, value):
if issubclass(value.__class__, mnemonic):
value.history = list(self.history)
self.history.append(value.length)
self.keys.append(key)
dict.__setitem__(self, key, value)
class SyntaxClass(type):
@classmethod
def __prepare__(metacls, name, bases):
return SyntaxTable()
def __new__(cls, name, bases, classdict):
result = type.__new__(cls, name, bases, dict(classdict))
result._history = classdict.history
result._keys = classdict.keys
return result
class mnemonic(object):
def __init__(self, length):
self.length = length
self.history = []
def real_length(self, length, instance):
if isinstance(length, FunctionType):
return length(instance) * 8
if isinstance(length, str):
return getattr(instance, length)
return length
class uimsbf(mnemonic):
def __get__(self, instance, owner):
start = sum(self.real_length(length, instance) for length in self.history)
return self.uimsbf(instance._packet, start, self.length)
@staticmethod
def uimsbf(packet, current, length):
pos = current % 8
block = current // 8
start = 8 - pos
try:
if length > 8:
num = ((length - 1) // 8) * 8
return (((packet[block] & (2 ** start) - 1) << num) |
uimsbf.uimsbf(packet, current + start, length - start))
else:
end = start - length
return (packet[block] & (2 ** start - 2 ** end)) >> end
except IndexError:
return 0
class bslbf(mnemonic):
def __get__(self, instance, owner):
start = sum(self.real_length(length, instance) for length in self.history)
if self.length == 1:
return uimsbf.uimsbf(instance._packet, start, self.length) == 1
else:
return ''.join(str(uimsbf.uimsbf(instance._packet, pos, 1))
for pos in range(start, start + self.length))
rpchof = bslbf
class nsize(mnemonic):
def __init__(self, cls, length):
self.cls = cls
mnemonic.__init__(self, length)
def __get__(self, instance, owner):
length = self.real_length(self.length, instance) // 8
start = sum(getattr(instance, length) if isinstance(length, str) else length
for length in self.history) // 8
end = start + length
while start < end:
obj = self.cls(instance._packet[start:end])
yield obj
start += len(obj) // 8
class Table(metaclass=SyntaxClass):
def __init__(self, packet):
self._packet = packet
def __len__(self):
return sum(length if isinstance(length, int) else getattr(self, length)
for length in self._history)
def __str__(self):
return "\n".join("{}: {}".format(field, getattr(self, field))
for field in self._keys)
def __getitem__(self, key):
return self._packet[key]
@classmethod
def loop(cls, length=None, times=None):
if length:
return nsize(cls, length)
def loop(length):
def fixed_length_loop(cls):
return nsize(cls, length)
return fixed_length_loop
class TransportStreamFile(BufferedReader):
PACKET_SIZE = 188
def __init__(self, path):
BufferedReader.__init__(self, FileIO(path))
def __next__(self):
packet = bytearray(self.read(self.PACKET_SIZE))
if len(packet) != self.PACKET_SIZE:
raise StopIteration
return TransportPacket(packet)
def table(self, Table):
buf = defaultdict(bytearray)
for packet in self:
pid = packet.PID
if pid not in Table._pids or not packet.has_payload:
continue
if packet.payload_unit_start_indicator:
if buf[pid]:
if (not hasattr(Table, '_table_ids') or
buf[pid][0] in Table._table_ids):
yield Table(buf[pid])
buf[pid] = packet.payload
else:
buf[pid].extend(packet.payload)
class TransportPacket(Table):
sync_byte = uimsbf(8)
transport_error_indicator = bslbf(1)
payload_unit_start_indicator = bslbf(1)
transport_priority = bslbf(1)
PID = uimsbf(13)
transport_scrambling_control = bslbf(2)
adaptation_field_control = bslbf(2)
continuity_counter = uimsbf(4)
@property
def has_adaptation(self):
return self.adaptation_field_control[0]
@property
def has_payload(self):
return self.adaptation_field_control[1]
@property
def payload(self):
if not self.has_payload:
return []
start = 4
if self.has_adaptation:
adaptation_length = self._packet[start]
start += 1 + adaptation_length
if (self.payload_unit_start_indicator and
self._packet[start:start+3] == b"\x00\x00\x01"):
pointer = self._packet[start]
start += 1 + pointer
return self._packet[start:]
class ProgramAssociationTable(Table):
_pids = [0x00]
_table_ids = [0x00]
table_id = uimsbf(8)
section_syntax_indicator = bslbf(1)
reserved_future_use = bslbf(1)
reserved_1 = bslbf(2)
section_length = uimsbf(12)
transport_stream_id = uimsbf(16)
resered_2 = bslbf(2)
version_number = uimsbf(5)
current_next_indicator = bslbf(1)
section_number = uimsbf(8)
last_section_number = uimsbf(8)
@loop(lambda self: self.section_length - 9)
class pids(Table):
program_number = uimsbf(16)
reserved = bslbf(3)
program_map_PID = uimsbf(13)
CRC_32 = rpchof(32)
if __name__ == '__main__':
import sys
with TransportStreamFile(sys.argv[1]) as ts:
for pat in ts.table(ProgramAssociationTable):
for patmap in pat.pids:
print(patmap.program_number, patmap.program_map_PID)
print(pat.CRC_32)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment