Skip to content

Instantly share code, notes, and snippets.

@ocadaruma
Created April 15, 2016 04:23
Show Gist options
  • Save ocadaruma/2391715d80f8bfe681269f87db1ad78c to your computer and use it in GitHub Desktop.
Save ocadaruma/2391715d80f8bfe681269f87db1ad78c to your computer and use it in GitHub Desktop.

class: center, middle

ふつうのSparkプログラミング

OptTechnologies 社内勉強会 2016/04/15

テクノロジー開発2部 岡田 遥来


目次

  1. 分散並列処理について
  2. Sparkについて
  3. 具体例(アクセスログのセッション化)

分散並列処理の必要性

ある処理の実行時間を短縮したいとき

マシンの増強 (スケールアップ)

  • DBへのアクセスがm回あって、それぞれ通信にn秒かかる場合、どんなに頑張ってもm×n秒かかる
  • ハイスペックなマシンは高い
  • 限界がある

処理を分割し、複数マシンで実行する (スケールアウト)


例: WordCount

テキストファイルに含まれる単語と、その出現頻度を数える

INPUT

Lorem ipsum dolor sit amet, consectetur adipiscing elit.
Quisque consectetur euismod hendrerit.
Suspendisse quis posuere justo, et hendrerit ipsum.
Cras porttitor metus ac libero vehicula, nec congue lectus consequat.
....

OUTPUT

Map[String, Int] // Map(cursus -> 103372, eros -> 119604, tincidunt -> 196561,...

WordCount: シーケンシャル

import collection.{mutable => mu}
val histogram: mu.Map[String, Int] = mu.HashMap.empty

io.Source.fromFile("/path/to/text").getLines().foreach { line =>
  line.replaceAll("[,.]", "").split(" ").map(_.toLowerCase.trim).foreach { word =>
    val i = histogram.getOrElseUpdate(word, 0)
    histogram(word) = i + 1
  }
}

WordCount: マルチスレッド

import collection.{mutable => mu}
import collection.{concurrent => con}
val histogram: mu.Map[String, Int] = con.TrieMap.empty

io.Source.fromFile("/path/to/text").getLines().toStream.par.foreach { line =>
  line.replaceAll("[,.]", "").split(" ").map(_.toLowerCase.trim).foreach { word =>
    val i = histogram.getOrElseUpdate(word, 0)
    histogram(word) = i + 1
  }
}

WordCount: 複数マシン

???

分散並列処理は難しい

  • そもそもロジックが並列化しづらい
    • 円周率やGCD計算などの普通の計算をどう並列化するのか
    • データセットに対する処理の場合、一定の単位で区切って振り分ける
  • ワーカー間のデータ転送
  • 各ワーカーへの処理の割り当て & 結果の集約
  • あるワーカーが落ちたときのリトライ処理
    • 台数が増えるということは故障確率も増える

Sparkとは

  • Scalaで実装された分散並列処理フレームワーク
  • Apacheトップレベルプロジェクトの一つ
  • Standaloneクラスタや、YARN, Mesosクラスタ上で動作
  • マネージドサービスもある
    • AWS: ElasticMapReduce
    • GCP: Cloud Dataproc

Hadoopとの違い

  • インメモリ処理を基本とする
  • RDDに定義された高階関数(map, filter,...)を使って処理を記述
    • Scalaのコレクション操作をする要領でロジックを書けば、勝手に分散並列化されるイメージ

RDDとは

  • Resilient Distributed Dataset (耐障害性分散データセット)
  • 遅延評価されるimmutableなScalaのコレクションのように作られている
    • map, filter, flatMap, groupByなどは "Transform"
      • これだけを書いても実行されない。RDDを生成する
    • foreach, count, saveAsTextFileなどは "Action"
      • 実行され、結果を生成する

例: アクセスログのセッション化

INPUT

タイムスタンプ IPアドレス

2016-04-15T00:00:01 203.0.113.0
2016-04-15T00:00:22 203.0.113.42
2016-04-15T00:05:01 203.0.113.2
2016-04-15T00:28:01 203.0.113.0
...

OUTPUT

それぞれのIPアドレスについて、30分以上間の空いてないアクセスの塊にまとめる。

IPアドレス アクセス回数 滞在時間(秒)

203.0.113.0 2 1680
203.0.113.42 1 0
203.0.113.2 1 0
...

実装例

case class Access(timestamp: DateTime, ipAddress: String)
case class Session(ipAddress: String, accessCount: Int, duration: Long)

object Main {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext()
    val accessLogs: RDD[String] = sc.textFile("/path/to/access_log/*")
    accessLogs
      .map { line =>
        val fields = line.split(' ')
        Access(DateTime.parse(fields(0)), fields(1))
      }
      .groupBy(_.ipAddress)
      .flatMap { case (ipAddress, accesses) =>
        val sorted = accesses.toList.sortBy(_.timestamp)
        sessionize(sorted)
      }
      .map(s => s"${s.ipAddress} ${s.accessCount} ${s.duration}")
      .saveAsTextFile("/path/to/output")
  }
  
  def sessionize(sortedAccesses: Seq[Access]): Seq[Session] = ???
}
  

デモ


注意点

  • Transformは複数回実行されることがあるため、外部へI/Oを行う場合は冪等にする必要がある
    • データ欠損時のリトライや、RDD#cache() しないでRDDを使い回す場合
  • 2016/4/15現在、Spark公式で配布されているバイナリはScala 2.10でビルドされている
    • Scala 2.11で書いたアプリが動かない
    • Spark on EMRも同様

まとめ

  • 分散並列処理を書く際には検討の価値あり
  • 以下のような場合は、分散シェルやタスクキュー等、素朴なツールでも十分と思う
    • データセット全体でキーごとにgroupingする、などのシャッフル処理がない
    • 各マシンで処理した結果を集計する必要がない
    • 「バカパラ」と呼ぶらしい (@shunsukeaihara さん談。ネガティブな意味ではない)

参考文献

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment