Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Apache Spark Importer for generating Neo4j batch inserter CSV files from DBPedia RDF dumps
/**
* Copyright (C) 2014 Kenny Bastani
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
object Configuration {
def HDFS_HOST: String = "hdfs://10.0.0.4:8020/"
def PRIMARY_TOPIC_URL: String = "<http://xmlns.com/foaf/0.1/primaryTopic>"
def RDF_LABEL_URL: String = "<http://www.w3.org/2000/01/rdf-schema#label>"
def WIKI_PAGE_LINK_URL: String = "http://dbpedia.org/ontology/wikiPageWikiLink"
def EXCLUDE_FILE_PATTERN: String = "http://dbpedia.org/resource/File:"
def EXCLUDE_CATEGORY_PATTERN: String = "http://dbpedia.org/resource/Category:"
def WIKI_LINKS_FILE_NAME: String = HDFS_HOST + "wikipedia_links_en.nt"
def WIKI_NAMES_FILE_NAME: String = HDFS_HOST + "labels_en.nt"
def PAGE_LINKS_FILE_NAME: String = HDFS_HOST + "page_links_en.nt"
def PAGE_NODES_CSV_HEADER: String = "dbpedia\tid\tl:label\twikipedia\ttitle";
def PAGE_LINKS_CSV_HEADER: String = "start\tend\ttype";
}
/**
* Copyright (C) 2014 Kenny Bastani
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
object DBpediaImporter {
val sc = new SparkContext("local", "Simple App", "YOUR_SPARK_HOME",
List("target/scala-2.10/simple-project_2.10-1.0.jar"))
def main() {
val wikiLinksFile = sc.textFile(Configuration.WIKI_LINKS_FILE_NAME)
val wikiNamesFile = sc.textFile(Configuration.WIKI_NAMES_FILE_NAME)
val pageLinksFile = sc.textFile(Configuration.PAGE_LINKS_FILE_NAME)
val wikiLinksMap = processWikiLinks(wikiLinksFile)
val wikiNamesMap = processWikiNames(wikiNamesFile)
val pageNodeData = joinWikiNamesLinks(wikiLinksMap, wikiNamesMap)
val pageNodeRows = generatePageNodes(pageNodeData)
val pageNodeIndex = pageNodeData.map(r => { r._1 }).zipWithUniqueId.collectAsMap()
val pageLinkRelationshipData = processPageLinks(pageLinksFile)
val pageLinkRelationshipRows = encodePageLinks(pageLinkRelationshipData, pageNodeIndex)
pageNodeRows.saveAsTextFile(Configuration.HDFS_HOST + "pagenodes")
pageLinkRelationshipRows.saveAsTextFile(Configuration.HDFS_HOST + "pagerels")
}
/**
* Process Wikipedia Links RDF file
* @param wikiLinksFile
* @return Returns an RDD[String] map of filtered lines for import into Neo4j
*/
def processWikiLinks(wikiLinksFile: RDD[String]): RDD[String] = {
val wikiLinksMap = wikiLinksFile.filter(line =>
line.contains(Configuration.PRIMARY_TOPIC_URL) &&
!line.contains(Configuration.EXCLUDE_FILE_PATTERN))
.map(e => {
e.split("(?<=>)\\s(?=<)|\\s\\.$")
.filter(a => {
!a.contains()
})
})
.map(uri => {
(uri(1), uri(0))
})
.map(line => {
line._1 + " " + line._2
})
wikiLinksMap
}
/**
*
* @param wikiNamesFile
* @return
*/
def processWikiNames(wikiNamesFile: RDD[String]): RDD[String] = {
val wikiNamesMap = wikiNamesFile.filter(line => line.contains(Configuration.RDF_LABEL_URL))
.filter(line => !line.contains(Configuration.EXCLUDE_FILE_PATTERN))
.map(e => {
e.split("(?<=>)\\s(?=<)|(?<=>)\\s(?=\\\")|@en\\s\\.$")
.filter(a => { !a.contains(Configuration.RDF_LABEL_URL) })
})
.map(uri => { (uri(0), uri(1)) })
.map(line => { line._1 + " " + line._2 })
wikiNamesMap
}
/**
*
* @param wikiLinksMap
* @param wikiNamesMap
* @return
*/
def joinWikiNamesLinks(wikiLinksMap: RDD[String], wikiNamesMap: RDD[String]): RDD[(String, String)] = {
// Join the sets, map on DBPEDIA_RESOURCE, reduce
val joinedList = wikiLinksMap.union(wikiNamesMap)
.map(line => {
val items = line.split("^<|>\\s<|\\>\\s\\\"|\\\"$|>$")
.filter(!_.isEmpty);
val mapResult = (if (items.length >= 2) (items(0), items(1)) else ("N/A", "N/A"))
mapResult
}).filter(items => items._1 != "N/A").map(a => (a._1, a._2))
joinedList
}
/**
*
* @param pageNodeData
* @return
*/
def generatePageNodes(pageNodeData: RDD[(String, String)]): RDD[String] = {
val header = sc.parallelize(Seq(Configuration.PAGE_NODES_CSV_HEADER).toList)
val rows = pageNodeData.zipWithUniqueId.map(e => {
e._1._1 + "\t" + e._2 + "\t" + "Page\t" + e._1._2.toList.mkString("\t")
})
val result = header.union(rows)
result
}
/**
*
* @param pageLinks
* @param pageNodeIndex
* @return
*/
def encodePageLinks(pageLinks: RDD[(Array[String])], pageNodeIndex: scala.collection.Map[String, Long]): RDD[(Long, Long)] = {
// Filter out bad links
val encodedPageLinksResult = pageLinks.filter(uri => {
!(pageNodeIndex.getOrElse(uri(0), -1) == -1 || pageNodeIndex.getOrElse(uri(1), -1) == -1)
}).map(uri => {
(pageNodeIndex(uri(0)), pageNodeIndex(uri(1)))
})
encodedPageLinksResult
}
/**
*
* @param pageLinksFile
* @return
*/
def processPageLinks(pageLinksFile: RDD[String]): RDD[(Array[String])] = {
// RDD Result -> <DBPEDIA_RESOURCE> <WIKIPEDIA_URL>
val pageLinks = pageLinksFile.filter(line =>
line.contains(Configuration.WIKI_PAGE_LINK_URL) &&
!line.contains(Configuration.EXCLUDE_FILE_PATTERN) &&
!line.contains(Configuration.EXCLUDE_CATEGORY_PATTERN))
.map(e => {
e.split("^<|>\\s<|\\>\\s\\\"|>\\s\\.$")
.filter(!_.isEmpty)
.filter(a => { !a.contains(Configuration.WIKI_PAGE_LINK_URL) })
})
pageLinks
}
/**
*
* @param pageLinkResults
* @return
*/
def generatePageLinkRelationships(pageLinkResults: RDD[(String, String)]): RDD[String] = {
val relHeader = sc.parallelize(Seq(Configuration.PAGE_LINKS_CSV_HEADER).toList)
val relRows = pageLinkResults.map(line => { line._1 + "\t" + line._2 + "\tHAS_LINK" }).map(line => line.toString)
val relResult = relHeader.union(relRows)
relResult
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment