Last active
February 21, 2019 23:12
-
-
Save rhanka/930409b451fea99509e27c398147ba5f to your computer and use it in GitHub Desktop.
Geocode with addok/BAN (adresse.data.gouv.fr) in DSS & python 2.7
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE | |
# -*- coding: utf-8 -*- | |
from multiprocessing import Process, Queue | |
import dataiku | |
from dataiku.customrecipe import get_input_names_for_role | |
from dataiku.customrecipe import get_output_names_for_role | |
from dataiku.customrecipe import get_recipe_config | |
import itertools | |
import logging | |
import pandas as pd | |
import numpy as np | |
import requests | |
import StringIO | |
import sys, time, traceback | |
# Proxy and server config | |
PROXY_PRIVATE_LAB = 'http://192.168.99.4:3128' | |
SERVER_GOUV_FR = 'https://api-adresse.data.gouv.fr' | |
http_proxy = PROXY_PRIVATE_LAB | |
server_address = SERVER_GOUV_FR | |
# Input fields configuration | |
columns = ['aue_adresse','aue_commune'] | |
post_code = 'aue_code_postal' | |
city_code = None | |
# Ouput fields configuration | |
output_prefix = 'aue_ban_' | |
error_prefix = 'error' | |
error_col = '{}{}'.format(output_prefix,error_prefix) if error_prefix else None | |
# Process config | |
lines_per_request = 50 | |
verbosechunksize = 5000 | |
threads = 3 | |
timeout = 60 | |
maxtries = 3 | |
limit = None | |
def err(): | |
#exc_info=sys.exc_info() | |
exc_type, exc_obj, exc_tb = sys.exc_info() | |
return | |
#return "{}".format(traceback.print_exception(*exc_info)) | |
def datas(): | |
"""Returns the columns composing the address""" | |
result = {'columns': columns} | |
cols = list(columns) | |
if post_code: | |
result['postcode'] = post_code | |
cols.append(post_code) | |
if city_code: | |
result['citycode'] = city_code | |
cols.append(city_code) | |
return (result, cols) | |
def process_chunk(i,df,process_queue,write_queue,schema_check=[]): | |
try: | |
df = adresse_submit(df,i,schema_check) | |
if ((((i+1)*lines_per_request) % verbosechunksize) == 0): | |
print("chunk {}-{} ok".format(i*lines_per_request+1,(i+1)*lines_per_request)) | |
except: | |
exc_type, exc_obj, exc_tb = sys.exc_info() | |
# error = "{} : {} line {}".format(str(exc_type),exc_obj,exc_tb.tb_lineno) | |
logging.warning("chunk {}-{} failed - {}".format(i*lines_per_request+1,(i+1)*lines_per_request,traceback.print_exception(exc_type, exc_obj, exc_tb))) | |
write_queue.put(df) | |
process_queue.get(i) | |
def correct_addr(df, cols): | |
df[cols] = df[cols].replace(np.nan, "xxxxx", regex = True) | |
df[cols] = df[cols].replace(r'^\s*$', "xxxxx", regex = True) | |
# df[cols] = df[cols].replace(r'["\']', "",regex = True) | |
df[cols] = df[cols].replace(r'"', "", regex = True) | |
df[cols] = df[cols].replace(r"'", "", regex = True) | |
df[cols] = df[cols].replace(r"\.,", "", regex = True) | |
df[cols] = df[cols].replace(r"_", ",", regex = True) | |
def adresse_submit(df,i=0,schema_check=[]): | |
"""Does the actual request to the geocoding server""" | |
global maxtries | |
string_io = StringIO.StringIO() | |
data, cols = datas() | |
response = None | |
if not isinstance(df,pd.DataFrame): | |
return df | |
df.reset_index(inplace=True) | |
correct_addr(df, cols) | |
df[cols].to_csv(string_io, encoding="utf-8", index=False) | |
kwargs = { | |
'data': data, | |
'files': {'data': string_io.getvalue()}, | |
'timeout': timeout, | |
'url': "{}/search/csv".format(server_address) | |
} | |
kwargs['proxies'] = { | |
'http': PROXY_PRIVATE_LAB, | |
'https': PROXY_PRIVATE_LAB } | |
tries=1 | |
failed=True | |
while ((failed == True) & (tries <= maxtries)): | |
try: | |
response = requests.post(**kwargs) | |
status_code = response.status_code | |
except requests.exceptions.ReadTimeout: | |
status_code = "timeout" | |
if status_code == 200: | |
failed=False | |
else: | |
#logging.warning("{}".format(tries)) | |
tries += 1 | |
if (tries <= maxtries): | |
time.sleep(3 ** (tries-1)) | |
if (failed == False): | |
content = StringIO.StringIO(response.content.decode('utf-8-sig')) | |
result = pd.read_csv(content, dtype=object) | |
if error_col: | |
df[error_col] = None | |
result = result.rename(columns={'longitude': 'result_longitude', | |
'latitude': 'result_latitude'}) | |
if (tries>1): | |
logging.warning("chunk {}-{} needed {} tries".format(tries,i*lines_per_request+1,(i+1)*lines_per_request)) | |
diff = [x for x in result.axes[1].difference(df.axes[1]) if x.startswith('result_')] | |
for result_column in diff: | |
if result_column.startswith("result_"): | |
new_column=result_column.replace("result_", output_prefix) | |
df[new_column] = result[result_column] | |
if (failed == True): | |
tries -= 1 | |
logging.warning("chunk {}-{} failed after {} tries".format(i*lines_per_request+1,(i+1)*lines_per_request,tries)) | |
df[output_prefix+'score'] = -1 | |
if error_col: | |
df[error_col] = "HTTP Status: {}".format(status_code) | |
if (len(schema_check)>len(df.axes[1])): | |
diff = [x for x in schema_check.difference(df.axes[1])] | |
for col in diff: | |
df[col]=None | |
df[cols]=df[cols].replace(r"^xxxxx$",np.nan,regex=True) | |
return df | |
def grouper(iterable, n, fillvalue=None): | |
"Collect data into fixed-length chunks or blocks" | |
args = [iter(iterable)] * n | |
return itertools.izip_longest(*args, fillvalue=fillvalue) | |
def geocode(ids, ods): | |
''' | |
Geocodes each row in an input dataset, and produces a row in the output dataset with additional fields. | |
ids: the input dataset | |
ods: the output dataset | |
''' | |
# First a small pass to produce the output schema | |
small = ids.get_dataframe(sampling='head', limit=3, infer_with_pandas=False) | |
initial_index = small.axes[1] | |
geocoded = adresse_submit(small) | |
output_index = geocoded.axes[1] | |
if '{}longitude'.format(output_prefix) not in output_index: | |
raise Exception('Geocoding failed: unable to make a sample request') | |
schema = ids.read_schema() | |
floats = [output_prefix + column for column in ['longitude', 'latitude', 'score']] | |
for column in output_index.difference(initial_index): | |
schema.append({'name': column, 'type': 'float' if column in floats else 'string'}) | |
ods.write_schema(schema) | |
ow = ods.get_writer() | |
# Then the full pass | |
dataset_iter = ids.iter_dataframes(chunksize=lines_per_request, infer_with_pandas=False, limit=limit) | |
process_queue = Queue(threads) | |
write_queue = Queue() | |
for i,chunk in enumerate(dataset_iter): | |
process_queue.put(i) | |
thread = Process(target=process_chunk, args=[i,chunk,process_queue,write_queue,output_index]) | |
thread.start() | |
while (write_queue.qsize() > 0): | |
ow.write_dataframe(write_queue.get()) | |
print "waiting {} chunk processes".format(process_queue.qsize()) | |
while (process_queue.qsize() > 0): | |
time.sleep(1) | |
print "flushing {} chunks".format(write_queue.qsize()) | |
while (write_queue.qsize() > 0): | |
ow.write_dataframe(write_queue.get()) | |
ow.close() | |
ids = dataiku.Dataset("input") | |
ods = dataiku.Dataset("output") | |
geocode(ids, ods) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment