Instantly share code, notes, and snippets.

What would you like to do?
Natural Language Processing with Spark Code
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.types._
/* We use this to pass configuration properties to Vertica and CoreNLP */
import java.util.{Properties, UUID}
/* Scala Java object conversion */
import scala.collection.JavaConverters._
/* CoreNLP library components */
import edu.stanford.nlp.pipeline._
import edu.stanford.nlp.pipeline.StanfordCoreNLP
import edu.stanford.nlp.ling.CoreAnnotations._
import edu.stanford.nlp.trees.Tree
import edu.stanford.nlp.trees.TreeCoreAnnotations.TreeAnnotation
import edu.stanford.nlp.dcoref.CorefChain
import edu.stanford.nlp.dcoref.CorefCoreAnnotations.CorefChainAnnotation
/* This class provides a wrapper for the CoreNLP process object. The wrapper
* extends Serializable so that Spark can serialize it and pass it to cluster nodes
* for parallel processing, and it will create a CoreNLP instance lazily
* Thanks to for this idea */
class StanfordCoreNLPWrapper(private val props: Properties) extends Serializable {
/* The transient annotation here tells Scala not to attempt to serialize the value
* of the coreNLP variable. CoreNLP starts up a parallel JVM process and returns
* information for connecting to that process, so even if the object were serializable
* it wouldn't make any sense to pass those values since the process would not be
* be guaranteed to be running on the other nodes. */
@transient private var coreNLP: StanfordCoreNLP = _
def get: StanfordCoreNLP = {
if (coreNLP == null) {
coreNLP = new StanfordCoreNLP(props)
def truncate(value: String,length: Int) : String = {
var return_val = value
if (value != null && value.length() > length) {
return_val = value.substring(0, length)
return return_val;
/* This method runs each comment from Vertica against the CoreNLP process
* and parses the results into the proper format for storing the info in Vertica
* It can definitely use some cleaning up. Much of the redundant code in here was
* a result of trying to deal with comments that were oddly formatted in the database
* so that the whole process doesn’t crash and terminate. */
def extract_entities(wrapper: StanfordCoreNLPWrapper, row: Row) : Row = {
try {
/* Pull in the Vertica data from this row. Use getOrElse to handle default values, and
* truncate values so that when we output back to Vertica we don't get errors about
* values being too large or unexpectedly null */
val comment_id = truncate(getRowString(row,"id").getOrElse(""),63)
val comment = getRowString(row,"comment").getOrElse("")
val date = new java.util.Date()
val curr_time = new java.sql.Timestamp(date.getTime)
val curr_time_str = truncate(curr_time.toString(),63)
val ratedTimestamp = truncate(getRowTimestamp(row,"ratedTimestamp").getOrElse(curr_time).toString(),63)
val normalized_rating = getRowDouble(row,"normalized_rating").getOrElse(0.0)
val location_id = truncate(getRowString(row,"location_id").getOrElse(""),2499)
val tenant_id = truncate(getRowString(row,"tenant_id").getOrElse(""),63)
if(comment.length() == 0) {
return Row(
val proc = wrapper.get
val doc = new Annotation(comment)
val valid_tags = Array[String]("ORGANIZATION","PERSON")
var entity_mentions = Array[String]()
var ne_tags = Array[String]()
val sents = doc.get(classOf[SentencesAnnotation]).asScala.toList
var pos_tags = Array[String]()
var lemmas = Array[String]()
/* TO-DO - pull in parse trees as well */
var parse_trees = Array[String]()
for(sent 0) {
entity_mentions = entity_mentions :+ ("\""+current_ne.mkString(" ")+"\"")
current_ne = Array[String]()
pos_tags = pos_tags :+ word.get(classOf[PartOfSpeechAnnotation])
lemmas = lemmas :+ word.get(classOf[LemmaAnnotation])
if(current_ne.length > 0) {
entity_mentions = entity_mentions :+ ("\""+current_ne.mkString(" ")+"\"")
current_ne = Array[String]()
var corefs = ""
var unique_entities = entity_mentions.toSet
val result = Row(
truncate(ne_tags.mkString(" "),2499),
truncate(pos_tags.mkString(" "),2499),
truncate(lemmas.mkString(" "),2499),
truncate(parse_trees.mkString(" "),2499),
return result
} catch {
case npe: java.lang.NullPointerException => Row("","",0.0,"","","","","","","","","")
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Scala NER Parser Application")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val base_jdbc_url = OMITTED
val db_props = new Properties(OMITTED)
var table_str = OMITTED
val df = = base_jdbc_url,table = table_str, properties = db_props)
/* Spark runs on a single partition by default, so if you want to force it to run in
* in parallel, you have to tell it how many partitions you want to create. In order to
* handle this automatically, we pull information from the Spark config to figure out how
* many EMR Core nodes we have available (Amazon sets the spark.executor.instances value to
* the # of nodes and the spark.executors.memory value to the size of each node). */
val num_exec = sc.getConf.get("spark.executor.instances").toInt
/* Here we use mapPartitions to ensure that the CoreNLP process is instantiated once per
* partition/node. */
val rdd = df.repartition(num_exec).mapPartitions(iter => {
val props = new Properties()
props.setProperty("annotators", "tokenize, ssplit, pos, lemma, ner")
val wrapper = new StanfordCoreNLPWrapper(props){ x => if(x == null){ Row("","",0.0,"","","","","","","","") } else { extract_entities(wrapper, x) } }
val schema = StructType(Array[StructField](
StructField("comment_id", StringType, false),
StructField("ratedTimestamp", StringType, true),
StructField("normalized_rating", DoubleType, false),
StructField("tenant_id", StringType, true),
StructField("location_id", StringType, true),
StructField("entities", StringType, true),
StructField("ner_tags", StringType, true),
StructField("pos_tags", StringType, true),
StructField("lemmas", StringType, true),
StructField("parse_tree", StringType, true),
StructField("corefs", StringType, true),
StructField("taggedTimestamp", StringType, true)
val output_df = sqlContext.createDataFrame(rdd,schema)
/* It is important to persist the result of the output_df value here so that
* in the next step when we run the flatMap on output_df, it doesn't re-run
* the extract_entities step above (persist tells Spark to cache the values
* of the last run in memory and re-use them) */
output_df.persist().write.mode("append").jdbc(url = base_jdbc_url, table = OMITTED, connectionProperties = db_props)

This comment has been minimized.

Copy link

raviranjan-innoplexus commented Feb 22, 2017

Hi, the code has certain errors in line 106. And further, what did you do to ne_tags? I can't find it changing after being declared as an empty array.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment