Skip to content

Instantly share code, notes, and snippets.

@nfarah86
Created June 11, 2022 14:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nfarah86/83edcee6ed28b833c966b1d61f0a4d1b to your computer and use it in GitHub Desktop.
Save nfarah86/83edcee6ed28b833c966b1d61f0a4d1b to your computer and use it in GitHub Desktop.
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import col
import os
from dotenv import load_dotenv
from rockset import Client, Q, F
load_dotenv()
def create_spark_session():
# create spark session
spark = SparkSession \
.builder \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.0") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.getOrCreate()
return spark
def write_to_s3(rdata):
rdata.write.format("csv").option("header","true").save("s3a://spark-rockset-public-nadine/OutputMoviesTwitch.csv")
def read_data(spark):
# read data coming in from s3
# transform data
print(dir(spark))
sc = spark.sparkContext
hadoop_configuration = sc._jsc.hadoopConfiguration()
hadoop_configuration.set("fs.s3a.access.key", os.environ.get('ACCESS_KEY'))
hadoop_configuration.set("fs.s3a.secret.key", os.environ.get('SECRET_KEY'))
hadoop_configuration.set("fs.s3a.endpoint", "s3.amazonaws.com")
hadoop_configuration.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
rdata = spark.read.options(header='True', delimiter=',').csv("s3a://spark-rockset-public-nadine/movies.csv")
#rdata.show()
#rdata.printSchema()
rdata = rdata.withColumn("vote_count", col("vote_count").cast("int"))
# rdata.printSchema()
return rdata
def create_collection(rs):
# created integration on the rockset console: MUST DO
# create a collection here
integration = rs.Integration.retrieve('sparkTwitchSessionS3')
csv = rs.Source.csv_params(
separator = ',',
encoding='UTF-8',
first_line_as_column_names = True,
column_names = None,
column_types=None
)
s3 = rs.Source.s3(bucket='spark-rockset-public-nadine',
prefix='OutputMoviesTwitch.csv/',
integration=integration,
format_params=csv
)
# poll collections - check upcoming blog post
new_coll = rs.Collection.create(name='SparkS3Twtich', sources =[s3])
def create_query_lambda(rs):
q = Q("SparkS3Twtich").where(F["video"] == "True").limit(1)
print(q)
qlambda = rs.QueryLambda.create('twitchQLSpark', workspace='commons', query=q,)
results = qlambda.execute()
print(results)
def main():
#spark_session = create_spark_session()
#rdata = read_data(spark_session)
#write_to_s3(rdata)
rs = Client(api_server='api.rs2.usw2.rockset.com', api_key= os.environ.get('NADINE_KEY'))
print(rs)
#create_collection(rs)
create_query_lambda(rs)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment