Created
January 5, 2016 20:35
-
-
Save thomaspouncy/e620d32d636f764df261 to your computer and use it in GitHub Desktop.
Natural Language Processing with Spark Code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.SparkContext | |
import org.apache.spark.SparkContext._ | |
import org.apache.spark.SparkConf | |
import org.apache.spark.sql._ | |
import org.apache.spark.sql.types._ | |
/* We use this to pass configuration properties to Vertica and CoreNLP */ | |
import java.util.{Properties, UUID} | |
/* Scala Java object conversion */ | |
import scala.collection.JavaConverters._ | |
import scala.io.Source._ | |
/* CoreNLP library components */ | |
import edu.stanford.nlp.pipeline._ | |
import edu.stanford.nlp.pipeline.StanfordCoreNLP | |
import edu.stanford.nlp.ling.CoreAnnotations._ | |
import edu.stanford.nlp.trees.Tree | |
import edu.stanford.nlp.trees.TreeCoreAnnotations.TreeAnnotation | |
import edu.stanford.nlp.dcoref.CorefChain | |
import edu.stanford.nlp.dcoref.CorefCoreAnnotations.CorefChainAnnotation | |
/* This class provides a wrapper for the CoreNLP process object. The wrapper | |
* extends Serializable so that Spark can serialize it and pass it to cluster nodes | |
* for parallel processing, and it will create a CoreNLP instance lazily | |
* Thanks to https://github.com/databricks/spark-corenlp for this idea */ | |
class StanfordCoreNLPWrapper(private val props: Properties) extends Serializable { | |
/* The transient annotation here tells Scala not to attempt to serialize the value | |
* of the coreNLP variable. CoreNLP starts up a parallel JVM process and returns | |
* information for connecting to that process, so even if the object were serializable | |
* it wouldn't make any sense to pass those values since the process would not be | |
* be guaranteed to be running on the other nodes. */ | |
@transient private var coreNLP: StanfordCoreNLP = _ | |
def get: StanfordCoreNLP = { | |
if (coreNLP == null) { | |
coreNLP = new StanfordCoreNLP(props) | |
} | |
coreNLP | |
} | |
} | |
def truncate(value: String,length: Int) : String = { | |
var return_val = value | |
if (value != null && value.length() > length) { | |
return_val = value.substring(0, length) | |
} | |
return return_val; | |
} | |
/* This method runs each comment from Vertica against the CoreNLP process | |
* and parses the results into the proper format for storing the info in Vertica | |
* It can definitely use some cleaning up. Much of the redundant code in here was | |
* a result of trying to deal with comments that were oddly formatted in the database | |
* so that the whole process doesn’t crash and terminate. */ | |
def extract_entities(wrapper: StanfordCoreNLPWrapper, row: Row) : Row = { | |
try { | |
/* Pull in the Vertica data from this row. Use getOrElse to handle default values, and | |
* truncate values so that when we output back to Vertica we don't get errors about | |
* values being too large or unexpectedly null */ | |
val comment_id = truncate(getRowString(row,"id").getOrElse(""),63) | |
val comment = getRowString(row,"comment").getOrElse("") | |
val date = new java.util.Date() | |
val curr_time = new java.sql.Timestamp(date.getTime) | |
val curr_time_str = truncate(curr_time.toString(),63) | |
val ratedTimestamp = truncate(getRowTimestamp(row,"ratedTimestamp").getOrElse(curr_time).toString(),63) | |
val normalized_rating = getRowDouble(row,"normalized_rating").getOrElse(0.0) | |
val location_id = truncate(getRowString(row,"location_id").getOrElse(""),2499) | |
val tenant_id = truncate(getRowString(row,"tenant_id").getOrElse(""),63) | |
if(comment.length() == 0) { | |
return Row( | |
comment_id, | |
ratedTimestamp, | |
normalized_rating, | |
tenant_id, | |
location_id, | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
curr_time_str | |
) | |
} | |
val proc = wrapper.get | |
val doc = new Annotation(comment) | |
proc.annotate(doc) | |
val valid_tags = Array[String]("ORGANIZATION","PERSON") | |
var entity_mentions = Array[String]() | |
var ne_tags = Array[String]() | |
val sents = doc.get(classOf[SentencesAnnotation]).asScala.toList | |
var pos_tags = Array[String]() | |
var lemmas = Array[String]() | |
/* TO-DO - pull in parse trees as well */ | |
var parse_trees = Array[String]() | |
for(sent 0) { | |
entity_mentions = entity_mentions :+ ("\""+current_ne.mkString(" ")+"\"") | |
current_ne = Array[String]() | |
} | |
} | |
pos_tags = pos_tags :+ word.get(classOf[PartOfSpeechAnnotation]) | |
lemmas = lemmas :+ word.get(classOf[LemmaAnnotation]) | |
} | |
if(current_ne.length > 0) { | |
entity_mentions = entity_mentions :+ ("\""+current_ne.mkString(" ")+"\"") | |
current_ne = Array[String]() | |
} | |
} | |
var corefs = "" | |
var unique_entities = entity_mentions.toSet | |
val result = Row( | |
comment_id, | |
ratedTimestamp, | |
normalized_rating, | |
tenant_id, | |
location_id, | |
truncate(unique_entities.mkString(","),2499), | |
truncate(ne_tags.mkString(" "),2499), | |
truncate(pos_tags.mkString(" "),2499), | |
truncate(lemmas.mkString(" "),2499), | |
truncate(parse_trees.mkString(" "),2499), | |
truncate(corefs,2499), | |
curr_time_str | |
) | |
return result | |
} catch { | |
case npe: java.lang.NullPointerException => Row("","",0.0,"","","","","","","","","") | |
} | |
} | |
def main(args: Array[String]) { | |
val conf = new SparkConf().setAppName("Scala NER Parser Application") | |
val sc = new SparkContext(conf) | |
val sqlContext = new SQLContext(sc) | |
val base_jdbc_url = OMITTED | |
val db_props = new Properties(OMITTED) | |
var table_str = OMITTED | |
val df = sqlContext.read.jdbc(url = base_jdbc_url,table = table_str, properties = db_props) | |
/* Spark runs on a single partition by default, so if you want to force it to run in | |
* in parallel, you have to tell it how many partitions you want to create. In order to | |
* handle this automatically, we pull information from the Spark config to figure out how | |
* many EMR Core nodes we have available (Amazon sets the spark.executor.instances value to | |
* the # of nodes and the spark.executors.memory value to the size of each node). */ | |
val num_exec = sc.getConf.get("spark.executor.instances").toInt | |
/* Here we use mapPartitions to ensure that the CoreNLP process is instantiated once per | |
* partition/node. */ | |
val rdd = df.repartition(num_exec).mapPartitions(iter => { | |
val props = new Properties() | |
props.setProperty("annotators", "tokenize, ssplit, pos, lemma, ner") | |
val wrapper = new StanfordCoreNLPWrapper(props) | |
iter.map{ x => if(x == null){ Row("","",0.0,"","","","","","","","") } else { extract_entities(wrapper, x) } } | |
}) | |
val schema = StructType(Array[StructField]( | |
StructField("comment_id", StringType, false), | |
StructField("ratedTimestamp", StringType, true), | |
StructField("normalized_rating", DoubleType, false), | |
StructField("tenant_id", StringType, true), | |
StructField("location_id", StringType, true), | |
StructField("entities", StringType, true), | |
StructField("ner_tags", StringType, true), | |
StructField("pos_tags", StringType, true), | |
StructField("lemmas", StringType, true), | |
StructField("parse_tree", StringType, true), | |
StructField("corefs", StringType, true), | |
StructField("taggedTimestamp", StringType, true) | |
) | |
) | |
val output_df = sqlContext.createDataFrame(rdd,schema) | |
/* It is important to persist the result of the output_df value here so that | |
* in the next step when we run the flatMap on output_df, it doesn't re-run | |
* the extract_entities step above (persist tells Spark to cache the values | |
* of the last run in memory and re-use them) */ | |
output_df.persist().write.mode("append").jdbc(url = base_jdbc_url, table = OMITTED, connectionProperties = db_props) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi, the code has certain errors in line 106. And further, what did you do to ne_tags? I can't find it changing after being declared as an empty array.