Skip to content

Instantly share code, notes, and snippets.

@monyone
Created September 20, 2023 01:59
Show Gist options
  • Save monyone/83f3d5d48d717a3531c23757c81f11a0 to your computer and use it in GitHub Desktop.
Save monyone/83f3d5d48d717a3531c23757c81f11a0 to your computer and use it in GitHub Desktop.
AV1 in MPEG-TS
#!/bin/sh
ffmpeg -f lavfi -i testsrc=r=30:d=10 -pix_fmt yuv420p -c:v librav1e -r 30 -movflags frag_every_frame+empty_moov av1.mp4
# ffmpeg -f lavfi -i testsrc=r=30:d=10 -pix_fmt yuv420p -c:v libaom-av1 -r 30 -movflags frag_every_frame+empty_moov av1.mp4
#!/usr/bin/env python3
import argparse
import sys
from dataclasses import dataclass
import re
def findBox(data: memoryview | bytearray | bytes, box: str | list[str]) -> memoryview | bytearray | bytes | None:
begin = 0
while begin < len(data):
size = int.from_bytes(data[begin:begin+4], byteorder='big') - 8
name = data[begin+4:begin+8].decode('ascii')
content = data[begin+8:begin+8+size]
if type(box) == str and name == box:
return content
elif type(box) == list and name == box[0]:
if len(box) == 1: return content
else: return findBox(content, box[1:])
begin += 8 + size
def parseTimescaleByMdhd(data: memoryview | bytearray | bytes) -> int | None:
version = data[0]
if version == 0:
return int.from_bytes(data[12:16], byteorder='big')
elif version == 1:
return int.from_bytes(data[20:24], byteorder='big')
else:
return None
def parseBaseMediaDecodeTimeByTfdt(data: memoryview | bytearray | bytes):
version = data[0]
if version == 0:
return int.from_bytes(data[4:8], byteorder='big')
elif version == 1:
return int.from_bytes(data[4:12], byteorder='big')
else:
return None
"""
def parseCompositionTimeOffsetByTrun(data: memoryview | bytearray | bytes) -> list[int]:
version = data[0]
flags = int.from_bytes(data[1:4], byteorder='big')
print(flags)
sample_count = int.from_bytes(data[4:8], byteorder='big')
begin = 8
if (flags & 0x000001) != 0: begin += 4
if (flags & 0x000004) != 0: begin += 4
result: list[int] = []
for _ in range(sample_count):
if (flags & 0x000100) != 0: begin += 4
if (flags & 0x000200) != 0: begin += 4
if (flags & 0x000400) != 0: begin += 4
if (flags & 0x000800) != 0:
result.append(int.from_bytes(data[4:8], byteorder='big'))
begin += 4
return result
"""
@dataclass
class AV1CodecConfigurationBox:
marker: int # 1 bit
version: int # 7 bit
seq_profile: int # 3 bit
seq_level_idx_0: int # 5 bit
seq_level_tier_0: int # 1 bit
high_bitdepth: int # 1 bit
twelve_bit: int # 1 bit
monochrome: int # 1 bit
chroma_subsampling_x: int # 1 bit
chroma_subsampling_y: int # 1 bit
chroma_sample_position: int # 2 bit
reserved: int # 3 bit (0)
initial_presentation_delay_present: int # 1 bit
initial_presentation_delay_minus_one: int # 4 bit
configOBUs: memoryview | bytearray | bytes
def parseAV1CodecConfigurationBox(data: memoryview | bytearray | bytes):
return AV1CodecConfigurationBox(
marker=((data[0] & 0b10000000) >> 7),
version=((data[0] & 0b01111111) >> 0),
seq_profile=((data[1] & 0b11100000) >> 5),
seq_level_idx_0=((data[1] & 0b00011111) >> 0),
seq_level_tier_0=((data[2] & 0b10000000) >> 7),
high_bitdepth=((data[2] & 0b01000000) >> 6),
twelve_bit=((data[2] & 0b00100000) >> 5),
monochrome=((data[2] & 0b00010000) >> 4),
chroma_subsampling_x=((data[2] & 0b00001000) >> 3),
chroma_subsampling_y=((data[2] & 0b00000100) >> 2),
chroma_sample_position=((data[2] & 0b00000011) >> 0),
reserved=((data[3] & 0b11100000) >> 5),
initial_presentation_delay_present=((data[3] & 0b00010000) >> 4),
initial_presentation_delay_minus_one=((data[3] & 0b00000001) >> 0),
configOBUs=(data[4:])
)
ESCAPE = re.compile('\0\0(\0|\1|\2|\3)'.encode('ascii'))
def escapeObus(data: memoryview | bytearray | bytes) -> bytes:
begin = 0
av1_in_ts = bytearray()
while begin < len(data):
offset = 0
forbidden_bit = (data[begin + offset] & 0b10000000) >> 7
type = (data[begin + offset] & 0b01111000) >> 3
extension_flag = (data[begin + offset] & 0b00000100) >> 2
has_size_field = (data[begin + offset] & 0b00000010) >> 1
reserved_1bit = (data[begin + offset] & 0b00000001) >> 0
offset += 1
if extension_flag: offset += 1
size = len(data) - (begin + offset)
if has_size_field:
size, cnt = 0, 0
while True:
val = data[begin + offset]
offset += 1
size |= (val & 0b01111111) << (cnt * 7)
if (val & 0b10000000) == 0: break
av1_in_ts += b'\0\0\1' + re.sub(ESCAPE, b'\0\0\3\\1', data[begin + offset: begin + offset + size])
begin += offset + size
return bytes(av1_in_ts)
PACKET_SIZE = 188
HEADER_SIZE = 4
def packetizeSection(section: memoryview | bytearray | bytes, pid: int, continuity_counter: int) -> list[bytes]:
result: list[bytes] = []
begin = 0
while (begin < len(section)):
next = min(len(section), begin + (PACKET_SIZE - HEADER_SIZE) - (1 if begin == 0 else 0))
result.append(bytes(
([
0x47,
(0 << 7) | ((1 if begin == 0 else 0) << 6) | (0 << 5) | ((pid & 0x1F00) >> 8),
(pid & 0x00FF),
(0 << 6) | (1 << 4) | (continuity_counter & 0x0F),
]) +
([0] if begin == 0 else []) +
list(section[begin:next]) +
([0xFF] * ((PACKET_SIZE - HEADER_SIZE) - ((next - begin) + (1 if begin == 0 else 0))))
))
continuity_counter = (continuity_counter + 1) & 0x0F
begin = next
return result
def packetizePES(pes: memoryview | bytearray | bytes, pid: int, continuity_counter: int) -> list[bytes]:
result: list[bytes] = []
begin = 0
while (begin < len(pes)):
next = min(len(pes), begin + (PACKET_SIZE - HEADER_SIZE))
packet = bytearray()
packet += bytes([
0x47,
(0 << 7) | ((1 if begin == 0 else 0) << 6) | (0 << 5) | ((pid & 0x1F00) >> 8),
(pid & 0x00FF),
(0 << 6) | (0x30 if (PACKET_SIZE - HEADER_SIZE) > (next - begin) else 0x10) | (continuity_counter & 0x0F),
])
if (((PACKET_SIZE - HEADER_SIZE) > (next - begin))):
packet += bytes([((PACKET_SIZE - HEADER_SIZE) - (next - begin)) - 1])
if (((PACKET_SIZE - HEADER_SIZE) > (next - begin + 1))):
packet += b'\x00'
if (((PACKET_SIZE - HEADER_SIZE) > (next - begin + 2))):
packet += bytes([0xFF] * (((PACKET_SIZE - HEADER_SIZE) - (next - begin)) - 2))
packet += bytes(pes[begin:next])
result.append(bytes(packet))
continuity_counter = (continuity_counter + 1) & 0x0F
begin = next
return result
def CRC32(section: memoryview | bytearray | bytes) -> int:
crc = 0xFFFFFFFF
for byte in section:
for index in range(7, -1, -1):
bit = (byte & (1 << index)) >> index
c = 1 if crc & 0x80000000 else 0
crc <<= 1
if c ^ bit: crc ^= 0x04c11db7
crc &= 0xFFFFFFFF
return crc
def genPAT(pmt_pid: int):
PAT = bytearray([
0x00, # table_id
0b10000000, # section_syntax_indicator, section_length
0b00000000, # section_length
0b00000000, # transport_stream_id
0b00000000, # transport_stream_id
0b00000001, # reserved, version_number, current_next_indicator
0b00000001, # section_number
0b00000001, # last_section_number
#
0b00000000, # program_number
0b00000001, # program_number
(0x1F00 & pmt_pid) >> 8, # program_map_pid
(0x00FF & pmt_pid) >> 0, # program_map_pid
])
PAT[1] = (PAT[1] & 0xF0) | (((len(PAT) - 3 + 4) & 0x0F00) >> 8)
PAT[2] = ((len(PAT) - 3 + 4) & 0x00FF)
PAT_CRC32 = CRC32(PAT)
PAT += int.to_bytes(PAT_CRC32, 4, byteorder='big')
return PAT
def genPMT(pcr_pid: int, av1_pid: int, config: AV1CodecConfigurationBox):
PMT = bytearray([
0x02, # table_id
0b10000000, # section_syntax_indicator, section_length
0b00000000, # section_length
0b00000000, # program_number
0b00000001, # program_number
0b00000001, # reserved, version_number, current_next_indicator
0b00000001, # section_number
0b00000001, # last_section_number
#
(0x1F00 & pcr_pid) >> 8, # pcr_pid
(0x00FF & pcr_pid) >> 0, # pcr_pid
0, # program_info_length (no descriptor loop)
0, # program_info_length (no descriptor loop)
#
])
registration_descriptor = bytes([
0x05,
4,
]) + ('AV01'.encode('ascii'))
av1_video_descriptor = bytes([
0x80,
4,
(config.marker << 7) | (config.version << 0),
(config.seq_profile << 5) | (config.seq_level_idx_0 << 0),
(config.seq_level_tier_0 << 7) | (config.high_bitdepth << 6) | (config.twelve_bit << 5) | (config.monochrome << 4) | (config.chroma_subsampling_x << 3) | (config.chroma_subsampling_y << 2) | (config.chroma_sample_position << 0),
(3 << 6) | (0 << 5) | (config.initial_presentation_delay_present << 4) | (config.initial_presentation_delay_minus_one << 0)
# 3 is hdr_wcg_idc (No indication made regarding HDR/WCG or SDR characteristics of the stream)
])
descriptors = registration_descriptor + av1_video_descriptor
PMT += bytes([
0x06,
(0x1F00 & av1_pid) >> 8, # program_map_pid
(0x00FF & av1_pid) >> 0, # program_map_pid
]) + (int.to_bytes(len(descriptors), 2, byteorder='big')) + descriptors
PMT[1] = (PMT[1] & 0xF0) | (((len(PMT) - 3 + 4) & 0x0F00) >> 8)
PMT[2] = ((len(PMT) - 3 + 4) & 0x00FF)
PMT_CRC32 = CRC32(PMT)
PMT += int.to_bytes(PMT_CRC32, 4, byteorder='big')
return PMT
def genPCR(pcr_pid: int, pcr: int, continuity_counter: int):
PCR = bytearray([
0x47,
(0 << 7) | (1 << 6) | (0 << 5) | ((pcr_pid & 0x1F00) >> 8),
(pcr_pid & 0x00FF),
(0 << 6) | 0x10 | (continuity_counter & 0x0F),
183,
0x10,
(pcr & 0x1FE000000) >> 25,
(pcr & 0x001FE0000) >> 17,
(pcr & 0x00001FE00) >> 9,
(pcr & 0x0000001FE) >> 1,
(pcr & 0x000000001) >> 0,
0,
0,
])
PCR += bytes([0xFF] * (PACKET_SIZE - len(PCR)))
return PCR
def genAV1(av1_in_ts: memoryview | bytearray | bytes, pts: int, dts: int):
AV1 = bytearray([
0, 0, 1,
0xBD,
0, 0,
0b10000000,
0b11000000,
10,
# pts
0b00110001 | ((pts & 0x1C0000000) >> 29),
(pts & 0x3FC00000) >> 22,
0b00000001 | ((pts & 0x003F8000) >> 15),
(pts & 0x00007F80) >> 7,
0b00000001 | ((pts & 0x0000007F) << 1),
# dts
0b00010001 | ((dts & 0x1C0000000) >> 29),
(dts & 0x3FC00000) >> 22,
0b00000001 | ((dts & 0x003F8000) >> 15),
(dts & 0x00007F80) >> 7,
0b00000001 | ((dts & 0x0000007F) << 1)
]) + av1_in_ts
return AV1
if __name__ == "__main__":
parser = argparse.ArgumentParser(description=('AV1 to AnnexB'))
parser.add_argument('-i', '--input', type=argparse.FileType('rb'), nargs='?', default=sys.stdin.buffer)
parser.add_argument('-o', '--output', type=argparse.FileType('wb'), nargs='?', default=sys.stdout.buffer)
args = parser.parse_args()
timescale: int | None = None
config: AV1CodecConfigurationBox | None = None
dts: int = None
PAT_CC, PMT_CC, AV1_CC, PCR_CC = 0, 0, 0, 0
PMT_PID, AV1_PID, PCR_PID = 0x100, 0x101, 0x200
EMIT_SECONDS = 0
while args.input:
size = int.from_bytes(args.input.read(4)) - 8
if size < 0: break
name = args.input.read(4).decode('ascii')
box = args.input.read(size)
if name == 'moov':
mdhd = findBox(box, ['trak', 'mdia', 'mdhd'])
timescale = parseTimescaleByMdhd(mdhd)
stsd = findBox(box, ['trak', 'mdia', 'minf', 'stbl', 'stsd'])
av01 = findBox(stsd[8:], 'av01')
av1C = findBox(av01[4 + 4 + 4 + 12 + 2 + 2 + 4 + 4 + 4 + 2 + 1 + 31 + 2 + 2:], 'av1C')
config = parseAV1CodecConfigurationBox(av1C)
elif name == 'moof':
tfdt = findBox(box, ['traf', 'tfdt'])
dts = parseBaseMediaDecodeTimeByTfdt(tfdt)
elif name == 'mdat':
if dts is None: continue
av1_in_ts = escapeObus(box)
if (dts / timescale) >= EMIT_SECONDS:
for packet in packetizeSection(genPAT(PMT_PID), 0, PAT_CC):
args.output.write(packet)
PAT_CC = (PAT_CC + 1) & 0x0F
for packet in packetizeSection(genPMT(PCR_PID, AV1_PID, config), PMT_PID, PMT_CC):
args.output.write(packet)
PMT_CC = (PMT_CC + 1) & 0x0F
args.output.write(genPCR(PCR_PID, int(EMIT_SECONDS * 90000), PCR_CC))
PCR_CC = (PCR_CC + 1) & 0x0F
EMIT_SECONDS += 0.1
for packet in packetizePES(genAV1(av1_in_ts, dts, dts), AV1_PID, AV1_CC):
args.output.write(packet)
AV1_CC = (AV1_CC + 1) & 0x0F
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment