Skip to content

Instantly share code, notes, and snippets.

@carlleston
Last active August 23, 2020 12:09
Show Gist options
  • Save carlleston/6559e78f0f6b293e0f65067a9bd8abb6 to your computer and use it in GitHub Desktop.
Save carlleston/6559e78f0f6b293e0f65067a9bd8abb6 to your computer and use it in GitHub Desktop.
pre-processing and linear model in pyspark
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import findspark\n",
"import pyspark\n",
"import sys\n",
"import ibmos2spark\n",
"\n",
"from pyspark.ml import Pipeline\n",
"from pyspark.ml.feature import VectorAssembler\n",
"from pyspark.ml.feature import Normalizer\n",
"from pyspark.ml.regression import LinearRegression\n",
"from pyspark.ml.feature import MinMaxScaler\n",
"from pyspark.ml.regression import LinearRegression\n",
"from pyspark.sql.functions import percent_rank\n",
"from pyspark.sql import Window\n",
"from pyspark.ml.evaluation import RegressionEvaluator\n",
"from pyspark.sql import SparkSession\n",
"from pyspark.sql.functions import col, unix_timestamp, to_date\n",
"from pyspark.sql import Window\n",
"import pyspark.sql.functions as func\n",
"\n",
"from pyspark.sql.functions import udf,col\n",
"from pyspark.sql.types import IntegerType\n",
"\n",
"from pyspark import SparkContext, SparkConf\n",
"import sys\n",
"\n",
"if __name__ == \"__main__\":\n",
"\n",
" conf = SparkConf().setAppName(\"Spark lnmodel\")\n",
" sc = SparkContext(conf=conf)\n",
" spark=SparkSession(sc)\n",
" findspark.init()\n",
" findspark.find()\n",
"\n",
" df_ts = spark.read.format(\"csv\").option(\"header\", \"true\").load(sys.argv[1])\n",
" df_ts = df_ts[['Date','Itau_open','BVSP_open','USDBRL_open','Itau_Close','lag_1','lag_2','lag_3']]\n",
"\n",
" df_news = spark.read.format(\"csv\").option(\"header\", \"true\").load(sys.argv[2])\n",
" df_news = df_news[['Date', 'Class']]\n",
"\n",
" df = df_ts.alias('a').join(df_news.alias('b'), on = ['Date'], how = 'outer')\n",
" df = df.withColumn('Date',to_date(unix_timestamp(col('Date'), 'yyyy-MM-dd').cast(\"timestamp\"))).orderBy('Date')\n",
" df = df.withColumn('Itau_open', col('Itau_open').cast('double'))\n",
" df = df.withColumn('BVSP_open', col('BVSP_open').cast('double')) \n",
" df = df.withColumn('USDBRL_open', col('USDBRL_open').cast('double'))\n",
" df = df.withColumn('Itau_Close', col('Itau_Close').cast('double'))\n",
" df = df.withColumn('lag_1', col('lag_1').cast('double'))\n",
" df = df.withColumn('lag_2', col('lag_2').cast('double'))\n",
" df = df.withColumn('lag_3', col('lag_3').cast('double'))\n",
" window_ff = Window.orderBy('Date')\\\n",
" .rowsBetween(-sys.maxsize, 0)\n",
"\n",
" read_last = func.last(df['Class'],\n",
" ignorenulls=True)\\\n",
" .over(window_ff)\n",
" # add columns to the dataframe\n",
" df = df.withColumn('Class', read_last)\n",
" df = df.na.fill({'Class': 'NN'})\n",
" categories = df.select('Class').distinct().rdd.flatMap(lambda x : x).collect()\n",
" categories.sort()\n",
" for category in categories:\n",
" function = udf(lambda item: 1 if item == category else 0, IntegerType())\n",
" new_column_name = 'class'+'_'+category\n",
" df = df.withColumn(new_column_name, function(col('class')))\n",
"\n",
" df = df[['Date','Itau_open','BVSP_open','USDBRL_open','Itau_Close','class_N','class_NN','class_P','lag_1','lag_2','lag_3']]\n",
"\n",
" FEATURES_COL1 = ['Itau_open','BVSP_open','USDBRL_open','lag_1','lag_2','lag_3']\n",
" vectorAssembler = VectorAssembler(inputCols=FEATURES_COL1,outputCol=\"features\")\n",
" vdf = vectorAssembler.transform(df.na.drop())\n",
" vdf = vdf.select(['Date','Itau_Close','features','class_N','class_NN','class_P'])\n",
" scale_features = MinMaxScaler(inputCol= 'features', outputCol= 'scaled_features')\n",
" model_scale = scale_features.fit(vdf)\n",
" df_scaled = model_scale.transform(vdf)\n",
" FEATURES_COL1 = ['scaled_features','class_N','class_NN','class_P']\n",
" vectorAssembler = VectorAssembler(inputCols=FEATURES_COL1,outputCol=\"Col_features\")\n",
" df_completed = vectorAssembler.transform(df_scaled)\n",
" df_completed = df_completed.select(['Date','Itau_Close','Col_features'])\n",
" df_completed = df_completed.withColumn(\"rank\", percent_rank().over(Window.partitionBy().orderBy(\"Date\")))\n",
" train_df = df_completed.where(\"rank <= .95\").drop(\"rank\")\n",
" test_df = df_completed.where(\"rank > .95\").drop(\"rank\")\n",
" lr = LinearRegression(featuresCol = 'Col_features', labelCol='Itau_Close')\n",
" lr_model = lr.fit(train_df)\n",
" lr_predictions = lr_model.transform(test_df)\n",
" lr_evaluator = RegressionEvaluator(predictionCol=\"prediction\", \\\n",
" labelCol=\"Itau_Close\",metricName=\"r2\")\n",
" test_result = lr_model.evaluate(test_df)\n",
" predictions = lr_model.transform(test_df)\n",
" predictions.select(\"prediction\",\"Itau_close\").write.option(\"header\", \"true\").mode(\"overwrite\").parquet('pred.parquet')\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.3"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment