Skip to content

Instantly share code, notes, and snippets.

@KristoforMaynard
Created May 30, 2016 18:55
Show Gist options
  • Save KristoforMaynard/3514c9111809a3f09f62cf03ae1cb244 to your computer and use it in GitHub Desktop.
Save KristoforMaynard/3514c9111809a3f09f62cf03ae1cb244 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
#
# retrieved from:
# https://github.com/ocelma/python-itunes/blob/e3adf9e837ccbf9ded9d7352ad8e45b56ab410eb/itunes/__init__.py
#
# edited by KristoforMaynard@github to be python 3 compliant and add
# :py:class:`EmptyReturner`, :py:class:`ResultWrapper`, and :py:func:`wrap_results`
"""A python interface to search iTunes Store"""
from __future__ import division, print_function, unicode_literals
import sys
import os
import json
try:
from urllib.parse import quote_plus, urlparse
from urllib.request import Request, urlopen
from urllib.error import HTTPError
except ImportError:
from urllib import quote_plus
from urllib2 import Request, urlopen, HTTPError
from urlparse import urlparse
try:
from hashlib import md5
except ImportError:
from md5 import md5
PY3K = sys.version_info[0] >= 3
if PY3K:
unicode = str
# __name__ = 'pyitunes'
# __doc__ = 'A python interface to search iTunes Store'
__author__ = 'Oscar Celma'
__version__ = '0.1'
__license__ = 'GPL'
__maintainer__ = 'Oscar Celma'
__email__ = 'ocelma@bmat.com'
__status__ = 'Beta'
API_VERSION = '2' # iTunes API version
COUNTRY = 'US' # ISO Country Store
HOST_NAME = 'http://itunes.apple.com/'
# Enable cache? if set to True, make sure that __cache_dir exists! (e.g. $ mkdir ./cache)
__cache_enabled = False
__cache_dir = './cache'
class ServiceException(Exception):
"""Exception related to the web service."""
def __init__(self, type, message):
self._type = type
self._message = message
def __str__(self):
return self._type + ': ' + self._message
def get_message(self):
return self._message
def get_type(self):
return self._type
class _Request(object):
"""Representing an abstract web service operation."""
def __init__(self, method_name, params):
self.params = params
self.method = method_name
def _download_response(self):
"""Returns a response"""
data = []
for name in list(self.params.keys()):
value = self.params[name]
if isinstance(value, int) or isinstance(value, float) or isinstance(value, int):
value = str(value)
try:
data.append('='.join((name, quote_plus(value.replace('&', '&').encode('utf8')))))
except UnicodeDecodeError:
data.append('='.join((name, quote_plus(value.replace('&', '&')))))
data = '&'.join(data)
url = HOST_NAME
parsed_url = urlparse(url)
if not parsed_url.scheme:
url = "http://" + url
url += self.method + '?'
url += data
#print url
request = Request(url)
response = urlopen(request)
return response.read().decode('utf-8')
def execute(self, cacheable=False):
try:
if is_caching_enabled() and cacheable:
response = self._get_cached_response()
else:
response = self._download_response()
return json.loads(response)
except HTTPError as e:
raise self._get_error(e.fp.read())
def _get_cache_key(self):
"""Cache key"""
keys = list(self.params.keys())[:]
keys.sort()
string = self.method
for name in keys:
string += name
if isinstance(self.params[name], int) or isinstance(self.params[name], float):
self.params[name] = str(self.params[name])
string += self.params[name]
return get_md5(string)
def _is_cached(self):
"""Returns True if the request is available in the cache."""
return os.path.exists(os.path.join(_get_cache_dir(), self._get_cache_key()))
def _get_cached_response(self):
"""Returns a file object of the cached response."""
if not self._is_cached():
response = self._download_response()
response_file = open(os.path.join(_get_cache_dir(), self._get_cache_key()), "w")
response_file.write(response)
response_file.close()
return open(os.path.join(_get_cache_dir(), self._get_cache_key()), "r").read()
def _get_error(self, text):
return ServiceException(type='Error', message=text)
raise
# Webservice BASE OBJECT
class _BaseObject(object):
"""An abstract webservices object."""
def __init__(self, method):
self._method = method
self._search_terms = dict()
def _request(self, method_name=None, params = None, cacheable = False):
if not method_name:
method_name = self._method
if not params:
params = self._get_params()
return _Request(method_name, params).execute(cacheable)
def _get_params(self):
params = {}
for key in list(self._search_terms.keys()):
params[key] = self._search_terms[key]
return params
def get(self):
self._json_results = self._request(cacheable=is_caching_enabled())
if 'errorMessage' in self._json_results:
raise ServiceException(type='Error', message=self._json_results['errorMessage'])
self._num_results = self._json_results['resultCount']
l = []
for json in self._json_results['results']:
type = None
if 'wrapperType' in json:
type = json['wrapperType']
elif 'kind' in json:
type = json['kind']
if type == 'artist':
id = json['artistId']
item = Artist(id)
elif type == 'collection':
id = json['collectionId']
item = Album(id)
elif type == 'track':
id = json['trackId']
item = Track(id)
elif type == 'audiobook':
id = json['collectionId']
item = Audiobook(id)
elif type == 'software':
id = json['trackId']
item = Software(id)
else:
if 'collectionId' in json:
id = json['collectionId']
elif 'artistId' in json:
id = json['artistId']
item = Item(id)
item._set(json)
l.append(item)
return l
# SEARCH
class Search(_BaseObject):
""" Search iTunes Store """
def __init__(self, query, country=COUNTRY, media='all', entity=None, attribute=None, limit=50, lang='en_us', version=API_VERSION, explicit='Yes'):
_BaseObject.__init__(self, 'search')
self._search_terms = dict()
self._search_terms['term'] = query
self._search_terms['country'] = country # ISO Country code for iTunes Store
self._search_terms['media'] = media # The media type you want to search for
if entity:
self._search_terms['entity'] = entity # The type of results you want returned, relative to the specified media type
if attribute:
self._search_terms['attribute'] = attribute # The attribute you want to search for in the stores, relative to the specified media type
self._search_terms['limit'] = limit # Results limit
self._search_terms['lang'] = lang # The language, English or Japanese, you want to use when returning search results
self._search_terms['version'] = version # The search result key version you want to receive back from your search
self._search_terms['explicit'] = explicit # A flag indicating whether or not you want to include explicit content in your search results
self._json_results = None
self._num_results = None
def num_results(self):
return self._num_results
# LOOKUP
class Lookup(_BaseObject):
""" Lookup """
def __init__(self, id, entity=None, limit=50):
_BaseObject.__init__(self, 'lookup')
self.id = id
self._search_terms['id'] = id
if entity:
self._search_terms['entity'] = entity# The type of results you want returned, relative to the specified media type
self._search_terms['limit'] = limit # Results limit
# RESULT ITEM
class Item(object):
""" Item result class """
def __init__(self, id):
self.id = id
self.name = None
self.url = None
# JSON SETTERs
def _set(self, json):
self.json = json
#print json
if 'kind' in json:
self.type = json['kind']
else:
self.type = json['wrapperType']
# Item information
self._set_genre(json)
self._set_release(json)
self._set_country(json)
self._set_artwork(json)
self._set_url(json)
def _set_genre(self, json):
self.genre = json.get('primaryGenreName', None)
def _set_release(self, json):
self.release_date = None
if 'releaseDate' in json and json['releaseDate']:
self.release_date = json['releaseDate'].split('T')[0]
def _set_country(self, json):
self.country_store = json.get('country', None)
def _set_artwork(self, json):
self.artwork = dict()
if 'artworkUrl30' in json:
self.artwork['30'] = json['artworkUrl30']
if 'artworkUrl60' in json:
self.artwork['60'] = json['artworkUrl60']
if 'artworkUrl100' in json:
self.artwork['100'] = json['artworkUrl100']
if 'artworkUrl512' in json:
self.artwork['512'] = json['artworkUrl512']
def _set_url(self, json):
self.url = None
if 'trackViewUrl' in json:
self.url = json['trackViewUrl']
elif 'collectionViewUrl' in json:
self.url = json['collectionViewUrl']
elif 'artistViewUrl' in json:
self.url = json['artistViewUrl']
# REPR, EQ, NEQ
def __repr__(self):
if not self.name:
if 'collectionName' in self.json:
self._set_name(self.json['collectionName'])
elif 'artistName' in self.json:
self._set_name(self.json['artistName'])
if PY3K:
return self.name
else:
return self.name.encode('utf8')
def __eq__(self, other):
return self.id == other.id
def __ne__(self, other):
return self.id != other.id
def _set_name(self, name):
self.name = name
# GETTERs
def get_id(self):
if not self.id:
if 'collectionId' in self.json:
self.id = self.json['collectionId']
elif 'artistId' in self.json:
self.id = self.json['artistId']
return self.id
def get_name(self):
""" Returns the Item's name """
return self.__repr__()
def get_url(self):
""" Returns the iTunes Store URL of the Item """
return self.url
def get_genre(self):
""" Returns the primary genre of the Item """
return self.genre
def get_release_date(self):
""" Returns the release date of the Item """
return self.release_date
def get_artwork(self):
""" Returns the artwork (a dict) of the item """
return self.artwork
def get_tracks(self, limit=500):
""" Returns the tracks of the Item """
if self.type == 'song':
return self
items = Lookup(id=self.id, entity='song', limit=limit).get()
if not items:
raise ServiceException(type='Error', message='Nothing found!')
return items[1:]
def get_albums(self, limit=200):
""" Returns the albums of the Item """
if self.type == 'collection':
return self
if self.type == 'song':
return self.get_album()
items = Lookup(id=self.id, entity='album', limit=limit).get()[1:]
if not items:
raise ServiceException(type='Error', message='Nothing found!')
return items[1:]
def get_album(self):
""" Returns the album of the Item """
if self.type == 'collection':
return self
items = Lookup(id=self.id, entity='album', limit=1).get()
if not items or len(items) == 1:
raise ServiceException(type='Error', message='Nothing found!')
return items[1]
# ARTIST
class Artist(Item):
""" Artist class """
def __init__(self, id):
Item.__init__(self, id)
def _set(self, json):
super(Artist, self)._set(json)
self.name = json['artistName']
self.amg_id = json.get('amgArtistId', None)
self.url = json.get('artistViewUrl', json.get('artistLinkUrl', None))
# GETTERs
def get_amg_id(self):
return self.amg_id
# ALBUM
class Album(Item):
""" Album class """
def __init__(self, id):
Item.__init__(self, id)
def _set(self, json):
super(Album, self)._set(json)
# Collection information
self.name = json['collectionName']
self.url = json.get('collectionViewUrl', None)
self.amg_id = json.get('amgAlbumId', None)
self.price = round(json['collectionPrice'] or 0, 4)
self.price_currency = json['currency']
self.track_count = json['trackCount']
self.copyright = json.get('copyright', None)
self._set_artist(json)
def _set_artist(self, json):
self.artist = None
if json.get('artistId'):
id = json['artistId']
self.artist = Artist(id)
self.artist._set(json)
# GETTERs
def get_amg_id(self):
return self.amg_id
def get_copyright(self):
return self.copyright
def get_price(self):
return self.price
def get_track_count(self):
return self.track_count
def get_artist(self):
return self.artist
# TRACK
class Track(Item):
""" Track class """
def __init__(self, id):
Item.__init__(self, id)
def _set(self, json):
super(Track, self)._set(json)
# Track information
self.name = json['trackName']
self.url = json.get('trackViewUrl', None)
self.preview_url = json.get('previewUrl', None)
self.price = None
if 'trackPrice' in json and json['trackPrice'] is not None:
self.price = round(json['trackPrice'], 4)
self.number = json.get('trackNumber', None)
self.duration = None
if 'trackTimeMillis' in json and json['trackTimeMillis'] is not None:
self.duration = round(json.get('trackTimeMillis', 0.0)/1000.0, 2)
try:
self._set_artist(json)
except KeyError:
self.artist = None
try:
self._set_album(json)
except KeyError:
self.album = None
def _set_artist(self, json):
self.artist = None
if json.get('artistId'):
id = json['artistId']
self.artist = Artist(id)
self.artist._set(json)
def _set_album(self, json):
if 'collectionId' in json:
id = json['collectionId']
self.album = Album(id)
self.album._set(json)
# GETTERs
def get_preview_url(self):
return self.preview_url
def get_disc_number(self):
return self.number
def get_duration(self):
return self.duration
def get_artist(self):
return self.artist
def get_price(self):
return self.price
# Audiobook
class Audiobook(Album):
""" Audiobook class """
def __init__(self, id):
Album.__init__(self, id)
# Software
class Software(Track):
""" Audiobook class """
def __init__(self, id):
Track.__init__(self, id)
def _set(self, json):
super(Software, self)._set(json)
self._set_version(json)
self._set_price(json)
self._set_description(json)
self._set_screenshots(json)
self._set_genres(json)
self._set_seller_url(json)
self._set_languages(json)
self._set_avg_rating(json)
self._set_num_ratings(json)
def _set_version(self, json):
self.version = json.get('version', None)
def _set_price(self, json):
self.price = json.get('price', None)
def _set_description(self, json):
self.description = json.get('description', None)
def _set_screenshots(self, json):
self.screenshots = json.get('screenshotUrls', None)
def _set_genres(self, json):
self.genres = json.get('genres', None)
def _set_seller_url(self, json):
self.seller_url = json.get('sellerUrl', None)
def _set_languages(self, json):
self.languages = json.get('languageCodesISO2A', None)
def _set_avg_rating(self, json, only_current_version=False):
if only_current_version:
self.avg_rating = json.get('averageUserRatingForCurrentVersion', None)
else:
self.avg_rating = json.get('averageUserRating', None)
def _set_num_ratings(self, json, only_current_version=False):
if only_current_version:
self.num_ratings = json.get('userRatingCountForCurrentVersion', None)
else:
self.num_ratings = json.get('userRatingCount', None)
# GETTERs
def get_version(self):
return self.version
def get_description(self):
return self.description
def get_screenshots(self):
return self.screenshots
def get_genres(self):
return self.genres
def get_seller_url(self):
return self.seller_url
def get_languages(self):
return self.languages
def get_avg_rating(self):
return self.avg_rating
def get_num_ratings(self):
return self.num_ratings
# CACHE
def enable_caching(cache_dir = None):
global __cache_dir
global __cache_enabled
if cache_dir == None:
import tempfile
__cache_dir = tempfile.mkdtemp()
else:
if not os.path.exists(cache_dir):
os.mkdir(cache_dir)
__cache_dir = cache_dir
__cache_enabled = True
def disable_caching():
global __cache_enabled
__cache_enabled = False
def is_caching_enabled():
"""Returns True if caching is enabled."""
global __cache_enabled
return __cache_enabled
def _get_cache_dir():
"""Returns the directory in which cache files are saved."""
global __cache_dir
global __cache_enabled
return __cache_dir
def get_md5(text):
"""Returns the md5 hash of a string."""
hash = md5()
try:
hash.update(text.encode('utf8'))
except UnicodeDecodeError:
hash.update(text)
return hash.hexdigest()
#SEARCHES
def search_track(query, limit=100, store=COUNTRY):
return Search(query=query, media='music', entity='song', limit=limit, country=store).get()
def search_album(query, limit=100, store=COUNTRY):
return Search(query=query, media='music', entity='album', limit=limit, country=store).get()
def search_artist(query, limit=100, store=COUNTRY):
return Search(query=query, media='music', entity='musicArtist', limit=limit, country=store).get()
def search(query, media='all', limit=100, store=COUNTRY):
return Search(query=query, media=media, limit=limit, country=store).get()
#LOOKUP
def lookup(id):
items = Lookup(id).get()
if not items:
raise ServiceException(type='Error', message='Nothing found!')
return items[0]
class EmptyReturner(object):
def __init__(self, value=""):
self.value = value
def __getattr__(self, name):
return self.value
def __call__(self, *args, **kwargs):
return self.value
class ResultWrapper(object):
def __init__(self, obj, default=""):
self.obj = obj
self.default = default
def __getattr__(self, name):
try:
attr = getattr(self.obj, name)
if hasattr(attr, "__call__"):
def wrapper(*args, **kwargs):
try:
return attr(*args, **kwargs)
except ServiceException:
return self.default
return wrapper
else:
if attr is None:
return self.default
else:
return attr
except AttributeError:
return self.default
def wrap_results(results):
return [ResultWrapper(r, default=EmptyReturner()) for r in results]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment