-
-
Save ryanmcgrath/958c602cff133bd7fa0b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import struct | |
import requests | |
class MP4Parser(object): | |
"""Parses bytes from an MP4 to determine where the audio offset | |
is located. Optionally allows constructing a new MP4 with the existing | |
MP4 atoms along with an extracted segment (e.g, 4 - 7 seconds). | |
""" | |
def __init__(self, url): | |
self.url = url | |
self.media_offset = 0 | |
self.raw_atom_data = b'' | |
self.mdat_wide_box = b'' | |
self.__currReadPosition = 0 | |
self.atoms = {} | |
self.begin = 0 | |
self.end = 0 | |
# These are atom names commonly used as container formats - i.e, atoms with | |
# these names will have child atoms with information. | |
# | |
# Note: you might be _inclined_ to put meta in here, but it honestly... should | |
# be parsed on its own. You can parse out all the atoms, and then handle parsing | |
# metadata yourself - e.g, parser.atoms['meta'] | |
# | |
# http://xhelmboyx.tripod.com/formats/mp4-layout.txt is helpful. :) | |
self.containers = [ | |
'moov', 'mdia', 'minf', 'trak', 'udta', 'ilst', 'mdra', | |
'cmov', 'rmra', 'rmda', 'clip', 'matt', 'edts', 'minf', | |
'dinf', 'stbl', 'sinf', 'udta', | |
# These two relate to 3GPP playback - 'bitr' is a few bits into 'd263' and requires custom | |
# handling... | |
#'d263', 'bitr', | |
] | |
if url[:3] == 'htt': | |
self.request = requests.get(self.url, stream = True) | |
self.filesize = int(self.request.headers['Content-Length']) | |
self.socket = self.request.raw | |
else: | |
import os.path | |
self.request = open(self.url, 'rb') | |
self.filesize = os.path.getsize(self.url) | |
self.socket = self.request # looploop | |
def parse_atoms(self, end_bytes_point): | |
"""Internal method for breaking down and parsing a range of bytes. Possibly recursive | |
in what it returns if an atom is deemed to be a "container", in that it might have | |
some multiple nested segments. | |
This method is larger than I'd like because this is a proof-of-concept and I don't care | |
about how clean it is at the moment. Overlook the glaring PEP8 issues and such. ;P | |
""" | |
while self.__currReadPosition < end_bytes_point: | |
_bytes = self.socket.read(4) | |
atomtype = self.socket.read(4) | |
atomsize = struct.unpack("!i", _bytes)[0] | |
if atomtype != 'mdat': | |
self.raw_atom_data += _bytes | |
self.raw_atom_data += atomtype | |
self.__currReadPosition += 8 | |
atomdata = None | |
if atomtype in self.containers: | |
atomdata = {} | |
end_bytes = self.__currReadPosition + atomsize - 8 | |
# The ability to seek() on a "socket" would be so welcome here. | |
for atom_type, atom_size, atom_data in self.parse_atoms(end_bytes): | |
atomdata[atom_type] = {'size': atom_size, 'data': atom_data} | |
elif atomtype == 'mvhd': | |
print atomtype | |
print atomsize | |
version = self.socket.read(1) | |
self.raw_atom_data += version | |
print 'Version: %s' % struct.unpack("!c", version) | |
self.raw_atom_data += self.socket.read(11) | |
time_scale = self.socket.read(4) | |
self.raw_atom_data += time_scale | |
self.time_scale = int(struct.unpack("!I", time_scale)[0]) | |
print 'Time Scale: %i' % self.time_scale | |
duration = self.socket.read(4) | |
self.raw_atom_data += struct.pack("!I", (self.end - self.begin) * self.time_scale) | |
self.raw_atom_data += self.socket.read(atomsize - 28) | |
self.__currReadPosition += atomsize - 8 + 20 | |
elif atomtype == 'stts': | |
print atomtype | |
print atomsize | |
versionflags = self.socket.read(4) | |
print 'Version & Flags: %i' % struct.unpack("!I", versionflags)[0] | |
self.raw_atom_data += versionflags | |
no_of_times = self.socket.read(4) | |
print 'No of Times: %i' % struct.unpack("!I", no_of_times)[0] | |
self.raw_atom_data += no_of_times | |
frame_count = self.socket.read(4) | |
print 'Frame Count: %i' % struct.unpack("!I", frame_count)[0] | |
self.raw_atom_data += frame_count | |
duration = self.socket.read(4) | |
print 'Duration: %i' % struct.unpack("!I", duration)[0] | |
self.raw_atom_data += duration | |
self.raw_atom_data += self.socket.read(atomsize - 24) | |
self.__currReadPosition += atomsize - 8 + 16 | |
#elif atomtype == 'tkhd': | |
# print atomtype | |
# print atomsize | |
# | |
# header = self.socket.read(24) | |
# | |
# duration = self.socket.read(4) | |
# print struct.unpack("!I", duration)[0] | |
# self.raw_atom_data += struct.pack("!I", (self.end - self.begin) * self.time_scale) | |
# self.raw_atom_data += self.socket.read(atomsize - 36) | |
# self.__currReadPosition += atomsize - 8 + 28 | |
elif atomtype == 'mdia': | |
print atomtype | |
print atomsize | |
header = self.socket.read(20) | |
time_scale = self.socket.read(4) | |
self.raw_atom_data += time_scale | |
time_scale = int(struct.unpack("!I", time_scale)[0]) | |
print time_scale | |
duration = self.socket.read(4) | |
self.raw_atom_data += struct.pack("!I", (self.end - self.begin) * time_scale) | |
self.raw_atom_data += self.socket.read(atomsize - 36) | |
self.__currReadPosition += atomsize - 8 + 28 | |
else: | |
if atomtype != 'mdat': | |
atomdata = self.socket.read(atomsize - 8) | |
self.raw_atom_data += atomdata | |
else: | |
self.mdat_wide_box += self.socket.read(8) | |
data = self.socket.read(atomsize - 16) | |
atomdata = data | |
self.__currReadPosition += atomsize - 8 | |
yield atomtype, atomsize, atomdata | |
def parse(self): | |
"""Walk the beginning bytes of the MP4 by reading directly | |
from the socket. | |
Builds a dict with the appropriate data (atom type, atom size, and atom data). | |
Atom data could be nested Dictionaries or Lists; pretty-printing the return values from this | |
method is probably a good idea. | |
""" | |
for atomtype, atomsize, atomdata in self.parse_atoms(self.filesize): | |
self.atoms[atomtype] = {'size': atomsize, 'data': atomdata} | |
if atomtype == 'mdat': | |
self.media_offset = self.__currReadPosition - atomsize | |
break | |
def extract_from_range(self, to_filename, begin, end): | |
"""Attempts to request a specific portion of the file in a range of seconds, | |
and then combines it with our earlier headers to make a valid MP4. | |
""" | |
import sys | |
content = '' | |
if self.url[:3] == 'htt': | |
content = requests.get(self.url, headers = { | |
'Range': 'bytes=%i-%i' % (begin, end) | |
}).content | |
else: | |
self.request.seek(begin) | |
content = self.request.read(end - begin) | |
self.request.close() | |
f = open(to_filename, 'wb') | |
f.write(self.raw_atom_data) | |
# Now we need to write the size of the content (4 bytes), the name of the atom ('mdat', 4 bytes) | |
# and then write the wide box we stored earlier (8 bytes), and then write in our data (content). | |
f.write(struct.pack("!i", sys.getsizeof(content))) # should be 4 bytes | |
f.write(b'mdat') # works...? | |
f.write(self.mdat_wide_box) | |
f.write(content) | |
f.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment