Skip to content

Instantly share code, notes, and snippets.

@hsyed
Created February 2, 2014 17:43
Show Gist options
  • Save hsyed/8771986 to your computer and use it in GitHub Desktop.
Save hsyed/8771986 to your computer and use it in GitHub Desktop.
package edu.hsyed.nlp
import edu.hsyed.nlp.pgforumdb.forumstats.MyPostgresDriver
import scala.collection.JavaConverters._
import scala.collection.JavaConversions._
import forumavroschema._
import MyPostgresDriver.simple._
import Database.threadLocalSession
import edu.hsyed.nlp.pgforumdb.CrawlerModel.DB.{ForumPosts, ForumUsers, ForumTopics}
import edu.hsyed.nlp.pgforumdb.DBCrawler
import parquet.hadoop.{ParquetOutputFormat, ParquetInputFormat}
import parquet.avro._
import org.apache.hadoop.fs.Path
import edu.hsyed.nlp.pgforumdb.CrawlerModel.Extractor.TopicCrawlState
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.mapreduce.Job
import java.util.concurrent.atomic.AtomicInteger
import java.util.Properties
import edu.stanford.nlp.pipeline.{Annotation, StanfordCoreNLP}
import edu.stanford.nlp.trees.TreeCoreAnnotations.TreeAnnotation
import edu.stanford.nlp.ling.CoreAnnotations.SentencesAnnotation
import org.apache.avro.specific.SpecificData
import edu.hsyed.nlp.pgforumdb.CrawlerModel.Extractor.ForumTopic
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator
class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[User])
kryo.register(classOf[Quote])
kryo.register(classOf[Post])
kryo.register(classOf[Topic])
}
}
object genForum {
private def makeUser(forumId: Int, userId: Int): User = {
DBCrawler.db.withSession {
val fstartedBy = Query(ForumUsers).filter(y => y.forumId === forumId && y.id === userId).first()
new User(forumId, userId, fstartedBy.name.getOrElse(""), fstartedBy.joinDate.map(_.toString).getOrElse(""))
}
}
def genForum {
class MyWriter extends AvroParquetWriter[Topic](new Path("posts.parq"), Topic.getClassSchema) {
override def write(t: Topic) {
synchronized {
super.write(t)
}
}
}
def makeTopic(x: ForumTopic): Topic = {
val topic = new Topic()
topic.setForumId(x.forumId)
topic.setId(x.id)
topic.setCategoryId(x.categoryId)
topic.setLastPostAt(x.lastPostAt.map(_.toString).getOrElse(""))
topic.setIsSticky(x.isSticky)
topic.setIsPoll(x.isPoll)
topic.setHasPrefix(x.hasPrefix)
topic.setTitle(x.title)
topic.setUrl("")
topic.setPostCount(x.postCount)
topic.setViewCount(x.viewCount)
topic.setStartedBy(makeUser(x.forumId, x.startedBy))
topic.setStartedAt(x.startedAt.map(_.toString).getOrElse(""))
val qpst =
DBCrawler.db.withSession {
Query(ForumPosts).filter(xx => {
xx.forumId === x.forumId && xx.topicId === x.id
}).list()
}
val pst = qpst.map(xxx => {
val p = new Post()
p.setForumId(xxx.forumId)
p.setId(xxx.id)
p.setTopicId(xxx.topicId)
p.setCategoryId(xxx.categoryId)
p.setIndex(xxx.index)
p.setPostPage(xxx.postPage)
p.setPostBy(makeUser(xxx.forumId, xxx.userId))
p.setPostedAt(xxx.postedAt.toString)
p.setPostText(xxx.postText)
if (xxx.quotes.nonEmpty) p.setQuotes(xxx.quotes.map(z => {
new Quote(z.index, z.startOffset, z.endOffset, z.isDirect, z.quotedId)
}).asJava)
p
}).asJava
topic.setPosts(pst)
topic
}
val writer = new MyWriter
//val writer = new AvroParquetWriter[Topic](new Path("posts.parq"), Topic.getClassSchema, CompressionCodecName.SNAPPY, ParquetWriter.DEFAULT_BLOCK_SIZE / 2, ParquetWriter.DEFAULT_PAGE_SIZE)
val q =
DBCrawler.db.withSession {
Query(ForumTopics).filter(x => x.crawlState === TopicCrawlState.Done).list()
}
val sz = q.size
val c = new AtomicInteger(0)
q.par.foreach {
x =>
writer.write(makeTopic(x))
val count = c.incrementAndGet()
print(f"\r${count.toFloat * 100 / sz}%4.2f%%")
}
writer.close()
}
object MyAnnotator extends ThreadLocal[StanfordCoreNLP] {
val props = new Properties()
props.put("annotators", "tokenize,ssplit,pos,lemma,parse")
override def initialValue = new StanfordCoreNLP(props)
}
def localNLPTransformation {
//new AvroParquetReader[Topic]()
}
def sparkNLPTransformation() {
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
System.setProperty("spark.kryo.registrator", "edu.hsyed.nlp.MyRegistrator")
val sc = new SparkContext("local[8]", "forumAddNlp")
// io configuration
val job = new Job()
ParquetInputFormat.setReadSupportClass(job, classOf[AvroReadSupport[Topic]])
ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport])
AvroParquetOutputFormat.setSchema(job, Topic.getClassSchema)
// configure annotator
val props = new Properties()
props.put("annotators", "tokenize,ssplit,pos,lemma,parse")
// annotator function
def annotatePosts(top: Topic): Topic = {
val new_p = top.getPosts.map {
x =>
val at = new Annotation(x.getPostText.toString)
MyAnnotator.get.annotate(at)
val t = at.get(classOf[SentencesAnnotation]).map(_.get(classOf[TreeAnnotation])).toList
val r = SpecificData.get().deepCopy[Post](x.getSchema, x)
if (t.nonEmpty) r.setTrees(t)
r
}
val new_t = SpecificData.get().deepCopy[Topic](top.getSchema, top)
new_t.setPosts(new_p)
new_t
}
// transformation
val ds = sc.newAPIHadoopFile("forum_dataset.parq", classOf[ParquetInputFormat[Topic]], classOf[Void], classOf[Topic], job.getConfiguration).repartition(16)
val new_ds = ds.map(x => (null, annotatePosts(x._2)))
// new_ds.foreach(println(_))
new_ds.saveAsNewAPIHadoopFile("annotated_posts.parq",
classOf[Void],
classOf[Topic],
classOf[ParquetOutputFormat[Topic]],
job.getConfiguration
)
}
}
object Start extends App {
genForum.sparkNLPTransformation()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment