Skip to content

Instantly share code, notes, and snippets.

@LearningJournal
Last active September 10, 2022 23:39
Show Gist options
  • Save LearningJournal/2800118a0d2da0350eec7c5718ddd5e2 to your computer and use it in GitHub Desktop.
Save LearningJournal/2800118a0d2da0350eec7c5718ddd5e2 to your computer and use it in GitHub Desktop.
wget -c --header "Cookie: oraclelicense=accept-securebackup-cookie" http://download.oracle.com/otn-pub/java/jdk/8u144-b01/090f390dda5b47b9b721c7dfaa008135/jdk-8u144-linux-x64.rpm
yum localinstall jdk-8u121-linux-x64.rpm
wget -c https://d3kbcqa49mib13.cloudfront.net/spark-2.2.0-bin-hadoop2.7.tgz
mkdir spark
tar -zxvf spark-2.2.0-bin-hadoop2.6.tgz -C spark/
vi .bash_profile
export SPARK_HOME=~/spark/spark-2.2.0-bin-hadoop2.7
export PATH=$PATH:$SPARK_HOME/bin
spark-shell
val df = spark.read.json("data/people.json")
df.filter("age > 21").select("name", "age").show()
df.createOrReplaceTempView("people")
spark.sql("SELECT * FROM people where age > 21").show()
pyspark
df = spark.read.json("data/people.json")
df.filter("age > 21").select("name", "age").show()
#You can use SQL
df.createOrReplaceTempView("people")
spark.sql("SELECT * FROM people where age > 21").show()
wget -c https://repo.continuum.io/archive/Anaconda3-5.0.0.1-Linux-x86_64.sh
bash Anaconda3-5.0.0.1-Linux-x86_64.sh
pip install toree-0.2.0.dev1.tar.gz
jupyter toree install --spark_home=$SPARK_HOME --interpreters=Scala,PySpark,SQL --user
describe table extended mysparkdb.spark_surveys;
CREATE TABLE IF NOT EXISTS mysparkdb.spark_surveys(
TIME_STAMP TIMESTAMP,
AGE LONG,
GENDER STRING,
COUNTRY STRING,
STATE STRING,
SELF_EMPLOYED STRING,
FAMILY_HISTORY STRING,
TREATMENT STRING,
WORK_INTERFERE STRING,
NO_EMPLOYEES STRING,
REMOTE_WORK STRING,
TECH_COMPANY STRING,
BENEFITS STRING,
CARE_OPTIONS STRING,
WELLNESS_PROGRAM STRING,
SEEK_HELP STRING,
ANONYMITY STRING,
LEAVE STRING,
MENTAL_HEALTH_CONSEQUENCE STRING,
PHYS_HEALTH_CONSEQUENCE STRING,
COWORKERS STRING,
SUPERVISOR STRING,
MENTAL_HEALTH_INTERVIEW STRING,
PHYS_HEALTH_INTERVIEW STRING,
MENTAL_VS_PHYSICAL STRING,
OBS_CONSEQUENCE STRING,
COMMENTS STRING)
USING PARQUET;
CREATE DATABASE IF NOT EXISTS mysparkdb
LOCATION '/home/prashant/mysparkdb/';
CREATE TABLE IF NOT EXISTS mysparkdb.surveys(
TIME_STAMP TIMESTAMP,
AGE LONG,
GENDER STRING,
COUNTRY STRING,
STATE STRING,
SELF_EMPLOYED STRING,
FAMILY_HISTORY STRING,
TREATMENT STRING,
WORK_INTERFERE STRING,
NO_EMPLOYEES STRING,
REMOTE_WORK STRING,
TECH_COMPANY STRING,
BENEFITS STRING,
CARE_OPTIONS STRING,
WELLNESS_PROGRAM STRING,
SEEK_HELP STRING,
ANONYMITY STRING,
LEAVE STRING,
MENTAL_HEALTH_CONSEQUENCE STRING,
PHYS_HEALTH_CONSEQUENCE STRING,
COWORKERS STRING,
SUPERVISOR STRING,
MENTAL_HEALTH_INTERVIEW STRING,
PHYS_HEALTH_INTERVIEW STRING,
MENTAL_VS_PHYSICAL STRING,
OBS_CONSEQUENCE STRING,
COMMENTS STRING)
USING CSV
OPTIONS ( header='true',
nullvalue='NA',
timestampFormat="yyyy-MM-dd'T'HH:mm:ss",
path='/home/prashant/survey.csv');
CREATE OR REPLACE VIEW mysparkdb.filtered_surveys
as SELECT AGE, REMOTE_WORK, COUNT(*) AGE_COUNT
FROM mysparkdb.surveys
GROUP BY AGE, REMOTE_WORK;
spark-sql -f myddl.sql
gs://dataproc-initialization-actions/zeppelin/zeppelin.sh
gcloud compute ssh --zone=us-east1-c --ssh-flag="-D" --ssh-flag="10000" --ssh-flag="-N" "spark-03-m"
"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe" "http://spark-03-m:8080" --proxy-server="socks5://localhost:10000" --host-resolver-rules="MAP * 0.0.0.0 , EXCLUDE localhost" --user-data-dir=/tmp/spark-03-m
%sh gsutil ls gs://pkp-gcp-bucket
%sh hadoop fs -cp gs://pkp-gcp-bucket/survey.csv /home/prashant/
%sh hadoop fs -ls /home/prashant/
jupyter notebook --no-browser
%sql CREATE DATABASE IF NOT EXISTS mysparkdb LOCATION '/home/prashant/mysparkdb/'
%sql
CREATE TABLE IF NOT EXISTS mysparkdb.surveys(
TIME_STAMP TIMESTAMP,
AGE LONG,
GENDER STRING,
COUNTRY STRING,
STATE STRING,
SELF_EMPLOYED STRING,
FAMILY_HISTORY STRING,
TREATMENT STRING,
WORK_INTERFERE STRING,
NO_EMPLOYEES STRING,
REMOTE_WORK STRING,
TECH_COMPANY STRING,
BENEFITS STRING,
CARE_OPTIONS STRING,
WELLNESS_PROGRAM STRING,
SEEK_HELP STRING,
ANONYMITY STRING,
LEAVE STRING,
MENTAL_HEALTH_CONSEQUENCE STRING,
PHYS_HEALTH_CONSEQUENCE STRING,
COWORKERS STRING,
SUPERVISOR STRING,
MENTAL_HEALTH_INTERVIEW STRING,
PHYS_HEALTH_INTERVIEW STRING,
MENTAL_VS_PHYSICAL STRING,
OBS_CONSEQUENCE STRING,
COMMENTS STRING)
USING CSV
OPTIONS ( header='true',
nullvalue='NA',
timestampFormat="yyyy-MM-dd'T'HH:mm:ss",
path='/home/prashant/survey.csv')
%sql select age, count(*) frequency from mysparkdb.surveys where age between 20 and 65 group by age;
beeline
!connect jdbc:hive2://localhost:10000
#You can start the thrift server from your Spark Home sbin directory
spark/spark-2.2.0-bin-hadoop2.7/sbin/start-thriftserver.sh
val df = spark.sql(
"""select age, count(*) from mysparkdb.surveys where age between 20 and 65 group by age"""
)
df.show
val df = spark.sql(
"""create temporary view age_count as select age, count(*)
from mysparkdb.surveys where age between 20 and 65 group by age"""
)
df.show
val df1 = spark.sql("""select * from age_count""")
df1.show
val sch = spark.table("mysparkdb.spark_surveys").schema
val df = spark.read
.format("csv")
.schema(sch)
.option("header", "true")
.option("mode", "failfast")
.load("/home/prashant/survey.csv")
spark.table("mysparkdb.spark_surveys").count
df.write.insertInto("mysparkdb.spark_surveys")
spark.table("mysparkdb.spark_surveys").count
val sch = spark.table("mysparkdb.spark_surveys").schema
val df = spark.read
.format("csv")
.schema(sch)
.option("header", "true")
.option("mode", "failfast")
.load("/home/prashant/survey.csv")
spark.table("mysparkdb.spark_surveys").count
df.write
.mode("append")
.saveAsTable("mysparkdb.spark_surveys")
spark.table("mysparkdb.spark_surveys").count
jupyter notebook --ip=0.0.0.0 --port=8888 --no-browser
val df = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("nullValue", "NA")
.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss")
.option("mode", "failfast")
.load("/home/prashant/survey.csv")
df.write
.format("parquet")
.saveAsTable("mysparkdb.new_surveys")
spark.table("mysparkdb.new_surveys").count
ShortType.sql
//gives you SMALLINT.
StringType.sql
//gives you STRING,
DateType.sql
//gives you DATE
TimestampType.sql
//gives you TIMESTAMP
VarcharType(10).sql
// gives you VARCHAR(10)
DecimalType(6, 2).sql
//gives DECIMAL(6,2)
SHOW DATABASES;
SHOW TABLES IN mysparkdb;
SHOW CREATE TABLE mysparkdb.surveys;
SHOW COLUMNS FROM mysparkdb.surveys;
SHOW COLUMNS IN mysparkdb.surveys;
DESCRIBE DATABASE mysparkdb;
DESCRIBE TABLE mysparkdb.surveys;
DESCRIBE TABLE EXTENDED mysparkdb.surveys;
spark.catalog.listDatabases
// same as SHOW DATABASES
//This API gives you a dataset for the list of all databases. You can display the list using the show method.
spark.catalog.listDatabases.show
//You can collect it back to the master node as a Scala Array.
val dbs = spark.catalog.listDatabases.collect
//Then you can loop through the array and apply a function on each element. Let's apply the println.
dbs.foreach(println)
spark.catalog.listColumns("mysparkdb","surveys").show
spark.catalog
.listColumns("mysparkdb", "surveys")
.foreach(x => println(x.name + "-->" + x.dataType.toUpperCase))
name := "spark Test App"
version := "0.1"
organization := "guru.learningjournal"
scalaVersion := "2.11.8"
val sparkVersion = "2.2.0"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion
)
org.apache.spark:spark-core_2.11:2.2.0
org.apache.spark:spark-sql_2.11:2.2.0
gcloud compute ssh --zone=us-east1-c --ssh-flag="-D" --ssh-flag="10000" --ssh-flag="-N" "spark-6-m"
"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe" "http://spark-6-m:8088" --proxy-server="socks5://localhost:10000" --host-resolver-rules="MAP * 0.0.0.0 , EXCLUDE localhost" --user-data-dir=/tmp/spark-6-m
name := "spark Test App"
version := "0.1"
organization := "guru.learningjournal"
scalaVersion := "2.11.8"
val sparkVersion = "2.2.0"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"io.confluent" % "kafka-avro-serializer" % "3.1.1"
)
resolvers += "confluent" at "http://packages.confluent.io/maven/"
io.confluent:kafka-avro-serializer:3.1.1
http://packages.confluent.io/maven/
package guru.learningjournal.examples
import org.apache.spark.sql._
import org.apache.spark.sql.types._
object SparkTestApp {
def main(args: Array[String]) = {
val spark = SparkSession.builder()
.appName("SparkTestApp")
.enableHiveSupport()
.getOrCreate()
val df = spark.read.json(args(0))
df.createTempView("surveys")
spark.sql(""
"select age, count(*) as frequency
from surveys where age between 20 and 65 group by age ""
")
.write
.saveAsTable("survey_frequency")
spark.stop()
}
}
spark-submit --master local --class guru.learningjournal.examples.SparkTestApp target/scala-2.11/spark-test-app-2.11-0.1.jar /home/prashant/spark-data/mental-health-in-tech-survey/json-data/surveys.json
--master spark://host:port
--master mesos://host:port
--master yarn
spark.udf.register(
"pgender",
(s: String) =>
if (List("f", "female", "woman").contains(s.toLowerCase)) "Female"
else "Male"
)
spark.sql("select pgender('female')").show
+-------------------+
|UDF:pgender(female)|
+-------------------+
| Female |
+-------------------+
#Submit a Spark Job in client mode
spark-submit --class org.apache.spark.examples.SparkPi spark-home/spark-2.2.0-bin-hadoop2.6/examples/jars/spark-examples_2.11-2.2.0.jar 1000
#Start an SSh tunnel
gcloud compute ssh --zone=us-east1-c --ssh-flag="-D" --ssh-flag="10000" --ssh-flag="-N" "spark22-notebook-m"
#Start the chrome browser using the SSH tunnel
cd C:\Program Files (x86)\Google\Chrome\Application
chrome.exe "http://spark4-m:4040" --proxy-server="socks5://localhost:10000" --host-resolver-rules="MAP * 0.0.0.0 , EXCLUDE localhost" --user-data-dir=/tmp/spark22-notebook
#Start a Spark sell with three executors
spark-shell --num-executors 3
#Submit a Spark Job in cluster mode
spark-submit --class org.apache.spark.examples.SparkPi --deploy-mode cluster file:///usr/lib/spark/examples/jars/spark-examples.jar 1000
spark.sql("select PGENDER('Woman')").show
/*
+------------------+
|UDF:PGENDER(Woman)|
+------------------+
| Female|
+------------------+
*/
name := "learningjournal-examples"
version := "1.0"
organization := "guru.learningjournal"
scalaVersion := "2.11.8"
val sparkVersion = "2.3.0"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion
)
package guru.learningjournal.examples
import org.apache.spark.sql._
import org.apache.spark.sql.types._
object SparkUDF {
def main(args: Array[String]) = {
//Create a Spark session
val spark = SparkSession.builder()
.appName("SparkLocalUDF")
.enableHiveSupport()
.getOrCreate()
//Load data and register a view
val df = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("nullValue", "NA")
.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss")
.option("mode", "failfast")
.load("gs://pkp-gcp-bucket/survey.csv")
df.createTempView("surveys")
//Create a local function
val parseGender = (s: String) => {
if (List("cis female", "f", "female", "woman", "femake", "female ",
"cis-female/femme", "female (cis)", "femail").contains(s.toLowerCase))
"Female"
else if (List("male", "m", "male-ish", "maile", "mal", "male (cis)",
"make", "male ", "man", "msle", "mail", "malr", "cis man", "cis male").contains(s.toLowerCase))
"Male"
else
"Transgender"
}
//Register the function as UDF
spark.udf.register("PGENDER", parseGender)
//Apply the UDF
spark.sql("select PGENDER(gender) as parsed_gender, * from surveys")
.write
.mode("overwrite")
.saveAsTable("transformed_survey")
spark.stop()
}
}
spark-submit --master yarn --class guru.learningjournal.examples.SparkUDF target/scala-2.11/learningjournal-examples_2.11-1.0.jar
select gender, parsed_gender from transformed_survey limit 20;
name := "spark-udf-lib"
version := "0.1"
organization := "guru.learningjournal"
scalaVersion := "2.11.8"
val sparkVersion = "2.3.0"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion
)
package guru.learningjournal.SparkUDFLib
import org.apache.spark.sql.api.java.UDF1
class ParseGender extends UDF1[String, String] {
def call(s: String): String = {
if (List("cis female", "f", "female", "woman", "femake", "female ",
"cis-female/femme", "female (cis)", "femail").contains(s.toLowerCase))
"Female"
else if (List("male", "m", "male-ish", "maile", "mal", "male (cis)",
"make", "male ", "man", "msle", "mail", "malr", "cis man", "cis male").contains(s.toLowerCase))
"Male"
else
"Transgender"
}
}
spark-shell --jars ./target/scala-2.11/spark-udf-lib_2.11-0.1.jar
import guru.learningjournal.SparkUDFLib._
val pgender_f = new ParseGender().call _
spark.udf.register("PGENDER", pgender_f)
val df = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("nullValue", "NA")
.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss")
.option("mode", "failfast")
.load("gs://pkp-gcp-bucket/survey.csv")
df.createTempView("surveys")
spark.sql("select pgender(gender) as parsed_gender, age from surveys limit 10").show
pyspark --jars ./target/scala-2.11/spark-udf-lib_2.11-0.1.jar
#Shell command
find \ -name * > flist.txt
#Start Spark Shell
spark-shell
#Load the data file
val flistRDD = sc.textFile("flist.txt")
#Check the number of partitions
flistRDD.getNumPartitions
#Redefine the partitions
val flistRDD = sc.textFile("flist.txt", 5)
#The second parameter in the above API is the number of partitions.
#Verify the new partitons
flistRDD.getNumPartitions
#You can iterate to all partitions and count the number of elements in each partition.
flistRDD.foreachPartition(p =>println("No of Items in partition-" + p.count(y=>true)) )
spark.udf.registerJavaFunction(
"PGENDER",
"guru.learningjournal.SparkUDFLib.ParseGender"
)
df = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("nullValue", "NA")
.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss")
.option("mode", "failfast")
.load("gs://pkp-gcp-bucket/survey.csv")
df.createTempView("surveys")
spark
.sql("select PGENDER(gender) as parsed_gender, age from surveys limit 10")
.show()
sqlContext.registerJavaFunction("PGENDER","guru.learningjournal.SparkUDF.ParseGender")
val flistRDD = sc.textFile("flist.txt", 5)
flistRDD.count()
//-------------------Scala Code---------------------
val flistRDD = sc.textFile("/home/prashant/flist.txt", 5)
val arrayRDD = flistRDD.map(x => x.split("/"))
val kvRDD = arrayRDD.map(a => (a(1), 1))
val fcountRDD = kvRDD.reduceByKey((x, y) => x + y)
fcountRDD.collect()
#------------------Python Code------------------
flistRDD = sc.textFile("/home/prashant/flist.txt", 5)
arrayRDD = flistRDD.map(lambda x: x.split("/"))
kvRDD = arrayRDD.map(lambda a: (a[1],1))
fcountRDD= kvRDD.reduceByKey(lambda x,y: x+y)
fcountRDD.collect()
Timestamp Age Gender Country state self_employed family_history treatment work_interfere no_employees remote_work tech_company benefits care_options wellness_program seek_help anonymity leave mental_health_consequence phys_health_consequence coworkers supervisor mental_health_interview phys_health_interview mental_vs_physical obs_consequence comments
2014-08-27 11:29:31 37 Female United States IL NA No Yes Often 6-25 No Yes Yes Not sure No Yes Yes Somewhat easy No No Some of them Yes No Maybe Yes No NA
2014-08-27 11:29:37 44 M United States IN NA No No Rarely More than 1000 No No Don't know No Don't know Don't know Don't know Don't know Maybe No No No No No Don't know No NA
2014-08-27 11:29:44 32 Male Canada NA NA No No Rarely 6-25 No Yes No No No No Don't know Somewhat difficult No No Yes Yes Yes Yes No No NA
2014-08-27 11:29:46 31 Male United Kingdom NA NA Yes Yes Often 26-100 No Yes No Yes No No No Somewhat difficult Yes Yes Some of them No Maybe Maybe No Yes NA
2014-08-27 11:30:22 31 Male United States TX NA No No Never 100-500 Yes Yes Yes No Don't know Don't know Don't know Don't know No No Some of them Yes Yes Yes Don't know No NA
2014-08-27 11:31:22 33 Male United States TN NA Yes No Sometimes 6-25 No Yes Yes Not sure No Don't know Don't know Don't know No No Yes Yes No Maybe Don't know No NA
2014-08-27 11:31:50 35 Female United States MI NA Yes Yes Sometimes 1-5 Yes Yes No No No No No Somewhat difficult Maybe Maybe Some of them No No No Don't know No NA
2014-08-27 11:32:05 39 M Canada NA NA No No Never 1-5 Yes Yes No Yes No No Yes Don't know No No No No No No No No NA
2014-08-27 11:32:39 42 Female United States IL NA Yes Yes Sometimes 100-500 No Yes Yes Yes No No No Very difficult Maybe No Yes Yes No Maybe No No NA
val df = spark.read
.options(
Map(
"header" -> "true",
"inferSchema" -> "true",
"nullValue" -> "NA",
"timestampFormat" -> "yyyy-MM-dd'T'HH:mm?:ss",
"mode" -> "failfast"
)
)
.csv("/home/prashant/spark-data/survey.csv")
df = spark.read.options(header = "true", \
inferSchema = "true", \
nullValue = "NA", \
timestampFormat = "yyyy-MM-dd'T'HH:mm?:ss", \
mode = "failfast").csv("/home/prashant/spark-data/survey.csv")
val df = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("nullValue", "NA")
.option("timestampFormat", "yyyy-MM-dd'T'HH:mm?:ss")
.option("mode", "failfast")
.option("path", "/home/prashant/spark-data/survey.csv")
.load()
df = spark.read\
.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.option("nullValue", "NA")\
.option("timestampFormat", "yyyy-MM-dd'T'HH:mm?:ss")\
.option("mode", "failfast")\
.option("path", "/home/prashant/spark-data/survey.csv")\
.load()
df.rdd.getNumPartitions
val df5= df.repartition(5).toDF
df5.rdd.getNumPartitions
df.select("Timestamp", "Age","remote_work","leave").filter("Age > 30").show
df5.printSchema
val df = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("nullValue", "NA")
.option("timestampFormat", "yyyy-MM-dd'T'HH:mm?:ss")
.option("mode", "failfast")
.option(
"path",
"/home/prashant/spark-data/mental-health-in-tech-survey/survey.csv"
)
.load()
val df1 = df.select($"Gender", $"treatment")
df1.show
val df2 = df1.select(
$"Gender",
(when($"treatment" === "Yes", 1).otherwise(0)).alias("All-Yes"),
(when($"treatment" === "No", 1).otherwise(0)).alias("All-Nos")
)
val df2 = df.select(
$"Gender",
(when($"treatment" === "Yes", 1).otherwise(0)).alias("All-Yes"),
(when($"treatment" === "No", 1).otherwise(0)).alias("All-Nos")
)
val df3 = df2
.groupBy("Gender")
.agg(sum($"All-Yes"), sum($"All-Nos"))
def parseGender(g: String) = {
g.toLowerCase match {
case "male" | "m" | "male-ish" | "maile" | "mal" | "male (cis)" | "make" |
"male " | "man" | "msle" | "mail" | "malr" | "cis man" | "cis male" =>
"Male"
case "cis female" | "f" | "female" | "woman" | "femake" | "female " |
"cis-female/femme" | "female (cis)" | "femail" =>
"Female"
case _ => "Transgender"
}
}
val parseGenderUDF = udf( parseGender _ )
val df3 = df2.select(
(parseGenderUDF($"Gender")).alias("Gender"),
$"All-Yes",
$"All-Nos"
)
val df4 = df3.groupBy("Gender").agg( sum($"All-Yes"),sum($"All-Nos"))
spark.conf.set("spark.sql.shuffle.partitions", 2)
val df = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("nullValue", "NA")
.option("timestampFormat", "yyyy-MM-dd'T'HH:mm?:ss")
.option("mode", "failfast")
.option(
"path",
"/home/prashant/spark-data/mental-health-in-tech-survey/survey.csv"
)
.load()
val df1 = df.select($"Gender", $"treatment")
val df2 = df.select(
$"Gender",
(when($"treatment" === "Yes", 1).otherwise(0)).alias("All-Yes"),
(when($"treatment" === "No", 1).otherwise(0)).alias("All-Nos")
)
def parseGender(g: String) = {
g.toLowerCase match {
case "male" | "m" | "male-ish" | "maile" | "mal" | "male (cis)" | "make" |
"male " | "man" | "msle" | "mail" | "malr" | "cis man" | "cis male" =>
"Male"
case "cis female" | "f" | "female" | "woman" | "femake" | "female " |
"cis-female/femme" | "female (cis)" | "femail" =>
"Female"
case _ => "Transgender"
}
}
val parseGenderUDF = udf(parseGender _)
val df3 = df2.select(
(parseGenderUDF($"Gender")).alias("Gender"),
$"All-Yes",
$"All-Nos"
)
val df4 = df3.groupBy("Gender").agg(sum($"All-Yes"), sum($"All-Nos"))
val df5 = df4.filter($"Gender" =!= "Transgender")
df5.collect
val df = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("nullValue", "NA")
.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss")
.option("mode", "failfast")
.load("/home/prashant/spark-data/survey.csv")
//Then we applied a select transformation and a filter condition.
val sel =
df.select("Timestamp", "Age", "remote_work", "leave").filter("Age > 30")
select timestamp, age,remote_work,leave
from survey_tbl
where age > 30;
select gender, sum(all_yes), sum(all_nos)
from (select case when lower(trim(gender)) in ('male','m','male-ish','maile','mal',
'male (cis)','make','male','man','msle',
'mail', 'malr','cis man', 'cis male')
then 'Male'
when lower(trim(gender)) in ('cis female','f','female','woman',
'femake','female ','cis-female/femme',
'female (cis)','femail')
then 'Female'
else 'Transgender'
end as gender,
case when treatment == 'Yes' then 1 else 0 end as all_yes,
case when treatment == 'No' then 1 else 0 end as all_nos
from survey_tbl)
where gender != 'Transgender'
group by gender
//You can create a Schema for survey data set using below code
import org.apache.spark.sql.types._
val surveySchema = StructType(
Array(
StructField("timestamp", TimestampType, true),
StructField("age", LongType, true),
StructField("gender", StringType, true),
StructField("country", StringType, true),
StructField("state", StringType, true),
StructField("self_employed", StringType, true),
StructField("family_history", StringType, true),
StructField("treatment", StringType, true),
StructField("work_interfere", StringType, true),
StructField("no_employees", StringType, true),
StructField("remote_work", StringType, true),
StructField("tech_company", StringType, true),
StructField("benefits", StringType, true),
StructField("care_options", StringType, true),
StructField("wellness_program", StringType, true),
StructField("seek_help", StringType, true),
StructField("anonymity", StringType, true),
StructField("leave", StringType, true),
StructField("mental_health_consequence", StringType, true),
StructField("phys_health_consequence", StringType, true),
StructField("coworkers", StringType, true),
StructField("supervisor", StringType, true),
StructField("mental_health_interview", StringType, true),
StructField("phys_health_interview", StringType, true),
StructField("mental_vs_physical", StringType, true),
StructField("obs_consequence", StringType, true),
StructField("comments", StringType, true)
)
)
//You can load the data using above schema
val df = spark.read
.format("csv")
.schema(surveySchema)
.option("header", "true")
.option("nullValue", "NA")
.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss")
.option("mode", "failfast")
.load("/home/prashant/spark-data/survey.csv")
df.createOrReplaceTempView("survey_tbl")
spark.catalog.listTables.show
df.createOrReplaceGlobalTempView("survey_gtbl")
spark.catalog.listTables("global_temp").show
spark.sql("""select timestamp, age,remote_work,leave
from survey_tbl
where age > 30""")
spark.sql("""select gender, sum(yes), sum(no)
from (select case when lower(trim(gender)) in ('male','m','male-ish','maile','mal',
'male (cis)','make','male ','man','msle',
'mail','malr','cis man','cis male')
then 'Male'
when lower(trim(gender)) in ('cis female','f','female','woman',
'female','female ','cis-female/femme',
'female (cis)','femail')
then 'Female'
else 'Transgender'
end as gender,
case when treatment == 'Yes' then 1 else 0 end as yes,
case when treatment == 'No' then 1 else 0 end as no
from survey_tbl)
where gender != 'Transgender'
group by gender""").show
spark.read.schema("a INT, b STRING, c DOUBLE")
//Read CSV into Data Frame
val df = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("nullValue", "NA")
.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss")
.option("mode", "failfast")
.load("/home/prashant/spark-data/mental-health-in-tech-survey/survey.csv")
//Write Data Frame to Parquet
df.write
.format("parquet")
.mode("overwrite")
.save("/home/prashant/spark-data/mental-health-in-tech-survey/parquet-data/")
//Read Parquet into Data Frame
val df = spark.read
.format("parquet")
.option("mode", "failfast")
.load("/home/prashant/spark-data/mental-health-in-tech-survey/parquet-data/")
//Write Data Frame to JSON
df.write
.format("json")
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
.mode("overwrite")
.save("/home/prashant/spark-data/mental-health-in-tech-survey/json-data/")
//Read JSON into Data Frame
val df = spark.read
.format("json")
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
.option("mode", "failfast")
.load("/home/prashant/spark-data/mental-health-in-tech-survey/json-data/")
//Write Data Frame to ORC
df.write
.format("orc")
.mode("overwrite")
.save("/home/prashant/spark-data/mental-health-in-tech-survey/orc-data/")
spark-shell --packages com.databricks:spark-xml_2.11:0.4.1,com.databricks:spark-avro_2.11:4.0.0
//Read ORC into Data Frame
val df = spark.read
.format("orc")
.option("mode", "failfast")
.load("/home/prashant/spark-data/mental-health-in-tech-survey/orc-data/")
//Write Data Frame to XML
df.write
.format("com.databricks.spark.xml")
.option("rootTag", "survey")
.option("rowTag", "survey-row")
.mode("overwrite")
.save("/home/prashant/spark-data/mental-health-in-tech-survey/xml-data/")
//Read XML into Data Frame
val df = spark.read
.format("com.databricks.spark.xml")
.option("rowTag", "survey-row")
.option("mode", "failfast")
.load("/home/prashant/spark-data/mental-health-in-tech-survey/xml-data/")
//Write Data Frame to AVRO
df.write
.format("com.databricks.spark.avro")
.mode("overwrite")
.save("/home/prashant/spark-data/mental-health-in-tech-survey/avro-data/")
//Read AVRO into Data Frame
val df = spark.read
.format("com.databricks.spark.avro")
.option("mode", "failfast")
.load("/home/prashant/spark-data/mental-health-in-tech-survey/avro-data/")
//Write Data Frame to CSV
df.write
.format("csv")
.option("header", "true")
.option("nullValue", "NA")
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
.mode("overwrite")
.save("/home/prashant/spark-data/mental-health-in-tech-survey/csv-data/")
sudo su postgres
createuser -- pwprompt -- interactive prashant
exit
createdb sparkDB
psql --host 10.128.0.4 -U prashant --dbnamesparkDB --password
hostname -I
//Login to your Postgres server machine as Postgres admin user.
sudo su portgres
//Connect to the database and check hba file location.
psql
show hba_file;
//Open hba file and add below line at the end.
vi /etc/postgresql/10/main/pg_hba.conf
host all all 0.0.0.0/0 md5
//Exit admin user
exit
//Restart PostgreSQL server
sudo /etc/init.d/postgresql restart
//Go back to the remote machine and test your connection.
psql --host 10.128.0.4 -U prashant --dbnamesparkDB --password
spark-shell --packages org.postgresql:postgresql:9.4.1207
//Read CSV into Data Frame
val df = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("nullValue", "NA")
.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss")
.option("mode", "failfast")
.load("/home/prashant/spark-data/mental-health-in-tech-survey/survey.csv")
df.createOrReplaceTempView("survey_tbl")
val dfout = spark.sql(
"""select gender, sum(yes) sum_yes, sum(no) sum_no
from (select case when lower(trim(gender)) in ('male','m','male-ish','maile','mal','male (cis)',
'make','male ','man','msle','mail','malr','cis man',
'cis male') then 'Male'
when lower(trim(gender)) in ('cis female','f','female','woman','femake','female ',
'cis-female/femme','female (cis)','femail') then 'Female'
else 'Transgender'
end as gender,
case when treatment == 'Yes' then 1 else 0 end as yes,
case when treatment == 'No' then 1 else 0 end as no
from survey_tbl)
where gender != 'Transgender'
group by gender"""
)
dfout.show
+------+-------+------+
|gender|sum_yes|sum_no|
+------+-------+------+
|Female| 170| 77|
| Male| 450| 541|
+------+-------+------+
//Write Data Frame to JDBC
dfout.write
.format("jdbc")
.mode("overwrite")
.option("driver", "org.postgresql.Driver")
.option("url", "jdbc:postgresql://10.128.0.4:5432/sparkDB")
.option("dbtable", "survey_results")
.option("user", "prashant")
.option("password", "pandey")
.save()
psql --host 10.128.0.4 -U prashant --dbname sparkDB --password
\d+ survey_results
select * from survey_results;
//Write Data Frame to JDBC
dfout.write
.format("jdbc")
.mode("overwrite")
.option("truncate", "true")
.option("driver", "org.postgresql.Driver")
.option("url", "jdbc:postgresql://10.128.0.4:5432/sparkDB")
.option("dbtable", "survey_results")
.option("user", "prashant")
.option("password", "pandey")
.save()
//spark-shell --packages org.postgresql:postgresql:9.4.1207
val pgDF_table = spark.read
.format("jdbc")
.option("driver", "org.postgresql.Driver")
.option("url", "jdbc:postgresql://10.128.0.4:5432/sparkDB")
.option("dbtable", "survey_results")
.option("user", "prashant")
.option("password", "pandey")
.load()
pgDF_table.show
//spark-shell --packages org.postgresql:postgresql:9.4.1207
val pgDF_table = spark.read
.format("jdbc")
.option("driver", "org.postgresql.Driver")
.option("url", "jdbc:postgresql://10.128.0.4:5432/sparkDB")
.option("dbtable", "(select * from survey_results limit 1) as survey_results")
.option("user", "prashant")
.option("password", "pandey")
.load()
pgDF_table.show
SELECT cluster_name, listen_address FROM system.local;
create KEYSPACE sparkdb WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};
create table sparkdb.survey_results( gender text, sum_yes int, sum_no int, primary key (gender));
select * from sparkdb.survey_results;
spark-shell --packages datastax:spark-cassandra-connector:2.0.1-s_2.11
//Read CSV into Data Frame
val df = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("nullValue", "NA")
.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss")
.option("mode", "failfast")
.load("/home/prashant/spark-data/mental-health-in-tech-survey/survey.csv")
df.createOrReplaceTempView("survey_tbl")
val dfout = spark.sql(
"""select gender, sum(yes) sum_yes, sum(no) sum_no
from (select case when lower(trim(gender)) in ('male','m','male-ish','maile','mal','male (cis)',
'make','male ','man','msle','mail','malr','cis man',
'cis male') then 'Male'
when lower(trim(gender)) in ('cis female','f','female','woman','femake','female ',
'cis-female/femme','female (cis)','femail') then 'Female'
else 'Transgender'
end as gender,
case when treatment == 'Yes' then 1 else 0 end as yes,
case when treatment == 'No' then 1 else 0 end as no
from survey_tbl)
where gender != 'Transgender'
group by gender"""
)
dfout.write
.format("org.apache.spark.sql.cassandra")
.mode("overwrite")
.option("confirm.truncate", "true")
.option("spark.cassandra.connection.host", "10.142.0.3")
.option("spark.cassandra.connection.port", "9042")
.option("keyspace", "sparkdb")
.option("table", "survey_results")
.save()
dfout.write
.format("org.apache.spark.sql.cassandra")
.mode("overwrite")
.option("confirm.truncate", "true")
.option("spark.cassandra.connection.host", "10.142.0.3")
.option("spark.cassandra.connection.port", "9042")
.option("keyspace", "sparkdb")
.option("table", "survey_results")
.save()
dfout.write
.format("org.apache.spark.sql.cassandra")
.mode("overwrite")
.option("confirm.truncate", "true")
.option("spark.cassandra.connection.host", "10.142.0.3")
.option("spark.cassandra.connection.port", "9042")
.option("keyspace", "sparkdb")
.option("table", "survey_results")
.save()
import org.apache.spark.sql.cassandra._
spark.setCassandraConf(
Map(
"spark.cassandra.connection.host" -> "10.142.0.3",
"spark.cassandra.connection.port" -> "9042"
)
)
import org.apache.spark.sql.cassandra._
spark.setCassandraConf(
Map(
"spark.cassandra.connection.host" -> "10.142.0.3",
"spark.cassandra.connection.port" -> "9042"
)
)
dfout.write
.format("org.apache.spark.sql.cassandra")
.mode("overwrite")
.option("confirm.truncate", "true")
.option("keyspace", "sparkdb")
.option("table", "survey_results")
.save()
val df_read = spark.read
.format("org.apache.spark.sql.cassandra")
.option("spark.cassandra.connection.host", "10.142.0.3")
.option("spark.cassandra.connection.port", "9042")
.option("keyspace", "sparkdb")
.option("table", "survey_results")
.load()
df_read.show
spark-sql -S
CREATE DATABASE mysparkdb
LOCATION '/home/prashant/mysparkdb/';
DESCRIBE DATABASE mysparkdb;
SET spark.sql.warehouse.dir;
hadoop fs -ls /home/prashant/
CREATE DATABASE gs_sparkdb
LOCATION 'gs://pkp-gcp-bucket/gs_sparkdb/';
CREATE TABLE IF NOT EXISTS mysparkdb.hive_surveys(
TIME_STAMP TIMESTAMP,
AGE LONG,
GENDER STRING,
COUNTRY STRING,
STATE STRING,
SELF_EMPLOYED STRING,
FAMILY_HISTORY STRING,
TREATMENT STRING,
WORK_INTERFERE STRING,
NO_EMPLOYEES STRING,
REMOTE_WORK STRING,
TECH_COMPANY STRING,
BENEFITS STRING,
CARE_OPTIONS STRING,
WELLNESS_PROGRAM STRING,
SEEK_HELP STRING,
ANONYMITY STRING,
LEAVE STRING,
MENTAL_HEALTH_CONSEQUENCE STRING,
PHYS_HEALTH_CONSEQUENCE STRING,
COWORKERS STRING,
SUPERVISOR STRING,
MENTAL_HEALTH_INTERVIEW STRING,
PHYS_HEALTH_INTERVIEW STRING,
MENTAL_VS_PHYSICAL STRING,
OBS_CONSEQUENCE STRING,
COMMENTS STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
STORED AS TEXTFILE;
LOAD DATA INPATH '/home/prashant/spark-data/csv/surveys.csv'
INTO TABLE mysparkdb.hive_surveys;
select * from mysparkdb.hive_surveys limit 5;
CREATE TABLE IF NOT EXISTS mysparkdb.spark_surveys(
TIME_STAMP TIMESTAMP,
AGE LONG,
GENDER STRING,
COUNTRY STRING,
STATE STRING,
SELF_EMPLOYED STRING,
FAMILY_HISTORY STRING,
TREATMENT STRING,
WORK_INTERFERE STRING,
NO_EMPLOYEES STRING,
REMOTE_WORK STRING,
TECH_COMPANY STRING,
BENEFITS STRING,
CARE_OPTIONS STRING,
WELLNESS_PROGRAM STRING,
SEEK_HELP STRING,
ANONYMITY STRING,
LEAVE STRING,
MENTAL_HEALTH_CONSEQUENCE STRING,
PHYS_HEALTH_CONSEQUENCE STRING,
COWORKERS STRING,
SUPERVISOR STRING,
MENTAL_HEALTH_INTERVIEW STRING,
PHYS_HEALTH_INTERVIEW STRING,
MENTAL_VS_PHYSICAL STRING,
OBS_CONSEQUENCE STRING,
COMMENTS STRING)
USING CSV
OPTIONS ( header='false',
nullvalue='NA',
timestampFormat="yyyy-MM-dd'T'HH:mm:ss");
val df = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("nullValue", "NA")
.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss")
.option("mode", "failfast")
.option("path", "/home/prashant/spark-data/csv/surveys.csv")
.load()
CREATE TABLE IF NOT EXISTS mysparkdb.external_spark_surveys(
TIME_STAMP TIMESTAMP,
AGE LONG,
GENDER STRING,
COUNTRY STRING,
STATE STRING,
SELF_EMPLOYED STRING,
FAMILY_HISTORY STRING,
TREATMENT STRING,
WORK_INTERFERE STRING,
NO_EMPLOYEES STRING,
REMOTE_WORK STRING,
TECH_COMPANY STRING,
BENEFITS STRING,
CARE_OPTIONS STRING,
WELLNESS_PROGRAM STRING,
SEEK_HELP STRING,
ANONYMITY STRING,
LEAVE STRING,
MENTAL_HEALTH_CONSEQUENCE STRING,
PHYS_HEALTH_CONSEQUENCE STRING,
COWORKERS STRING,
SUPERVISOR STRING,
MENTAL_HEALTH_INTERVIEW STRING,
PHYS_HEALTH_INTERVIEW STRING,
MENTAL_VS_PHYSICAL STRING,
OBS_CONSEQUENCE STRING,
COMMENTS STRING)
USING CSV
OPTIONS (header='false',
nullvalue='NA',
timestampFormat="yyyy-MM-dd'T'HH:mm:ss",
path='/home/prashant/spark-data/csv/surveys.csv');
DROP TABLE IF EXISTS mysparkdb.external_spark_surveys;
CREATE TABLE IF NOT EXISTS mysparkdb.external_spark_surveys(
TIME_STAMP TIMESTAMP,
AGE LONG,
GENDER STRING,
COUNTRY STRING,
STATE STRING,
SELF_EMPLOYED STRING,
FAMILY_HISTORY STRING,
TREATMENT STRING,
WORK_INTERFERE STRING,
NO_EMPLOYEES STRING,
REMOTE_WORK STRING,
TECH_COMPANY STRING,
BENEFITS STRING,
CARE_OPTIONS STRING,
WELLNESS_PROGRAM STRING,
SEEK_HELP STRING,
ANONYMITY STRING,
LEAVE STRING,
MENTAL_HEALTH_CONSEQUENCE STRING,
PHYS_HEALTH_CONSEQUENCE STRING,
COWORKERS STRING,
SUPERVISOR STRING,
MENTAL_HEALTH_INTERVIEW STRING,
PHYS_HEALTH_INTERVIEW STRING,
MENTAL_VS_PHYSICAL STRING,
OBS_CONSEQUENCE STRING,
COMMENTS STRING)
USING CSV
OPTIONS (header='false',
nullvalue='NA',
timestampFormat="yyyy-MM-dd'T'HH:mm:ss")
LOCATION '/home/prashant/spark-data/csv/surveys.csv';
describe table extended mysparkdb.external_spark_surveys;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment