Created
June 11, 2022 14:25
-
-
Save nfarah86/83edcee6ed28b833c966b1d61f0a4d1b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import SparkSession | |
from pyspark import SparkContext | |
from pyspark.sql.functions import col | |
import os | |
from dotenv import load_dotenv | |
from rockset import Client, Q, F | |
load_dotenv() | |
def create_spark_session(): | |
# create spark session | |
spark = SparkSession \ | |
.builder \ | |
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.0") \ | |
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ | |
.getOrCreate() | |
return spark | |
def write_to_s3(rdata): | |
rdata.write.format("csv").option("header","true").save("s3a://spark-rockset-public-nadine/OutputMoviesTwitch.csv") | |
def read_data(spark): | |
# read data coming in from s3 | |
# transform data | |
print(dir(spark)) | |
sc = spark.sparkContext | |
hadoop_configuration = sc._jsc.hadoopConfiguration() | |
hadoop_configuration.set("fs.s3a.access.key", os.environ.get('ACCESS_KEY')) | |
hadoop_configuration.set("fs.s3a.secret.key", os.environ.get('SECRET_KEY')) | |
hadoop_configuration.set("fs.s3a.endpoint", "s3.amazonaws.com") | |
hadoop_configuration.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") | |
rdata = spark.read.options(header='True', delimiter=',').csv("s3a://spark-rockset-public-nadine/movies.csv") | |
#rdata.show() | |
#rdata.printSchema() | |
rdata = rdata.withColumn("vote_count", col("vote_count").cast("int")) | |
# rdata.printSchema() | |
return rdata | |
def create_collection(rs): | |
# created integration on the rockset console: MUST DO | |
# create a collection here | |
integration = rs.Integration.retrieve('sparkTwitchSessionS3') | |
csv = rs.Source.csv_params( | |
separator = ',', | |
encoding='UTF-8', | |
first_line_as_column_names = True, | |
column_names = None, | |
column_types=None | |
) | |
s3 = rs.Source.s3(bucket='spark-rockset-public-nadine', | |
prefix='OutputMoviesTwitch.csv/', | |
integration=integration, | |
format_params=csv | |
) | |
# poll collections - check upcoming blog post | |
new_coll = rs.Collection.create(name='SparkS3Twtich', sources =[s3]) | |
def create_query_lambda(rs): | |
q = Q("SparkS3Twtich").where(F["video"] == "True").limit(1) | |
print(q) | |
qlambda = rs.QueryLambda.create('twitchQLSpark', workspace='commons', query=q,) | |
results = qlambda.execute() | |
print(results) | |
def main(): | |
#spark_session = create_spark_session() | |
#rdata = read_data(spark_session) | |
#write_to_s3(rdata) | |
rs = Client(api_server='api.rs2.usw2.rockset.com', api_key= os.environ.get('NADINE_KEY')) | |
print(rs) | |
#create_collection(rs) | |
create_query_lambda(rs) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment