Skip to content

Instantly share code, notes, and snippets.

@calixtofelipe
Created February 12, 2021 14:30
Show Gist options
  • Save calixtofelipe/04a2c6eee1e51eed32182102091877b8 to your computer and use it in GitHub Desktop.
Save calixtofelipe/04a2c6eee1e51eed32182102091877b8 to your computer and use it in GitHub Desktop.
from pyspark import SparkContext
from pyspark.sql import SparkSession, dataframe
from config import MONGO_URI, S3_ENDPOINT,S3_BUCKET, S3_ACCESS, S3_SECRET, S3_COLLECTION
def mongo_2_s3():
spark_mongo = (
SparkSession.builder.appName("spark_get_mongo")
.config("spark.mongodb.input.uri", MONGO_URI)
.config("spark.mongodb.output.uri", MONGO_URI)
.config("spark.mongodb.input.collection", S3_COLLECTION)
.config(
"spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1"
)
.getOrCreate()
)
df = spark_mongo.read.format("mongo").load()
def load_config(spark_context: SparkContext):
spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", S3_ACCESS)
spark_context._jsc.hadoopConfiguration().set("fs.s3a.secret.key", S3_SECRET)
spark_context._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
spark_context._jsc.hadoopConfiguration().set(
"fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"
)
spark_context._jsc.hadoopConfiguration().set(
"fs.s3a.endpoint", S3_ENDPOINT
)
spark_context._jsc.hadoopConfiguration().set(
"fs.s3a.connection.ssl.enabled", "false"
)
load_config(spark_mongo.sparkContext)
df.write.parquet(S3_BUCKET)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment