Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Natural Language Processing with Spark Code
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.types._
/* We use this to pass configuration properties to Vertica and CoreNLP */
import java.util.{Properties, UUID}
/* Scala Java object conversion */
import scala.collection.JavaConverters._
import scala.io.Source._
/* CoreNLP library components */
import edu.stanford.nlp.pipeline._
import edu.stanford.nlp.pipeline.StanfordCoreNLP
import edu.stanford.nlp.ling.CoreAnnotations._
import edu.stanford.nlp.trees.Tree
import edu.stanford.nlp.trees.TreeCoreAnnotations.TreeAnnotation
import edu.stanford.nlp.dcoref.CorefChain
import edu.stanford.nlp.dcoref.CorefCoreAnnotations.CorefChainAnnotation
/* This class provides a wrapper for the CoreNLP process object. The wrapper
* extends Serializable so that Spark can serialize it and pass it to cluster nodes
* for parallel processing, and it will create a CoreNLP instance lazily
* Thanks to https://github.com/databricks/spark-corenlp for this idea */
class StanfordCoreNLPWrapper(private val props: Properties) extends Serializable {
/* The transient annotation here tells Scala not to attempt to serialize the value
* of the coreNLP variable. CoreNLP starts up a parallel JVM process and returns
* information for connecting to that process, so even if the object were serializable
* it wouldn't make any sense to pass those values since the process would not be
* be guaranteed to be running on the other nodes. */
@transient private var coreNLP: StanfordCoreNLP = _
def get: StanfordCoreNLP = {
if (coreNLP == null) {
coreNLP = new StanfordCoreNLP(props)
}
coreNLP
}
}
def truncate(value: String,length: Int) : String = {
var return_val = value
if (value != null && value.length() > length) {
return_val = value.substring(0, length)
}
return return_val;
}
/* This method runs each comment from Vertica against the CoreNLP process
* and parses the results into the proper format for storing the info in Vertica
* It can definitely use some cleaning up. Much of the redundant code in here was
* a result of trying to deal with comments that were oddly formatted in the database
* so that the whole process doesn’t crash and terminate. */
def extract_entities(wrapper: StanfordCoreNLPWrapper, row: Row) : Row = {
try {
/* Pull in the Vertica data from this row. Use getOrElse to handle default values, and
* truncate values so that when we output back to Vertica we don't get errors about
* values being too large or unexpectedly null */
val comment_id = truncate(getRowString(row,"id").getOrElse(""),63)
val comment = getRowString(row,"comment").getOrElse("")
val date = new java.util.Date()
val curr_time = new java.sql.Timestamp(date.getTime)
val curr_time_str = truncate(curr_time.toString(),63)
val ratedTimestamp = truncate(getRowTimestamp(row,"ratedTimestamp").getOrElse(curr_time).toString(),63)
val normalized_rating = getRowDouble(row,"normalized_rating").getOrElse(0.0)
val location_id = truncate(getRowString(row,"location_id").getOrElse(""),2499)
val tenant_id = truncate(getRowString(row,"tenant_id").getOrElse(""),63)
if(comment.length() == 0) {
return Row(
comment_id,
ratedTimestamp,
normalized_rating,
tenant_id,
location_id,
"",
"",
"",
"",
"",
"",
curr_time_str
)
}
val proc = wrapper.get
val doc = new Annotation(comment)
proc.annotate(doc)
val valid_tags = Array[String]("ORGANIZATION","PERSON")
var entity_mentions = Array[String]()
var ne_tags = Array[String]()
val sents = doc.get(classOf[SentencesAnnotation]).asScala.toList
var pos_tags = Array[String]()
var lemmas = Array[String]()
/* TO-DO - pull in parse trees as well */
var parse_trees = Array[String]()
for(sent 0) {
entity_mentions = entity_mentions :+ ("\""+current_ne.mkString(" ")+"\"")
current_ne = Array[String]()
}
}
pos_tags = pos_tags :+ word.get(classOf[PartOfSpeechAnnotation])
lemmas = lemmas :+ word.get(classOf[LemmaAnnotation])
}
if(current_ne.length > 0) {
entity_mentions = entity_mentions :+ ("\""+current_ne.mkString(" ")+"\"")
current_ne = Array[String]()
}
}
var corefs = ""
var unique_entities = entity_mentions.toSet
val result = Row(
comment_id,
ratedTimestamp,
normalized_rating,
tenant_id,
location_id,
truncate(unique_entities.mkString(","),2499),
truncate(ne_tags.mkString(" "),2499),
truncate(pos_tags.mkString(" "),2499),
truncate(lemmas.mkString(" "),2499),
truncate(parse_trees.mkString(" "),2499),
truncate(corefs,2499),
curr_time_str
)
return result
} catch {
case npe: java.lang.NullPointerException => Row("","",0.0,"","","","","","","","","")
}
}
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Scala NER Parser Application")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val base_jdbc_url = OMITTED
val db_props = new Properties(OMITTED)
var table_str = OMITTED
val df = sqlContext.read.jdbc(url = base_jdbc_url,table = table_str, properties = db_props)
/* Spark runs on a single partition by default, so if you want to force it to run in
* in parallel, you have to tell it how many partitions you want to create. In order to
* handle this automatically, we pull information from the Spark config to figure out how
* many EMR Core nodes we have available (Amazon sets the spark.executor.instances value to
* the # of nodes and the spark.executors.memory value to the size of each node). */
val num_exec = sc.getConf.get("spark.executor.instances").toInt
/* Here we use mapPartitions to ensure that the CoreNLP process is instantiated once per
* partition/node. */
val rdd = df.repartition(num_exec).mapPartitions(iter => {
val props = new Properties()
props.setProperty("annotators", "tokenize, ssplit, pos, lemma, ner")
val wrapper = new StanfordCoreNLPWrapper(props)
iter.map{ x => if(x == null){ Row("","",0.0,"","","","","","","","") } else { extract_entities(wrapper, x) } }
})
val schema = StructType(Array[StructField](
StructField("comment_id", StringType, false),
StructField("ratedTimestamp", StringType, true),
StructField("normalized_rating", DoubleType, false),
StructField("tenant_id", StringType, true),
StructField("location_id", StringType, true),
StructField("entities", StringType, true),
StructField("ner_tags", StringType, true),
StructField("pos_tags", StringType, true),
StructField("lemmas", StringType, true),
StructField("parse_tree", StringType, true),
StructField("corefs", StringType, true),
StructField("taggedTimestamp", StringType, true)
)
)
val output_df = sqlContext.createDataFrame(rdd,schema)
/* It is important to persist the result of the output_df value here so that
* in the next step when we run the flatMap on output_df, it doesn't re-run
* the extract_entities step above (persist tells Spark to cache the values
* of the last run in memory and re-use them) */
output_df.persist().write.mode("append").jdbc(url = base_jdbc_url, table = OMITTED, connectionProperties = db_props)
}
@raviranjan-innoplexus

This comment has been minimized.

Copy link

commented Feb 22, 2017

Hi, the code has certain errors in line 106. And further, what did you do to ne_tags? I can't find it changing after being declared as an empty array.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.