Last active
February 20, 2023 07:42
-
-
Save spyoungtech/d6f759622765d88761146ed88fc9cf34 to your computer and use it in GitHub Desktop.
Pandora Scraper
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from selenium.webdriver.common.keys import Keys | |
import time | |
from browsermobproxy import Server | |
import ast | |
import urllib2 as UR | |
import os | |
import eyed3 as ED3 | |
from django.utils.encoding import force_text | |
import re | |
from selenium import webdriver | |
def get_valid_filename(s): | |
#Get valid filenames to avoid illegal naming conventions. force_text is part of the django web framework, but works well for this purpose | |
s = force_text(s).strip().replace(' ', '_') | |
return re.sub(r'(?u)[^-\w.]', '', s) | |
while True: #loop forever | |
try: | |
songdictionary=ast.literal_eval(open('songdictionary.txt').read()) #I save a dictionary, containing info of the songs I scrape to avoid duplicates. A better solution would be to use Pickle, but oh well. | |
print "Successfully Loaded Existing Library" | |
except: | |
songdictionary={} #If the text file doesn't exist (such as first run) it will create a new dictionary if it can't load it from txt file. | |
print "No Existing Library Found. Creating New Library" | |
stationlist=["2972469754180727516", "2968611593648665308","2968611185626772188","2968608441142670044","2969569594693972700","2969576896138375900" "2969155933508793052","2969575809511650012", "2969575002057798364","2969573662028002012", "2969572957653365468", "2968608170559730396","2968607466185093852","2965864545451045596","2965839952468308700","2963232164290294492","2963224420464259804","2963122741408494300","2931546043064054492","2931356781035188956","2917932624528339676","2917278857491444444","2915354407200158428","2915320975174726364","2915283488700166876"] | |
#I could have gotten my station list programatically, but it was small enough to copy/paste each one I wanted to scrape MP3's from. | |
server = Server(r'C:\Python27\Lib\site-packages\browsermobproxy\browsermob-proxy-2.1.0-beta-3\bin\browsermob-proxy') #Local path to BMP file | |
server.start() | |
proxy = server.create_proxy() #Proxy is used to generate a HAR file containing the connection URLS that the MP3s are loaded from. | |
chrome_options = webdriver.ChromeOptions() | |
chrome_options.add_argument("--proxy-server={0}".format(proxy.proxy)) #Configure chrome options | |
driver = webdriver.Chrome('C:\Program Files (x86)\Google\Chrome\Application\chromedriver.exe',chrome_options = chrome_options) #Chrome driver downloaded separately from packages. | |
driver.get("http://www.pandora.com/account/sign-in") #Navigate to sign in page | |
userelem = driver.find_element_by_name("email") #Identify username/email element | |
pwelem= driver.find_element_by_name("password") #Identify the password entrybox element | |
userelem.send_keys("MyPandoraUserName@example.com") #Send input to username/email element | |
pwelem.send_keys("MyPandoraPassword")#send input to the password entrybox element | |
loginelem=driver.find_element_by_xpath('//*[@id="welcomeSearch"]/div/div/div[2]/div[1]/div/div/form/div[3]/input') #identify the login button element | |
loginelem.send_keys(Keys.RETURN) #send input to the login button | |
proxy.new_har("pandora") #Create new HAR file (network traffic data that can be seen in the "inspector" which is available in most browsers) | |
time.sleep(7) | |
try: #First load request before iterating over station list (I.E. Try to scrape whatever station loads when you first login) | |
getartist=driver.find_element_by_class_name('playerBarArtist') #Get song elements from HTML, including Artist, song title, album, album art, etc. | |
artist_text=getartist.text | |
gettitle=driver.find_element_by_class_name('playerBarSong') | |
title_text=gettitle.text | |
getalbum=driver.find_element_by_class_name('playerBarAlbum') | |
album_text=getalbum.text | |
artwork=driver.find_element_by_class_name('playerBarArt') | |
timeclass=driver.find_element_by_class_name('remainingTime') #Get time remaining to know how long to wait for (waiting is sometimes necessary to avoid getting locked out) | |
timeremaining=timeclass.text | |
artworkURL=artwork.get_attribute('src') #Get source of the album art picture, so save locally. | |
artworkimg=UR.urlopen(artworkURL) | |
artisttext=get_valid_filename(artist_text)########################################################## | |
titletext=get_valid_filename(title_text)######Transfrom text from HTML to filepath-friendly names### | |
albumtext=get_valid_filename(album_text)############################################################ | |
artworkfilename=os.path.join(os.getcwd(),"album_artwork")+"\\"+albumtext+".jpg" | |
artistdir=os.path.join(os.getcwd(),artisttext) | |
if not os.path.exists(artistdir): | |
os.makedirs(artistdir) | |
filepath=str(os.getcwd())+"\\"+artisttext+"\\"+titletext+".mp3" #Generate filename\path for MP3 file. | |
try: | |
songdictionary[filepath] #Check the dictionary to see if that filepath has already been used (song has already been previously downloaded) | |
print "Already had that first one!" | |
except KeyError: #If a keyerror occurs, that means that song is not in the dictionary, so we go ahead and download it. | |
songdictionary[filepath]=[title_text,artist_text,album_text,artworkfilename] | |
with open(artworkfilename,'wb') as ARTWORK_FILE: | |
ARTWORK_FILE.write(artworkimg.read()) | |
with open('PandaProxy.txt', 'wb') as Pandora_File: | |
Pandora_File.write(str(proxy.har)) # returns a HAR JSON blob | |
data=ast.literal_eval(open('PandaProxy.txt').read()) | |
data_two=data['log'] | |
entries=data_two['entries'] | |
for item in entries: | |
if "http://audio" in item['request']['url'] or ".com/access/" in item['request']['url']: | |
songdata=UR.urlopen(item['request']['url']) | |
with open(filepath, 'wb') as MP3FILE: | |
MP3FILE.write(songdata.read()) | |
print "Successfully saved MP3" | |
break | |
except: | |
pass | |
for item in stationlist: #Subsequent request for each station - iterate over stationlist. | |
proxy.new_har("pandora") | |
stationurl="http://www.pandora.com/station/play/"+str(item) #URL scheme for stations. | |
driver.get(stationurl) | |
time.sleep(5) | |
try: | |
getartist=driver.find_element_by_class_name('playerBarArtist') | |
artist_text=getartist.text | |
gettitle=driver.find_element_by_class_name('playerBarSong') | |
title_text=gettitle.text | |
getalbum=driver.find_element_by_class_name('playerBarAlbum') | |
album_text=getalbum.text | |
artwork=driver.find_element_by_class_name('playerBarArt') | |
timeclass=driver.find_element_by_class_name('remainingTime') | |
timeremaining=timeclass.text | |
except: | |
time.sleep(5) | |
try: | |
getartist=driver.find_element_by_class_name('playerBarArtist') | |
artist_text=getartist.text | |
gettitle=driver.find_element_by_class_name('playerBarSong') | |
title_text=gettitle.text | |
getalbum=driver.find_element_by_class_name('playerBarAlbum') | |
album_text=getalbum.text | |
artwork=driver.find_element_by_class_name('playerBarArt') | |
timeclass=driver.find_element_by_class_name('remainingTime') | |
timeremaining=timeclass.text | |
except: | |
continue | |
artworkURL=artwork.get_attribute('src') | |
artworkimg=UR.urlopen(artworkURL) | |
artisttext=get_valid_filename(artist_text) | |
titletext=get_valid_filename(title_text) | |
albumtext=get_valid_filename(album_text) | |
artworkfilename=os.path.join(os.getcwd(),"album_artwork")+"\\"+albumtext+".jpg" | |
artistdir=os.path.join(os.getcwd(),artisttext) | |
if not os.path.exists(artistdir): | |
os.makedirs(artistdir) | |
filepath=str(os.getcwd())+"\\"+artisttext+"\\"+titletext+".mp3" | |
try: | |
songdictionary[filepath] | |
print "Already had that second one!" | |
continue | |
except KeyError: | |
songdictionary[filepath]=[title_text,artist_text,album_text,artworkfilename] | |
with open(artworkfilename,'wb') as ARTWORK_FILE: | |
ARTWORK_FILE.write(artworkimg.read()) | |
with open('PandaProxy.txt', 'wb') as Pandora_File: | |
Pandora_File.write(str(proxy.har)) # returns a HAR JSON blob | |
data=ast.literal_eval(open('PandaProxy.txt').read()) | |
data_two=data['log'] | |
entries=data_two['entries'] | |
for item in entries: | |
if "http://audio" in item['request']['url'] or ".com/access/" in item['request']['url']: #Matches a pattern in URLs of the HAR file to identify URLS beloging to the MP3 files loaded to the browser. This may be different if you're not a Pandora One subscriber. | |
songdata=UR.urlopen(item['request']['url']) #Request to URL | |
with open(filepath, 'wb') as MP3FILE: #Write the MP3 to file. | |
MP3FILE.write(songdata.read()) | |
print "Successfully saved MP3" | |
break | |
### Write ID3 tags to the MP3 file | |
audiofile=ED3.load(filepath) | |
audiofile.initTag(version=(2,4,0))# | |
audiofile.tag.artist=unicode(artist_text) | |
audiofile.tag.album=unicode(album_text) | |
audiofile.tag.title=unicode(title_text) | |
audiofile.tag.save(filepath, version=(1,None,None)) | |
with open('songdictionary.txt', 'wb') as SONGD_FILE: | |
SONGD_FILE.write(str(songdictionary)) | |
try: | |
proxy.new_har("pandora") | |
timeremaining=timeremaining.replace('-','')#get time remaining on the song, to know how long to wait before the next request. | |
waittext=timeremaining.split(':')#get time remaining on the song, to know how long to wait before the next request. | |
waittime=int(waittext[0])*60 + int(waittext[1])#get time remaining on the song, to know how long to wait before the next request. | |
print "Sleeping "+str(waittime)+ " seconds" | |
time.sleep(int(waittime)) | |
print "Safety 10 second sleep" | |
time.sleep(10) | |
getartist=driver.find_element_by_class_name('playerBarArtist') | |
artist_text=getartist.text | |
gettitle=driver.find_element_by_class_name('playerBarSong') | |
title_text=gettitle.text | |
getalbum=driver.find_element_by_class_name('playerBarAlbum') | |
album_text=getalbum.text | |
artwork=driver.find_element_by_class_name('playerBarArt') | |
timeclass=driver.find_element_by_class_name('remainingTime') | |
timeremaining=timeclass.text | |
artworkURL=artwork.get_attribute('src') | |
artworkimg=UR.urlopen(artworkURL) | |
artisttext=get_valid_filename(artist_text) | |
titletext=get_valid_filename(title_text) | |
albumtext=get_valid_filename(album_text) | |
artworkfilename=os.path.join(os.getcwd(),"album_artwork")+"\\"+albumtext+".jpg" | |
artistdir=os.path.join(os.getcwd(),artisttext) | |
if not os.path.exists(artistdir): | |
os.makedirs(artistdir) | |
filepath=str(os.getcwd())+"\\"+artisttext+"\\"+titletext+".mp3" | |
try: | |
songdictionary[filepath] | |
print "Already had that one!" | |
except KeyError: | |
songdictionary[filepath]=[title_text,artist_text,album_text,artworkfilename] | |
with open(artworkfilename,'wb') as ARTWORK_FILE: | |
ARTWORK_FILE.write(artworkimg.read()) | |
with open('PandaProxy.txt', 'wb') as Pandora_File: | |
Pandora_File.write(str(proxy.har)) # returns a HAR JSON blob | |
data=ast.literal_eval(open('PandaProxy.txt').read()) | |
data_two=data['log'] | |
entries=data_two['entries'] | |
for item in entries: | |
if "http://audio" in item['request']['url'] or ".com/access/" in item['request']['url']: | |
songdata=UR.urlopen(item['request']['url']) | |
with open(filepath, 'wb') as MP3FILE: | |
MP3FILE.write(songdata.read()) | |
print "Successfully saved MP3" | |
break | |
except: | |
print "EXCEPTION Sleeping 3 minutes until next request" | |
time.sleep(180) #for error handling to prevent rapid requests, which Pandora will lock the account for. | |
continue | |
server.stop() | |
driver.quit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment