Last active
October 17, 2015 22:54
-
-
Save bnlucas/98858034fc1f974afbaf to your computer and use it in GitHub Desktop.
Move seamlessly through paginated API results without loading all pages at once. Only loads the pages needed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
this example uses the Spotify Web API which returns paginated results. | |
the Paginated class can be modified for use with other APIs. | |
output: | |
--------------------------------------------------------------------- | |
top 5 search results: | |
Bear Vs. Shark | |
Shark? | |
Shark Tank | |
Shark City Click | |
Shark Week | |
bottom 5 search results: | |
Tom Johnson's Shark | |
Ka Mano - 'The Shark' | |
Shark Toys | |
Shark Alley Hobos | |
Chita Rivera, Marilyn Cooper, Reri Grist & Shark Girls | |
26/166 items loaded. | |
2/ 9 pages loaded. | |
''' | |
import Paginated | |
API_URL = 'https://api.spotify.com/v1/' | |
DEBUG = True | |
MARKET = 'US' | |
session = requests.api | |
artists = search('shark') | |
print 'top 5 search results:\n' | |
for artist in artists[:5]: | |
print artist['name'] | |
print '\n\nbottom 5 search results:\n' | |
for artist in artists[-5:]: | |
print artist['name'] | |
print '\n{:3d}/{:3d} items loaded.'.format(len(artists), artists.total) | |
print '{:3d}/{:3d} pages loaded.'.format(sum(artists.pages), len(artists.pages)) | |
def search(query, type, limit=20, offset=0, market=None): | |
''' | |
make a search call to the API. return paginated object | |
with results. this doesn't actually make the initial | |
call, only sets up the Paginated object which will then | |
make the API calls. | |
''' | |
if not market: | |
market = MARKET | |
query = { | |
'url': 'search', | |
'params': { | |
'query': query, | |
'type': type, | |
'limit': limit, | |
'offset': offset, | |
'market': market, | |
} | |
} | |
return Paginated(query, 'artists') | |
def encode(data, encoding='utf-8'): | |
''' | |
returns encoded json object returned by the API request. | |
:param data: the json object. | |
:param encoding: which encoding to use. | |
''' | |
if isinstance(data, dict): | |
return {encode(k): encode(v) for k, v in data.iteritems()} | |
if isinstance(data, list): | |
return [encode(i) for i in data] | |
if isinstance(data, unicode): | |
return data.encode(encoding) | |
return data | |
class SpotifyException(Exception): | |
def __init__(self, status_code, code, msg): | |
self.status_code = status_code | |
self.code = code | |
self.msg = msg | |
def __str__(self): | |
return 'status code: {}, code:{} - {}'.format( | |
self.status_code, self.code, self.msg) | |
def api_call(method, url, payload, params): | |
''' | |
make the API call. | |
''' | |
url = ''.join([API_URL, url]) | |
params = dict(params=params) | |
headers = {'Content-Type': 'application/json'} | |
if payload: | |
params['payload'] = json.dumps(params) | |
request = session.request(method, url, headers=headers, **params) | |
if DEBUG: | |
print '\n', method, request.url | |
if payload: | |
print 'DATA', params['payload'] | |
try: | |
request.raise_for_status() | |
except: | |
raise SpotifyException(request.status_code, -1, | |
'{}:\n {}'.format(request.url, request.json['error']['message'])) | |
if len(request.text) > 0: | |
results = request.json() | |
if DEBUG: | |
print '\nRESP {}'.format(results) | |
return encode(results) | |
else: | |
return None | |
def get(url, payload=None, **kwargs): | |
tries = 0 | |
delay = 1 | |
while tries < config.MAX_GET_RETRIES: | |
try: | |
return api_call('GET', url, payload, kwargs) | |
except SpotifyException as e: | |
tries += 1 | |
if e.status_code == 429 or (500 <= e.status_code < 600): | |
if tries > config.MAX_GET_RETRIES: | |
raise | |
else: | |
time.sleep(delay) | |
delay += 1 | |
else: | |
raise | |
except: | |
tries += 1 | |
if tries < config.MAX_GET_RETRIES: | |
time.sleep(delay) | |
delay += 1 | |
else: | |
raise |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
def mceil(n, m): | |
''' | |
returns ceil(n) the multiple m. | |
example: mceil(21, 10) -> 30.0 which is the next multiple of 10. | |
:param n: value to ceil. | |
:param m: the multiple. | |
''' | |
return m * math.ceil(n / float(m)) | |
def mfloor(n, m): | |
''' | |
returns floor(n) the multiple m. | |
example: mfloor(29, 10) -> 20.0 which is the previous multiple of 10. | |
:param n: value to floor. | |
:param m: the multiple. | |
''' | |
return m * math.floor(n / float(m)) | |
class Paginated(object): | |
def __init__(self, query, wrapper=None): | |
''' | |
loads `results` into a paginated object. this can be modified to any | |
paginated API. | |
the one element that is set for this specific example is `self.wrapper` | |
as the API being used wraps the results by the `type` parameter in the | |
query string. | |
example `results`: | |
result = { | |
artists: { | |
href: "/search?query=turtles&offset=0&limit=20&type=artist", | |
items: [..], | |
limit: 20, | |
next: "/search?query=turtles&offset=20&limit=20&type=artist", | |
offset: 0, | |
previous: null, | |
total: 24 | |
} | |
} | |
query = { | |
'url': '/url/to/call', | |
'params': { | |
'limit': 10, | |
'offset': 0 | |
} | |
} | |
:param query: the query that will be made to the API. | |
:param wrapper: does the API wrap the results inside a wrapper? if so, | |
`self.load_page` will remove this from each call. | |
''' | |
self.wrapper = wrapper | |
self.query = query | |
self.limit = query['params']['limit'] | |
self.total = self.limit # assumes at least `limit` items total. | |
self.pages = None | |
self.items = [] | |
def __len__(self): | |
''' | |
returns length of `self.items` minus all empty (None) elemnts. | |
''' | |
return len(self.items) - self.items.count(None) | |
def __str__(self): | |
return str(self.items[:len(self)]) | |
def __getitem__(self, index): | |
''' | |
returns `self.items[index]` by calling `Paginated[index]`. walks through | |
`self.items` and loads next page if needed. | |
:param index: index of `self.items` being obtained. | |
''' | |
def step(index): | |
if index > self.total: | |
raise IndexError('list index out of range') | |
if index < 0: | |
''' | |
if `index` is less than `0`, we need to know total, `self.total` | |
in a list of 23 items, paginated[-2] should return paginated[22] | |
since we don't yet know the total size, we must call the first | |
page to adjust `index` properly. | |
''' | |
self.load_page(0) | |
index += self.total | |
page = int(mfloor(index, self.limit)) / self.limit | |
if not self.pages or not self.pages[page]: | |
self.load_page(page) | |
return self.items[index] | |
if isinstance(index, slice): | |
''' | |
if paginated[x:] is called, where `stop` is not specified, we make a | |
'blind call'. `IndexError` will be raised if no calls to the API | |
have been made to where `self.total` has been set. `page[0]` will be | |
loaded and the indices are pulled again. if there's still a range | |
issue, `IndexError` will be raised and returned. | |
''' | |
try: | |
indices = index.indices(self.total) | |
return [step(i) for i in xrange(*indices)] | |
except IndexError: | |
indices = index.indices(self.total) | |
return [step(i) for i in xrange(*indices)] | |
return step(index) | |
def _build_pages(self, total): | |
''' | |
builds total pages. called from `self.load_page` if the pages have not | |
yet been populated. creates empty lists `items[N]` and pages[P]` where | |
`N` is total number of items and `P` is total number of pages. | |
:param total: total items of paginated results. | |
''' | |
self.total = total | |
self.items = [None] * total | |
self.pages = [0] * (int(mceil(total, self.limit)) / self.limit) | |
def load_page(self, page): | |
''' | |
loads the given page by making an API call with the offset for `page`. | |
sets `pages[page]` to loaded, generates needed offset for `page` and | |
sends out a GET request to the API. | |
:param page: given page within the paginated object. | |
''' | |
offset = page * self.limit | |
self.query['params']['offset'] = offset | |
results = get(self.query['url'], **self.query['params']) | |
if self.wrapper: | |
results = results[self.wrapper] | |
if self.pages is None: | |
self._build_pages(results['total']) | |
self.pages[page] = 1 | |
for i in xrange(len(results['items'])): | |
self.items[i + offset] = results['items'][i] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment