Created
November 7, 2022 20:17
-
-
Save Emekaborisama/8b3c0a70a2742cf6e861b99751faea7c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import resource | |
import requests | |
import pandas as pd | |
from magniv.core import task | |
import pickle | |
from upload_download_s3 import download_s3, upload_s3 | |
#load serialized model | |
serialized_model = open("tasks/model/model_lin.p", "rb") | |
model = pickle.load(serialized_model) | |
url = os.getev("DATA_URL") | |
def inference(data,model): | |
""" load model, preprocess data and run price prediction inference""" | |
data['fields.last_review'] = pd.to_datetime(data['fields.last_review']) | |
data['fields.last_review_month'] =data['fields.last_review'].dt.month | |
data['fields.last_review_days'] = data['fields.last_review'].dt.day | |
data['fields.last_review_year'] = 2022 -data['fields.last_review'].dt.year | |
data= data.drop(['fields.last_review','fields.latitude', 'fields.longitude'],axis=1) | |
data['fields.room_type'] = data['fields.room_type'].replace({"Entire home/apt":0,"Private room":1,"Shared room":2,"Hotel room":3}) | |
return model.predict(data.values) | |
@task(key='first', schedule="@monthly",on_success=["second"], description=" get airbnb data and store it on s3") | |
#magniv decorator to setup orchestration monthly and run task after preprocessing is done | |
def get_data(): | |
payload={'dataset': 'airbnb-listings', | |
'q': 'new york'} | |
response = requests.request("POST", url,data=payload, ) | |
result = response.json() | |
result_ori = pd.json_normalize(result['records']) | |
res = result_ori[['fields.room_type', 'fields.minimum_nights', 'fields.number_of_reviews', 'fields.reviews_per_month','fields.host_listings_count', 'fields.availability_365','fields.latitude', 'fields.longitude','fields.last_review']] | |
up_data = upload_s3(res) | |
return res, result_ori | |
@task(key="second",schedule="@monthly",resources={"cpu": "2000m", "memory": "2Gi"},description=" preprocess data and run price prediction inference") | |
def merge_result_geo(): | |
data = download_s3() | |
pred = inference(model=model, data=data) | |
data['predicted_prices'] = pred | |
return data[['predicted_prices','fields.latitude', 'fields.longitude' ]].to_json() | |
if __name__ == '__main__': | |
merge_result_geo() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment