Skip to content

Instantly share code, notes, and snippets.

@poizan42
Last active January 1, 2016 07:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save poizan42/8114992 to your computer and use it in GitHub Desktop.
Save poizan42/8114992 to your computer and use it in GitHub Desktop.
Read video data from saved combined traffic dumps of a youtube video, such as those created by `tshark -z follow,tcp,raw,...`
#!/usr/bin/python
import sys
import os
from urlparse import urlparse,parse_qs
def readChunk(f):
get = f.readline()
if get == '':
raise EOFError()
if get[:3] != 'GET':
raise Exception('Parse error, GET expected, got ' + get)
(x, url, x) = get.strip().split(' ')
parsedUrl = urlparse(url)
skip = parsedUrl.path != '/videoplayback'
if skip:
print "Not video playback, skipping chunk. Url is: %s" % url
chunkLength = 0
else:
params = parse_qs(parsedUrl.query)
itag = int(params['itag'][0])
(start,end) = [int(i) for i in params['range'][0].split('-')]
chunkLength = end-start+1
while f.readline().strip() != '':
pass
respStatus = f.readline().strip();
if not skip and respStatus != 'HTTP/1.1 200 OK':
raise Exception('Wrong status: ' + respStatus)
while True:
line = f.readline().strip()
if line == '':
break
header = [s.strip() for s in line.split(':', 1)]
if header[0] == 'Content-Length':
contentLength = int(header[1])
srcStart = f.tell()
if not skip:
print 'Chunk at 0x%.8X to 0x%.8X-0x%.8X with itag %d (length %d)' % (srcStart, start, end, itag, chunkLength)
if contentLength != chunkLength:
if not skip:
print 'WARNING: Wrong Content-Length, expected %d got %d' % (chunkLength, contentLength)
chunkLength = contentLength
searchString = 'GET /'
data = f.read(chunkLength)
savedPos = f.tell()
testNext = f.read(len(searchString))
f.seek(savedPos)
bad = testNext != searchString and testNext != ''
if bad:
print 'WARNING: Incorrect chunk length! Searching for real end.'
f.seek(-chunkLength, 1)
fc = 0
data = ''
while True:
c = f.read(1)
if c == searchString[fc]:
if fc == len(searchString) - 1:
f.seek(-len(searchString), 1)
break
fc += 1
else:
if fc > 0:
data += searchString[:fc]
fc = 0
data += c
if c == '':
break
print 'Real end found at 0x%.8X' % f.tell()
if skip:
return None
else:
realLength = f.tell() - srcStart
return {'itag': itag, 'start': start, 'end': start + realLength - 1, 'bad': bad, 'data': data }
(outTemplateName, outTemplateExt) = os.path.splitext(sys.argv[-1])
outFiles = {}
for fileName in sys.argv[1:-1]:
print '\nLoading video data from file ' + fileName
f = open(fileName, 'rb')
while True:
try:
chunk = readChunk(f)
if chunk == None:
continue
try:
out = outFiles[chunk['itag']]
except KeyError:
out = open('%s-%d%s' % (outTemplateName, chunk['itag'], outTemplateExt), 'wb')
outFiles[chunk['itag']] = out
out.seek(chunk['start'])
out.write(chunk['data'])
except EOFError:
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment