Skip to content

Instantly share code, notes, and snippets.

@devender-yadav
Last active June 19, 2021 04:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save devender-yadav/5c4328918602b7910ba883e18b68fd87 to your computer and use it in GitHub Desktop.
Save devender-yadav/5c4328918602b7910ba883e18b68fd87 to your computer and use it in GitHub Desktop.
Pyspark JDBC samples

Fetching data via JDBC using pyspark

https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

Database used - MySQL

Data used - academia.stackexchange.com.7z from https://archive.org/details/stackexchange

Data from multiple tables are there in academia.stackexchange.com folder. I used PostHistory.xml.

MySQL commands:

CREATE Database stackexchange;

use stackexchange

CREATE TABLE post_history (
    Id INT NOT NULL PRIMARY KEY,
    PostHistoryTypeId SMALLINT NOT NULL,
    PostId INT NOT NULL,
    RevisionGUID VARCHAR(36),
    CreationDate DATETIME,
    UserId INT,
    Text TEXT
);

I created a new user for the test

CREATE USER 'devender'@'localhost' IDENTIFIED BY 'dev_mysql';

GRANT ALL ON stackexchange.* TO 'devender'@'localhost';

FLUSH PRIVILEGES;

Then I tried to load PostHistory.xml data into table post_history using

load xml infile '<path to xml>/PostHistory.xml' into table post_history rows identified by '<row>';

I got an error

ERROR 1290 (HY000): The MySQL server is running with the --secure-file-priv option so it cannot execute this statement

So I moved xml file to /var/lib/mysql-files directory and tried

load xml infile '/var/lib/mysql-files/PostHistory.xml' into table post_history rows identified by '<row>';

It worked 👌

check JDBC code without partitioning and with partitioning.

Command:

spark-submit --jars <path-to-connector>/mysql-connector-java-5.1.36.jar --driver-class-path <path-to-connector>/mysql-connector-java-5.1.36.jar spark-jdbc-partition.py
from pyspark.sql import SparkSession
import time as t
t1 = t.time()
spark = SparkSession.builder.master("local[4]").appName("Test-JDBC").getOrCreate()
ds = spark.read.jdbc("jdbc:mysql://localhost:3306/stackexchange","(select min(id), max(id) from post_history) as ph",properties={"user": "devender", "password": "password@123", "driver":"com.mysql.jdbc.Driver"})
r = ds.head()
minId = r[0]
maxId = r[1]
ds = spark.read.jdbc("jdbc:mysql://localhost:3306/stackexchange", "(select * from post_history) as ph",
properties={"user": "devender", "password": "password@123", "driver":"com.mysql.jdbc.Driver"}, numPartitions=4, column="id", lowerBound =minId, upperBound=maxId)
count = ds.count()
t2 = t.time()
# It took less time with partitioning
print ("Total time taken by spark - "+ str(t2-t1) + " seconds")
print(count)
from pyspark.sql import SparkSession
import time as t
t1 = t.time()
spark = SparkSession.builder.master("local[1]").appName("Test-JDBC").getOrCreate()
#.option("fetchsize","10000")
ds = spark.read.jdbc("jdbc:mysql://localhost:3306/stackexchange", "(select * from post_history) as ph",
properties={"user": "devender", "password": "password@123", "driver":"com.mysql.jdbc.Driver"})
count = ds.count()
t2 = t.time()
print ("Total time taken by spark - "+ str(t2-t1) + " seconds")
print(count)
@devender-yadav
Copy link
Author

fetchsize an optional parameter can be used to improve speed of ingestion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment