Last active
January 26, 2016 11:03
-
-
Save willycs40/078b97d73c3097241b8d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
# This code is borrowed, with edits for python 3, from https://github.com/scraperwiki/zoopla | |
# License: https://github.com/scraperwiki/zoopla/blob/master/LICENCE | |
import requests | |
import requests_cache | |
from urllib.parse import urlencode | |
import logging | |
L = logging.getLogger(__name__) | |
BASE_URL = 'http://api.zoopla.co.uk/api/v1/' | |
def api(version, **kwargs): | |
if version == 1: | |
return _ApiVersion1(**kwargs) | |
else: | |
raise ValueError("Unsupported API version: '{}'".format(version)) | |
class PropertyListing(object): | |
def __init__(self, member_variables): | |
self.__dict__ = member_variables | |
class _ApiVersion1(object): | |
def __init__(self, api_key, session_id=None, cache_seconds=(12 * 60 * 60)): | |
self.api_key = self._validate_api_key(api_key) | |
if cache_seconds: | |
install_cache(cache_seconds) | |
def _validate_api_key(self, api_key): | |
if len(api_key) < 24: | |
raise ValueError("Invalid API key(?): '{}'".format(api_key)) | |
return api_key | |
def _make_url(self, command, arguments): | |
arguments['api_key'] = self.api_key | |
url = "{}{}.js?{}".format(BASE_URL, command, | |
urlencode(sort_dict(arguments))) | |
L.debug(url) | |
return url | |
def _call_api(self, command, arguments): | |
validate_query_arguments(arguments) | |
url = self._make_url('property_listings', arguments) | |
f = download_url(url) | |
parsed = f | |
if 'error_code' in parsed: | |
# TODO: define an ApiError | |
raise RuntimeError("Error {}: {}".format(parsed['error_code'], | |
parsed['error_string'])) | |
return parsed | |
def zed_index(self): | |
raise NotImplementedError("This method isn't yet implemented.") | |
def area_value_graphs(self): | |
raise NotImplementedError("This method isn't yet implemented.") | |
def property_rich_list(self): | |
raise NotImplementedError("This method isn't yet implemented.") | |
def average_area_sold_price(self): | |
raise NotImplementedError("This method isn't yet implemented.") | |
def area_zed_indices(self): | |
raise NotImplementedError("This method isn't yet implemented.") | |
def zoopla_estimates(self): | |
raise NotImplementedError("This method isn't yet implemented.") | |
def average_sold_prices(self): | |
raise NotImplementedError("This method isn't yet implemented.") | |
def _call_api_paged(self, command, args, max_results, result_processor): | |
""" | |
There are a few conditions where we need to stop paging | |
1) We've yielded max_results | |
2) We've yielded result_count | |
""" | |
num_yielded = 0 | |
num_yielded_in_loop = 0 | |
args['page_size'] = 100 | |
args['page_number'] = 1 | |
result_count = None | |
def reached_limit(number, limit): | |
return number >= limit if limit is not None else False | |
def finished(): | |
L.debug("yielded: {}, max_results: {}, result_count: {}".format( | |
num_yielded, max_results, result_count)) | |
if reached_limit(num_yielded, max_results): | |
L.debug("Stop paging, yielded={}, max_results={}".format( | |
num_yielded, max_results)) | |
return True | |
elif reached_limit(num_yielded, result_count): | |
L.debug("Stop paging, yielded={}, result_count={}".format( | |
num_yielded, result_count)) | |
return True | |
else: | |
return False | |
while not finished(): | |
response = self._call_api('property_listings', args) | |
result_count = response['result_count'] - 1 # note I added a -1 here because I was getting an off-by-one causing looping meaning I used up all my quota | |
num_yielded_in_loop = 0 | |
for listing in result_processor(response): | |
yield listing | |
num_yielded += 1 | |
num_yielded_in_loop += 1 | |
if finished(): | |
break | |
if num_yielded_in_loop < 5: # an extra safety, this breaks the loop if we didn't get at least 5 results... | |
break | |
args['page_number'] += 1 | |
def property_listings(self, max_results=100, **kwargs): | |
L.debug('property_listings(max_results={}, {})'.format( | |
max_results, kwargs)) | |
result_processor = self._create_listings | |
if 'page_size' not in kwargs and 'page_number' not in kwargs: | |
L.debug("Automatically paging this request.") | |
generator = self._call_api_paged( | |
'property_listings', | |
kwargs, | |
max_results, | |
result_processor) | |
else: | |
L.debug("Not paging this request.") | |
generator = self.create_listings( | |
self._call_api('property_listings', kwargs)) | |
for listing in generator: | |
yield listing | |
def _create_listings(self, api_response): | |
response_meta = dict(api_response) | |
del response_meta['listing'] | |
L.debug("response meta: {}".format(response_meta)) | |
listings = api_response['listing'] | |
L.debug("{} listings".format(len(listings))) | |
for listing in listings: | |
listing['meta'] = response_meta | |
yield PropertyListing(listing) | |
def get_session_id(self): | |
raise NotImplementedError("This method isn't yet implemented.") | |
def refine_estimate(self): | |
raise NotImplementedError("This method isn't yet implemented.") | |
def arrange_viewing(self): | |
raise NotImplementedError("This method isn't yet implemented.") | |
def local_info_graphs(self): | |
raise NotImplementedError("This method isn't yet implemented.") | |
def property_historic_listings(self): | |
raise NotImplementedError("This method isn't yet implemented.") | |
def install_cache(expire_after): | |
L.info("Installing cache, valid for {} seconds.".format(expire_after)) | |
requests_cache.install_cache( | |
expire_after=expire_after, | |
allowable_methods=('GET',)) | |
def download_url(url): | |
response = requests.get(url) | |
response.raise_for_status() | |
return response.json() | |
def sort_dict(some_dict): | |
""" | |
Return a unicode:unicode dictionary, sorted by the key. | |
>>> sort_dict({'b': 1, 'a': 2}) | |
[('a', 2), ('b', 1)] | |
""" | |
return sorted(some_dict.items()) | |
def validate_query_arguments(arguments): | |
validated = {} | |
for argument, value in arguments.items(): | |
validated[argument] = validate_argument(argument, value) | |
def validate_argument(name, value): | |
validate_func_name = 'validate_' + name | |
try: | |
validate_func = globals()[validate_func_name] | |
except KeyError: | |
L.debug("No function {}(..), returning '{}' as '{}'".format( | |
validate_func_name, name, value)) | |
return value | |
else: | |
L.debug("Calling {}({})".format(validate_func_name, value)) | |
return validate_func(value) | |
def validate_area(area): | |
return True | |
def validate_lat_min(value): | |
float(value) | |
return value |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from api import api | |
import csv | |
import logging | |
from time import sleep | |
class Parameters: | |
BASE_OUTPUT_FILE_NAME ='output/output_{}.csv' | |
API_KEY = '85g6qverxyyjqe37gxkgqp6n' | |
BASE_API_PARAMETERS = { | |
'postcode': 'b23', | |
'order_by': 'age', | |
'max_results': 2000, | |
# 'listing_status': 'rent', # rent / sale | |
'include_sold': 1, | |
'include_rented': 1, | |
'summarised': 'true' | |
} | |
OUTCODES = [ | |
'B1','B2','B3','B4','B5','B6','B7','B8','B9','B10','B11','B12','B13','B14','B15','B16','B17','B18','B19','B20','B21','B23','B24','B25','B26','B27','B28','B29','B30','B31','B32','B33','B34','B35','B36','B37','B38','B40','B42','B43','B44','B45','B46','B47','B48','B49','B50','B60','B61','B62','B63','B64','B65','B66','B67','B68','B69','B70','B71','B72','B73','B74','B75','B76','B77','B78','B79','B80','B90','B91','B92','B93','B94','B95','B96','B97','B98' | |
] | |
CSV_FIELD_LIST = [ | |
'listing_id', | |
'outcode', | |
'displayable_address', | |
'num_bathrooms', | |
'num_bedrooms', | |
'num_floors', | |
'num_recepts', | |
'listing_status', | |
'status', | |
'price', | |
'property_type', | |
'new_home', | |
'latitude', | |
'longitude', | |
'first_published_date', | |
'last_published_date' | |
] | |
def get_data(api_instance, parameters, file_name, write_header): | |
outputfile=open(file_name,'w',encoding='utf8',newline='') | |
wr = csv.writer(outputfile, quoting=csv.QUOTE_ALL) | |
if write_header: | |
wr.writerow(Parameters.CSV_FIELD_LIST) | |
for listing in api_instance.property_listings(**parameters): | |
row_fields = [] | |
for field in Parameters.CSV_FIELD_LIST: | |
if hasattr(listing, field): | |
row_fields.append(getattr(listing, field)) | |
else: | |
row_fields.append('') | |
wr.writerow(row_fields) | |
outputfile.close() | |
def main(): | |
logging.info('Getting API') | |
api_instance = api(version=1, api_key=Parameters.API_KEY) | |
#single_run(api_instance, Parameters.BASE_API_PARAMETERS, Parameters.BASE_OUTPUT_FILE_NAME.format('single')) | |
looped_run(api_instance, Parameters.BASE_API_PARAMETERS, Parameters.BASE_OUTPUT_FILE_NAME, 5) | |
def single_run(api_instance, api_params, file_name): | |
get_data(api_instance, api_params, file_name, True) | |
def looped_run(api_instance, api_params, file_name, delay_spacer): | |
for outcode in Parameters.OUTCODES[0:2]: | |
api_params['postcode'] = outcode | |
file_name = base_file_name.format(outcode) | |
get_data(api_instance, api_params, file_name, False) | |
sleep(delay_spacer) | |
if __name__ == '__main__': | |
logging.basicConfig(level=logging.DEBUG) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment