Skip to content

Instantly share code, notes, and snippets.

@mind1m
Created October 11, 2019 16:24
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save mind1m/07c275254f344edff746f09fca6611ae to your computer and use it in GitHub Desktop.
Save mind1m/07c275254f344edff746f09fca6611ae to your computer and use it in GitHub Desktop.
import glob
import time
import math
import base64
import requests
import argparse
import concurrent.futures
import numpy as np
import pandas as pd
from functools import partial
API_KEY = 'INSERT' # From Integrations tab of deployment (beginning of the code)
USERNAME = 'INSERT' # From Integrations tab of deployment (beginning of the code)
DEPLOYMENT_ID = 'INSERT' # From Integrations tab of deployment (beginning of the code)
DATAROBOT_KEY = 'INSERT' # From Integrations tab of deployment (search for datarobot-key in the code)
ROWS_PER_PREDICTION = 50
THREADS = 10
def _make_datarobot_deployment_predictions(data):
"""
Make predictions on data provided using DataRobot deployment_id provided.
See docs for details:
https://app.datarobot.com/docs/users-guide/predictions/api/new-prediction-api.html
Parameters
----------
data : str
Feature1,Feature2
numeric_value,string
Returns
-------
Response schema:
https://app.datarobot.com/docs/users-guide/predictions/api/new-prediction-api.html#response-schema
Raises
------
DataRobotPredictionError if there are issues getting predictions from DataRobot
"""
# Set HTTP headers. The charset should match the contents of the file.
headers = {'Content-Type': 'text/plain; charset=UTF-8', 'datarobot-key': DATAROBOT_KEY}
url = f'https://farmers.dynamic.orm.datarobot.com/predApi/v1.0/deployments/{DEPLOYMENT_ID}/predictions'
# Make API request for predictions
predictions_response = requests.post(
url, auth=(USERNAME, API_KEY), data=data, headers=headers)
# Return a Python dict following the schema in the documentation
return predictions_response.json()
def _find_csv(dataset_folder):
csvs = glob.glob(f'{dataset_folder}/*.csv')
if len(csvs) != 1:
raise ValueError(f'Cannot find one csv in {dataset_folder}, found {csvs}')
return csvs[0]
def _replace_image(dataset_folder, val):
# convert single cell to base64 image, if image
if not isinstance(val, str):
# not string, skip
return val
is_image = ('.jpg' in val.lower()) or ('.png' in val.lower())
if not is_image:
# not image, skip
return val
with open(f'{dataset_folder}/{val}', 'rb') as f:
b64img = base64.b64encode(f.read()).decode('utf-8')
return b64img
def _to_csv(df, dataset_folder):
# replace filenames with b64 images
df = df.applymap(partial(_replace_image, dataset_folder))
return df.to_csv()
def _predict_chunk(df, dataset_folder):
print(f'Adding images to chunk...')
data = _to_csv(df, dataset_folder)
print(f'Sending prediction request...')
start_t = time.time()
pred_data = _make_datarobot_deployment_predictions(data)
print(f'Finished request in {int(time.time() - start_t)} sec')
# get probabilities of class=1
def _positive_class_proba(pred):
for res in pred:
if res['label'] == 1:
return res['value']
raise ValueError('Not found')
probas = [_positive_class_proba(pred['predictionValues']) for pred in pred_data['data']]
df['predicted_probas'] = probas
return df
def main(dataset_folder, output_csv):
in_df = pd.read_csv(_find_csv(dataset_folder))
print(f'Loaded dataset with {len(in_df)} rows')
out_df = None
chunks_count = math.ceil(len(in_df) / ROWS_PER_PREDICTION)
with concurrent.futures.ProcessPoolExecutor(max_workers=THREADS) as executor:
futures = [
executor.submit(_predict_chunk, chunk, dataset_folder)
for chunk in np.array_split(in_df, chunks_count)
]
for future in concurrent.futures.as_completed(futures):
predicted_df = future.result()
if out_df is None:
out_df = predicted_df.copy()
else:
out_df = out_df.append(predicted_df)
print(f'Predicted rows: {len(out_df)}/{len(in_df)}')
out_df.to_csv(output_csv, index=False)
print(f'Saved result to {output_csv}')
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('dataset_folder')
parser.add_argument('output_csv')
args = parser.parse_args()
main(args.dataset_folder, args.output_csv)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment