Created
February 5, 2018 12:38
-
-
Save billmetangmo/70820d3783d49095c424d52f132ad5df to your computer and use it in GitHub Desktop.
Index Nutch data in ElasticSearch 5.x from spark-shell
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.elasticsearch.hadoop._ | |
import org.elasticsearch.spark._ | |
import java.time._ | |
import java.time.format._ | |
// Get parameters | |
// args(0) = Namenode IP Address ( Resource Manager on MapR distribution) | |
// args(1) = nutch segment name | |
// args(2) = Elasticsearch index name | |
val args = sc.getConf.get("spark.driver.args").split("\\s+") | |
if (args.length != 3) { println ( "Missing arguments : arg0->Namnode Ip address & arg1->Segment name & arg3->Index name"); System.exit(1) } | |
// define inputs | |
val metadata=sc.sequenceFile("hdfs://"+args(0)+"/nutch/segments/"+args(1)+"/content/*/data",classOf[org.apache.hadoop.io.Text], classOf[org.apache.nutch.protocol.Content]) | |
val parse_text=sc.sequenceFile("hdfs://"+args(0)+"/nutch/segments/"+args(1)+"/parse_text/*/data",classOf[org.apache.hadoop.io.Text], classOf[org.apache.nutch.parse.ParseText]) | |
// extract (key,value) = (url,text) as String because default type are serializable by default spark serializer | |
val text = parse_text.map { case(key,text) => ( key.toString,text.getText)} | |
// change date format from RFC_1123_DATE_TIME to from ISO_LOCAL_DATE_TIME as described in Java 8 doc | |
// RFC_1123_DATE_TIME : EEE, dd MMM yyyy HH:mm:ss GMT | |
// ISO_LOCAL_DATE_TIME : yyyy-MM-dd'T'HH:MM:SS ( supported by Elasticsearch 5.1.1) | |
val formatDate: String => String = x => x match { case null => null;case _ => DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(DateTimeFormatter.RFC_1123_DATE_TIME.parse(x)) } | |
// replace mal formatted dates by null ( dates which doesn't contain GMT) | |
val filterDate: String => String = x => { x match { case null => null ; case _ => x.contains("GMT") match { case true => x; case _ => null}} } | |
// filter url with none text | |
val IterableTextisEmpty: Iterable[String] => Boolean = x => x.isEmpty match { case false => true; case _ => false } | |
// Get metadata for a given url from content object | |
val filterMetadata: org.apache.nutch.protocol.Content => (String,String,String,String,String,String) = metadata => (metadata.getUrl,formatDate(filterDate(metadata.getMetadata.get("Last-Modified"))),metadata.getMetadata().get("nutch.segment.name"),formatDate(filterDate(metadata.getMetadata.get("Date"))),metadata.getMetadata.get("Content-Type"),metadata.getMetadata.get("Content-Length")) | |
// Define data class useful which is used to create a map list of all fields | |
case class Data(url: String,content: String, lastModified: String, segment: String,date:String, contentType:String, contentLength:String) | |
// create an object Data from iter1 & iter2 | |
// iter1 contains all metadata for a given url | |
// iter2 contains plain text for a given url | |
val createData: (Iterable[(String, String, String, String, String, String)],Iterable[String]) => Data = (iter1,iter2) => (Data(iter1.map(_._1).head,iter2.head.toString,iter1.map(_._2).head,iter1.map(_._3).head,iter1.map(_._4).head,iter1.map(_._5).head,iter1.map(_._6).head)) | |
// Create Map(field -> value) from Data class | |
val DataToMap: Data => Map[String, String] = data => Map("url"->data.url,"content"->data.content,"lastModified"->data.lastModified,"segment"->data.segment,"date"->data.date,"contentType"->data.contentType,"contentLength"->data.contentLength) | |
// Store metadata & text from a given url to Elasticsearch | |
// metadata.map { case(key,metadata) => ( key.toString,filterMetadata(metadata))} | |
// .cogroup(text) | |
// .filter { case (a,(iterMeta,iterText)) => IterableTextisEmpty(iterText)} | |
// .map { case( a,(iterMeta,iterText)) => ( a,{ DataToMap(createData(iterMeta,iterText))})} | |
// .values | |
// .saveToEs(args(2)+"/docs", Map("es.mapping.id" -> "url")) | |
metadata.map { case(key,metadata) => ( key.toString,filterMetadata(metadata))}.cogroup(text).filter { case (a,(iterMeta,iterText)) => IterableTextisEmpty(iterText)}.map { case( a,(iterMeta,iterText)) => ( a,{ DataToMap(createData(iterMeta,iterText))})}.values.saveToEs(args(2)+"/docs", Map("es.mapping.id" -> "url")) | |
// close spark-shell | |
println("<<<<<<<<< Program executed >>>>>>>>>") | |
System.exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Execution
You need additional jars :
Elasticsearch-spark : http://repo1.maven.org/maven2/org/elasticsearch/elasticsearch-spark-20_2.11/5.1.1/elasticsearch-spark-20_2.11-5.1.1.jar
Apache-nutch 1.12 : http://central.maven.org/maven2/org/apache/nutch/nutch/1.12/nutch-1.12.jar
Then: