Skip to content

Instantly share code, notes, and snippets.

@billmetangmo
Created February 5, 2018 12:38
Show Gist options
  • Save billmetangmo/70820d3783d49095c424d52f132ad5df to your computer and use it in GitHub Desktop.
Save billmetangmo/70820d3783d49095c424d52f132ad5df to your computer and use it in GitHub Desktop.
Index Nutch data in ElasticSearch 5.x from spark-shell
import org.elasticsearch.hadoop._
import org.elasticsearch.spark._
import java.time._
import java.time.format._
// Get parameters
// args(0) = Namenode IP Address ( Resource Manager on MapR distribution)
// args(1) = nutch segment name
// args(2) = Elasticsearch index name
val args = sc.getConf.get("spark.driver.args").split("\\s+")
if (args.length != 3) { println ( "Missing arguments : arg0->Namnode Ip address & arg1->Segment name & arg3->Index name"); System.exit(1) }
// define inputs
val metadata=sc.sequenceFile("hdfs://"+args(0)+"/nutch/segments/"+args(1)+"/content/*/data",classOf[org.apache.hadoop.io.Text], classOf[org.apache.nutch.protocol.Content])
val parse_text=sc.sequenceFile("hdfs://"+args(0)+"/nutch/segments/"+args(1)+"/parse_text/*/data",classOf[org.apache.hadoop.io.Text], classOf[org.apache.nutch.parse.ParseText])
// extract (key,value) = (url,text) as String because default type are serializable by default spark serializer
val text = parse_text.map { case(key,text) => ( key.toString,text.getText)}
// change date format from RFC_1123_DATE_TIME to from ISO_LOCAL_DATE_TIME as described in Java 8 doc
// RFC_1123_DATE_TIME : EEE, dd MMM yyyy HH:mm:ss GMT
// ISO_LOCAL_DATE_TIME : yyyy-MM-dd'T'HH:MM:SS ( supported by Elasticsearch 5.1.1)
val formatDate: String => String = x => x match { case null => null;case _ => DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(DateTimeFormatter.RFC_1123_DATE_TIME.parse(x)) }
// replace mal formatted dates by null ( dates which doesn't contain GMT)
val filterDate: String => String = x => { x match { case null => null ; case _ => x.contains("GMT") match { case true => x; case _ => null}} }
// filter url with none text
val IterableTextisEmpty: Iterable[String] => Boolean = x => x.isEmpty match { case false => true; case _ => false }
// Get metadata for a given url from content object
val filterMetadata: org.apache.nutch.protocol.Content => (String,String,String,String,String,String) = metadata => (metadata.getUrl,formatDate(filterDate(metadata.getMetadata.get("Last-Modified"))),metadata.getMetadata().get("nutch.segment.name"),formatDate(filterDate(metadata.getMetadata.get("Date"))),metadata.getMetadata.get("Content-Type"),metadata.getMetadata.get("Content-Length"))
// Define data class useful which is used to create a map list of all fields
case class Data(url: String,content: String, lastModified: String, segment: String,date:String, contentType:String, contentLength:String)
// create an object Data from iter1 & iter2
// iter1 contains all metadata for a given url
// iter2 contains plain text for a given url
val createData: (Iterable[(String, String, String, String, String, String)],Iterable[String]) => Data = (iter1,iter2) => (Data(iter1.map(_._1).head,iter2.head.toString,iter1.map(_._2).head,iter1.map(_._3).head,iter1.map(_._4).head,iter1.map(_._5).head,iter1.map(_._6).head))
// Create Map(field -> value) from Data class
val DataToMap: Data => Map[String, String] = data => Map("url"->data.url,"content"->data.content,"lastModified"->data.lastModified,"segment"->data.segment,"date"->data.date,"contentType"->data.contentType,"contentLength"->data.contentLength)
// Store metadata & text from a given url to Elasticsearch
// metadata.map { case(key,metadata) => ( key.toString,filterMetadata(metadata))}
// .cogroup(text)
// .filter { case (a,(iterMeta,iterText)) => IterableTextisEmpty(iterText)}
// .map { case( a,(iterMeta,iterText)) => ( a,{ DataToMap(createData(iterMeta,iterText))})}
// .values
// .saveToEs(args(2)+"/docs", Map("es.mapping.id" -> "url"))
metadata.map { case(key,metadata) => ( key.toString,filterMetadata(metadata))}.cogroup(text).filter { case (a,(iterMeta,iterText)) => IterableTextisEmpty(iterText)}.map { case( a,(iterMeta,iterText)) => ( a,{ DataToMap(createData(iterMeta,iterText))})}.values.saveToEs(args(2)+"/docs", Map("es.mapping.id" -> "url"))
// close spark-shell
println("<<<<<<<<< Program executed >>>>>>>>>")
System.exit(0)
@billmetangmo
Copy link
Author

billmetangmo commented Feb 5, 2018

Mapping ( Crawled URL <-> ES document )

ES Document

"_index": "spark",
  "_type": "docs",
  "_id": "<url>",
  "_score": 1,
  "_source": {
    "url": "<url>",
    "lastModified": "2017-03-23T10:06:55",
    "contentLength": "13278",
    "segment": "20170323110328",
    "contentType": "text/html; charset=utf-8",
    "date": "2017-03-23T10:07:59",
    "content": "<text>"
  },
  "fields": {
    "date": [
      1490263679000
    ],
    "lastModified": [
      1490263615000
    ]
  }
}

Nutch document

######## Folder "<segment name>/parse_text/*/data" (contains text by URL) ######
<key,content> = <url,ParseText.getText()>
 
######## Folder "<segment name>/content/*/data" (contains metadata by URL ) ######
<key,(url,lastModified,segment,contentLength,segment,contentType,date)> = <url,xxx>
 
lastModified : Content.getMetadata().get("Last-Modified")
segment : Content.getMetadata().get("nutch.segment.name")
date : Content.getMetadata().get("Date")
contentType : Content.getMetadata().get("Content-Type")
contentLength : Content.getMetadata().get("Content-Length")
url : Content.getUrl()

@billmetangmo
Copy link
Author

billmetangmo commented Feb 5, 2018

Execution

You need additional jars :

Elasticsearch-spark : http://repo1.maven.org/maven2/org/elasticsearch/elasticsearch-spark-20_2.11/5.1.1/elasticsearch-spark-20_2.11-5.1.1.jar
Apache-nutch 1.12 : http://central.maven.org/maven2/org/apache/nutch/nutch/1.12/nutch-1.12.jar

Then:

#Execute script on yarn , if you want to execute it locally remove "--master yarn" options
#<arg1>: Resource manager IP Address
#<arg2>: Nutch segment name to parse
#<arg3>: Elasticsearch index name ( should match <index_name> defined on Elasticsearch configuration above)
 
./spark-2.0.0-bin/bin/spark-shell --master yarn --jars libraries/apache-nutch-1.12.jar,libraries/elasticsearch-spark-20_2.11-5.1.1.jar -i NutchToElastic.scala --conf spark.driver.args="<arg1> <arg2> <arg3>"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment