Skip to content

Instantly share code, notes, and snippets.

@dragos
Created June 26, 2015 16:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dragos/f2b48ef15e6ef7049693 to your computer and use it in GitHub Desktop.
Save dragos/f2b48ef15e6ef7049693 to your computer and use it in GitHub Desktop.
Wikipedia filtering
package test
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import SQLContext._
import org.apache.spark.sql.types._
case class WikidataElement(id: String, sites: Map[String, String])
object Foo {
final val WikiJsonFile = "/Volumes/Thunderbolt_SSD/dragos/Documents/wikidata/20150622.json"
// final val WikiJsonFile = "/Volumes/Thunderbolt_SSD/dragos/Documents/wikidata/head.json"
val langs = Seq("en", "de")
def main(args: Array[String]): Unit = {
// explicit stripped-down schema to skip schema inference,
// which would take a full scan
val tpe = StructType(Seq(
StructField("id", StringType),
StructField("sitelinks",
MapType(StringType,
MapType(StringType, StringType))
)))
val conf = new SparkConf().setAppName("wiki-data")
.setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val rdd = sqlContext.read.schema(tpe).json(WikiJsonFile)
rdd.registerTempTable("wikidata")
val titles = sqlContext.sql("""
SELECT id,
sitelinks.enwiki.title AS entitle,
sitelinks.dewiki.title AS detitle
FROM wikidata
WHERE sitelinks IS NOT NULL
""").cache()
val res = titles.filter('entitle === 'detitle)
val start = System.currentTimeMillis()
println("Titles the same in english and german: " + res.count)
val duration = System.currentTimeMillis() - start
println("Duration: " + duration / 1000 + " seconds")
sc.stop()
}
}
@nraychaudhuri
Copy link

Very cool.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment