Skip to content

Instantly share code, notes, and snippets.

@ryanmcgrath
Last active February 23, 2017 19:26
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ryanmcgrath/958c602cff133bd7fa0b to your computer and use it in GitHub Desktop.
Save ryanmcgrath/958c602cff133bd7fa0b to your computer and use it in GitHub Desktop.
import os
import struct
import requests
class MP4Parser(object):
"""Parses bytes from an MP4 to determine where the audio offset
is located. Optionally allows constructing a new MP4 with the existing
MP4 atoms along with an extracted segment (e.g, 4 - 7 seconds).
"""
def __init__(self, url):
self.url = url
self.media_offset = 0
self.raw_atom_data = b''
self.mdat_wide_box = b''
self.__currReadPosition = 0
self.atoms = {}
self.begin = 0
self.end = 0
# These are atom names commonly used as container formats - i.e, atoms with
# these names will have child atoms with information.
#
# Note: you might be _inclined_ to put meta in here, but it honestly... should
# be parsed on its own. You can parse out all the atoms, and then handle parsing
# metadata yourself - e.g, parser.atoms['meta']
#
# http://xhelmboyx.tripod.com/formats/mp4-layout.txt is helpful. :)
self.containers = [
'moov', 'mdia', 'minf', 'trak', 'udta', 'ilst', 'mdra',
'cmov', 'rmra', 'rmda', 'clip', 'matt', 'edts', 'minf',
'dinf', 'stbl', 'sinf', 'udta',
# These two relate to 3GPP playback - 'bitr' is a few bits into 'd263' and requires custom
# handling...
#'d263', 'bitr',
]
if url[:3] == 'htt':
self.request = requests.get(self.url, stream = True)
self.filesize = int(self.request.headers['Content-Length'])
self.socket = self.request.raw
else:
import os.path
self.request = open(self.url, 'rb')
self.filesize = os.path.getsize(self.url)
self.socket = self.request # looploop
def parse_atoms(self, end_bytes_point):
"""Internal method for breaking down and parsing a range of bytes. Possibly recursive
in what it returns if an atom is deemed to be a "container", in that it might have
some multiple nested segments.
This method is larger than I'd like because this is a proof-of-concept and I don't care
about how clean it is at the moment. Overlook the glaring PEP8 issues and such. ;P
"""
while self.__currReadPosition < end_bytes_point:
_bytes = self.socket.read(4)
atomtype = self.socket.read(4)
atomsize = struct.unpack("!i", _bytes)[0]
if atomtype != 'mdat':
self.raw_atom_data += _bytes
self.raw_atom_data += atomtype
self.__currReadPosition += 8
atomdata = None
if atomtype in self.containers:
atomdata = {}
end_bytes = self.__currReadPosition + atomsize - 8
# The ability to seek() on a "socket" would be so welcome here.
for atom_type, atom_size, atom_data in self.parse_atoms(end_bytes):
atomdata[atom_type] = {'size': atom_size, 'data': atom_data}
elif atomtype == 'mvhd':
print atomtype
print atomsize
version = self.socket.read(1)
self.raw_atom_data += version
print 'Version: %s' % struct.unpack("!c", version)
self.raw_atom_data += self.socket.read(11)
time_scale = self.socket.read(4)
self.raw_atom_data += time_scale
self.time_scale = int(struct.unpack("!I", time_scale)[0])
print 'Time Scale: %i' % self.time_scale
duration = self.socket.read(4)
self.raw_atom_data += struct.pack("!I", (self.end - self.begin) * self.time_scale)
self.raw_atom_data += self.socket.read(atomsize - 28)
self.__currReadPosition += atomsize - 8 + 20
elif atomtype == 'stts':
print atomtype
print atomsize
versionflags = self.socket.read(4)
print 'Version & Flags: %i' % struct.unpack("!I", versionflags)[0]
self.raw_atom_data += versionflags
no_of_times = self.socket.read(4)
print 'No of Times: %i' % struct.unpack("!I", no_of_times)[0]
self.raw_atom_data += no_of_times
frame_count = self.socket.read(4)
print 'Frame Count: %i' % struct.unpack("!I", frame_count)[0]
self.raw_atom_data += frame_count
duration = self.socket.read(4)
print 'Duration: %i' % struct.unpack("!I", duration)[0]
self.raw_atom_data += duration
self.raw_atom_data += self.socket.read(atomsize - 24)
self.__currReadPosition += atomsize - 8 + 16
#elif atomtype == 'tkhd':
# print atomtype
# print atomsize
#
# header = self.socket.read(24)
#
# duration = self.socket.read(4)
# print struct.unpack("!I", duration)[0]
# self.raw_atom_data += struct.pack("!I", (self.end - self.begin) * self.time_scale)
# self.raw_atom_data += self.socket.read(atomsize - 36)
# self.__currReadPosition += atomsize - 8 + 28
elif atomtype == 'mdia':
print atomtype
print atomsize
header = self.socket.read(20)
time_scale = self.socket.read(4)
self.raw_atom_data += time_scale
time_scale = int(struct.unpack("!I", time_scale)[0])
print time_scale
duration = self.socket.read(4)
self.raw_atom_data += struct.pack("!I", (self.end - self.begin) * time_scale)
self.raw_atom_data += self.socket.read(atomsize - 36)
self.__currReadPosition += atomsize - 8 + 28
else:
if atomtype != 'mdat':
atomdata = self.socket.read(atomsize - 8)
self.raw_atom_data += atomdata
else:
self.mdat_wide_box += self.socket.read(8)
data = self.socket.read(atomsize - 16)
atomdata = data
self.__currReadPosition += atomsize - 8
yield atomtype, atomsize, atomdata
def parse(self):
"""Walk the beginning bytes of the MP4 by reading directly
from the socket.
Builds a dict with the appropriate data (atom type, atom size, and atom data).
Atom data could be nested Dictionaries or Lists; pretty-printing the return values from this
method is probably a good idea.
"""
for atomtype, atomsize, atomdata in self.parse_atoms(self.filesize):
self.atoms[atomtype] = {'size': atomsize, 'data': atomdata}
if atomtype == 'mdat':
self.media_offset = self.__currReadPosition - atomsize
break
def extract_from_range(self, to_filename, begin, end):
"""Attempts to request a specific portion of the file in a range of seconds,
and then combines it with our earlier headers to make a valid MP4.
"""
import sys
content = ''
if self.url[:3] == 'htt':
content = requests.get(self.url, headers = {
'Range': 'bytes=%i-%i' % (begin, end)
}).content
else:
self.request.seek(begin)
content = self.request.read(end - begin)
self.request.close()
f = open(to_filename, 'wb')
f.write(self.raw_atom_data)
# Now we need to write the size of the content (4 bytes), the name of the atom ('mdat', 4 bytes)
# and then write the wide box we stored earlier (8 bytes), and then write in our data (content).
f.write(struct.pack("!i", sys.getsizeof(content))) # should be 4 bytes
f.write(b'mdat') # works...?
f.write(self.mdat_wide_box)
f.write(content)
f.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment