Skip to content

Instantly share code, notes, and snippets.

@calixtofelipe
Created February 13, 2021 11:11
Show Gist options
  • Save calixtofelipe/aff0587cdeab7eb5cc31223173923dd4 to your computer and use it in GitHub Desktop.
Save calixtofelipe/aff0587cdeab7eb5cc31223173923dd4 to your computer and use it in GitHub Desktop.
from pyspark import SparkContext
from pyspark.sql import SparkSession, dataframe
import pyspark.sql.functions as F
from config import MONGO_URI, S3_ENDPOINT,S3_BUCKET, S3_ACCESS, S3_SECRET, \
S3_COLLECTION,S3_BUCKET_READ, ORACLE_URL, ORACLE_TABLE, ORACLE_USER, ORACLE_PASSWORD
def s3_2_oracle():
spark = SparkSession.builder.getOrCreate()
def load_config(spark_context: SparkContext):
spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", S3_ACCESS)
spark_context._jsc.hadoopConfiguration().set("fs.s3a.secret.key", S3_SECRET)
spark_context._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
spark_context._jsc.hadoopConfiguration().set(
"fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"
)
spark_context._jsc.hadoopConfiguration().set(
"fs.s3a.endpoint", S3_ENDPOINT
)
spark_context._jsc.hadoopConfiguration().set(
"fs.s3a.connection.ssl.enabled", "false"
)
load_config(spark.sparkContext)
df = spark.read.parquet(S3_BUCKET_READ)
df = df.withColumn("id_string", df._id.oid).drop("_id")
df.write.format("jdbc").options(
url=ORACLE_URL,
driver="oracle.jdbc.driver.OracleDriver",
dbtable=ORACLE_TABLE,
user=ORACLE_USER,
password=ORACLE_PASSWORD,
).mode("append").save()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment