Skip to content

Instantly share code, notes, and snippets.

View mostafam's full-sized avatar

Mostafa Majidpour mostafam

View GitHub Profile
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf('string', PandasUDFType.SCALAR)
def get_zip_pdf_b(lat_series, lng_series):
pdf = brd_pdf.value
zip_series= []
for k in range(len(lat_series)):
lat = lat_series[k]
lng = lng_series[k]
@udf('string')
def get_zip_b(lat, lng):
pdf = brd_pdf.value
try:
out = pdf[(pdf['bounds_north']>=lat) &
(pdf['bounds_south']<=lat) &
(pdf['bounds_west']<=lng) &
(pdf['bounds_east']>=lng) ]
dist = [None]*len(out)
for i in range(len(out)):
@mostafam
mostafam / take4_3.py
Created June 3, 2020 01:17
Two stage procedure
# with radius = 5
tic = time.time()
output_df1 = df.withColumn('zip', get_zip_d(col("latitude"),col("longitude"))).cache()
output_df1.filter("zip='bad'").count()
toc = time.time()
(toc-tic)/60 # about 515 minutes
# about 4% without zip code
# with radius = 100
tic = time.time()
@mostafam
mostafam / take4_2.py
Created June 3, 2020 01:15
uszipcode logic
import math
@udf('string')
def get_zip_d(lat, lng):
dist_btwn_lat_deg = 69.172
dist_btwn_lon_deg = math.cos(lat) * 69.172
radius = 5
lat_degr_rad = abs(radius / dist_btwn_lat_deg)
lon_degr_rad = abs(radius / dist_btwn_lon_deg)
@mostafam
mostafam / take4_1.py
Created June 3, 2020 01:14
Take4 Extracting db file
import sqlite3
search = SearchEngine(db_file_dir="/tmp/db")
conn = sqlite3.connect("/tmp/db/simple_db.sqlite")
pdf = pd.read_sql_query('''select zipcode, lat, lng, radius_in_miles,
bounds_west, bounds_east, bounds_north, bounds_south from
simple_zipcode''',conn)
brd_pdf = sc.broadcast(pdf)
from joblib import Parallel, delayed
def producer(pdf):
for idx in pdf.index:
yield pdf['latitude'][idx], pdf['longitude'][idx]
def srch(latitude,longitude):
try:
@mostafam
mostafam / take2_2.py
Last active June 3, 2020 01:10
Take2 modified
from pyspark.sql.functions import udf, col
@udf('string')
def get_zip_udf3(latitude, longitude):
search = SearchEngine(db_file_dir="/tmp/db")
try:
zip = search.by_coordinates(latitude, longitude, returns=1)[0].to_dict()["zipcode"]
except:
zip = 'bad'
@mostafam
mostafam / take2.py
Last active June 3, 2020 01:09
Take2
from pyspark.sql.functions import udf, col
@udf('string')
def get_zip_udf2(latitude, longitude):
search = SearchEngine()
try:
zip = search.by_coordinates(latitude, longitude, returns=1)[0].to_dict()["zipcode"]
except:
zip = 'bad'
return zip
from pyspark.sql.functions import udf, col
search = SearchEngine()
@udf('string')
def get_zip_udf1(latitude, longitude):
try:
zip = search.by_coordinates(latitude, longitude, returns=1)[0].to_dict()["zipcode"]
except:
zip = 'bad'
import pandas as pd
import numpy as np
import time
from uszipcode import SearchEngine
search = SearchEngine()
pdf = df.toPandas()
tic = time.time()
for idx in pdf.iloc[0:1000,:].index: