Skip to content

Instantly share code, notes, and snippets.

@kimutansk
Created January 28, 2016 12:36
Show Gist options
  • Save kimutansk/dbdf02332f9a2c47337e to your computer and use it in GitHub Desktop.
Save kimutansk/dbdf02332f9a2c47337e to your computer and use it in GitHub Desktop.
Gearpumpの最小アプリケーションの構成は? ref: http://qiita.com/kimutansk/items/ec304a0b81dce8677d7d
-option1 value1 -option2 value3 -flag1 -flag2 remainArg1 remainArg2...
Graph(split ~ partitioner ~> sum)
package io.gearpump.streaming.examples.wordcount
import java.util.concurrent.TimeUnit
import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
import io.gearpump.Message
import io.gearpump.cluster.UserConfig
class Split(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
import taskContext.{output, self}
// 1. 自分自身にStartメッセージを通知。
override def onStart(startTime : StartTime) : Unit = {
self ! Message("start")
}
// 2. 文章を単語に分割し、空文字を除去した上で下流に送信
override def onNext(msg : Message) : Unit = {
Split.TEXT_TO_SPLIT.lines.foreach { line =>
line.split("[\\s]+").filter(_.nonEmpty).foreach { msg =>
output(new Message(msg, System.currentTimeMillis()))
}
}
// 3. 次メッセージを自分に対して送信するタスクを仕掛ける
import scala.concurrent.duration._
taskContext.scheduleOnce(Duration(100, TimeUnit.MILLISECONDS))(self ! Message("continue", System.currentTimeMillis()))
}
}
object Split {
val TEXT_TO_SPLIT =
"""
| Licensed to the Apache Software Foundation (ASF) under one
| or more contributor license agreements. See the NOTICE file
| distributed with this work for additional information
| regarding copyright ownership. The ASF licenses this file
| to you under the Apache License, Version 2.0 (the
| "License"); you may not use this file except in compliance
| with the License. You may obtain a copy of the License at
|
| http://www.apache.org/licenses/LICENSE-2.0
|
| Unless required by applicable law or agreed to in writing, software
| distributed under the License is distributed on an "AS IS" BASIS,
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
| See the License for the specific language governing permissions and
| limitations under the License.
""".stripMargin
}
package io.gearpump.streaming.examples.wordcount
import java.util.concurrent.TimeUnit
import akka.actor.Cancellable
import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
import io.gearpump.Message
import io.gearpump.cluster.UserConfig
import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration
class Sum (taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
private[wordcount] val map : mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]()
private[wordcount] var wordCount : Long = 0
private var snapShotTime : Long = System.currentTimeMillis()
private var snapShotWordCount : Long = 0
private var scheduler : Cancellable = null
override def onStart(startTime : StartTime) : Unit = {
// 1. 起動時に状態出力タスクを仕掛ける。
scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
new FiniteDuration(30, TimeUnit.SECONDS))(reportWordCount)
}
override def onNext(msg : Message) : Unit = {
if (null == msg) {
return
}
// 2. 受信したメッセージから単語を取得し、総受信回数と単語ごとの受信回数をカウント
val current = map.getOrElse(msg.msg.asInstanceOf[String], 0L)
wordCount += 1
map.put(msg.msg.asInstanceOf[String], current + 1)
}
// 3. Processor停止時に状態出力タスクを停止
override def onStop() : Unit = {
if (scheduler != null) {
scheduler.cancel()
}
}
// 4. 状態出力タスクにて、前回との差分を基にスループットを算出し、出力
def reportWordCount() : Unit = {
val current : Long = System.currentTimeMillis()
LOG.info(s"Task ${taskContext.taskId} Throughput: ${(wordCount - snapShotWordCount, (current - snapShotTime) / 1000)} (words, second)")
snapShotWordCount = wordCount
snapShotTime = current
}
}
package io.gearpump.streaming.examples.wordcount
import io.gearpump.cluster.local.LocalCluster
import io.gearpump.streaming.{StreamApplication, Processor}
import io.gearpump.cluster.UserConfig
import io.gearpump.cluster.client.ClientContext
import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
import io.gearpump.partitioner.HashPartitioner
import io.gearpump.util.Graph.Node
import io.gearpump.util.{AkkaApp, Graph, LogUtil}
import org.slf4j.Logger
object WordCount extends AkkaApp with ArgumentsParser {
private val LOG: Logger = LogUtil.getLogger(getClass)
val RUN_FOR_EVER = -1
// 1. 起動時のCLIから読み込む項目と形式、注釈、必須/オプショナル、デフォルト値を定義
override val options: Array[(String, CLIOption[Any])] = Array(
"split" -> CLIOption[Int]("<how many split tasks>", required = false, defaultValue = Some(1)),
"sum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1))
)
def application(config: ParseResult) : StreamApplication = {
// 2. CLIから読み込んだ設定項目を用いてProcessorを生成
val splitNum = config.getInt("split")
val sumNum = config.getInt("sum")
val split = Processor[Split](splitNum)
val sum = Processor[Sum](sumNum)
// 3. メッセージのProcessor間の割り振りを行うPartitionerを生成
val partitioner = new HashPartitioner
// 4. ProcessorとPartitionerを用いてDAGを作成
val app = StreamApplication("wordCount", Graph(split ~ partitioner ~> sum), UserConfig.empty)
app
}
// 5. System Properties > -Dgearpump.config.file > gear.conf > geardefault.conf > reference.conf
// という優先度で読み込んだ設定オブジェクトConfigを用いて初期化
override def main(akkaConf: Config, args: Array[String]): Unit = {
val config = parse(args)
val localCluster = if (System.getProperty("DEBUG") != null) {
val cluster = new LocalCluster(akkaConf: Config)
cluster.start
Some(cluster)
} else {
None
}
val context: ClientContext = localCluster match {
case Some(local) => local.newClientContext
case None => ClientContext(akkaConf)
}
// 6. デプロイするアプリケーションを生成
val app = application(config)
// 7. クラスタにSubmit
context.submit(app)
// 8. デプロイするプログラムを停止
context.close()
localCluster.map(_.stop)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment