Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@tegansnyder
Last active April 26, 2019 14:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save tegansnyder/b8f2b612c9e63aecd04c to your computer and use it in GitHub Desktop.
Save tegansnyder/b8f2b612c9e63aecd04c to your computer and use it in GitHub Desktop.
Apache Spark querying a CSV using SQL Context
Using Apache Spark to Query a CSV Like with SQL like syntax.

Load up the spark shell with the appropriate package for csv parsing:

./bin/spark-shell --packages com.databricks:spark-csv_2.10:1.1.0

In the scala terminal type the following, referencing the path to your csv file. Example below:

import org.apache.spark.sql.SQLContext
val dataFile = "/Users/teg/Desktop/vewProductName.csv"
val sqlContext = new SQLContext(sc)
sqlContext.load("com.databricks.spark.csv", Map("path" -> dataFile, "header" -> "true")).registerTempTable("products")


sqlContext.sql("""select * from products WHERE SKU = 'ZZ888806041'""").save("/tmp/agg.csv", "com.databricks.spark.csv")

Essentially Spark breaks up the csv into fragments in the /tmp/agg.csv directory. You then need to piece the fragments back together to get the results of the above query. To do that do:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
 
def merge(srcPath: String, dstPath: String): Unit =  {
	val hadoopConfig = new Configuration()
	val hdfs = FileSystem.get(hadoopConfig)
	FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)
}

merge("/tmp/agg.csv", "agg.csv")

Then if you look at the data in agg.csv you will see the results of your query.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment