Skip to content

Instantly share code, notes, and snippets.

View AdroitAnandAI's full-sized avatar

Anand P V AdroitAnandAI

View GitHub Profile
@AdroitAnandAI
AdroitAnandAI / timeonRapids.py
Last active June 14, 2021 08:43
Get Timing Stats on Stanalone vs Rapids
# Generate Tuples and Count in Spark
import string
import random
from time import time
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
if not 'sc' in globals():
sc = SparkContext('local')
spark = SparkSession(sc)
@AdroitAnandAI
AdroitAnandAI / reduceByKey.py
Created June 12, 2021 09:14
Reduce Operation Example
# Simple ReduceByKey example in python sourced from:
# https://backtobazics.com/big-data/spark/apache-spark-reducebykey-example/
# creating PairRDD x with key value pairs
x = sc.parallelize([("a", 1), ("b", 1), ("a", 1), ("a", 1),
("b", 1), ("b", 1), ("b", 1), ("b", 1)], 3)
# Applying reduceByKey operation on x
y = x.reduceByKey(lambda accum, n: accum + n)
print(y.collect())
# [('b', 5), ('a', 3)]
@AdroitAnandAI
AdroitAnandAI / sparkReadS3.py
Last active June 11, 2021 17:21
Read S3 files from Spark EMR Notebook
import os
import boto3
import pandas as pd
import sys
if sys.version_info[0] < 3:
from StringIO import StringIO # Python 2.x
else:
from io import StringIO # Python 3.x
@AdroitAnandAI
AdroitAnandAI / readS3.py
Last active June 11, 2021 17:13
To read and write from S3
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("docker-numpy").getOrCreate()
sc = spark.sparkContext
# Read in a file from S3 with the s3a file protocol
text = spark.read.text("s3a://spark-anand/main.py")
print(text.take(5))
sentences = np.array([
@AdroitAnandAI
AdroitAnandAI / trainRFSpark.py
Last active June 6, 2021 16:50
RandomForest in Spark
import pyspark
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
def trainRFmodel(trainingData):
@AdroitAnandAI
AdroitAnandAI / vectorAssembler.py
Created June 6, 2021 16:43
Combine Features for Spark
from pyspark.ml.feature import VectorAssembler
# Assemble all the features with VectorAssembler
def assembleFeatures(dfRDD):
required_features = ['week',
'temp',
'wind',
'rainfall',
'day',
@AdroitAnandAI
AdroitAnandAI / rf_timeseries.py
Created June 6, 2021 14:32
Random Forest Regressor to predict Time Series
# Fitting Random Forest Regression to the dataset
# To import sklearn's regressor
from sklearn.ensemble import RandomForestRegressor
# create regressor object
regressor = RandomForestRegressor(n_estimators = 100, random_state = 0)
# fit the regressor with x and y data
regressor.fit(train_X, train_y)
@AdroitAnandAI
AdroitAnandAI / newFeatures.py
Created June 6, 2021 14:10
Data Transformations
def convert_to_date(date):
date_as_string = str(date)
date_as_string = date_as_string[2:]
date_time_obj = datetime.strptime(date_as_string, '%y%m%d')
return date_time_obj
def convert_to_daynum(date):
date_as_string = str(date)
date_as_string = date_as_string[2:]
# Configure ML pipeline with three stages: tokenizer, CountVec, and LR
# https://spark.apache.org/docs/latest/ml-pipeline.html
#Refer: https://spark.apache.org/docs/latest/ml-features#tokenizer
tokenizer = Tokenizer(inputCol="text", outputCol="words")
#Refer: https://spark.apache.org/docs/latest/ml-features.html#countvectorizer
cv = CountVectorizer(inputCol=tokenizer.getOutputCol(), \
outputCol="features", minDF=2.0)
lr = LogisticRegression(maxIter=10, regParam=0.001)
@AdroitAnandAI
AdroitAnandAI / sparksql.py
Created June 6, 2021 06:09
Spark SQL Example
# spark is an existing SparkSession
df = spark.read.json("./people.json")
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()