Skip to content

Instantly share code, notes, and snippets.

View bgweber's full-sized avatar

Ben Weber bgweber

View GitHub Profile
@bgweber
bgweber / Dockerfile
Last active February 17, 2020 17:04
FROM ubuntu:latest
MAINTAINER Ben Weber
RUN apt-get update \
&& apt-get install -y python3-pip python3-dev \
&& cd /usr/local/bin \
&& ln -s /usr/bin/python3 python
RUN pip3 install flask
RUN pip3 install pandas
@bgweber
bgweber / request.py
Last active February 17, 2020 16:53
import requests
result = requests.post("http://localhost:5000", \
json = { 'G1':'1', 'G2':'0', 'G3':'0', 'G4':'0', 'G5':'0', \
'G6':'0', 'G7':'0', 'G8':'0', 'G9':'0', 'G10':'0'})
print(result)
print(result.json())
# connect to the monitoring service
from google.cloud import logging
logging_client = logging.Client(project = 'serving-268422', credentials = credentials)
logger = logging_client.logger('model_service')
# log a message to stack driver
logger.log_text('Hello World!')
# connect to the monitoring service
from google.cloud import monitoring_v3
from google.oauth2 import service_account
import time
credentials = service_account.Credentials.from_service_account_file('serving.json')
client = monitoring_v3.MetricServiceClient(credentials = credentials)
project_name = client.project_path('serving-268422')
# create a custom metric
@bgweber
bgweber / serving.py
Last active February 17, 2020 16:28
import pandas as pd
from sklearn.linear_model import LogisticRegression
import flask
df = pd.read_csv("https://github.com/bgweber/Twitch/raw/master/Recommendations/games-expand.csv")
model = LogisticRegression()
model.fit(df.drop(['label'], axis=1), df['label'])
app = flask.Flask(__name__)
# load pandas, sklearn, and pyspark types and functions
import pandas as pd
from sklearn.linear_model import LogisticRegression
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
# load the CSV as a Spark data frame
pandas_df = pd.read_csv(
"https://github.com/bgweber/Twitch/raw/master/Recommendations/games-expand.csv")
spark_df = spark.createDataFrame(pandas_df)
# assign a user ID and a partition ID using Spark SQL
spark_df.createOrReplaceTempView("spark_df")
spark_df = spark.sql("""
select *, user_id%10 as partition_id
from (
# train a model, but first, pull everything to the driver node
df = spark_df.toPandas().drop(['user_id', 'partition_id'], axis = 1)
y_train = df['label']
x_train = df.drop(['label'], axis=1)
# use logistic regression
model = LogisticRegression()
model.fit(x_train, y_train)
# pull all data to the driver node
sample_df = spark_df.toPandas()
# create a prediction for each user
ids = sample_df['user_id']
x_train = sample_df.drop(['label', 'user_id', 'partition_id'], axis=1)
pred = model.predict_proba(x_train)
result_df = pd.DataFrame({'user_id': ids, 'prediction': pred[:,1]})
# display the results
# define a schema for the result set, the user ID and model prediction
schema = StructType([StructField('user_id', LongType(), True),
StructField('prediction', DoubleType(), True)])
# define the Pandas UDF
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def apply_model(sample_pd):
# run the model on the partitioned data set
ids = sample_df['user_id']