Skip to content

Instantly share code, notes, and snippets.

@darionyaphet
Created October 13, 2017 07:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save darionyaphet/2c988cf89a2801f787f175a5fc720007 to your computer and use it in GitHub Desktop.
Save darionyaphet/2c988cf89a2801f787f175a5fc720007 to your computer and use it in GitHub Desktop.
package org.darion.yaphet.gearpump
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.cluster.client.ClientContext
import org.apache.gearpump.streaming.{Processor, StreamApplication}
import org.apache.gearpump.streaming.partitioner.HashPartitioner
import org.apache.gearpump.streaming.source.DataSourceProcessor
import org.apache.gearpump.util.{AkkaApp, Graph}
import org.apache.gearpump.util.Graph.Node
//org.darion.yaphet.gearpump.GearpumpMain
object GearpumpMain extends AkkaApp {
override def main(akkaConf: Config, args: Array[String]): Unit = {
val context = ClientContext(akkaConf)
implicit val actorSystem = context.system
val sourceProcessor = DataSourceProcessor(new WordSource, args(1).toInt, "Split")
val sink = Processor[WordSink](args(2).toInt)
val partitioner = new HashPartitioner
val computation = sourceProcessor ~ partitioner ~> sink
val app = StreamApplication("wordCount", Graph(computation), UserConfig.empty)
context.submit(app)
}
override def help(): Unit = {
}
}
package org.darion.yaphet.gearpump
import java.time.Instant
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.task.{Task, TaskContext}
import scala.collection.mutable
class WordSink(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
private val map: mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]()
private var snapShotTime: Long = System.currentTimeMillis()
override def onStart(startTime: Instant): Unit = {
snapShotTime = System.currentTimeMillis()
}
override def onNext(msg: Message): Unit = {
val current = map.getOrElse(msg.value.asInstanceOf[String], 0L)
map.put(msg.value.asInstanceOf[String], current + 1)
//if ((System.currentTimeMillis() - snapShotTime) > 1000 * 30) {
// for ((k, v) <- map) {
// println(k + " " + v)
// }
}
}
}
package org.darion.yaphet.gearpump
import java.io.File
import java.nio.charset.Charset
import java.time.Instant
import java.util.Random
import org.apache.gearpump.Message
import org.apache.gearpump.com.google.common.io.Files
import org.apache.gearpump.streaming.source.DataSource
import org.apache.gearpump.streaming.task.TaskContext
import scala.collection.mutable.ArrayBuffer
class WordSource extends DataSource {
private val wordList = ArrayBuffer[String]()
private var size = 0
override def open(context: TaskContext, startTime: Instant): Unit = {
val words = Files.readLines(new File("/data/words"), Charset.defaultCharset)
for (index <- 0.until(words.size())) {
wordList += words.get(index)
}
println(wordList)
size = words.size()
println(size)
}
override def read(): Message = {
val timeStamp = System.currentTimeMillis()
val index = new Random(timeStamp).nextInt(size)
Message(wordList(index), timeStamp)
}
override def close(): Unit = {
}
override def getWatermark: Instant = {
Instant.now()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment