Skip to content

Instantly share code, notes, and snippets.

@pochi
Last active August 29, 2015 14:08
Show Gist options
  • Save pochi/3571b982c7d42899aa33 to your computer and use it in GitHub Desktop.
Save pochi/3571b982c7d42899aa33 to your computer and use it in GitHub Desktop.
Spark Streaming Reading

Description

Spark Streamingのサンプルコードがどのように動作しているか確認する。 なお、今回はKafkaを利用してStreamingし、Elastic Searchのインデキシングする例を考える。

前提

  • Spark Streaming: v1.1.0
  • Local environemnt on Mac OX

以下のソースコードをベースに考えますが私の環境では動いていないコードです。

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

import org.apache.spark.streaming.kafka._

object Elastic {
  def main(args: Array[String]) {
    val numThreads = 1
    val zookeeperQuorum = "localhost:2181"
    val groupId = "test-consumer-group"
    val topic = Array("test").map((_, numThreads)).toMap

    val sc = new SparkContext("local", "Elastic Search Indexer App", System.getenv("SPARK_HOME"),
      List("target/scala-2.10/elastic-search-index-project_2.10-0.0.1.jar"))
    val ssc = new StreamingContext(sc, Seconds(10))
    ssc.checkpoint("checkpoint")

    val logs = KafkaUtils.createStream(ssc, zookeeperQuorum, groupId, topic).map(_._2)
    logs.print();

    ssc.start()
    ssc.awaitTermination()

  }
}

このコードを見ていきます。

val sc = new SparkContext("local", "Elastic Search Indexer App", System.getenv("SPARK_HOME"),
    List("target/scala-2.10/elastic-search-index-project_2.10-0.0.1.jar"))

/core/src/main/scala/org/apache/spark/SparkContext.scala ソースコードは上記にありますね。 Sparkはコメントがいっぱい書いてあります。

  /**
   * Alternative constructor that allows setting common Spark properties directly
   *
   * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
   * @param appName A name for your application, to display on the cluster web UI.
   * @param sparkHome Location where Spark is installed on cluster nodes.
   * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
   *             system or HDFS, HTTP, HTTPS, or FTP URLs.
   */
  private[spark] def this(master: String, appName: String, sparkHome: String, jars: Seq[String]) =
    this(master, appName, sparkHome, jars, Map(), Map())

そしてコンストラクタの処理を再度呼び出します。

  def this(
      master: String,
      appName: String,
      sparkHome: String = null,
      jars: Seq[String] = Nil,
      environment: Map[String, String] = Map(),
      preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) =
  {
    this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment))
    this.preferredNodeLocationData = preferredNodeLocationData
  }

SparkContext.updatedConfはSparkConfインスタンスの更新を一括で行ってくれるものです。

  /**
   * Creates a modified version of a SparkConf with the parameters that can be passed separately
   * to SparkContext, to make it easier to write SparkContext's constructors. This ignores
   * parameters that are passed as the default value of null, instead of throwing an exception
   * like SparkConf would.
   */
  private[spark] def updatedConf(
      conf: SparkConf,
      master: String,
      appName: String,
      sparkHome: String = null,
      jars: Seq[String] = Nil,
      environment: Map[String, String] = Map()): SparkConf =
  {
    val res = conf.clone()
    res.setMaster(master)
    res.setAppName(appName)
    if (sparkHome != null) {
      res.setSparkHome(sparkHome)
    }
    if (jars != null && !jars.isEmpty) {
      res.setJars(jars)
    }
    res.setExecutorEnv(environment.toSeq)
    res
  }

続いて以下のコードを見ていきます。

val ssc = new StreamingContext(sc, Seconds(10))

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

ここにソースコードがあります。

  /**
   * Create a StreamingContext using an existing SparkContext.
   * @param sparkContext existing SparkContext
   * @param batchDuration the time interval at which streaming data will be divided into batches
   */
  def this(sparkContext: SparkContext, batchDuration: Duration) = {
    this(sparkContext, null, batchDuration)
  }

これを作っておくとDStream(Sparkで扱うStreamingの単位)を扱えるようになります。 次にソースコードでcheckpointを作っています。 メソッド経由で呼ぶとディレクトリ作ってくれるの便利です。

ssc.checkpoint("checkpoint")
  /**
   * Set the context to periodically checkpoint the DStream operations for driver
   * fault-tolerance.
   * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored.
   *                  Note that this must be a fault-tolerant file system like HDFS for
   */
  def checkpoint(directory: String) {
    if (directory != null) {
      val path = new Path(directory)
      val fs = path.getFileSystem(sparkContext.hadoopConfiguration)
      fs.mkdirs(path)
      val fullPath = fs.getFileStatus(path).getPath().toString
      sc.setCheckpointDir(fullPath)
      checkpointDir = fullPath
    } else {
      checkpointDir = null
    }
  }

次にKafkaUtilsでDStreamを生成する部分を見ていきます。

val logs = KafkaUtils.createStream(ssc, zookeeperQuorum, groupId, topic).map(_._2)
  /**
   * Create an input stream that pulls messages from a Kafka Broker.
   * @param ssc       StreamingContext object
   * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..)
   * @param groupId   The group id for this consumer
   * @param topics    Map of (topic_name -> numPartitions) to consume. Each partition is consumed
   *                  in its own thread
   * @param storageLevel  Storage level to use for storing the received objects
   *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
   */
  def createStream(
      ssc: StreamingContext,
      zkQuorum: String,
      groupId: String,
      topics: Map[String, Int],
      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
    ): ReceiverInputDStream[(String, String)] = {
    val kafkaParams = Map[String, String](
      "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
      "zookeeper.connection.timeout.ms" -> "10000")
    createStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topics, storageLevel)
  }

  /**
   * Create an input stream that pulls messages from a Kafka Broker.
   * @param ssc         StreamingContext object
   * @param kafkaParams Map of kafka configuration parameters,
   *                    see http://kafka.apache.org/08/configuration.html
   * @param topics      Map of (topic_name -> numPartitions) to consume. Each partition is consumed
   *                    in its own thread.
   * @param storageLevel Storage level to use for storing the received objects
   */
  def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](
      ssc: StreamingContext,
      kafkaParams: Map[String, String],
      topics: Map[String, Int],
      storageLevel: StorageLevel
    ): ReceiverInputDStream[(K, V)] = {
    new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
  }

このKafkaInputDStreamがReceiverInputDStreamを継承しているのでさらにみていきます。 ここでチェックしておいたほうがいいのがStorageLevel。 これがデフォルトだと性能重視でメモリにしか書き出さないようになっているので、 デバックしたいときなどは引数に、StorageLevel.MEMORY_AND_DISK_SERを渡すといい。

abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
  extends InputDStream[T](ssc_) {

  /** Keeps all received blocks information */
  private lazy val receivedBlockInfo = new HashMap[Time, Array[ReceivedBlockInfo]]

  /** This is an unique identifier for the network input stream. */
  val id = ssc.getNewReceiverStreamId()
}

StreamingContextからgetNewReceiverStreamId()を呼び出してReceiverに登録しているように見えます。 実際のSparkContextのメソッドを見てみましょう。

  private[streaming] def getNewReceiverStreamId() = nextReceiverInputStreamId.getAndIncrement()

nextReceiverInputStreamIdを見てみます。

  private val nextReceiverInputStreamId = new AtomicInteger(0)

ふむー、番号ふっていただけのようですね。。。 さらにInputDStreamを継承しているので見ていきます。

  ssc.graph.addInputStream(this)

ssc.graphはDStreamGraphオブジェクトだったはずなのでその中を見に行きます。

  def addInputStream(inputStream: InputDStream[_]) {
    this.synchronized {
      inputStream.setGraph(this)
      inputStreams += inputStream
    }
  }

これを見るとinputStreamsがinputDStreamをもった配列なので一つのSparkContextが管理する inputStreamに登録していることになりますね。 さらに継承しているDStreamは特に初期処理なにもしていないのでここで一旦KafkaUtils.createStreamは終了です。 これでSparkContextが監視する入力が登録できたということですね。 次はとうとう処理を初めて行きます。

ssc.start()
  /**
   * Start the execution of the streams.
   */
  def start(): Unit = synchronized {
    // Throw exception if the context has already been started once
    // or if a stopped context is being started again
    if (state == Started) {
      throw new SparkException("StreamingContext has already been started")
    }
    if (state == Stopped) {
      throw new SparkException("StreamingContext has already been stopped")
    }
    validate()
    scheduler.start()
    state = Started
  }

ふむふむ、validateしてschedulerをスタートして状態を変更しているようです。 まずはvalidateを見てみます。

  private def validate() {
    assert(graph != null, "Graph is null")
    graph.validate()

    assert(
      checkpointDir == null || checkpointDuration != null,
      "Checkpoint directory has been set, but the graph checkpointing interval has " +
        "not been set. Please use StreamingContext.checkpoint() to set the interval."
    )
  }

graphはDStreamを表すオブジェクトのようです。 下半分はcheckpointDirが設定されていること、checkpointDurationが設定されていないことをチェックしています。

  private[streaming] val graph: DStreamGraph = {
    if (isCheckpointPresent) {
      cp_.graph.setContext(this)
      cp_.graph.restoreCheckpointData()
      cp_.graph
    } else {
      assert(batchDur_ != null, "Batch duration for streaming context cannot be null")
      val newGraph = new DStreamGraph()
      newGraph.setBatchDuration(batchDur_)
      newGraph
    }
  }

checkpointに関する設定があればそのオブジェクトに自身をセットして なければDStreamGraphを作ってbatchDur_をセットしているようです。 cp_はinitializeのタイミングで入れてないので多分else節が呼ばれていると思っています。 このbatchDur_は今回の例で言うとSeconds(10)になります。

つづいて、scheduler.start()です。

  private[streaming] val scheduler = new JobScheduler(this)

JobSchedulerに自身を設定しています。 このクラスは streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala にあります。

このクラスは streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala にあります。

  def start(): Unit = synchronized {
    if (eventActor != null) return // scheduler has already been started

    logDebug("Starting JobScheduler")
    eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
      def receive = {
        case event: JobSchedulerEvent => processEvent(event)
      }
    }), "JobScheduler")

    listenerBus.start()
    receiverTracker = new ReceiverTracker(ssc)
    receiverTracker.start()
    jobGenerator.start()
    logInfo("Started JobScheduler")
  }

ssc.envはSparkEnvインスタンスが入っていてactorSystemはAkkaが選択されます。 (正確にはAkkaUtils.createActorSystemでAkka周りの設定を行ったインスタンス。 ここではとりあえずこれ以上追わないことにします) つまりeventActorにはイベント処理を定義していることになります。 processEventに処理を委譲しているようです。

  private def processEvent(event: JobSchedulerEvent) {
    try {
      event match {
        case JobStarted(job) => handleJobStart(job)
        case JobCompleted(job) => handleJobCompletion(job)
        case ErrorReported(m, e) => handleError(m, e)
      }
    } catch {
      case e: Throwable =>
        reportError("Error in job scheduler", e)
    }
  }

このprocessEventでは処理の始まり、終了、エラーを判定してそれぞれのハンドラに渡しています。 それぞれを簡単に追っておきます。

  private def handleJobStart(job: Job) {
    val jobSet = jobSets.get(job.time)
    if (!jobSet.hasStarted) {
      listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo))
    }
    jobSet.handleJobStart(job)
    logInfo("Starting job " + job.id + " from job set of time " + jobSet.time)
    SparkEnv.set(ssc.env)
  }

jobSesとjobSetという見慣れない関数が呼ばれています。

  private val jobSets = new ConcurrentHashMap[Time, JobSet]

時間とJobSetを持ったHashMapがjobSetsでした。

handleJobStartではjobSetsからgetしているのでどこかでJobSetをputしていることになります。 putはsubmitJobSetという部分でやっています。どこで呼んでいるんだろう。。。

JobSetはJobを管理するクラスのようです。主に経過時間やジョブのステータスを管理しています。 ここではJobSetはあまり深く触れないものとします。

JobクラスはSparkで実際に実行されるクラスのように見えます。すごくシンプルで自分が誰かを 管理するのと引数でもらった関数を実行するのがメインの仕事のようです。

さて、ここでJobScheduler#startに戻ることにします。 eventActorを指定した後今度はlistenerBus.startを呼んでいます。 そもそもlisternerBusはStreamingListernerBusクラスのインスタンスのようです。

  val listenerBus = new StreamingListenerBus()

このクラスはStreamingListenersにStreamingListenerEventsを渡してあげるんだとコメントに書いています。

/** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */

さて、startメソッドはどういう実装になっているかというととてもシンプルです。

  def start() {
    listenerThread.start()
  }

listenerThreadってのがメインみたいです。

  val listenerThread = new Thread("StreamingListenerBus") {
    setDaemon(true)
    override def run() {
      while (true) {
        val event = eventQueue.take
        event match {
          case receiverStarted: StreamingListenerReceiverStarted =>
            listeners.foreach(_.onReceiverStarted(receiverStarted))
          case receiverError: StreamingListenerReceiverError =>
            listeners.foreach(_.onReceiverError(receiverError))
          case receiverStopped: StreamingListenerReceiverStopped =>
            listeners.foreach(_.onReceiverStopped(receiverStopped))
          case batchSubmitted: StreamingListenerBatchSubmitted =>
            listeners.foreach(_.onBatchSubmitted(batchSubmitted))
          case batchStarted: StreamingListenerBatchStarted =>
            listeners.foreach(_.onBatchStarted(batchStarted))
          case batchCompleted: StreamingListenerBatchCompleted =>
            listeners.foreach(_.onBatchCompleted(batchCompleted))
          case StreamingListenerShutdown =>
            // Get out of the while loop and shutdown the daemon thread
            return
          case _ =>
        }
      }
    }
  }

Scala読んだことほとんどないのでちょっとつらくなってきました。 でもやっているのは多分以下のことです。

  • Threadを立ち上げてエンドレスにぶん回す
  • eventQueueからeventを取得する
  • listenersにそれぞれイベントを投げる

それではeventQueueとlistenersをそれぞれ見ていきます。

  private val EVENT_QUEUE_CAPACITY = 10000
  private val eventQueue = new LinkedBlockingQueue[StreamingListenerEvent](EVENT_QUEUE_CAPACITY)

上限10000個のキューを持っています。 ここでStreamingListenerEventクラスを引数にとっていて多分ここがKafkaのログにあたるんじゃないかと思っています。 listenersはStreamingListenerを管理するもののようです。 なのでlistenerBusをstartするということは登録してあるeventListerたちにイベントがあったらどんどん受け渡すことを 始めることを意味しています。

次にJobScheduler#startで呼ばれるのが以下の部分です。

    receiverTracker = new ReceiverTracker(ssc)
    receiverTracker.start()

コメントを見ると、NetworkInputDStreamsの管理をするクラスだと記載されています。

  def start() = synchronized {
    if (actor != null) {
      throw new SparkException("ReceiverTracker already started")
    }

    if (!receiverInputStreams.isEmpty) {
      actor = ssc.env.actorSystem.actorOf(Props(new ReceiverTrackerActor),
        "ReceiverTracker")
      receiverExecutor.start()
      logInfo("ReceiverTracker started")
    }
  }

ここではAkkaにReceiverTrackerActorを登録して、receiverExecutorをstartしています。 ReceiverTrackerActorは以下の動きをします。

  private class ReceiverTrackerActor extends Actor {
    def receive = {
      case RegisterReceiver(streamId, typ, host, receiverActor) =>
        registerReceiver(streamId, typ, host, receiverActor, sender)
        sender ! true
      case AddBlock(receivedBlockInfo) =>
        addBlocks(receivedBlockInfo)
      case ReportError(streamId, message, error) =>
        reportError(streamId, message, error)
      case DeregisterReceiver(streamId, message, error) =>
        deregisterReceiver(streamId, message, error)
        sender ! true
    }
  }

Receiverからメッセージを受け取るためのActorのようです。 一回ここでの受信メッセージについてはおいておきます。 receiveExecutorは実質ReceiveLancherが呼ばれています。 ReceiveLauncherはクラスタ上のReceiver全てが呼び出すものになっています。

  val receiverExecutor = new ReceiverLauncher()

ReceiverLauncher#startは以下のような実装になっています。

    def start() {
      thread.start()
    }

またthreadですね。中身は以下のようになっています。

    @transient val thread  = new Thread() {
      override def run() {
        try {
          SparkEnv.set(env)
          startReceivers()
        } catch {
          case ie: InterruptedException => logInfo("ReceiverLauncher interrupted")
        }
      }
    }

envはSparckContextのenvなので環境変数を共有しているイメージでしょうか。 startReceiversは以下のような実装です。

    /**
     * Get the receivers from the ReceiverInputDStreams, distributes them to the
     * worker nodes as a parallel collection, and runs them.
     */
    private def startReceivers() {
      val receivers = receiverInputStreams.map(nis => {
        val rcvr = nis.getReceiver()
        rcvr.setReceiverId(nis.id)
        rcvr
      })

      // Right now, we only honor preferences if all receivers have them
      val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)

      // Create the parallel collection of receivers to distributed them on the worker nodes
      val tempRDD =
        if (hasLocationPreferences) {
          val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
          ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
        } else {
          ssc.sc.makeRDD(receivers, receivers.size)
        }

      // Function to start the receiver on the worker node
      val startReceiver = (iterator: Iterator[Receiver[_]]) => {
        if (!iterator.hasNext) {
          throw new SparkException(
            "Could not start receiver as object not found.")
        }
        val receiver = iterator.next()
        val executor = new ReceiverSupervisorImpl(receiver, SparkEnv.get)
        executor.start()
        executor.awaitTermination()
      }
      // Run the dummy Spark job to ensure that all slaves have registered.
      // This avoids all the receivers to be scheduled on the same node.
      if (!ssc.sparkContext.isLocal) {
        ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
      }

      // Distribute the receivers and start them
      logInfo("Starting " + receivers.length + " receivers")
      ssc.sparkContext.runJob(tempRDD, startReceiver)
      logInfo("All of the receivers have been terminated")
    }

長い。。。コメントを見るとReceiverInputDStreamsから受け取ってそれらをノードに渡すんだといっています。 大事そうなので細かくみていきます。 まずはreceiversから。

      val receivers = receiverInputStreams.map(nis => {
        val rcvr = nis.getReceiver()
        rcvr.setReceiverId(nis.id)
        rcvr
      })

receiverInputStreamsはクラス初期化の際にssc.graph.getReceiverInputStreams()が代入されます。 最初のほうにssc.graphはDStreamGraphをインスタンス化したものが代入されていました。

  def getReceiverInputStreams() = this.synchronized {
    inputStreams.filter(_.isInstanceOf[ReceiverInputDStream[_]])
      .map(_.asInstanceOf[ReceiverInputDStream[_]])
      .toArray
  }

ということでinputDStreamの配列が帰ってくるイメージです。 次にmapの中で呼ばれているnis.getReceiver()ですがこれは入力ソースによって違ってくるものです。 KafkaInputDStreamを利用する場合KafkaReceiverのインスタンスがかえります。

  def getReceiver(): Receiver[(K, V)] = {
    new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
        .asInstanceOf[Receiver[(K, V)]]
  }

ここで次のreceiveTrackerの処理に進みます。

      // Right now, we only honor preferences if all receivers have them
      val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)

ここではもし各Receiverは自分の好きなホストを指定していればそこを受け取るようにしています。 今回はメインの流れとは関係しないためスキップします。(preferredLocationはKafkaReceiverの親クラスであるReceiverクラスでオーバーライドできるようになっています)

次。

      // Create the parallel collection of receivers to distributed them on the worker nodes
      val tempRDD =
        if (hasLocationPreferences) {
          val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
          ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
        } else {
          ssc.sc.makeRDD(receivers, receivers.size)
        }

コメントを見るとワーカーノードにあるReceiverたちに配布するためのコレクションを作るとしています。 ここでStreamingContextが持つSpackContextに対してRDDをつくるよう指示しています。

  /** Distribute a local Scala collection to form an RDD.
   *
   * This method is identical to `parallelize`.
   */
  def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
    parallelize(seq, numSlices)
  }
  
   /** Distribute a local Scala collection to form an RDD.
   *
   * @note Parallelize acts lazily. If `seq` is a mutable collection and is
   * altered after the call to parallelize and before the first action on the
   * RDD, the resultant RDD will reflect the modified collection. Pass a copy of
   * the argument to avoid this.
   */
  def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
  }
  

Receiverの数分並列にRDDの形でデータを渡すようにしています。 ParallelCollectionRDDはまた後で見ます。 次にいきます。

      // Function to start the receiver on the worker node
      val startReceiver = (iterator: Iterator[Receiver[_]]) => {
        if (!iterator.hasNext) {
          throw new SparkException(
            "Could not start receiver as object not found.")
        }
        val receiver = iterator.next()
        val executor = new ReceiverSupervisorImpl(receiver, SparkEnv.get)
        executor.start()
        executor.awaitTermination()
      }

各Receiverに対してstartよんでるんだとおもうんだけど、あくまで関数として保存しているだけ。 また後で呼ばれるんだろうということだがreceiverに対してReceiverSupervisorImplしてるとこは見ておく。 と思ったけど実装が長いのであとで必要なとこだけ見てみる。 やってるのはAkkaの振る舞いとかの実装。ここで見るのはexecutor.start()とexecutor.awaitTermination()とする。

  /** Start the supervisor */
  def start() {
    onStart()
    startReceiver()
  }
  
  override protected def onStart() {
    blockGenerator.start()
  }

blockGeneratorは後で見ます。 もう一つのstartReceiverメソッドはReciverSupervisor.scalaにあります。

  /** Start receiver */
  def startReceiver(): Unit = synchronized {
    try {
      logInfo("Starting receiver")
      receiver.onStart()
      logInfo("Called receiver onStart")
      onReceiverStart()
      receiverState = Started
    } catch {
      case t: Throwable =>
        stop("Error starting receiver " + streamId, Some(t))
    }
  }

ここでonStartが呼ばれるがReceiverインスタンスはKafkaReceiverなのでそこにあるonStart実装を見てみる。

  def onStart() {

    logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))

    // Kafka connection properties
    val props = new Properties()
    kafkaParams.foreach(param => props.put(param._1, param._2))

    val zkConnect = kafkaParams("zookeeper.connect")
    // Create the connection to the cluster
    logInfo("Connecting to Zookeeper: " + zkConnect)
    val consumerConfig = new ConsumerConfig(props)
    consumerConnector = Consumer.create(consumerConfig)
    logInfo("Connected to " + zkConnect)

    // When auto.offset.reset is defined, it is our responsibility to try and whack the
    // consumer group zk node.
    if (kafkaParams.contains("auto.offset.reset")) {
      tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id"))
    }

    val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
      .newInstance(consumerConfig.props)
      .asInstanceOf[Decoder[K]]
    val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
      .newInstance(consumerConfig.props)
      .asInstanceOf[Decoder[V]]

    // Create Threads for each Topic/Message Stream we are listening
    val topicMessageStreams = consumerConnector.createMessageStreams(
      topics, keyDecoder, valueDecoder)

    val executorPool = Executors.newFixedThreadPool(topics.values.sum)
    try {
      // Start the messages handler for each partition
      topicMessageStreams.values.foreach { streams =>
        streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
      }
    } finally {
      executorPool.shutdown() // Just causes threads to terminate after work is done
    }
  }

ZooKeeperにつなげてがんばってそうです。 さらにstartReceiverのなかでonReceiverStartをよんでいるので実装をみてみます。

  override protected def onReceiverStart() {
    val msg = RegisterReceiver(
      streamId, receiver.getClass.getSimpleName, Utils.localHostName(), actor)
    val future = trackerActor.ask(msg)(askTimeout)
    Await.result(future, askTimeout)
  }

はっ、とうとうここでResigerReceiverしています! RegisterReceiverはReceiverTrackerActorの中で持っているAkkaイベントでここで送り先を登録していそうです。

さて、ここまででやっとreceiverExecutorにどうやってreceiverが登録されるかがわかりました。 今見てきたメソッドはまだ呼ばれていませんが次の処理で呼ばれるようになります。

      // Distribute the receivers and start them
      logInfo("Starting " + receivers.length + " receivers")
      ssc.sparkContext.runJob(tempRDD, startReceiver)
      logInfo("All of the receivers have been terminated")

Receiverを実際に動かす部分です。

  /**
   * Run a job on all partitions in an RDD and return the results in an array.
   */
  def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
    runJob(rdd, func, 0 until rdd.partitions.size, false)
  }
  
  /**
   * Run a function on a given set of partitions in an RDD and pass the results to the given
   * handler function. This is the main entry point for all actions in Spark. The allowLocal
   * flag specifies whether the scheduler can run the computation on the driver rather than
   * shipping it out to the cluster, for short actions like first().
   */
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      allowLocal: Boolean,
      resultHandler: (Int, U) => Unit) {
    if (dagScheduler == null) {
      throw new SparkException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    val start = System.nanoTime
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
      resultHandler, localProperties.get)
    logInfo(
      "Job finished: " + callSite.shortForm + ", took " + (System.nanoTime - start) / 1e9 + " s")
    rdd.doCheckpoint()
  }

ここでdagschedulerから実行してるぽい。

すごく長い道のりでしたが、JobScheduler#startでまだ最後に呼ばれるものがあります。

    jobGenerator.start()

これで一応最後です。 まずはjobGeneratorが何なのかというところから。

  private val jobGenerator = new JobGenerator(this)

ということでJobGeneratorクラスを見に行きます。 初期化するときにJobSchedulerからenvとかgraphとかをいろいろ設定しておきます。

  /** Start generation of jobs */
  def start(): Unit = synchronized {
    if (eventActor != null) return // generator has already been started

    eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
      def receive = {
        case event: JobGeneratorEvent =>  processEvent(event)
      }
    }), "JobGenerator")
    if (ssc.isCheckpointPresent) {
      restart()
    } else {
      startFirstTime()
    }
  }

コメントにはここでjobたちを生成しているとの意味深なコメントが。 ぱっとみるとJobGeneratorのAkkaがいて、次にsscの状態によってrestartかstartFirsttimeが呼ばれています。 ちなみにJobGenerator Akkaは以下の処理をしてくれるみたい。

  • GenerateJobs
  • ClearMetadata
  • DoCheckpoint
  • ClearCheckpintData
  /** Starts the generator for the first time */
  private def startFirstTime() {
    val startTime = new Time(timer.getStartTime())
    graph.start(startTime - graph.batchDuration)
    timer.start(startTime.milliseconds)
    logInfo("Started JobGenerator at " + startTime)
  }

  /** Restarts the generator based on the information in checkpoint */
  private def restart() {
    // If manual clock is being used for testing, then
    // either set the manual clock to the last checkpointed time,
    // or if the property is defined set it to that time
    if (clock.isInstanceOf[ManualClock]) {
      val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
      val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0)
      clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
    }

    val batchDuration = ssc.graph.batchDuration

    // Batches when the master was down, that is,
    // between the checkpoint and current restart time
    val checkpointTime = ssc.initialCheckpoint.checkpointTime
    val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds))
    val downTimes = checkpointTime.until(restartTime, batchDuration)
    logInfo("Batches during down time (" + downTimes.size + " batches): "
      + downTimes.mkString(", "))

    // Batches that were unprocessed before failure
    val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering)
    logInfo("Batches pending processing (" + pendingTimes.size + " batches): " +
      pendingTimes.mkString(", "))
    // Reschedule jobs for these times
    val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering)
    logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " +
      timesToReschedule.mkString(", "))
    timesToReschedule.foreach(time =>
      jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time)))
    )

    // Restart the timer
    timer.start(restartTime.milliseconds)
    logInfo("Restarted JobGenerator at " + restartTime)
  }

初回時はgraph.startをメインでやっているようですね。 graphは確かDStreamGraphだったので、、

  def start(time: Time) {
    this.synchronized {
      if (zeroTime != null) {
        throw new Exception("DStream graph computation already started")
      }
      zeroTime = time
      startTime = time
      outputStreams.foreach(_.initialize(zeroTime))
      outputStreams.foreach(_.remember(rememberDuration))
      outputStreams.foreach(_.validate)
      inputStreams.par.foreach(_.start())
    }
  }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment