Skip to content

Instantly share code, notes, and snippets.

@mipsparc
Last active July 22, 2023 18:31
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mipsparc/9f90a7e4fd2644538133fe8e44598753 to your computer and use it in GitHub Desktop.
Save mipsparc/9f90a7e4fd2644538133fe8e44598753 to your computer and use it in GitHub Desktop.
totts.py(MPEG2-TSの中のTDTパケットを取り出して、中に入ってる時刻情報を表示するやつ)のリアルタイムストリーム入力可能にしたバージョン。
#! /usr/bin/env python3
#coding:utf-8
#require bitstring
import bitstring
from sys import argv
import math
import sys
import io
import time
#filename = argv[1]
packet_length = 188
#packet_length = 204
syncbyte = 0x47
count = 0
failcount = 0
summary = {}
first = True
readpos = 0
bio = io.BytesIO()
#quoted from https://gist.github.com/jiffyclub/1294443
def mjd_to_date(mjd):
jd = mjd + 2400000.5
jd = jd + 0.5
F, I = math.modf(jd)
I = int(I)
A = math.trunc((I - 1867216.25)/36524.25)
if I > 2299160:
B = I + 1 + A - math.trunc(A / 4.)
else:
B = I
C = B + 1524
D = math.trunc((C - 122.1) / 365.25)
E = math.trunc(365.25 * D)
G = math.trunc((C - E) / 30.6001)
day = int(C - E + F - math.trunc(30.6001 * G))
if G < 13.5:
month = G - 1
else:
month = G - 13
if month > 2.5:
year = D - 4716
else:
year = D - 4715
return year, month, day
class TS:
def __init__(self, packet):
self.packet = packet
def unpack(self):
try:
self.underflow = False
if packet_length == 188:
#188-5(header+pointer)-4(CRC)=179, 179*8=1432bits
self.sync, self.error, self.pid, self.scramble, self.counter, self.pointer, self.payload, self.crc = \
self.packet.unpack('uint:8, bool, pad:2, uint:13, bits:2, pad:2, uint:4, uint:8, bits:1432, bytes:4')
else:
#TODO long packet
pass
#find sync
if self.sync != syncbyte:
found = self.packet.find('0x47', bytealigned=True) #findには文字列で渡す…
return True
return False
except bitstring.ReadError:
#バッファ枯渇
self.underflow = True
return True
def tdt_unpack(self):
table_id, mjd16, h1, h2, m1, m2, s1, s2 = \
self.payload.unpack('uint:8, pad:16, uint:16, uint:4, uint:4, uint:4, uint:4, uint:4, uint:4')
year, month, day = mjd_to_date(mjd16)
hour = int(str(h1)+str(h2))
minute = int(str(m1)+str(m2))
sec = int(str(s1)+str(s2))
print('TIME {}-{}-{} {}:{}:{} JST'.format(year, month, day, hour, minute, sec))
while True:
#新規パケットをwriteする前に終端にseek(seek位置から書くから)
bio.seek(0, io.SEEK_END)
bio.write(sys.stdin.buffer.read(packet_length))
#読むので前回位置にseek
bio.seek(readpos)
read_data = bio.read(packet_length)
#前回seek位置を次のに合わせる
readpos += packet_length
packet = bitstring.ConstBitStream(bytes=read_data)
ts = TS(packet)
if ts.unpack():
if not ts.underflow:
#syncできなかったら戻る
if ts.packet.bytepos != 0: #このパケット内にsyncがあったら
readpos = readpos - packet_length + ts.packet.bytepos
#print(ts.packet.bytepos)
else:
print('underflow')
readpos -= packet_length
if readpos < 0:
readpos = 0
failcount += 1
else:
#カウンタチェック
#if not first:
# print('PID:{} {}'.format(ts.pid, ts.counter))
#TDT
if ts.pid == 0x14 and not ts.error:
ts.tdt_unpack()
if not ts.error:
try:
summary[ts.pid] += 1
except KeyError:
summary[ts.pid] = 1
first = False
count += 1
print('-------------------------------')
print('PID count:',len(summary))
print('Fail:', failcount)
print('Total:', count)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment