Created
January 28, 2016 12:36
-
-
Save kimutansk/dbdf02332f9a2c47337e to your computer and use it in GitHub Desktop.
Gearpumpの最小アプリケーションの構成は? ref: http://qiita.com/kimutansk/items/ec304a0b81dce8677d7d
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-option1 value1 -option2 value3 -flag1 -flag2 remainArg1 remainArg2... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Graph(split ~ partitioner ~> sum) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.gearpump.streaming.examples.wordcount | |
import java.util.concurrent.TimeUnit | |
import io.gearpump.streaming.task.{StartTime, Task, TaskContext} | |
import io.gearpump.Message | |
import io.gearpump.cluster.UserConfig | |
class Split(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) { | |
import taskContext.{output, self} | |
// 1. 自分自身にStartメッセージを通知。 | |
override def onStart(startTime : StartTime) : Unit = { | |
self ! Message("start") | |
} | |
// 2. 文章を単語に分割し、空文字を除去した上で下流に送信 | |
override def onNext(msg : Message) : Unit = { | |
Split.TEXT_TO_SPLIT.lines.foreach { line => | |
line.split("[\\s]+").filter(_.nonEmpty).foreach { msg => | |
output(new Message(msg, System.currentTimeMillis())) | |
} | |
} | |
// 3. 次メッセージを自分に対して送信するタスクを仕掛ける | |
import scala.concurrent.duration._ | |
taskContext.scheduleOnce(Duration(100, TimeUnit.MILLISECONDS))(self ! Message("continue", System.currentTimeMillis())) | |
} | |
} | |
object Split { | |
val TEXT_TO_SPLIT = | |
""" | |
| Licensed to the Apache Software Foundation (ASF) under one | |
| or more contributor license agreements. See the NOTICE file | |
| distributed with this work for additional information | |
| regarding copyright ownership. The ASF licenses this file | |
| to you under the Apache License, Version 2.0 (the | |
| "License"); you may not use this file except in compliance | |
| with the License. You may obtain a copy of the License at | |
| | |
| http://www.apache.org/licenses/LICENSE-2.0 | |
| | |
| Unless required by applicable law or agreed to in writing, software | |
| distributed under the License is distributed on an "AS IS" BASIS, | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| See the License for the specific language governing permissions and | |
| limitations under the License. | |
""".stripMargin | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.gearpump.streaming.examples.wordcount | |
import java.util.concurrent.TimeUnit | |
import akka.actor.Cancellable | |
import io.gearpump.streaming.task.{StartTime, Task, TaskContext} | |
import io.gearpump.Message | |
import io.gearpump.cluster.UserConfig | |
import scala.collection.mutable | |
import scala.concurrent.duration.FiniteDuration | |
class Sum (taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) { | |
private[wordcount] val map : mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]() | |
private[wordcount] var wordCount : Long = 0 | |
private var snapShotTime : Long = System.currentTimeMillis() | |
private var snapShotWordCount : Long = 0 | |
private var scheduler : Cancellable = null | |
override def onStart(startTime : StartTime) : Unit = { | |
// 1. 起動時に状態出力タスクを仕掛ける。 | |
scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS), | |
new FiniteDuration(30, TimeUnit.SECONDS))(reportWordCount) | |
} | |
override def onNext(msg : Message) : Unit = { | |
if (null == msg) { | |
return | |
} | |
// 2. 受信したメッセージから単語を取得し、総受信回数と単語ごとの受信回数をカウント | |
val current = map.getOrElse(msg.msg.asInstanceOf[String], 0L) | |
wordCount += 1 | |
map.put(msg.msg.asInstanceOf[String], current + 1) | |
} | |
// 3. Processor停止時に状態出力タスクを停止 | |
override def onStop() : Unit = { | |
if (scheduler != null) { | |
scheduler.cancel() | |
} | |
} | |
// 4. 状態出力タスクにて、前回との差分を基にスループットを算出し、出力 | |
def reportWordCount() : Unit = { | |
val current : Long = System.currentTimeMillis() | |
LOG.info(s"Task ${taskContext.taskId} Throughput: ${(wordCount - snapShotWordCount, (current - snapShotTime) / 1000)} (words, second)") | |
snapShotWordCount = wordCount | |
snapShotTime = current | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.gearpump.streaming.examples.wordcount | |
import io.gearpump.cluster.local.LocalCluster | |
import io.gearpump.streaming.{StreamApplication, Processor} | |
import io.gearpump.cluster.UserConfig | |
import io.gearpump.cluster.client.ClientContext | |
import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} | |
import io.gearpump.partitioner.HashPartitioner | |
import io.gearpump.util.Graph.Node | |
import io.gearpump.util.{AkkaApp, Graph, LogUtil} | |
import org.slf4j.Logger | |
object WordCount extends AkkaApp with ArgumentsParser { | |
private val LOG: Logger = LogUtil.getLogger(getClass) | |
val RUN_FOR_EVER = -1 | |
// 1. 起動時のCLIから読み込む項目と形式、注釈、必須/オプショナル、デフォルト値を定義 | |
override val options: Array[(String, CLIOption[Any])] = Array( | |
"split" -> CLIOption[Int]("<how many split tasks>", required = false, defaultValue = Some(1)), | |
"sum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1)) | |
) | |
def application(config: ParseResult) : StreamApplication = { | |
// 2. CLIから読み込んだ設定項目を用いてProcessorを生成 | |
val splitNum = config.getInt("split") | |
val sumNum = config.getInt("sum") | |
val split = Processor[Split](splitNum) | |
val sum = Processor[Sum](sumNum) | |
// 3. メッセージのProcessor間の割り振りを行うPartitionerを生成 | |
val partitioner = new HashPartitioner | |
// 4. ProcessorとPartitionerを用いてDAGを作成 | |
val app = StreamApplication("wordCount", Graph(split ~ partitioner ~> sum), UserConfig.empty) | |
app | |
} | |
// 5. System Properties > -Dgearpump.config.file > gear.conf > geardefault.conf > reference.conf | |
// という優先度で読み込んだ設定オブジェクトConfigを用いて初期化 | |
override def main(akkaConf: Config, args: Array[String]): Unit = { | |
val config = parse(args) | |
val localCluster = if (System.getProperty("DEBUG") != null) { | |
val cluster = new LocalCluster(akkaConf: Config) | |
cluster.start | |
Some(cluster) | |
} else { | |
None | |
} | |
val context: ClientContext = localCluster match { | |
case Some(local) => local.newClientContext | |
case None => ClientContext(akkaConf) | |
} | |
// 6. デプロイするアプリケーションを生成 | |
val app = application(config) | |
// 7. クラスタにSubmit | |
context.submit(app) | |
// 8. デプロイするプログラムを停止 | |
context.close() | |
localCluster.map(_.stop) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment