Skip to content

Instantly share code, notes, and snippets.

@O1sims
Last active July 26, 2018 14:44
Show Gist options
  • Save O1sims/9d40f2b249d5a2bc803ddabd0feefa44 to your computer and use it in GitHub Desktop.
Save O1sims/9d40f2b249d5a2bc803ddabd0feefa44 to your computer and use it in GitHub Desktop.
Collect PropertyPal data
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
"""
Created on Tue July 10 11:06:05 2018
Last updated Thur July 26 15:43:10 2018
@author: owen
"""
import os
import re
import json
import requests
import datetime
from bson import json_util
from bs4 import BeautifulSoup
from string import punctuation
# Enter the area you want to collect property data from, keep blank ("") to search whole of NI
SEARCH_AREA = "belfast"
DIR_PATH = os.path.dirname(os.path.realpath(__file__))
BASIC_REQUEST = {
"baseURL": "https://www.propertypal.com",
"forSalePath": "/property-for-sale",
"forRentPath": "/property-to-rent",
"sortOptions": {
"mostPopular": "/sort-hot",
"recentlyAdded": "/sort-dateHigh",
"recentlyUpdated": "/sort-updatedHigh",
"priceLowHigh": "/sort-priceLow",
"priceHighLow": "/sort-priceHigh"
},
"userAgent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:24.0) Gecko/20100101 Firefox/24.0"
}
POSTCODE_AREAS = {
'BT1': 'central belfast',
'BT2': 'central belfast',
'BT3': 'central belfast',
'BT4': 'east belfast',
'BT5': 'east belfast',
'BT6': 'east belfast',
'BT7': 'central belfast',
'BT8': 'south belfast',
'BT9': 'south belfast',
'BT10': 'south belfast',
'BT11': 'west belfast',
'BT12': 'west belfast',
'BT13': 'north belfast',
'BT14': 'north belfast',
'BT15': 'central belfast',
'BT16': 'east belfast',
'BT17': 'south belfast',
'BT18': 'holywood',
'BT19': 'bangor',
'BT20': 'bangor',
'BT21': 'donaghadee',
'BT22': 'strangford',
'BT23': 'newtownards',
'BT24': 'ballynahinch',
'BT25': 'dromore',
'BT26': 'hillsborough',
'BT27': 'east lisburn',
'BT28': 'lisburn',
'BT29': 'crumlin',
'BT30': 'downpatrick',
'BT31': 'castlewellan',
'BT32': 'banbridge',
'BT33': 'newcastle',
'BT34': 'hilltown',
'BT35': 'newry',
'BT36': 'glengormley',
'BT37': 'whiteabbey',
'BT38': 'carrickfergus',
'BT39': 'ballyclare',
'BT40': 'larne',
'BT41': 'antrim',
'BT42': 'ballymena',
'BT43': 'north ballymena',
'Belfast': 'central belfast'
}
def get_property_page(area, page_number, property_type, sort_by):
type_url = 'forSalePath' if property_type == 'sale' else 'forRentPath'
url = '{}{}'.format(
BASIC_REQUEST['baseURL'],
BASIC_REQUEST[type_url])
if area:
url += '/{}'.format(area)
if sort_by is not None:
url += BASIC_REQUEST['sortOptions'][sort_by]
if page_number > 1:
url += '/page-{}'.format(page_number)
request_page = requests.get(
url=url,
headers={'User-Agent': BASIC_REQUEST['userAgent']})
if request_page.status_code == 200:
return BeautifulSoup(request_page.content, "lxml")
else:
raise ValueError(
'Page not found: The request received a {} status code'.format(
request_page.status_code))
def get_final_page_number(first_page_soup):
raw_page_number = first_page_soup.find("li", {"class", "paging-last"}).get_text()
clean_page_number = raw_page_number.encode('ascii', 'ignore').strip().replace(',', '')
return int(clean_page_number)
def get_all_main_images(page_soup):
property_images = []
all_images = page_soup.findAll("div", {"class": "propbox-img"})
for image in all_images:
string_image = str(image)
if 'is-no-photo' in string_image:
property_images.append(None)
elif 'propbox-time' in string_image or 'openviewing' in string_image:
property_images.append(
image.find("img").attrs['data-lazy-src'])
return property_images
def strip_punctuation(string):
prep_string = string.replace('-', ' ')
return ''.join(char for char in prep_string if char not in punctuation)
def clean_price(price):
if price is None:
end_price = price
else:
raw_price = price.get_text()
cleaned_price = raw_price.encode('ascii', 'ignore').strip().replace(',', '')
try:
end_price = int(cleaned_price)
except ValueError:
end_price = cleaned_price
return end_price
def get_currency(raw_price):
currencies = {
'£': 'pound',
'€': 'euro',
'$': 'dollar'
}
string_price = str(raw_price)
for c, v in currencies.iteritems():
if c in string_price:
return v
return 'unknown'
def get_price(page_soup):
offer = page_soup.find("span", {"class": "price-offers"})
if offer is not None:
offer = offer.get_text().strip()
price = page_soup.find("span", {"class": "price-value "})
min_price = page_soup.find("span", {"class": "price-min"})
max_price = page_soup.find("span", {"class": "price-max"})
currency = 'unknown'
if price is not None:
currency = get_currency(price)
elif min_price is not None:
currency = get_currency(price)
return {
'offer': offer,
'price': clean_price(price),
'minPrice': clean_price(min_price),
'maxPrice': clean_price(max_price),
'currency': currency
}
def get_property_id(html_string, clean_address, id_length=10):
property_id = ''
start_string = html_string.find(clean_address)
for i in range(id_length):
new_char = html_string[start_string + len(clean_address) + i]
if new_char is not '"':
property_id += html_string[start_string + len(clean_address) + i]
else:
break
return property_id
def get_clean_address(address, town):
trans_address = strip_punctuation(address).lower().replace(' ', '-')
return str('/{}-{}/'.format(
trans_address,
town.lower()))
def get_hyperlink(page_soup, address, town):
if town is None or address is None:
return None
clean_address = get_clean_address(
address=address,
town=town)
property_id = get_property_id(
html_string=str(page_soup),
clean_address=clean_address)
return clean_address + property_id
def get_address(page_soup):
raw_address = page_soup.find("span", {"class": "propbox-addr"}).get_text()
clean_address = str(raw_address.rstrip(', '))
return clean_address
def get_postcode(page_soup):
raw_postcode = page_soup.find("span", {"class": "propbox-town"})
clean_postcode = str(raw_postcode.get_text())
return clean_postcode
def get_brief(page_soup):
raw_brief = page_soup.find("p", {"class": "propbox-brief"})
if raw_brief is None:
return None
else:
clean_brief = str(raw_brief.get_text().strip())
return clean_brief
def get_estate_agent(page_soup):
agent = page_soup.find("p", {"class": "propbox-account"})
agent_data = {'name': None, 'branch': None}
if agent is not None:
rep = {
'Marketed by ': '',
'Developed by ': ''
}
rep = dict((re.escape(k), v) for k, v in rep.iteritems())
pattern = re.compile("|".join(rep.keys()))
agent = pattern.sub(
lambda m: rep[re.escape(m.group(0))],
agent.get_text().strip())
if '(' in agent:
agent_data['name'] = str(agent.split(' (')[0])
agent_data['branch'] = str(agent[agent.find("(") + 1:agent.find(")")])
else:
agent_data['name'] = str(agent)
return agent_data
def property_location(detail_soup):
map_data = detail_soup.find('a', {'class': 'Mediabox-miniBoxMap'}).attrs
if map_data is None:
return {
'lat': float(0),
'lon': float(0)
}
map_data_options = json.loads(map_data['data-map-options'])
return {
'lat': float(map_data_options['lat']),
'lon': float(map_data_options['lng'])
}
def amenity_present(detail_page, amenity):
return amenity in detail_page.lower()
def parse_epc_rating(epc_rating_list):
parsed_epc = {
'actual': {
'band': None,
'score': None
},
'potential': {
'band': None,
'score': None
}
}
if len(epc_rating_list) == 2:
parsed_epc_values = []
for epc in epc_rating_list:
match = re.match(
pattern=r"([a-z]+)([0-9]+)",
string=epc,
flags=re.I)
if match:
items = match.groups()
for item in items:
parsed_epc_values.append(item)
parsed_epc['actual']['band'] = parsed_epc_values[0]
parsed_epc['actual']['score'] = int(parsed_epc_values[1])
parsed_epc['potential']['band'] = parsed_epc_values[2]
parsed_epc['potential']['score'] = int(parsed_epc_values[3])
return parsed_epc
def get_property_details(hyperlink):
if hyperlink is None:
return None
data = {}
page_response = requests.get(
url='{}{}'.format(
BASIC_REQUEST['baseURL'],
hyperlink),
headers={'User-Agent': BASIC_REQUEST['userAgent']})
if page_response.status_code == 200:
detail_page = page_response.content
detail_soup = BeautifulSoup(detail_page, "lxml")
key_info_table = detail_soup.find("table", {"id": "key-info-table"})
key_info_rows = key_info_table.find_all('tr')
for row in key_info_rows:
row_title = str(row.findAll('th')[0].get_text().lower())
if row_title != 'stamp duty' and row_title != 'price':
cols = row.findAll('td')
cols = [ele.text.strip() for ele in cols]
info = [ele for ele in cols if ele][0].encode('ascii', 'ignore').strip()
if row_title == 'rates':
info = float(info.replace(' pa*', '').replace(',', ''))
elif 'epc' in row_title:
info = info.split('\n', 1)[0]
info = parse_epc_rating(
epc_rating_list=info.split('/'))
elif row_title in ['bathrooms', 'bedrooms', 'receptions']:
info = int(info)
else:
info = str(info)
data[to_camel_case(string=row_title)] = info
data['amenities'] = {}
for amenity in ['garden', 'garage', 'driveway', 'parking', 'bay window']:
data['amenities'][to_camel_case(string=amenity)] = amenity_present(
detail_page=detail_page,
amenity=amenity)
data['location'] = property_location(
detail_soup=detail_soup)
data['additionalInfo'] = str(detail_soup.find(
"div", {"class": "prop-descr-text"}))
return data
else:
return None
def to_camel_case(string):
humped_camel = ''.join(x for x in string.title() if not x.isspace())
return humped_camel[0].lower() + humped_camel[1:]
def generate_tags(taggables):
tags = []
for tag in taggables:
if tag is not None:
tags.append(tag.lower())
return tags
def property_dataset(page_soup):
dataset = []
property_details = page_soup.findAll("div", {"class": "propbox-details"})
property_images = get_all_main_images(page_soup)
number_of_properties = len(property_details)
number_of_images = len(property_images)
if number_of_properties != number_of_images:
raise ValueError(
'Mis-match: We collected {} properties and {} images'.format(
number_of_properties,
number_of_images))
for i in range(number_of_properties):
address = get_address(property_details[i])
postcode_split = get_postcode(property_details[i]).split()
if len(postcode_split) > 1:
town = postcode_split[0]
postcode = postcode_split[1]
area = POSTCODE_AREAS[postcode]
hyperlink = get_hyperlink(
page_soup=page_soup,
address=address,
town=town)
property_id = get_property_id(
html_string=str(page_soup),
clean_address=get_clean_address(
address=address,
town=town))
else:
area = town = postcode = hyperlink = property_id = None
tags = generate_tags(
taggables=[address, town, postcode, area])
dataset.append({
'timeAdded': datetime.datetime.now(),
'propertyId': property_id,
'tags': tags,
'address': address,
'town': town,
'area': area,
'postcode': postcode,
'priceInfo': get_price(property_details[i]),
'brief': get_brief(property_details[i]),
'estateAgent': get_estate_agent(property_details[i]),
'hyperlink': '{}{}'.format(
BASIC_REQUEST['baseURL'],
hyperlink),
'details': get_property_details(hyperlink),
'image': property_images[i]
})
return dataset
def get_property_dataset(area, property_type, sort_by,
page_limit=False):
first_page = get_property_page(
area=area,
page_number=0,
property_type=property_type,
sort_by=sort_by)
if page_limit:
final_page_number = 3
else:
final_page_number = get_final_page_number(
first_page)
print('Parsing page 1 of {}...'.format(final_page_number))
property_data = property_dataset(first_page)
for page_number in range(2, final_page_number):
try:
print('Parsing page {} of {}...'.format(
page_number, final_page_number))
property_page = get_property_page(
area=area,
page_number=page_number,
property_type=property_type,
sort_by=sort_by)
property_data += property_dataset(property_page)
except ValueError:
print("Oops! Some parsing went wrong in search page {}".format(
page_number))
return property_data
if __name__ == '__main__':
properties = get_property_dataset(
area=SEARCH_AREA,
property_type='sale',
sort_by='recentlyAdded',
page_limit=True)
properties_json = json.dumps(properties, default=json_util.default)
json_file = open(DIR_PATH + '/propertyData.json', 'w')
json_file.write(properties_json)
json_file.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment