Last active
August 29, 2015 14:07
-
-
Save saranya-vatti/6f000981ab422796cd03 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
##get a way to flag wrong lyrics and search for ?? | |
##-the next search result next time the program is run | |
##-song name+artist | |
##-song name+album | |
##write an android app to insert link into the music player: | |
##-link to lyrics if present in local; else | |
##-link to google to search if net is connected; else | |
##-disabled link | |
try: | |
import urllib.request as urllib2 | |
except: | |
import urllib2 | |
import bs4 | |
from bs4 import BeautifulSoup | |
import re | |
import json | |
import os | |
import time | |
import urllib | |
import shutil | |
from tinytag import TinyTag | |
#sourceDir = 'E:\\Music\\English' | |
sourceDir = 'I:\\Music\\' | |
#destDir = 'E:\\Music\\Lyrics\\' | |
destDir = 'I:\\Lyrics\\' | |
lyricRepo = 'E:\\Music\\Lyrics\\' | |
LOG_LEVELS = { | |
"DEBUG" : 100, | |
"INFO" : 200, | |
"ERROR" : 300, | |
"NONE" : 400 | |
} | |
LOGLVL=LOG_LEVELS["NONE"] | |
unabletoparse=[] | |
def log_error(exception, location): | |
if(LOGLVL <= LOG_LEVELS["ERROR"]): | |
print("Exception encountered : " + str(type(exception)) + " in " + location) | |
print("Exception : " + str(exception)) | |
def log_info(string): | |
if(LOGLVL <= LOG_LEVELS["INFO"]): | |
print(string) | |
def log_debug(string): | |
if(LOGLVL <= LOG_LEVELS["DEBUG"]): | |
print(string) | |
def parseFilename(filename): | |
#check out id3 for recently downloaded files | |
try: | |
newfilename = TinyTag.get(sourceDir+filename).title | |
if newfilename and newfilename is not None: | |
log_info("Filename " + filename + " parsed to ") | |
filename = newfilename | |
except Exception as e: | |
log_error(e, "parseFileName while parsing " + filename) | |
try: | |
filename = filename[re.search("[a-zA-Z]", filename).start():] | |
except Exception as e: | |
log_error(e, "parseFileName after parsing " + filename + " to " + newfilename) | |
filename = re.sub(r'\([^)]*\)', '', filename) | |
filename = re.sub(r'\[[^\]]*\]', '', filename) | |
filename = re.sub(r'(?i)(www.(.*).com)','',filename) | |
filename = re.sub(r'(?i)(www.(.*).eu)','',filename) | |
filename = re.sub(r'(?i)(www.(.*).pk)','',filename) | |
filename = filename.split(',')[0] | |
filename = filename.split('.mp3')[0] | |
filename = re.sub(r'[-+_.]', ' ', filename) | |
filename = re.sub(r' +',' ',filename).strip() | |
log_info(filename); | |
return filename; | |
def getQuery(filename): | |
return '+'.join(parseFilename(filename).split(' ')) | |
def createFile(newfilename, content): | |
try: | |
file = open(destDir+newfilename+'.txt', 'wb') | |
file.write(parseTextToFileFormat(content)) | |
log_info("File created : " + destDir+newfilename+'.txt') | |
file.close() | |
file = open(lyricRepo+newfilename+'.txt', 'wb') | |
file.write(parseTextToFileFormat(content)) | |
log_info("File created : " + lyricRepo+newfilename+'.txt') | |
file.close() | |
except Exception as e: | |
log_error(e, "createFile") | |
def getURLFromGoogleSearchAPI(filename): | |
query=getQuery(filename) + "+lyrics" | |
try: | |
requestString='http://ajax.googleapis.com/ajax/services/search/web?v=1.0&q=' + query | |
req = urllib2.Request(requestString, headers={"User-Agent" : "Magic Browser"}) | |
response = urllib2.urlopen(req) | |
str_response = response.readall().decode('utf-8') | |
testjson = json.loads(str_response) | |
return testjson["responseData"]["results"][0]["unescapedUrl"] | |
except Exception as e: | |
return False | |
def getURLFromGoogleSearch(filename): | |
query=getQuery(filename) + "+lyrics" | |
try: | |
requestString='http://www.google.com/search?client=aff-maxthon-maxthon4&channel=t26&q=' + query | |
req = urllib2.Request(requestString, headers={"User-Agent" : "Magic Browser"}) | |
response = urllib2.urlopen(req) | |
soup = BeautifulSoup(response.read()) | |
response.close() | |
return soup.find(id="ires").find_all('a')[0].get('href').split("/url?q=")[1].split("&")[0] | |
except Exception as e: | |
return False | |
def parseLyricsMania(response): | |
soup = BeautifulSoup(response.read()) | |
response.close() | |
print(soup.prettify()) | |
def parseTextToFileFormat(str): | |
try: | |
output='\n'.join(str.split('\\r\\n')) | |
output=re.sub(r'(\s\s)','~',output) | |
output='\n'.join(output.split('\\n\\n')) | |
output=re.sub(r'(~)+\n','\n',output) | |
output='\n'.join(output.split('~')) | |
output='\n'.join(output.split('\\n')) | |
output='\''.join(output.split('\\\'')) | |
output=re.sub(r'\n\n*','\n',output) | |
output=output.strip() | |
return output.encode('utf8').replace(b'\n',b'\r\n') | |
except Exception as e: | |
log_error(e, "parseTextToFileFormat") | |
def parseLyrics(filename): | |
url=getURLFromGoogleSearch(filename) | |
req = urllib2.Request(url, headers={"User-Agent" : "Magic Browser"}) | |
response = urllib2.urlopen(req) | |
domain=url.split("http://")[1].split("/")[0] | |
newfilename=parseFilename(filename) | |
soup = BeautifulSoup(response.read()) | |
response.close() | |
if "www.azlyrics.com" in url: | |
createFile(newfilename,str(soup).split("<!-- start of lyrics -->")[1].split("<!-- end of lyrics -->")[0].replace("<br/>","\n")) | |
elif "www.lyricsmasti.com" in url: | |
createFile(newfilename,BeautifulSoup(str(soup.find(id="lcontent1"))).get_text("\n")) | |
elif "www.lyricsintelugu.com" in url or "www.lyriclahari.com" in url or ".blogspot." in url: | |
createFile(newfilename,BeautifulSoup(str(soup.find_all(class_="post-body entry-content")[0])).get_text("\n")) | |
elif "www.lyricsmania.com" in url: | |
createFile(newfilename,parseLyricsMania(response)) | |
elif "www.lyricsmint.com" in url: | |
createFile(newfilename,soup.find_all(class_="post-entry")[0].get_text("\n")) | |
elif "www.glamsham.com" in url: | |
createFile(newfilename,soup.find_all(class_="general")[6].get_text("\n")) | |
elif "www.metrolyrics.com" in url: | |
createFile(newfilename,soup.find(id="lyrics-body-text").get_text("\n")) | |
elif "annamacharya-lyrics.blogspot" in url: | |
createFile(newfilename,soup.find_all(class_="post-body")[0].get_text("\n")) | |
elif "www.justsomelyrics.com" in url: | |
createFile(newfilename,soup.find_all(class_="core-left")[0].get_text("\n")) | |
elif "www.lyricsmode.com" in url: | |
createFile(newfilename,soup.find(id="lyrics_text").get_text("\n")) | |
elif "www.lyricsfreak.com" in url: | |
createFile(newfilename,soup.find(id="content_h").get_text("\n")) | |
elif "songlyrics.blogsplug.in" in url: | |
createFile(newfilename,soup.find_all(class_="entry")[0].get_text("\n")) | |
elif "www.thelyricarchive.com" in url: | |
createFile(newfilename,soup.find_all("td")[9].get_text("\n")) | |
elif "www.stlyrics.com" in url: | |
createFile(newfilename,soup.find(id="page").get_text("\n")) | |
elif "www.releaselyrics.com" in url: | |
createFile(newfilename,soup.find(id="id-content").get_text("\n")) | |
elif "www.songlyrics.com" in url: | |
createFile(newfilename,soup.find(id="songLyricsDiv").get_text("\n")) | |
elif "songmeanings.com" in url: | |
createFile(newfilename,soup.find_all(class_="holder lyric-box")[0].get_text("\n")) | |
elif "www.lyrster.com" in url: | |
createFile(newfilename,soup.find(id="lyrics").get_text("\n")) | |
elif "www.animelyrics.com" in url: | |
createFile(newfilename,soup.find_all(class_="lyrics")[0].get_text("\n")) | |
elif "www.bobdylan.com" in url: | |
createFile(newfilename,soup.find_all(class_="field-items")[0].get_text("\n")) | |
else: | |
log_info("Domain is : " + domain + " . URL is : " + url + " .Unable to parse. Skipping...") | |
unabletoparse.append("Domain : " + domain + ". URL :" + url) | |
songslist=[] | |
for name in os.listdir(sourceDir): | |
if(name.endswith('.mp3')): | |
songslist.append(name) | |
total=len(songslist) | |
i=0 | |
print("Extracting lyrics", end="") | |
for filename in songslist: | |
#filename="sore feet song.mp3" | |
i=i+1 | |
#print(str(round(i*100/total, 1)) + "% done") | |
print(".", end="") | |
newfilename=parseFilename(filename) | |
if os.path.isfile(destDir + newfilename + '.txt'): | |
log_info('Lyrics ' + destDir + newfilename + '.txt' + ' already present. Skipping..') | |
elif os.path.isfile(lyricRepo + newfilename + '.txt'): | |
log_info('Copying lyrics from repo for : ' + newfilename + '...') | |
shutil.copyfile(lyricRepo + newfilename + '.txt', destDir + newfilename + '.txt') | |
else: | |
time.sleep(5) | |
try: | |
parseLyrics(filename) | |
except Exception as e: | |
log_error(e, "searching lyrics via google") | |
unabletoparse.append(filename) | |
print("Lyrics not found for : ") | |
print(unabletoparse) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# -*- coding: utf-8 -*- | |
# | |
# tinytag - an audio meta info reader | |
# Copyright (c) 2014 Tom Wallroth | |
# | |
# Sources on github: | |
# http://github.com/devsnd/tinytag/ | |
# | |
# licensed under GNU GPL version 3 (or later) | |
# | |
# This program is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License | |
# along with this program. If not, see <http://www.gnu.org/licenses/> | |
# | |
import codecs | |
import struct | |
import os | |
class TinyTag(object): | |
"""Base class for all tag types""" | |
def __init__(self, filehandler, filesize): | |
self._filehandler = filehandler | |
self.filesize = filesize | |
self.track = None | |
self.track_total = None | |
self.title = None | |
self.artist = None | |
self.album = None | |
self.year = None | |
self.duration = 0 | |
self.audio_offset = 0 | |
self.bitrate = 0.0 # must be float for later VBR calculations | |
self.samplerate = 0 | |
def has_all_tags(self): | |
"""check if all tags are already defined. Useful for ID3 tags | |
since multiple kinds of tags can be in one audio file | |
""" | |
return all((self.track, self.track_total, self.title, | |
self.artist, self.album, self.year)) | |
@classmethod | |
def get(cls, filename, tags=True, duration=True): | |
parser_class = None | |
size = os.path.getsize(filename) | |
if not size > 0: | |
return TinyTag(None, 0) | |
if cls == TinyTag: | |
"""choose which tag reader should be used by file extension""" | |
mapping = { | |
('.mp3',): ID3, | |
('.oga', '.ogg'): Ogg, | |
('.wav'): Wave, | |
('.flac'): Flac, | |
} | |
for fileextension, tagclass in mapping.items(): | |
if filename.lower().endswith(fileextension): | |
parser_class = tagclass | |
else: | |
# use class on which the method was invoked as parser | |
parser_class = cls | |
if parser_class is None: | |
raise LookupError('No tag reader found to support filetype! ') | |
with open(filename, 'rb') as af: | |
tag = parser_class(af, size) | |
tag.load(tags=tags, duration=duration) | |
return tag | |
def __str__(self): | |
public_attrs = ((k, v) for k, v in self.__dict__.items() if not k.startswith('_')) | |
return str(dict(public_attrs)) | |
def __repr__(self): | |
return str(self) | |
def load(self, tags, duration): | |
"""default behavior of all tags. This method is called in the | |
constructors of all tag readers | |
""" | |
if tags: | |
self._parse_tag(self._filehandler) | |
self._filehandler.seek(0) | |
if duration: | |
self._determine_duration(self._filehandler) | |
def _set_field(self, fieldname, bytestring, transfunc=None): | |
"""convienience function to set fields of the tinytag by name. | |
the payload (bytestring) can be changed using the transfunc""" | |
if getattr(self, fieldname): | |
return | |
if transfunc: | |
setattr(self, fieldname, transfunc(bytestring)) | |
else: | |
setattr(self, fieldname, bytestring) | |
def _determine_duration(self, fh): | |
raise NotImplementedError() | |
def _parse_tag(self, fh): | |
raise NotImplementedError() | |
def update(self, other): | |
"""update the values of this tag with the values from another tag""" | |
for key in ['track', 'track_total', 'title', 'artist', | |
'album', 'year', 'duration']: | |
if not getattr(self, key) and getattr(other, key): | |
setattr(self, key, getattr(other, key)) | |
class ID3(TinyTag): | |
FID_TO_FIELD = { # Mapping from Frame ID to a field of the TinyTag | |
'TRCK': 'track', 'TRK': 'track', | |
'TYER': 'year', 'TYE': 'year', | |
'TALB': 'album', 'TAL': 'album', | |
'TPE1': 'artist', 'TP1': 'artist', | |
'TIT2': 'title', 'TT2': 'title', | |
} | |
_MAX_ESTIMATION_SEC = 30 | |
def __init__(self, filehandler, filesize): | |
TinyTag.__init__(self, filehandler, filesize) | |
# save position after the ID3 tag for duration mesurement speedup | |
self._bytepos_after_id3v2 = 0 | |
@classmethod | |
def set_estimation_precision(cls, estimation_in_seconds): | |
cls._MAX_ESTIMATION_SEC = estimation_in_seconds | |
def _determine_duration(self, fh): | |
max_estimation_frames = (ID3._MAX_ESTIMATION_SEC*44100) // 1152 | |
frame_size_mean = 0 | |
# set sample rate from first found frame later, default to 44khz | |
file_sample_rate = 44100 | |
# see this page for the magic values used in mp3: | |
# http://www.mpgedit.org/mpgedit/mpeg_format/mpeghdr.htm | |
bitrates = [0, 32, 40, 48, 56, 64, 80, 96, 112, 128, 160, 192, | |
224, 256, 320] | |
samplerates = [44100, 48000, 32000] | |
header_bytes = 4 | |
frames = 0 # count frames for determining mp3 duration | |
# seek to first position after id3 tag (speedup for large header) | |
fh.seek(self._bytepos_after_id3v2) | |
while True: | |
# reading through garbage until 12 '1' bits are found | |
b = fh.read(1) | |
if len(b) == 0: | |
break | |
if b == b'\xff': | |
b = fh.read(1) | |
if b > b'\xf0': | |
bitrate_freq, rest = struct.unpack('BB', fh.read(2)) | |
br_id = (bitrate_freq & 0xf0) >> 4 # biterate id | |
sr_id = (bitrate_freq & 0x03) >> 2 # sample rate id | |
# check if the values aren't just random | |
if br_id == 15 or br_id == 0 or sr_id == 3: | |
# invalid frame! roll back to last position | |
fh.seek(-2, os.SEEK_CUR) | |
continue | |
frames += 1 # it's most probably an mp3 frame | |
bitrate = bitrates[br_id] | |
samplerate = samplerates[sr_id] | |
# running average of bitrate | |
self.bitrate = (self.bitrate*(frames-1) + bitrate)/frames | |
if frames == 1: | |
# we already read the 4 bytes frame header | |
self.audio_offset = fh.tell() - 4 | |
self.samplerate = samplerate | |
padding = 1 if bitrate_freq & 0x02 > 0 else 0 | |
frame_length = (144000 * bitrate) // samplerate + padding | |
frame_size_mean += frame_length | |
if frames == max_estimation_frames: | |
# try to estimate duration | |
fh.seek(-1, 2) # jump to last byte | |
estimated_frame_count = fh.tell() / (frame_size_mean / frames) | |
samples = estimated_frame_count * 1152 | |
self.duration = samples/float(self.samplerate) | |
return | |
if frame_length > 1: | |
# jump over current frame body | |
fh.seek(frame_length - header_bytes, os.SEEK_CUR) | |
samples = frames * 1152 # 1152 is the default frame size for mp3 | |
if self.samplerate: | |
self.duration = samples/float(self.samplerate) | |
def _parse_tag(self, fh): | |
self._parse_id3v2(fh) | |
if not self.has_all_tags(): # try to get more info using id3v1 | |
fh.seek(-128, 2) # id3v1 occuppies the last 128 bytes | |
self._parse_id3v1(fh) | |
def _parse_id3v2(self, fh): | |
# for info on the specs, see: http://id3.org/Developer%20Information | |
header = struct.unpack('3sBBB4B', fh.read(10)) | |
tag = codecs.decode(header[0], 'ISO-8859-1') | |
# check if there is an ID3v2 tag at the beginning of the file | |
if tag == 'ID3': | |
major, rev = header[1:3] | |
unsync = (header[3] & 0x80) > 0 | |
extended = (header[3] & 0x40) > 0 | |
experimental = (header[3] & 0x20) > 0 | |
footer = (header[3] & 0x10) > 0 | |
size = self._calc_size_7bit_bytes(header[4:9]) | |
self._bytepos_after_id3v2 = size | |
parsed_size = 0 | |
if extended: # just read over the extended header. | |
size_bytes = struct.unpack('4B', fh.read(6)[0:4]) | |
extd_size = self._calc_size_7bit_bytes(size_bytes) | |
fh.read(extd_size - 6) | |
while parsed_size < size: | |
is_id3_v22 = major == 2 | |
frame_size = self._parse_frame(fh, is_v22=is_id3_v22) | |
if frame_size == 0: | |
break | |
parsed_size += frame_size | |
def _parse_id3v1(self, fh): | |
if fh.read(3) == b'TAG': # check if this is an ID3 v1 tag | |
asciidecode = lambda x: self._unpad(codecs.decode(x, 'ASCII')) | |
self._set_field('title', fh.read(30), transfunc=asciidecode) | |
self._set_field('artist', fh.read(30), transfunc=asciidecode) | |
self._set_field('album', fh.read(30), transfunc=asciidecode) | |
self._set_field('year', fh.read(4), transfunc=asciidecode) | |
comment = fh.read(30) | |
if b'\x00\x00' < comment[-2:] < b'\x01\x00': | |
self._set_field('track', str(ord(comment[-1:]))) | |
def _parse_frame(self, fh, is_v22=False): | |
encoding = 'ISO-8859-1' # default encoding used in most mp3 tags | |
# ID3v2.2 especially ugly. see: http://id3.org/id3v2-00 | |
frame_header_size = 6 if is_v22 else 10 | |
frame_size_bytes = 3 if is_v22 else 4 | |
binformat = '3s3B' if is_v22 else '4s4B2B' | |
frame_header_data = fh.read(frame_header_size) | |
if len(frame_header_data) == 0: | |
return 0 | |
frame = struct.unpack(binformat, frame_header_data) | |
frame_id = self._decode_string(frame[0]) | |
frame_size = self._calc_size_7bit_bytes(frame[1:1+frame_size_bytes]) | |
if frame_size > 0: | |
# flags = frame[1+frame_size_bytes:] # dont care about flags. | |
content = fh.read(frame_size) | |
fieldname = ID3.FID_TO_FIELD.get(frame_id) | |
if fieldname: | |
if fieldname == 'track': | |
self._parse_track(content) | |
else: | |
self._set_field(fieldname, content, self._decode_string) | |
return frame_size | |
return 0 | |
def _decode_string(self, b): | |
# it's not my fault, this is the spec. | |
if b[:1] == b'\x00': | |
return self._unpad(codecs.decode(b[1:], 'ISO-8859-1')) | |
if b[0:3] == b'\x01\xff\xfe': | |
bytestr = b[3:-1] if len(b) % 2 == 0 else b[3:] | |
return codecs.decode(bytestr, 'UTF-16') | |
return self._unpad(codecs.decode(b, 'ISO-8859-1')) | |
def _unpad(self, s): | |
# strings in mp3 _can_ be terminated with a zero byte at the end | |
return s[:s.index('\x00')] if '\x00' in s else s | |
def _parse_track(self, b): | |
track = self._decode_string(b) | |
track_total = None | |
if '/' in track: | |
track, track_total = track.split('/') | |
self._set_field('track', track) | |
self._set_field('track_total', track_total) | |
def _calc_size_7bit_bytes(self, bytestr): | |
ret = 0 # length of mp3 header fields is described | |
for b in bytestr: # by some "7-bit-bytes". The most significant | |
ret <<= 7 # bit is always set to zero, so it has to be | |
ret += b & 127 # removed. | |
return ret # | |
class StringWalker(object): | |
"""file obj like string. probably there are buildins doing this already""" | |
def __init__(self, string): | |
self.string = string | |
def read(self, nbytes): | |
retstring, self.string = self.string[:nbytes], self.string[nbytes:] | |
return retstring | |
class Ogg(TinyTag): | |
def __init__(self, filehandler, filesize): | |
TinyTag.__init__(self, filehandler, filesize) | |
self._tags_parsed = False | |
self._max_samplenum = 0 # maximum sample position ever read | |
def _determine_duration(self, fh): | |
MAX_PAGE_SIZE = 65536 # https://xiph.org/ogg/doc/libogg/ogg_page.html | |
if not self._tags_parsed: | |
self._parse_tag(fh) # determine sample rate | |
fh.seek(0) # and rewind to start | |
if self.filesize > MAX_PAGE_SIZE: | |
fh.seek(-MAX_PAGE_SIZE, 2) # go to last possible page position | |
while True: | |
b = fh.read(1) | |
if len(b) == 0: | |
return # EOF | |
if b == b'O': # look for an ogg header | |
if fh.read(3) == b'ggS': | |
fh.seek(-4, 1) # parse the page header from start | |
for packet in self._parse_pages(fh): | |
pass # parse all remaining pages | |
self.duration = self._max_samplenum / float(self.samplerate) | |
else: | |
fh.seek(-3, 1) # oops, no header, rewind selectah! | |
def _parse_tag(self, fh): | |
page_start_pos = fh.tell() # set audio_offest later if its audio data | |
for packet in self._parse_pages(fh): | |
walker = StringWalker(packet) | |
header = walker.read(7) | |
if header == b"\x01vorbis": | |
(channels, self.samplerate, max_bitrate, bitrate, | |
min_bitrate) = struct.unpack("<B4i", packet[11:28]) | |
if not self.audio_offset: | |
self.bitrate = bitrate / 1024 | |
self.audio_offset = page_start_pos | |
elif header == b"\x03vorbis": | |
self._parse_vorbis_comment(walker) | |
else: | |
break | |
page_start_pos = fh.tell() | |
def _parse_vorbis_comment(self, fh): | |
# for the spec, see: http://xiph.org/vorbis/doc/v-comment.html | |
mapping = {'album': 'album', 'title': 'title', 'artist': 'artist', | |
'date': 'year', 'tracknumber': 'track'} | |
vendor_length = struct.unpack('I', fh.read(4))[0] | |
vendor = fh.read(vendor_length) | |
elements = struct.unpack('I', fh.read(4))[0] | |
for i in range(elements): | |
length = struct.unpack('I', fh.read(4))[0] | |
keyvalpair = codecs.decode(fh.read(length), 'UTF-8') | |
if '=' in keyvalpair: | |
splitidx = keyvalpair.index('=') | |
key, value = keyvalpair[:splitidx], keyvalpair[splitidx+1:] | |
fieldname = mapping.get(key.lower()) | |
if fieldname: | |
self._set_field(fieldname, value) | |
def _parse_pages(self, fh): | |
# for the spec, see: https://wiki.xiph.org/Ogg | |
previous_page = b'' # contains data from previous (continuing) pages | |
header_data = fh.read(27) # read ogg page header | |
while len(header_data) != 0: | |
header = struct.unpack('<4sBBqIIiB', header_data) | |
oggs, version, flags, pos, serial, pageseq, crc, segments = header | |
self._max_samplenum = max(self._max_samplenum, pos) | |
if oggs != b'OggS' or version != 0: | |
break # not a valid ogg file | |
segsizes = struct.unpack('B'*segments, fh.read(segments)) | |
total = 0 | |
for segsize in segsizes: # read all segments | |
total += segsize | |
if total < 255: # less than 255 bytes means end of page | |
yield previous_page + fh.read(total) | |
previous_page = b'' | |
total = 0 | |
if total != 0: | |
if total % 255 == 0: | |
previous_page += fh.read(total) | |
else: | |
yield previous_page + fh.read(total) | |
previous_page = b'' | |
header_data = fh.read(27) | |
class Wave(TinyTag): | |
def __init__(self, filehandler, filesize): | |
TinyTag.__init__(self, filehandler, filesize) | |
self._duration_parsed = False | |
def _determine_duration(self, fh): | |
# see: https://ccrma.stanford.edu/courses/422/projects/WaveFormat/ | |
# and: https://en.wikipedia.org/wiki/WAV | |
riff, size, fformat = struct.unpack('4sI4s', fh.read(12)) | |
if riff != b'RIFF' or fformat != b'WAVE': | |
print('not a wave file!') | |
channels, samplerate, bitdepth = 2, 44100, 16 # assume CD quality | |
chunk_header = fh.read(8) | |
while len(chunk_header) > 0: | |
subchunkid, subchunksize = struct.unpack('4sI', chunk_header) | |
if subchunkid == b'fmt ': | |
_, channels, self.samplerate = struct.unpack('HHI', fh.read(8)) | |
_, _, bitdepth = struct.unpack('<IHH', fh.read(8)) | |
self.bitrate = self.samplerate * channels * bitdepth / 1024 | |
elif subchunkid == b'data': | |
self.duration = subchunksize/channels/samplerate/(bitdepth/8) | |
self.audio_offest = fh.tell() - 8 # rewind to data header | |
fh.seek(subchunksize, 1) | |
elif subchunkid == b'id3 ' or subchunkid == b'ID3 ': | |
id3 = ID3(fh, 0) | |
id3._parse_id3v2(fh) | |
self.update(id3) | |
else: # some other chunk, just skip the data | |
fh.seek(subchunksize, 1) | |
chunk_header = fh.read(8) | |
self._duration_parsed = True | |
def _parse_tag(self, fh): | |
if not self._duration_parsed: | |
self._determine_duration(fh) # parse_whole file to determine tags :( | |
class Flac(TinyTag): | |
def load(self, tags, duration): | |
if self._filehandler.read(4) != b'fLaC': | |
return # not a flac file! | |
if tags: | |
self._parse_tag(self._filehandler) | |
self._filehandler.seek(4) | |
if duration: | |
self._determine_duration(self._filehandler) | |
def _determine_duration(self, fh): | |
# for spec, see https://xiph.org/flac/ogg_mapping.html | |
header_data = fh.read(4) | |
while len(header_data): | |
meta_header = struct.unpack('B3B', header_data) | |
size = self._bytes_to_int(meta_header[1:4]) | |
# http://xiph.org/flac/format.html#metadata_block_streaminfo | |
if meta_header[0] == 0: # STREAMINFO | |
stream_info_header = fh.read(size) | |
if len(stream_info_header) < 34: # invalid streaminfo | |
break | |
header = struct.unpack('HH3s3s8B16s', stream_info_header) | |
# From the ciph documentation: | |
# py | <bits> | |
#---------------------------------------------- | |
# H | <16> The minimum block size (in samples) | |
# H | <16> The maximum block size (in samples) | |
# 3s | <24> The minimum frame size (in bytes) | |
# 3s | <24> The maximum frame size (in bytes) | |
# 8B | <20> Sample rate in Hz. | |
# | <3> (number of channels)-1. | |
# | <5> (bits per sample)-1. | |
# | <36> Total samples in stream. | |
# 16s| <128> MD5 signature | |
# | |
min_blk, max_blk, min_frm, max_frm = header[0:4] | |
min_frm = self._bytes_to_int(struct.unpack('3B', min_frm)) | |
max_frm = self._bytes_to_int(struct.unpack('3B', max_frm)) | |
# channels- | |
# `. bits total samples | |
# |----- samplerate -----| |-||----| |---------~ ~----| | |
# 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 | |
# #---4---# #---5---# #---6---# #---7---# #--8-~ ~-12-# | |
self.samplerate = self._bytes_to_int(header[4:7]) >> 4 | |
channels = ((header[6] >> 1) & 0x07) + 1 | |
bit_depth = ((header[6] & 1) << 4) + ((header[7] & 0xF0) >> 4) | |
bit_depth = (bit_depth + 1) | |
total_sample_bytes = [(header[7] & 0x0F)] + list(header[8:12]) | |
total_samples = self._bytes_to_int(total_sample_bytes) | |
md5 = header[12:] | |
self.duration = float(total_samples) / self.samplerate | |
self.bitrate = self.filesize/self.duration*8/1024 | |
return | |
else: | |
fh.seek(size, 1) | |
header_data = fh.read(4) | |
def _bytes_to_int(self, b): | |
result = 0 | |
for byte in b: | |
result = (result << 8) + byte | |
return result | |
def _parse_tag(self, fh): | |
# for spec, see https://xiph.org/flac/ogg_mapping.html | |
header_data = fh.read(4) | |
while len(header_data): | |
meta_header = struct.unpack('B3B', header_data) | |
size = self._bytes_to_int(meta_header[1:4]) | |
if meta_header[0] == 4: | |
oggtag = Ogg(fh, 0) | |
oggtag._parse_vorbis_comment(fh) | |
self.update(oggtag) | |
return | |
else: | |
fh.seek(size, 1) | |
header_data = fh.read(4) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment