Skip to content

Instantly share code, notes, and snippets.

View bgweber's full-sized avatar

Ben Weber bgweber

View GitHub Profile
# sklearn version
from sklearn.ensemble import RandomForestRegressor as RFR
from multiprocessing.pool import ThreadPool
# allow up to 5 concurrent threads
pool = ThreadPool(5)
# hyperparameters to test out (n_trees)
parameters = [ 10, 20, 50]
# spark version
from pyspark.ml.regression import RandomForestRegressor
# define a function to train a RF model and return metrics
def mllib_random_forest(trees, boston_train, boston_test):
# train a random forest regressor with the specified number of trees
rf = RandomForestRegressor(numTrees = trees, labelCol="target")
model = rf.fit(boston_train)
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
# setup the spark data frame as a table
boston_sp.createOrReplaceTempView("boston")
# add train/test label and expand the data set by 3x (each num trees parameter)
full_df = spark.sql("""
select *
from (
# import panda, keras and tensorflow
import pandas as pd
import tensorflow as tf
import keras
from keras import models, layers
# Load the sample data set and split into x and y data frames
df = pd.read_csv("https://github.com/bgweber/Twitch/raw/master/Recommendations/games-expand.csv")
x = df.drop(['label'], axis=1)
y = df['label']
# Load libraries
import flask
import pandas as pd
import tensorflow as tf
import keras
from keras.models import load_model
# instantiate flask
app = flask.Flask(__name__)
# load Flask
import flask
app = flask.Flask(__name__)
# define a predict function as an endpoint
@app.route("/predict", methods=["GET","POST"])
def predict():
data = {"success": False}
# get the request parameters
# define a schema for the result set, the user ID and model prediction
schema = StructType([StructField('user_id', LongType(), True),
StructField('prediction', DoubleType(), True)])
# define the Pandas UDF
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def apply_model(sample_pd):
# run the model on the partitioned data set
ids = sample_df['user_id']
# pull all data to the driver node
sample_df = spark_df.toPandas()
# create a prediction for each user
ids = sample_df['user_id']
x_train = sample_df.drop(['label', 'user_id', 'partition_id'], axis=1)
pred = model.predict_proba(x_train)
result_df = pd.DataFrame({'user_id': ids, 'prediction': pred[:,1]})
# display the results
# train a model, but first, pull everything to the driver node
df = spark_df.toPandas().drop(['user_id', 'partition_id'], axis = 1)
y_train = df['label']
x_train = df.drop(['label'], axis=1)
# use logistic regression
model = LogisticRegression()
model.fit(x_train, y_train)
# load the CSV as a Spark data frame
pandas_df = pd.read_csv(
"https://github.com/bgweber/Twitch/raw/master/Recommendations/games-expand.csv")
spark_df = spark.createDataFrame(pandas_df)
# assign a user ID and a partition ID using Spark SQL
spark_df.createOrReplaceTempView("spark_df")
spark_df = spark.sql("""
select *, user_id%10 as partition_id
from (