Created
August 1, 2021 11:25
-
-
Save lenalytics/c1225258665d49b1f6fe05ebce439840 to your computer and use it in GitHub Desktop.
Airbnb ETL
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import psycopg2 | |
from ast import literal_eval | |
from re import sub | |
def connect(): | |
connection = psycopg2.connect(user="xxx", | |
password="xxx", | |
host="xxx", | |
port="5432", | |
database="dbname") | |
return connection | |
def upload_to_table(filename, table): | |
"""This method reads files from ./data/filename, applies transformations from table, and inserts the result into the | |
database """ | |
conn = connect() | |
c = conn.cursor() | |
file = open(filename, "r") | |
reader = csv.reader(file, delimiter=',') | |
next(reader, None) # skip the headers | |
for row in reader: | |
table(row).insert(cursor=c) | |
conn.commit() | |
c.close() | |
upload_to_table("data/au/listings.csv", Listing) | |
upload_to_table("data/au/reviews.csv", Review) | |
upload_to_table("data/au/calendar.csv", Calendar) | |
## CLASS DEFINITIONS BELOW | |
class Listing: | |
def __init__(self, csv_row): | |
self.raw_csv_row = csv_row | |
self.id = toInt(csv_row[0]) | |
self.listing_url = csv_row[1] | |
self.scrape_id = toInt(csv_row[2]) | |
self.last_searched = toString(csv_row[3], '-infinity') | |
self.last_scraped = toString(csv_row[4], '-infinity') | |
self.name = csv_row[5] | |
self.description = csv_row[6] | |
self.neighborhood_overview = csv_row[7] | |
self.picture_url = csv_row[8] | |
self.host_id = toInt(csv_row[9]) | |
self.host_url = csv_row[10] | |
self.host_name = csv_row[11] | |
self.host_since = toString(csv_row[12], 'infinity') | |
self.host_location = csv_row[13] | |
self.host_about = csv_row[14] | |
self.host_response_time = csv_row[15] | |
self.host_response_rate = toInt(csv_row[16]) | |
self.host_acceptance_rate = toInt(csv_row[17]) | |
self.host_is_superhost = csv_row[18] == 't' | |
self.host_thumbnail_url = csv_row[19] | |
self.host_picture_url = csv_row[20] | |
self.host_neighbourhood = csv_row[21] | |
self.host_listings_count = toInt(csv_row[22]) | |
self.host_total_listings_count = toInt(csv_row[23]) | |
self.host_verifications = literal_eval(csv_row[24]) # array | |
self.host_has_profile_pic = csv_row[25] == 't' | |
self.host_identity_verified = csv_row[26] == 't' | |
self.neighbourhood = csv_row[27] | |
# self.location = csv_row[] | |
self.latitude = float(csv_row[28]) | |
self.longitude = float(csv_row[29]) | |
self.property_type = csv_row[30] | |
self.room_type = csv_row[31] | |
self.accommodates = toInt(csv_row[32]) | |
self.bathrooms = toInt(csv_row[33]) | |
self.bathrooms_text = csv_row[34] | |
self.bedrooms = toInt(csv_row[35]) | |
self.beds = toInt(csv_row[36]) | |
self.amenities = literal_eval(csv_row[37]) # array | |
self.price = toDecimal(csv_row[38]) | |
self.minimum_nights = toInt(csv_row[39]) | |
self.maximum_nights = toInt(csv_row[40]) | |
self.minimum_minimum_nights = toInt(csv_row[41]) | |
self.maximum_minimum_nights = toInt(csv_row[42]) | |
self.minimum_maximum_nights = toInt(csv_row[43]) | |
self.maximum_maximum_nights = toInt(csv_row[44]) | |
self.minimum_nights_avg_ntm = toInt(csv_row[45]) | |
self.maximum_nights_avg_ntm = toInt(csv_row[46]) | |
self.calendar_updated = csv_row[47] | |
self.has_availability = csv_row[48] == 't' | |
self.availability_30 = toInt(csv_row[49]) | |
self.availability_60 = toInt(csv_row[50]) | |
self.availability_90 = toInt(csv_row[51]) | |
self.availability_365 = toInt(csv_row[52]) | |
self.calendar_last_scraped = toString(csv_row[53], '-infinity') | |
self.number_of_reviews = toInt(csv_row[54]) | |
self.number_of_reviews_ltm = toInt(csv_row[55]) | |
self.number_of_reviews_l30d = toInt(csv_row[56]) | |
self.first_review = toString(csv_row[57], '-infinity') | |
self.last_review = toString(csv_row[58], '-infinity') | |
self.review_scores_rating = toInt(csv_row[59]) | |
self.review_scores_accuracy = toInt(csv_row[60]) | |
self.review_scores_cleanliness = toInt(csv_row[61]) | |
self.review_scores_checkin = toInt(csv_row[62]) | |
self.review_scores_communication = toInt(csv_row[63]) | |
self.review_scores_location = toInt(csv_row[64]) | |
self.review_scores_value = toInt(csv_row[65]) | |
self.requires_license = csv_row[66] == 't' | |
self.license = csv_row[67] | |
self.instant_bookable = csv_row[68] == 't' | |
self.calculated_host_listings_count = toInt(csv_row[69]) | |
self.calculated_host_listings_count_entire_homes = toInt(csv_row[70]) | |
self.calculated_host_listings_count_private_rooms = toInt(csv_row[71]) | |
self.calculated_host_listings_count_shared_rooms = toInt(csv_row[72]) | |
self.region_id = toInt(csv_row[73]) | |
self.region_name = csv_row[74] | |
self.region_parent_id = toInt(csv_row[75]) | |
self.region_parent_name = csv_row[76] | |
self.region_parent_parent_id = toInt(csv_row[77]) | |
self.region_parent_parent_name = csv_row[78] | |
self.reviews_per_month = toDecimal(csv_row[79]) | |
def insert(self, cursor=None): | |
sql = """INSERT INTO "listing"(id,listing_url,scrape_id,last_searched,last_scraped,name,description,neighborhood_overview,picture_url,host_id,host_url,host_name,host_since,host_location,host_about,host_response_time,host_response_rate,host_acceptance_rate,host_is_superhost,host_thumbnail_url,host_picture_url,host_neighbourhood, | |
host_listings_count,host_total_listings_count,host_verifications,host_has_profile_pic,host_identity_verified, | |
neighbourhood, location, latitude, longitude, | |
property_type,room_type,accommodates,bathrooms,bathrooms_text,bedrooms,beds,amenities,price, | |
minimum_nights,maximum_nights,minimum_minimum_nights,maximum_minimum_nights,minimum_maximum_nights,maximum_maximum_nights, | |
minimum_nights_avg_ntm,maximum_nights_avg_ntm,calendar_updated, | |
has_availability,availability_30,availability_60,availability_90,availability_365,calendar_last_scraped,number_of_reviews,number_of_reviews_ltm,number_of_reviews_l30d,first_review,last_review,review_scores_rating,review_scores_accuracy,review_scores_cleanliness,review_scores_checkin,review_scores_communication,review_scores_location,review_scores_value, | |
requires_license,license,instant_bookable,calculated_host_listings_count,calculated_host_listings_count_entire_homes,calculated_host_listings_count_private_rooms,calculated_host_listings_count_shared_rooms,region_id,region_name,region_parent_id,region_parent_name,region_parent_parent_id,region_parent_parent_name,reviews_per_month) | |
VALUES(%s, %s, %s, %s, %s, %s,%s, %s, %s,%s, %s, %s,%s, %s, %s,%s, %s, %s,%s, %s, %s,%s, %s, %s, %s, %s, %s, %s, | |
POINT(%s, %s), %s, %s, | |
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, | |
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, | |
%s, %s, %s, %s, %s, %s, %s,%s, %s, %s, %s, %s, %s, %s);""" | |
try: | |
cursor.execute(sql, ( | |
self.id, self.listing_url, self.scrape_id, self.last_searched, self.last_scraped, self.name, | |
self.description, | |
self.neighborhood_overview, self.picture_url, self.host_id, self.host_url, self.host_name, | |
self.host_since, | |
self.host_location, self.host_about, self.host_response_time, self.host_response_rate, | |
self.host_acceptance_rate, self.host_is_superhost, self.host_thumbnail_url, self.host_picture_url, | |
self.host_neighbourhood, self.host_listings_count, self.host_total_listings_count, | |
self.host_verifications, | |
self.host_has_profile_pic, self.host_identity_verified, self.neighbourhood, | |
self.latitude, self.longitude, self.latitude, self.longitude, | |
self.property_type, self.room_type, self.accommodates, self.bathrooms, self.bathrooms_text, | |
self.bedrooms, | |
self.beds, self.amenities, self.price, | |
self.minimum_nights, self.maximum_nights, self.minimum_minimum_nights, self.maximum_minimum_nights, | |
self.minimum_maximum_nights, | |
self.maximum_maximum_nights, self.minimum_nights_avg_ntm, self.maximum_nights_avg_ntm, | |
self.calendar_updated, | |
self.has_availability, self.availability_30, self.availability_60, self.availability_90, | |
self.availability_365, self.calendar_last_scraped, self.number_of_reviews, self.number_of_reviews_ltm, | |
self.number_of_reviews_l30d, self.first_review, self.last_review, self.review_scores_rating, | |
self.review_scores_accuracy, self.review_scores_cleanliness, self.review_scores_checkin, | |
self.review_scores_communication, self.review_scores_location, self.review_scores_value, | |
self.requires_license, self.license, self.instant_bookable, self.calculated_host_listings_count, | |
self.calculated_host_listings_count_entire_homes, self.calculated_host_listings_count_private_rooms, | |
self.calculated_host_listings_count_shared_rooms, self.region_id, self.region_name, | |
self.region_parent_id, | |
self.region_parent_name, self.region_parent_parent_id, self.region_parent_parent_name, | |
self.reviews_per_month | |
)) | |
except Exception as error: | |
print(self.raw_csv_row) | |
print("Exception:", error) | |
class Review: | |
def __init__(self, csv_row): | |
self.raw_csv_row = csv_row | |
self.listing_id = toInt(csv_row[0]) | |
self.id = toInt(csv_row[1]) | |
self.date = toString(csv_row[2], '+infinity') | |
self.reviewer_id = toInt(csv_row[3]) | |
self.reviewer_name = csv_row[4] | |
self.comments = csv_row[5] | |
def insert(self, cursor=None): | |
sql = """INSERT INTO "review"(listing_id,id,date,reviewer_id,reviewer_name,comments) | |
VALUES(%s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING;""" | |
try: | |
cursor.execute(sql, ( | |
self.listing_id, self.id, self.date, self.reviewer_id, self.reviewer_name, self.comments | |
)) | |
except Exception as error: | |
print(self.raw_csv_row) | |
print("Exception:", error) | |
class Calendar: | |
def __init__(self, csv_row): | |
self.raw_csv_row = csv_row | |
self.listing_id = toInt(csv_row[0]) | |
self.date = toString(csv_row[1], '+infinity') | |
self.available = csv_row[2] == 't' | |
self.price = toDecimal(csv_row[3]) | |
self.adjusted_price = toDecimal(csv_row[4]) | |
self.minimum_nights = toInt(csv_row[5]) | |
self.maximum_nights = toInt(csv_row[6]) | |
def insert(self, cursor=None): | |
sql = """INSERT INTO "airdata"."calendar"(listing_id,date,available,price,adjusted_price,minimum_nights,maximum_nights) | |
VALUES(%s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING;""" | |
try: | |
cursor.execute(sql, ( | |
self.listing_id, self.date, self.available, self.price, self.adjusted_price, self.minimum_nights, | |
self.maximum_nights | |
)) | |
except Exception as error: | |
print(self.raw_csv_row) | |
print("Exception:", error) | |
def toInt(s): | |
cs = sub("[^0-9-.]", "", s) | |
try: | |
conv = int(cs) | |
except: | |
conv = 0 | |
return conv | |
def toDecimal(s): | |
cs = sub("[^0-9.-]", "", s) | |
try: | |
conv = float(cs) | |
except: | |
conv = 0 | |
return conv | |
def badString(s): | |
if not s: return True | |
return s.isspace() or s == '' | |
def toString(s, default): | |
return default if badString(s) else s | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment