Skip to content

Instantly share code, notes, and snippets.

@bobquest33
Last active March 9, 2020 05:07
Show Gist options
  • Save bobquest33/e189130046c902170376abfad8413423 to your computer and use it in GitHub Desktop.
Save bobquest33/e189130046c902170376abfad8413423 to your computer and use it in GitHub Desktop.
The below script helps to load the data to a database using Pyspark. I used the following command to load the below data and it created a new table with appropriate data types in Postgres. This a very good feature I liked of PySpark data frames.
import os
import sys
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql import DataFrameReader
conf = SparkConf().setAppName('Simple App')
sc = SparkContext("local", "Simple App")
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
sqlContext = SQLContext(sc)
# Path for spark source folder
os.environ['SPARK_HOME']="C:/Users/USER1/rcs/spark-2.1.0-bin-hadoop2.6"
os.environ['SPARK_CLASSPATH']="C:/Users/USER1/Documents/python/test/100_script_30_day_challenge/pyspark/postgresql-42.1.1.jre6.jar"
# Append pyspark to Python Path
sys.path.append("C:/Users/USER1/rcs/spark-2.1.0-bin-hadoop2.6/python")
sys.path.append("C:/Users/USER1/rcs/spark-2.1.0-bin-hadoop2.6/python/lib/py4j-0.10.4-src.zip")
spark = SparkSession.builder\
.master('local[*]')\
.appName('My App')\
.config('spark.sql.warehouse.dir', 'file:///C:/temp')\
.getOrCreate()
accounts_rdd = spark.read\
.format('csv')\
.option('header', 'true')\
.load('test_bank_dat.csv')
#Convert RDD to DataFrame
cols = ('ACCOUNT_ID','STREET_ADDRESS','SECONDARY_ADDRESS','POSTAL_CODE','CITY','COUNTRY','COUNTRY_CODE',
'ZIP_CODE','SWIFT_ADDR','TEL_NUM','EMAIL_ADDR','CNTCT_PRSN','CMPNY_NAME','FAX_NUM')
print accounts_rdd.show()
df = accounts_rdd.toDF(*cols)
print df.show()
# Define JDBC properties for DB Connection
url = "jdbc:postgresql://localhost/postgres"
properties = {
"user": "pridash4",
"driver": "org.postgresql.Driver"
}
#Write the file to DataBase table test_bics
#df.write.mode("overwrite").jdbc(url=url, table="test_bics1", properties=properties)
val1 = df.count()
print "count:",val1
df = DataFrameReader(sqlContext).jdbc(
url=url, table='test_bics1', properties=properties
)
val2 = df.count()
print val2
if val1 == val2:
print "All recourds uploaded"
else:
print "Record mismatch"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment