Skip to content

Instantly share code, notes, and snippets.

@bnlucas
Last active October 17, 2015 22:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bnlucas/98858034fc1f974afbaf to your computer and use it in GitHub Desktop.
Save bnlucas/98858034fc1f974afbaf to your computer and use it in GitHub Desktop.
Move seamlessly through paginated API results without loading all pages at once. Only loads the pages needed.
'''
this example uses the Spotify Web API which returns paginated results.
the Paginated class can be modified for use with other APIs.
output:
---------------------------------------------------------------------
top 5 search results:
Bear Vs. Shark
Shark?
Shark Tank
Shark City Click
Shark Week
bottom 5 search results:
Tom Johnson's Shark
Ka Mano - 'The Shark'
Shark Toys
Shark Alley Hobos
Chita Rivera, Marilyn Cooper, Reri Grist & Shark Girls
26/166 items loaded.
2/ 9 pages loaded.
'''
import Paginated
API_URL = 'https://api.spotify.com/v1/'
DEBUG = True
MARKET = 'US'
session = requests.api
artists = search('shark')
print 'top 5 search results:\n'
for artist in artists[:5]:
print artist['name']
print '\n\nbottom 5 search results:\n'
for artist in artists[-5:]:
print artist['name']
print '\n{:3d}/{:3d} items loaded.'.format(len(artists), artists.total)
print '{:3d}/{:3d} pages loaded.'.format(sum(artists.pages), len(artists.pages))
def search(query, type, limit=20, offset=0, market=None):
'''
make a search call to the API. return paginated object
with results. this doesn't actually make the initial
call, only sets up the Paginated object which will then
make the API calls.
'''
if not market:
market = MARKET
query = {
'url': 'search',
'params': {
'query': query,
'type': type,
'limit': limit,
'offset': offset,
'market': market,
}
}
return Paginated(query, 'artists')
def encode(data, encoding='utf-8'):
'''
returns encoded json object returned by the API request.
:param data: the json object.
:param encoding: which encoding to use.
'''
if isinstance(data, dict):
return {encode(k): encode(v) for k, v in data.iteritems()}
if isinstance(data, list):
return [encode(i) for i in data]
if isinstance(data, unicode):
return data.encode(encoding)
return data
class SpotifyException(Exception):
def __init__(self, status_code, code, msg):
self.status_code = status_code
self.code = code
self.msg = msg
def __str__(self):
return 'status code: {}, code:{} - {}'.format(
self.status_code, self.code, self.msg)
def api_call(method, url, payload, params):
'''
make the API call.
'''
url = ''.join([API_URL, url])
params = dict(params=params)
headers = {'Content-Type': 'application/json'}
if payload:
params['payload'] = json.dumps(params)
request = session.request(method, url, headers=headers, **params)
if DEBUG:
print '\n', method, request.url
if payload:
print 'DATA', params['payload']
try:
request.raise_for_status()
except:
raise SpotifyException(request.status_code, -1,
'{}:\n {}'.format(request.url, request.json['error']['message']))
if len(request.text) > 0:
results = request.json()
if DEBUG:
print '\nRESP {}'.format(results)
return encode(results)
else:
return None
def get(url, payload=None, **kwargs):
tries = 0
delay = 1
while tries < config.MAX_GET_RETRIES:
try:
return api_call('GET', url, payload, kwargs)
except SpotifyException as e:
tries += 1
if e.status_code == 429 or (500 <= e.status_code < 600):
if tries > config.MAX_GET_RETRIES:
raise
else:
time.sleep(delay)
delay += 1
else:
raise
except:
tries += 1
if tries < config.MAX_GET_RETRIES:
time.sleep(delay)
delay += 1
else:
raise
import math
def mceil(n, m):
'''
returns ceil(n) the multiple m.
example: mceil(21, 10) -> 30.0 which is the next multiple of 10.
:param n: value to ceil.
:param m: the multiple.
'''
return m * math.ceil(n / float(m))
def mfloor(n, m):
'''
returns floor(n) the multiple m.
example: mfloor(29, 10) -> 20.0 which is the previous multiple of 10.
:param n: value to floor.
:param m: the multiple.
'''
return m * math.floor(n / float(m))
class Paginated(object):
def __init__(self, query, wrapper=None):
'''
loads `results` into a paginated object. this can be modified to any
paginated API.
the one element that is set for this specific example is `self.wrapper`
as the API being used wraps the results by the `type` parameter in the
query string.
example `results`:
result = {
artists: {
href: "/search?query=turtles&offset=0&limit=20&type=artist",
items: [..],
limit: 20,
next: "/search?query=turtles&offset=20&limit=20&type=artist",
offset: 0,
previous: null,
total: 24
}
}
query = {
'url': '/url/to/call',
'params': {
'limit': 10,
'offset': 0
}
}
:param query: the query that will be made to the API.
:param wrapper: does the API wrap the results inside a wrapper? if so,
`self.load_page` will remove this from each call.
'''
self.wrapper = wrapper
self.query = query
self.limit = query['params']['limit']
self.total = self.limit # assumes at least `limit` items total.
self.pages = None
self.items = []
def __len__(self):
'''
returns length of `self.items` minus all empty (None) elemnts.
'''
return len(self.items) - self.items.count(None)
def __str__(self):
return str(self.items[:len(self)])
def __getitem__(self, index):
'''
returns `self.items[index]` by calling `Paginated[index]`. walks through
`self.items` and loads next page if needed.
:param index: index of `self.items` being obtained.
'''
def step(index):
if index > self.total:
raise IndexError('list index out of range')
if index < 0:
'''
if `index` is less than `0`, we need to know total, `self.total`
in a list of 23 items, paginated[-2] should return paginated[22]
since we don't yet know the total size, we must call the first
page to adjust `index` properly.
'''
self.load_page(0)
index += self.total
page = int(mfloor(index, self.limit)) / self.limit
if not self.pages or not self.pages[page]:
self.load_page(page)
return self.items[index]
if isinstance(index, slice):
'''
if paginated[x:] is called, where `stop` is not specified, we make a
'blind call'. `IndexError` will be raised if no calls to the API
have been made to where `self.total` has been set. `page[0]` will be
loaded and the indices are pulled again. if there's still a range
issue, `IndexError` will be raised and returned.
'''
try:
indices = index.indices(self.total)
return [step(i) for i in xrange(*indices)]
except IndexError:
indices = index.indices(self.total)
return [step(i) for i in xrange(*indices)]
return step(index)
def _build_pages(self, total):
'''
builds total pages. called from `self.load_page` if the pages have not
yet been populated. creates empty lists `items[N]` and pages[P]` where
`N` is total number of items and `P` is total number of pages.
:param total: total items of paginated results.
'''
self.total = total
self.items = [None] * total
self.pages = [0] * (int(mceil(total, self.limit)) / self.limit)
def load_page(self, page):
'''
loads the given page by making an API call with the offset for `page`.
sets `pages[page]` to loaded, generates needed offset for `page` and
sends out a GET request to the API.
:param page: given page within the paginated object.
'''
offset = page * self.limit
self.query['params']['offset'] = offset
results = get(self.query['url'], **self.query['params'])
if self.wrapper:
results = results[self.wrapper]
if self.pages is None:
self._build_pages(results['total'])
self.pages[page] = 1
for i in xrange(len(results['items'])):
self.items[i + offset] = results['items'][i]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment