Skip to content

Instantly share code, notes, and snippets.

@sully90
Created March 13, 2019 12:48
Show Gist options
  • Save sully90/868275a2d6492d266b967a8f0e81073d to your computer and use it in GitHub Desktop.
Save sully90/868275a2d6492d266b967a8f0e81073d to your computer and use it in GitHub Desktop.
LightGBM Issue
#!/usr/bin/env python
# coding: utf-8
# In[1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages anguenot:pyspark-cassandra:0.9.0,Azure:mmlspark:0.16 pyspark-shell"
from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
conf = SparkConf()
conf.setMaster("spark://spark-host:7077")
conf.setAppName("GLIBCXX Test")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
# In[2]:
import pyspark_cassandra
# Marketplace Infringement
# ========================
#
# Load rows from the marketplace infringement table.
# In[3]:
mp = sc.cassandraTable("keyspace", "table")
# In[4]:
# Define the columns to read and schema
from pyspark.sql import types as T
cassandraRDD = mp.select(
"id",
"actioned",
"feature1",
"feature2",
"feature3",
"feature4"
)
schema = T.StructType([
T.StructField('id', T.StringType(), True),
T.StructField('actioned', T.BooleanType(), True),
T.StructField('feature1', T.FloatType(), True),
T.StructField('feature2', T.FloatType(), True),
T.StructField('feature3', T.FloatType(), True),
T.StructField('feature4', T.FloatType(), True)
])
# In[5]:
# Build the DataFrame
df = spark.createDataFrame(cassandraRDD, schema)
# Machine Learning
# ================
# In[6]:
# Prepare input features
from pyspark.ml.feature import VectorAssembler
features = [
"feature1",
"feature2",
"feature3",
"feature4"
]
assembler = VectorAssembler(inputCols=features, outputCol="features", handleInvalid="skip")
stages = [assembler]
# In[7]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df)
ml_df = pipelineModel.transform(df)
selectedCols = ["actioned", 'features']
ml_df = ml_df.select(selectedCols)
ml_df.printSchema()
# In[8]:
# Split into training and test set
train, test = ml_df.randomSplit([0.7, 0.3], seed=1234)
# ML
# ========
# In[9]:
from mmlspark import TrainClassifier, LightGBMClassifier
model = TrainClassifier(model=LightGBMClassifier(), labelCol="actioned").fit(train)
# In[ ]:
from mmlspark import ComputeModelStatistics, TrainedClassifierModel
prediction = model.transform(test)
metrics = ComputeModelStatistics().transform(prediction)
metrics.limit(10).toPandas()
# In[ ]:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment