Skip to content

Instantly share code, notes, and snippets.

View albertoandreottiATgmail's full-sized avatar
💭
organizing the bits and bytes of the universe :)

Jose Pablo Alberto Andreotti albertoandreottiATgmail

💭
organizing the bits and bytes of the universe :)
  • JohnSnowLabs
  • Argentina
View GitHub Profile
~/spark_stream$ cp src/main/resources/people2.json target/scala-2.11/classes/
val streamingQuery = ageEvents
.writeStream
// check for files every 2s
.trigger(ProcessingTime("2 seconds"))
// write in the console
.format("console")
.start()
// Wait 2 minutes
streamingQuery.awaitTermination(120000)
val ageEvents = rawRecords
.select($"name", - $"age" + 2017 as "birthyear")
// read the first file to get the schema
val firstFile = spark.read.json(s"$fullPath")
// create the stream
val rawRecords = spark.readStream
.schema(firstFile.schema)
.json(s"$resourcesPath/*.json")
+-----------+--------------------+-------------------+
| title| title| score|
+-----------+--------------------+-------------------+
| Mad Max| Moonlight| 0.5|
| Mad Max| The Handmaiden| 0.2857142857142857|
| Spotlight|Manchester by the...| 0.3333333333333333|
| Spotlight| Hell or High Water| 0.3333333333333333|
| Carol| Hell or High Water| 0.3333333333333333|
| Carol| The Handmaiden| 0.2857142857142857|
| Inside out| Hell or High Water| 0.5|
@albertoandreottiATgmail
albertoandreottiATgmail / main.scala
Last active May 10, 2017 04:35
put_all_together
implicit val spark = SparkSession.builder().appName("Custom Joins").master("local[8]").getOrCreate()
val sqlContext = spark.sqlContext
// load dataframes from files under /resources
val huluDf = loadDF(fsPath("/hulu_movies.csv"))
huluDf.createOrReplaceTempView("hulumovies")
val netflixDf = loadDF(fsPath("/netflix_movies.csv"))
netflixDf.createOrReplaceTempView("netflixmovies")
//define our UDF
def similar(desc1:String, desc2:String): Double ={ //returns a ratio
val tags1 = desc1.split(" ").map(_.trim).toSet
val tags2 = desc2.split(" ").map(_.trim).toSet
//now tags1 and tags2 are each a set of Strings, each being a tag
tags1.intersect(tags2).size.toDouble / (tags1.size + tags2.size)
}
SELECT NetflixMovies.name, HuluMovies.name
FROM NetflixMovies
JOIN HuluMovies ON similar(HuluMovies.tags, NetflixMovies.tags) > 0.2;
SELECT NetflixMovies.name, HuluMovies.name
FROM NetflixMovies
JOIN HuluMovies ON HuluMovies.genre=NetflixMovies.genre;