Skip to content

Instantly share code, notes, and snippets.

@papaver
Last active December 28, 2015 09:19
Show Gist options
  • Save papaver/7478629 to your computer and use it in GitHub Desktop.
Save papaver/7478629 to your computer and use it in GitHub Desktop.
MPEG-1 / MPEG-2 Audio Layer III (.mp3) parser
#------------------------------------------------------------------------------
# mp3.py - MPEG-1 / MPEG-2 Audio Layer III (.mp3) parser
#------------------------------------------------------------------------------
import collections
import itertools
import os.path
import struct
#------------------------------------------------------------------------------
# defines
#------------------------------------------------------------------------------
# mpeg audio versions
kMpeg1 = 0b11 # ISO/IEC 11172-3
kMpeg2 = 0b10 # ISO/IEC 13818-3
kMpeg25 = 0b00 # unofficial extension of mpeg2
kMpegReserved = 0b01
# audio layers
kLayerI = 0b11
kLayerII = 0b10
kLayerIII = 0b01
kLayerReserved = 0b00
# bitrate definitions
kBitrateFree = 0b0000
kBitrateReserved = 0b1111
# channel modes
kChannelModeStereo = 0b00
kChannelModeJointStereo = 0b01 # stereo
kChannelModeDual = 0b10 # two mono channels
kChannelModeMono = 0b11
# emphasis
kEmphasisNone = 0b00
kEmphasis5015Ms = 0b01
kEmphasisCCITJ17 = 0b11
kEmphasisReserved = 0b10
#------------------------------------------------------------------------------
# data tables
#------------------------------------------------------------------------------
# bitrate table, values in kilobits
sBitrateLookupTable = {
kMpeg1 : {
kLayerI : [32, 64, 96, 128, 160, 192, 224, 256, 288, 320, 352, 384, 416, 448],
kLayerII : [32, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320, 384],
kLayerIII : [32, 40, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320]
},
kMpeg2 : {
kLayerI : [32, 48, 56, 64, 80, 96, 112, 128, 144, 160, 176, 192, 224, 256],
kLayerII : [ 8, 16, 24, 32, 40, 48, 56, 64, 80, 96, 112, 128, 144, 160]
}
}
# sampling rate table, values in hertz
sSamplingRateLookupTable = {
kMpeg1 : [44100, 48000, 32000],
kMpeg2 : [22050, 24000, 16000],
kMpeg25 : [11025, 12000, 8000]
}
# samples per frame
sSamplesPerFrameLookupTable = {
kMpeg1 : { kLayerI : 384, kLayerII : 1152, kLayerIII : 1152 },
kMpeg2 : { kLayerI : 384, kLayerII : 1152, kLayerIII : 576 },
kMpeg25 : { kLayerI : 384, kLayerII : 1152, kLayerIII : 576 }
}
# side information size, values in bytes
sSideInfoSizeLookupTable = {
kMpeg1 : { kChannelModeStereo : 32, kChannelModeMono : 17 },
kMpeg2 : { kChannelModeStereo : 17, kChannelModeMono : 9 },
}
#------------------------------------------------------------------------------
# structs
#------------------------------------------------------------------------------
ID3v2Version = collections.namedtuple('ID3v2Version', ['major', 'revision'])
ID3v2Frame = collections.namedtuple('ID3v2Frame', ['tag', 'size', 'flags', 'content'])
#------------------------------------------------------------------------------
# helper methods
#------------------------------------------------------------------------------
def _versionToStr(version):
if version == kMpeg1: return 'Mpeg1'
elif version == kMpeg2: return 'Mpeg2'
elif version == kMpeg25: return 'Mpeg2.5'
else: raise ValueError("Invalid audio version.")
#------------------------------------------------------------------------------
def _layerToStr(layer):
if layer == kLayerI: return 'LayerI'
elif layer == kLayerII: return 'LayerII'
elif layer == kLayerIII: return 'LayerIII'
else: raise ValueError("Invalid audio layer.")
#------------------------------------------------------------------------------
def _bytesToStr(bytes_, wide=16):
chunk = lambda input_, size: map(None, *([iter(input_)] * size))
bytes_ += '\x00' * (wide - (len(bytes_) % wide))
return '\n'.join( \
map(lambda l: ' '.join( \
map(lambda s: ''.join( \
[x.encode('hex') for x in s]), chunk(l, 2))), chunk(bytes_, wide)))
#------------------------------------------------------------------------------
# Mp3FrameHeader
#------------------------------------------------------------------------------
class Mp3FrameHeader(object):
"""Frame header consists of information about frame (bitrate, stereo,
mode, etc) and because of that frames are independent items. Each of them
can have its own characteristic. It is used eg. in Variable Bitate files,
where each frame can have different bitrate.
"""
#--------------------------------------------------------------------------
# defines
#--------------------------------------------------------------------------
kSyncWord = 0xffe00000
# header bit masks
kSyncWordMask = 0xffe00000
kAudioVersionMask = 0x00180000
kLayerMask = 0x00060000
kProtectionMask = 0x00010000
kBitrateMask = 0x0000f000
kSamplingRateMask = 0x00000c00
kPaddingMask = 0x00000200
kPrivateMask = 0x00000100
kChannelModeMask = 0x000000c0
kModeExtensionMask = 0x00000030
kCopyrightMask = 0x00000008
kOriginalMask = 0x00000004
kEmphasisMask = 0x00000003
kHeaderLength = 4
#--------------------------------------------------------------------------
# statics
#--------------------------------------------------------------------------
@classmethod
def hasSyncWord(cls, bytes_):
"""Check if the bytes contains the sync word identifier.
"""
# convert into an integer if string buffer passed in
if isinstance(bytes_, str):
return cls.hasSyncWord(struct.unpack('>I', bytes_)[0])
elif isinstance(bytes_, int):
return (bytes_ & cls.kSyncWordMask) == cls.kSyncWord
else:
raise ValueError("Invalid bytes type '%s', should be str[4] or int.")
#--------------------------------------------------------------------------
# object
#--------------------------------------------------------------------------
def __init__(self, bytes_):
"""Bytes can either be a 4 char string or an integer.
"""
super(Mp3FrameHeader, self).__init__()
# validate the header
if not Mp3FrameHeader.hasSyncWord(bytes_):
raise ValueError("Frame header '%s' is invalid." % bytes_)
# store as int, reference bits when header info requested
self._header = \
bytes_ if isinstance(bytes_, int) else struct.unpack('>I', bytes_)[0]
#--------------------------------------------------------------------------
def __repr__(self):
"""Identify frame by audio version, layer, bitrate, sampling rate.
"""
bitrate = self.bitrate
return "MpegFrameHeader(%(version)s, %(layer)s, %(bitrate)dkbps @ %(samplingrate)dhz)" % {
'version' : _versionToStr(self.version),
'layer' : _layerToStr(self.layer),
'bitrate' : '?' if bitrate == None else (bitrate / 1000),
'samplingrate' : self.samplingRate
}
#--------------------------------------------------------------------------
# properties
#--------------------------------------------------------------------------
@property
def version(self):
"""MPEG audio version, either MPEG-1, MPEG-2, or MPEG-2.5. An exception
will be raised if audio version is set to reserved.
"""
# validate value is not set to reserved
value = self._getValue(Mp3FrameHeader.kAudioVersionMask)
if value == kMpegReserved:
raise ValueError("Mpeg audio version is set to reserved value.")
return value
#--------------------------------------------------------------------------
@property
def layer(self):
"""MPEG audio layer, either Layer I, Layer II, or Layer III. An
exception will be raised if layer is set to reserved.
"""
# validate value is not set to reserved
value = self._getValue(Mp3FrameHeader.kLayerMask)
if value == kLayerReserved:
raise ValueError("Mpeg layer is set to reserved value.")
return value
#--------------------------------------------------------------------------
@property
def protection(self):
"""Protection bit signifing if a 16 bit CRC is present right after the
header. Returns true if CRC is present.
"""
return not self._bitSet(Mp3FrameHeader.kProtectionMask)
#--------------------------------------------------------------------------
@property
def bitrate(self):
"""The bitrates are always displayed in kilobits per second. The
bitrate index 1111 is reserved and should never be used. In the MPEG
audio standard there is a free format described. This free format means
that the file is encoded with a constant bitrate, which is not one of
the predefined bitrates. None is returned if bitrate is free, an
exception will be thrown if the bitrate is set to reserved, else the
rate will be returned in bits per second.
"""
# validate value is not set to reserved
value = self._getValue(Mp3FrameHeader.kBitrateMask)
if value == kBitrateReserved:
raise ValueError("Mpeg bitrate is set to reserved value.")
# check if bitrate is set to free
if value == kBitrateFree:
return None
# grab audio version, mpeg2.5 shares mpeg2 bitrates
version = self.version
if version == kMpeg25:
version = kMpeg2
# grab audio layer, layerII and layerIII share bitrates in mpeg2|2.5
layer = self.layer
if (version == kMpeg2) and (layer == kLayerIII):
layer = kLayerII
# lookup bitrate in table, index off by one because of free setting
return sBitrateLookupTable[version][layer][value - 1] * 1000
#--------------------------------------------------------------------------
@property
def samplingRate(self):
"""The sampling rate specifies how many samples per second are
recorded. Each MPEG version can handle different sampling rates.
"""
# validate value is not set to reserved
value = self._getValue(Mp3FrameHeader.kSamplingRateMask)
if value == kBitrateReserved:
raise ValueError("Mpeg sampling rate is set to reserved value.")
# grab audio version
version = self.version
# lookup sampling rate in table
return sSamplingRateLookupTable[version][value]
#--------------------------------------------------------------------------
@property
def padding(self):
"""If it is set, data is padded with with one slot.
"""
return self._bitSet(Mp3FrameHeader.kPaddingMask)
#--------------------------------------------------------------------------
@property
def private(self):
"""Private bit (only informative).
"""
return self._bitSet(Mp3FrameHeader.kPrivateMask)
#--------------------------------------------------------------------------
@property
def channelMode(self):
"""Channel mode. Dual channel files are made of two independent mono
channels. Each one uses exactly half the bitrate of the file. Most
decoders output them as stereo, but it might not always be the case.
"""
return self._getValue(Mp3FrameHeader.kChannelModeMask)
#--------------------------------------------------------------------------
@property
def modeExtension(self):
"""Mode extension (only used in joint stereo).
"""
raise NotImplementedError("Property 'modeExtension' not implemented.")
#--------------------------------------------------------------------------
@property
def copyright(self):
"""Copyright on media (only informative). Returns true if media is
copyrighted.
"""
return self._bitSet(Mp3FrameHeader.kCopyrightMask)
#--------------------------------------------------------------------------
@property
def original(self):
"""Original or copy of media (only informative). Returns true if
original media.
"""
return self._bitSet(Mp3FrameHeader.kOriginalMask)
#--------------------------------------------------------------------------
@property
def emphasis(self):
"""The emphasis indication is here to tell the decoder that the file
must be de-emphasized, that means the decoder must 're-equalize' the
sound after a Dolby-like noise suppression. It is rarely used. An
exception will be raised if emphasis set to reserved.
"""
# validate value is not set to reserved
value = self._getValue(Mp3FrameHeader.kEmphasisMask)
if value == kEmphasisReserved:
raise ValueError("Mpeg emphasis is set to reserved value.")
return value
#--------------------------------------------------------------------------
# methods
#--------------------------------------------------------------------------
def slotSize(self):
"""Calculate the slot size based on the audio layer, value in bytes.
"""
return 4 if self.layer == kLayerI else 1
#--------------------------------------------------------------------------
def samplesPerFrame(self):
"""Sample count in a frame is dependent on audio version and layer.
"""
version = self.version
layer = self.layer
samples = sSamplesPerFrameLookupTable[version][layer]
return samples
#--------------------------------------------------------------------------
def frameSize(self):
"""Calculate the frame size (bytes) based on bit and sampling rate.
"""
bitrate = self.bitrate
samplingRate = self.samplingRate
samples = self.samplesPerFrame()
slotSize = self.slotSize()
padding = 1 if self.padding else 0
# calculate the size based on samples per frame in terms of slots
slots = ((samples * bitrate) / (slotSize * 8.0) / samplingRate) + padding
# convert into bytes
frameSize = int(slots) * slotSize
return frameSize
#--------------------------------------------------------------------------
def duration(self):
"""Calculate duration of the frame in milliseconds.
"""
samples = self.samplesPerFrame()
samplingRate = self.samplingRate
return samples / (samplingRate / 1000.0)
#--------------------------------------------------------------------------
def sideInformationSize(self):
"""Size of the side information, general decoding instructions for the
frame, present in LayerIII frames.
"""
# mpeg2 and mpeg2.5 are the same
version = self.version
if version == kMpeg25:
version = kMpeg2
# only really care if there are single or dual channels
channelMode = self.channelMode
if channelMode != kChannelModeMono:
channelMode = kChannelModeStereo
return sSideInfoSizeLookupTable[version][channelMode]
#--------------------------------------------------------------------------
# internal methods
#--------------------------------------------------------------------------
def _getValue(self, mask):
"""Extract the value from the header selected by the mask.
"""
# grab the masked value from the header
value = self._header & mask
# shift the value to correct its power
while not (mask & 0b1):
mask >>= 1
value >>= 1
return value
#--------------------------------------------------------------------------
def _bitSet(self, mask):
"""Returns true if bit in mask is set, else false.
"""
return (self._header & mask) == mask
#------------------------------------------------------------------------------
# ID3v1Tag
#------------------------------------------------------------------------------
class ID3v1Tag(object):
"""The tag is used to describe the MPEG Audio file. It contains information
about artist, title, album, publishing year and genre. There is some extra
space for comments. It is exactly 128 bytes long and is located at very end
of the audio data.
"""
#--------------------------------------------------------------------------
# defines
#--------------------------------------------------------------------------
kTagId = 'TAG'
kTagLength = 128
#--------------------------------------------------------------------------
# statics
#--------------------------------------------------------------------------
@classmethod
def hasTagId(cls, buffer_):
"""Check if the buffer starts with the proper tag identifier.
"""
return buffer_[0:3] == cls.kTagId
#--------------------------------------------------------------------------
# object
#--------------------------------------------------------------------------
def __init__(self, buffer_):
super(ID3v1Tag, self).__init__()
# verify the buffer has the correct tag
if not ID3v1Tag.hasTagId(buffer_):
raise ValueError("Invalid tag id '%s' in buffer, expecting '%s'." % \
(buffer_[0:3], ID3v1Tag.kTagId))
# verify the length is correct
length = len(buffer_)
if length != ID3v1Tag.kTagLength:
raise ValueError("Invalid tag buffer size %d, expecting %d." % \
(length, ID3v1Tag.kTagLength))
# looks like valid tag
self._buffer = buffer_
#--------------------------------------------------------------------------
# properties
#--------------------------------------------------------------------------
@property
def identifier(self):
return self._buffer[0:3]
#--------------------------------------------------------------------------
@property
def title(self):
return self._buffer[3:33]
#--------------------------------------------------------------------------
@property
def artist(self):
return self._buffer[33:63]
#--------------------------------------------------------------------------
@property
def album(self):
return self._buffer[63:93]
#--------------------------------------------------------------------------
@property
def year(self):
return self._buffer[93:97]
#--------------------------------------------------------------------------
@property
def comment(self):
return self._buffer[97:127]
#--------------------------------------------------------------------------
@property
def genre(self):
return struct.unpack('>B', self._buffer[127])[0]
#------------------------------------------------------------------------------
# ID3v2Tag
#------------------------------------------------------------------------------
class ID3v2Tag(object):
"""The ID3v2 offers a flexible way of storing information about an audio
file within itself to determine its origin and contents. The information
may be technical information, such as equalisation curves, as well as
related meta information, such as title, performer, copyright etc.
"""
#--------------------------------------------------------------------------
# defines
#--------------------------------------------------------------------------
kTagId = 'ID3'
kTagHeaderLength = 10
# options present in flags field
kFlagUnsynchronisation = 0x70
kFlagExtendedHeader = 0x40
kFlagExperimentalIndicator = 0x20
#--------------------------------------------------------------------------
# statics
#--------------------------------------------------------------------------
@classmethod
def hasTagId(cls, buffer_):
"""Check if the buffer starts with the proper tag identifier.
"""
return buffer_[0:3] == cls.kTagId
#--------------------------------------------------------------------------
@classmethod
def parseSize(cls, buffer_):
"""Retrieve the size of the entire tag from the buffer. The size is
encoded with four bytes where the most significant bit (bit 7) is set
to zero in every byte, making a total of 28 bits. Size is the size of
the complete tag after unsychronisation, including padding, excluding
the header but not excluding the extended header.
"""
bytes_ = zip(struct.unpack('>BBBB', buffer_[6:10]), range(21, -1, -7))
size = reduce(lambda x, (v,s): ((v & 0x7f) << s) | x, bytes_, 0)
return cls.kTagHeaderLength + size
#--------------------------------------------------------------------------
# object
#--------------------------------------------------------------------------
def __init__(self, buffer_):
super(ID3v2Tag, self).__init__()
# verify the buffer has the correct tag
if not ID3v2Tag.hasTagId(buffer_):
raise ValueError("Invalid tag id '%s' in buffer, expecting '%s'." % \
(buffer_[0:3], ID3v2Tag.kTagId))
# verify the length is correct
tagLength = len(buffer_)
bufferLength = ID3v2Tag.parseSize(buffer_)
if bufferLength != tagLength:
raise ValueError("Invalid tag buffer size %d, expecting %d." % \
(bufferLength, tagLength))
# parse tag frames
self._frames = self._parseTagFrames(buffer_[ID3v2Tag.kTagHeaderLength:])
# looks like valid tag
self._buffer = buffer_
#--------------------------------------------------------------------------
def _parseTagFrames(self, buffer_):
"""Parse all of the tag frames from the buffer.
"""
frames = []
# grouper function to help slice up the header, pretty much returns
# the same value x times in a row as defined by the list z
def sliceby(z):
for i in itertools.chain(*map(lambda (x,y): [y] * x, zip(z, range(len(z))))):
yield i
# quit parsing if there isn't enough buffer to read in a tag header
index = 0
while index < len(buffer_) - ID3v2Tag.kTagHeaderLength:
# grab the header info
start = index
end = index + ID3v2Tag.kTagHeaderLength
header = buffer_[start:end]
# split into respective segments
slicer = lambda _, i=sliceby([4,4,2]): next(i)
tag, size, flags = [''.join(g) for _, g in itertools.groupby(header, slicer)]
# bail out if tag is invalid
if tag == '\x00' * 4:
break
# convert into integer data
size, = struct.unpack('>I', size)
flags, = struct.unpack('>H', flags)
# save frame
start = end
end = end + size
frame = ID3v2Frame(tag, size, flags, buffer_[start:end])
frames.append(frame)
# update index
index = end
return frames
#--------------------------------------------------------------------------
# properties
#--------------------------------------------------------------------------
@property
def identifier(self):
return self._buffer[0:3]
#--------------------------------------------------------------------------
@property
def version(self):
return ID3v2Version(*struct.unpack('>BB', self._buffer[3:5]))
#--------------------------------------------------------------------------
@property
def flags(self):
return struct.unpack('>B', self._buffer[5])[0]
#--------------------------------------------------------------------------
@property
def size(self):
return ID3v2Tag.parseSize(self._buffer)
#--------------------------------------------------------------------------
@property
def frames(self):
return self._frames
#------------------------------------------------------------------------------
# XingHeader
#------------------------------------------------------------------------------
class XingHeader(object):
"""This header is often (but unfortunately not always) added to files which
are encoded with variable bitrate mode.
"""
#--------------------------------------------------------------------------
# defines
#--------------------------------------------------------------------------
kHeaderIdXing = 'Xing'
kHeaderIdInfo = 'Info'
# options present in flags field
kFlagFrames = 0x0001
kFlagBytes = 0x0002
kFlagTableOfContents = 0x0004
kFlagQualityIndicator = 0x0008
# size of fields if present
sFieldSizes = {
kFlagFrames : 4,
kFlagBytes : 4,
kFlagTableOfContents : 100,
kFlagQualityIndicator : 4
}
# size of headers with mandatory fields
kHeaderMinLength = 8
#--------------------------------------------------------------------------
# statics
#--------------------------------------------------------------------------
@classmethod
def hasHeaderId(cls, buffer_):
"""Check if the buffer starts with the proper identifier.
"""
return buffer_[0:4] in [cls.kHeaderIdXing, cls.kHeaderIdInfo]
#--------------------------------------------------------------------------
@classmethod
def calculateOffset(cls, buffer_, flag=None):
"""Calculate the offset of the flags field by checking which fields
are present in the header. If flag is not provided size of header
will be returned.
"""
# set default flag if not provided
if flag == None:
flag = 0xffff
# grab the flags field from the buffer
flags, = struct.unpack('>I', buffer_[4:8])
# add up the offsets of the present fields
hasField = lambda f: (flags & f) != 0
fieldFlags = filter(lambda f: hasField(f) and (f < flag), cls.sFieldSizes)
offset = 8 + sum(map(lambda f: cls.sFieldSizes[f], fieldFlags))
return offset
#--------------------------------------------------------------------------
# object
#--------------------------------------------------------------------------
def __init__(self, buffer_):
super(XingHeader, self).__init__()
# verify the buffer has the correct header id
if not XingHeader.hasHeaderId(buffer_):
ids = [XingHeader.kHeaderIdXing, XingHeader.kHeaderIdInfo]
raise ValueError("Invalid id '%s' in buffer, expecting '%s'." % \
(buffer_[0:4], ' or '.join(ids)))
# verify the length is correct
headerLength = XingHeader.calculateOffset(buffer_)
bufferLength = len(buffer_)
if bufferLength != headerLength:
raise ValueError("Invalid header buffer size %d, expecting %d." % \
(bufferLength, headerLength))
# looks like valid header
self._buffer = buffer_
#--------------------------------------------------------------------------
# properties
#--------------------------------------------------------------------------
@property
def identifier(self):
return self._buffer[0:4]
#--------------------------------------------------------------------------
@property
def flags(self):
return struct.unpack('>I', self._buffer[4:8])[0]
#--------------------------------------------------------------------------
@property
def frames(self):
if self.hasFramesField():
return struct.unpack('>I', self._buffer[8:12])[0]
return None
#--------------------------------------------------------------------------
@property
def bytes(self):
if self.hasBytesField():
field = self._getField(XingHeader.kFlagBytes)
return struct.unpack('>I', field)[0]
return None
#--------------------------------------------------------------------------
@property
def tableOfContents(self):
if self.hasTableOfContentsField():
field = self._getField(XingHeader.kFlagTableOfContents)
return struct.unpack('>' + 'B' * len(field), field)
return None
#--------------------------------------------------------------------------
@property
def qualityIndicator(self):
if self.hasQualityIndicatorField():
field = self._getField(XingHeader.kFlagQualityIndicator)
return struct.unpack('>I', field)[0]
return None
#--------------------------------------------------------------------------
# methods
#--------------------------------------------------------------------------
def hasFramesField(self):
return self._isFlagSet(XingHeader.kFlagFrames)
#--------------------------------------------------------------------------
def hasBytesField(self):
return self._isFlagSet(XingHeader.kFlagBytes)
#--------------------------------------------------------------------------
def hasTableOfContentsField(self):
return self._isFlagSet(XingHeader.kFlagTableOfContents)
#--------------------------------------------------------------------------
def hasQualityIndicatorField(self):
return self._isFlagSet(XingHeader.kFlagQualityIndicator)
#--------------------------------------------------------------------------
# internal methods
#--------------------------------------------------------------------------
def _isFlagSet(self, flag):
return (self.flags & flag) != 0
#--------------------------------------------------------------------------
def _getField(self, flag):
size = XingHeader.sFieldSizes[flag]
offset = XingHeader.calculateOffset(self._buffer, flag)
return self._buffer[offset:offset+size]
#------------------------------------------------------------------------------
# Mp3Frame
#------------------------------------------------------------------------------
class Mp3Frame(object):
"""A lightweight object containing a header, reference to a buffer and a
start position.
"""
#--------------------------------------------------------------------------
# object
#--------------------------------------------------------------------------
def __init__(self, header, buffer_, start):
super(Mp3Frame, self).__init__()
self._header = header
self._buffer = buffer_
self._start = start
self._xing = None
#--------------------------------------------------------------------------
# properties
#--------------------------------------------------------------------------
@property
def header(self):
return self._header
#--------------------------------------------------------------------------
@property
def content(self):
start = self._start
end = start + self._header.frameSize()
return self._buffer[start+Mp3FrameHeader.kHeaderLength:end]
#--------------------------------------------------------------------------
# methods
#--------------------------------------------------------------------------
def getXingHeader(self):
"""Checks if the frame contains a xing header instead of actual audio
data. The header is returned if found else None.
"""
# check if already retrieved
if self._xing == None:
# check for the xing header identifier
start = self._start + 4 + self._header.sideInformationSize()
end = start + XingHeader.kHeaderMinLength
header = self._buffer[start:end]
if XingHeader.hasHeaderId(header):
# calculate the xing header size and extract from the buffer
size = XingHeader.calculateOffset(header)
end = start + size
self._xing = XingHeader(self._buffer[start:end])
return self._xing
#------------------------------------------------------------------------------
# Mp3Buffer
#------------------------------------------------------------------------------
class Mp3Buffer(object):
"""Abstracts parsing data out of a buffer. Methods available to examine
the buffer for mp3 specific properties such as sycn words, tags, and
headers.
"""
#--------------------------------------------------------------------------
# object
#--------------------------------------------------------------------------
def __init__(self, buffer_):
super(Mp3Buffer, self).__init__()
# set fields to default values
self._buffer = buffer_
self._start = 0
self._end = len(self._buffer)
self._index = 0
#--------------------------------------------------------------------------
# methods
#--------------------------------------------------------------------------
def isEOF(self):
"""Returns true if the current index is past the end of the buffer.
"""
return self._index >= self._end
#--------------------------------------------------------------------------
def reset(self):
"""Reset the index to the start of the buffer.
"""
self._index = self._start
#--------------------------------------------------------------------------
def availableBytes(self):
"""Returns the number of bytes from the current index to the end of the
buffer.
"""
return 0 if self.isEOF() else (self._end - self._index)
#--------------------------------------------------------------------------
def hasID3v1Tag(self):
"""Checks the end of the buffer for the presence of a ID3v1 tag.
"""
# calculate the start index of the id3v1 tag
start = self._end - ID3v1Tag.kTagLength
# check for a valid tag id
return ID3v1Tag.hasTagId(self._buffer[start:])
#--------------------------------------------------------------------------
def hasID3v2Tag(self):
"""Checks the start of the buffer for the presence of a ID3v2 tag.
"""
# check for a valid tag id
return ID3v2Tag.hasTagId(self._buffer)
#--------------------------------------------------------------------------
def popID3v1Tag(self):
"""Slices the ID3v1 tag out of the buffer and returns the tag object,
the size of the buffer will be updated.
"""
# calculate the start index of the id3v1 tag
start = self._end - ID3v1Tag.kTagLength
# create the tag object from the buffer
id3v1 = ID3v1Tag(self._buffer[start:])
# update the buffer to reflect the splicing
self._end = start
return id3v1
#--------------------------------------------------------------------------
def popID3v2Tag(self):
"""Slices the ID3v2 tag out of the buffer and returns the tag object,
the size of the buffer will be updated, index will be repositioned to
the start of the buffer if invalidated.
"""
# calculate the size of the id3v2 tag buffer
size = ID3v2Tag.parseSize(self._buffer)
# create the tag object from the buffer
id3v2 = ID3v2Tag(self._buffer[:size])
# update the buffer/index to reflect the splicing
self._start = size
self._index = max(self._index, self._start)
return id3v2
#--------------------------------------------------------------------------
def syncToMp3Frame(self):
"""Walk the index forward until a frame sync word marker is found. If
the index lies on a sync marker, index will not be updated. True will
will be returned if a sync word is encountered, otherwise false will
be returned when the available buffer space runs to low.
"""
# make sure there enough room for the frame header
while self.availableBytes() >= Mp3FrameHeader.kHeaderLength:
# check the current index for the sycn word
start = self._index
end = start + Mp3FrameHeader.kHeaderLength
if Mp3FrameHeader.hasSyncWord(self._buffer[start:end]):
return True
# step to the next byte
print "-> Skipping index %d: %s" % (start, self._buffer[start])
self._index += 1
# ran out of buffer space to accomodate a frame header
return False
#--------------------------------------------------------------------------
def extractMp3Frame(self):
"""Create an mp3 frame object using the header information available at
the current index. The index will be forwarded to the end of the frame
and the frame object will be returned.
"""
# create the mp3 frame header
start = self._index
end = start + Mp3FrameHeader.kHeaderLength
frameHeader = Mp3FrameHeader(self._buffer[start:end])
# create the mp3 frame
frame = Mp3Frame(frameHeader, self._buffer, self._index)
# forward index to end of frame
self._index = start + frameHeader.frameSize()
return frame
#--------------------------------------------------------------------------
def getLeftoverBuffer(self):
"""Returns the content from the current index position to the end of
the buffer.
"""
# make sure there is content to return
if self.isEOF():
return None
return self._buffer[self._index:]
#------------------------------------------------------------------------------
# Mp3File
#------------------------------------------------------------------------------
class Mp3File(object):
"""MPEG-1 / MPEG-2 Audio Layer III (.mp3) file parser. Currently only
reading and writing frames is supported.
"""
#--------------------------------------------------------------------------
# object
#--------------------------------------------------------------------------
def __init__(self, mp3Path):
"""Attempt to parse the frames and headers/tags found in the mp3 file.
"""
super(Mp3File, self).__init__()
# make sure the path is valid
if not os.path.exists(mp3Path):
raise ValueError("MPEG .mp3 file not found at '%s'." % mp3Path)
# open up the file and load the entire file into memory, optimize to
# support streaming once funcionality is added...
with open(mp3Path, 'rb') as mp3File:
self._buffer = Mp3Buffer(mp3File.read())
# parse out the id3v1 tag if provided
self._id3v1tag = None
if self._buffer.hasID3v1Tag():
self._id3v1tag = self._buffer.popID3v1Tag()
# parse out the id3v2 tag if provided
self._id3v2tag = None
if self._buffer.hasID3v2Tag():
self._id3v2tag = self._buffer.popID3v2Tag()
# forward to the first frame
if not self._buffer.syncToMp3Frame():
raise ValueError("First mp3 frame could not be located.")
# fields to track mp3 frames encountered
self._headerFrame = None
self._frames = []
# first frame could be special 'header' frame
firstFrame = self._buffer.extractMp3Frame()
xingHeader = firstFrame.getXingHeader()
if xingHeader != None:
self._headerFrame = firstFrame
else:
self._frames.append(firstFrame)
# take advantage of frame count if possible
framesParsed = lambda: False
if xingHeader != None:
framesParsed = lambda: len(self._frames) == xingHeader.frames
# parse the rest of the frames from the buffer
while not self._buffer.isEOF() and not framesParsed():
# bail if another sync marker not found
if not self._buffer.syncToMp3Frame():
break
# extract the frame
frame = self._buffer.extractMp3Frame()
self._frames.append(frame)
#--------------------------------------------------------------------------
# methods
#--------------------------------------------------------------------------
def totalDuration(self):
return sum(map(lambda f: f.header.duration(), self._frames))
#--------------------------------------------------------------------------
def printMp3Info(self):
"""Print out all available info found in tags and headers.
"""
# frame header
frame = self._headerFrame
if frame == None:
frame = self._frames[0]
print 'Mp3FrameHeader:'
print ' - version : %s' % _versionToStr(frame.header.version)
print ' - layer : %s' % _layerToStr(frame.header.layer)
print ' - bitrate : %s' % frame.header.bitrate
print ' - sampling rate : %s' % frame.header.samplingRate
print ' - frame size : %s' % frame.header.frameSize()
print ' - duration : %s' % frame.header.duration()
print ''
# id3v1 tag
if self._id3v1tag != None:
print 'ID3v1 Tag:'
print ' - identifier : %s' % self._id3v1tag.identifier
print ' - title : %s' % self._id3v1tag.title
print ' - artist : %s' % self._id3v1tag.artist
print ' - album : %s' % self._id3v1tag.album
print ' - year : %s' % self._id3v1tag.year
print ' - comment : %s' % self._id3v1tag.comment
print ' - genre : %s' % self._id3v1tag.genre
print ''
# id3v2 tag
if self._id3v2tag != None:
print 'ID3v2 Tag:'
print ' - identifier : %s' % self._id3v2tag.identifier
print ' - version : %s.%s' % self._id3v2tag.version
print ' - flags : %s' % self._id3v2tag.flags
print ' - size : %s' % self._id3v2tag.size
print ' - frames :'
for tagFrame in self._id3v2tag.frames:
print ' - %s' % tagFrame.tag
print ''
# xing header
xingHeader = frame.getXingHeader()
if xingHeader != None:
print 'Xing Header:'
print ' - identifier : %s' % xingHeader.identifier
print ' - frames : %s' % xingHeader.frames
print ' - bytes : %s' % xingHeader.bytes
print ' - toc : %s' % str(xingHeader.tableOfContents)
print ' - quiality : %s' % xingHeader.qualityIndicator
print ''
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment