Skip to content

Instantly share code, notes, and snippets.

@andfanilo
Last active January 29, 2024 08:25
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andfanilo/9bde77d9cc8cebd44154cfd6ddd1ea6f to your computer and use it in GitHub Desktop.
Save andfanilo/9bde77d9cc8cebd44154cfd6ddd1ea6f to your computer and use it in GitHub Desktop.
Pyspark Streamlit demo for university
from pyspark.rdd import RDD
from pyspark.sql import Row
import streamlit as st
from utils import _initialize_spark
st.write("# :tada: Hello Pyspark")
spark, sc = _initialize_spark()
st.write("[Link to Spark window](http://localhost:4040)")
st.write("## Create RDD from a Python list")
l = list(range(10))
# st.write(l)
rdd = sc.parallelize(l)
rdd.cache()
st.write(rdd)
st.write("## Get results through actions")
st.write(rdd.collect())
st.write(rdd.take(3))
st.write(rdd.count())
st.write("## Transform RDDs")
st.write(rdd.filter(lambda x: x%2==0).collect()) # talk about lazy evaluation here: filter still non evaluated rdd
st.write(rdd.map(lambda x: x*2).collect())
st.write(rdd.map(lambda x: x*2).reduce(lambda x, y: x + y)) # reduce runs all previous rdds
# Compare the two following
st.write(rdd.map(lambda x: list(range(x))).collect())
st.write(rdd.flatMap(lambda x: list(range(x))).collect())
st.write("## Wordcount")
file_rdd = sc.textFile("lorem.txt")
st.write(file_rdd.collect()) # so what's inside ?
st.write(file_rdd.flatMap(lambda sentence: sentence.split()).map(lambda word: (word, 1)).reduceByKey(lambda x,y: x+y).collect())
st.write()
# spark.stop()
import pandas as pd
import pyspark.pandas as ps
import streamlit as st
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import SQLTransformer
from pyspark.rdd import RDD
from pyspark.sql import Row
from pyspark.sql.types import *
from utils import _initialize_spark
spark, sc = _initialize_spark()
st.write("# :tada: Hello Pyspark DataFrame")
rdd = spark.sparkContext.parallelize([('1', 'a'), ('2', 'b'), ('3', 'c'), ('4', 'd'), ('5', 'e'), ('6', 'f')])
schema = StructType([StructField('ID', StringType(), True), StructField('letter', StringType(), True)])
df = spark.createDataFrame(rdd, schema)
st.write(df)
### INIT
df = pd.read_csv("titanic.csv")
sdf = spark.createDataFrame(df)
### PySpark Dataframe
st.write(sdf.columns)
st.dataframe(sdf.toPandas())
st.write(sdf.select("PassengerId", "Survived", "Sex"))
st.write(sdf.withColumn('Name_Upper', upper(sdf.Name)))
st.write(sdf.groupby('Survived').count())
@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
# Simply plus one by using pandas Series.
return series + 1
st.write(sdf.select(pandas_plus_one(sdf.Age)))
sdf.createOrReplaceTempView("titanic")
st.write(spark.sql('SELECT * FROM titanic WHERE Survived = 1'))
st.write(spark.sql("SELECT count(*) from titanic GROUP BY Survived"))
spark.udf.register("add_one", pandas_plus_one)
st.write(spark.sql("SELECT add_one(Age) FROM titanic"))
### pandas-on-Spark Dataframe
#psdf = ps.from_pandas(df)
psdf = sdf.pandas_api()
st.write(psdf["Age"])
st.write(psdf[psdf["Survived"]==1].to_pandas())
st.write(psdf.groupby('Survived').count())
st.write(psdf.groupby('Survived')["PassengerId"].count().to_pandas())
### Titanic Final
titanic = spark.read.format(
'csv'
).option(
'header', 'true'
).option(
'inferSchema', 'true'
).load("titanic.csv")
titanic.createOrReplaceTempView('titanic')
st.subheader("Machine Learning")
mean_age = spark.sql('SELECT MEAN(Age) FROM titanic').collect()[0][0]
titanic = titanic.na.fill(mean_age, ['Age'])
binarizer = Binarizer(threshold=50, inputCol="Age", outputCol="binarized_age")
titanic_bin = binarizer.transform(titanic)
st.write(titanic_bin.toPandas())
regex='"(Mr)"'
sqlTrans = SQLTransformer(
statement=f"SELECT *, regexp_extract(Name, {regex}) AS Civility FROM __THIS__"
)
st.write(sqlTrans.transform(titanic).toPandas())

Prerequisites

conda create -n pyspark-demo
conda activate pyspark-demo
pip install streamlit pyspark >= 3.0.0
import numpy as np
import pandas as pd
from pyspark import SparkConf
from pyspark.sql import SparkSession
def _initialize_spark() -> SparkSession:
"""Create a Spark Session for Streamlit app"""
conf = SparkConf().setAppName("lecture-lyon2").setMaster("local")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
return spark, spark.sparkContext
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment