Skip to content

Instantly share code, notes, and snippets.

@maiha
Last active April 4, 2018 21:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save maiha/d2942659a378bd001a853c7c9b1f3491 to your computer and use it in GitHub Desktop.
Save maiha/d2942659a378bd001a853c7c9b1f3491 to your computer and use it in GitHub Desktop.
kafka-1.0-exactly-once

情報

概要

  • kafka-0.11 で導入された idempotent producer によって同一パーティション内の冪等性を保証
  • produce request v3 で導入された transaction 機能により、複数メッセージのアトミック処理を保証

チートシート(擬似コード)

- producer = Producer.new
+ producer = Producer.new(pid: Broker.findCoordinatorRequest.initPidRequest)

- producer.send_message(topic, partition, ...)
+ if producer.send_message(topic, partition, pid, sequence_no, ...)
+   producer.max_seqs[topic, partition] += 1

- broker.on_receive(msg) {|msg| broker.store_msg(msg)}
+ broker.on_receive(msg) {|msg|
+   max = broker.max_seqs[msg.pid, msg.topic, msg.partition]
+   case msg.sequence_no
+     0..max -> # ignore
+     max+1  -> broker.store_msg(msg); max+=1
+     _      -> raise "out-of-sequence"

producerの一意性

  • producer は初期化時に broker から識別子として PID を割り当てられる (PID = Producer ID)
  • producer はトピックパーティション別に シーケンス番号 を保持し、初期化時に0に設定する
  • producer はメッセージ送信が成功する度に該当トピックパーティションの シーケンス番号 を1増加させる

メッセージの一意性

  • producer はメッセージに対して トピックパーティションPIDシーケンス番号 を設定し、その3組の値で一意性を提供する

冪等性の保証

  • broker は トピックパーティションPID の組毎に最後にコミットしたメッセージの シーケンス番号 を保持しておく
  • broker は受け取ったメッセージ内の シーケンス番号 と自分が保持している シーケンス番号 を比較し、以下の処理を行う
  • +1 の値を受け取った)→ メッセージを受理する
  • <= の値を受け取った)→ メッセージを無視する(冪等性が担保)
  • +1 より大きい値を受け取った)→ out-of-sequence エラーを発生させる

トランザクション性の保証

  • 以下によって、複数のメッセージをアトミックに処理する機構を加える
  • Consumer Coordinator を導入してConsumerグループをステート管理したように、 Transaction Coordinator という概念を導入してトランザクション処理をステート管理する
  • ステートの永続化のために、コンシューマオフセット トピックと同じように、 トランザクションログ トピックを導入する
  • 同じ Consumer group 内のconsumerではオフセット値が共有されたように、同じ トランザクションログID を持つproducerではトランザクション状態を共有でき、トランザクションの再開や中止ができる
  • 有効なトランザクションを singleton にするために、 producer epoch という概念をさらに導入する

broker

設定 説明 デフォルト
transactional.id.timeout.ms トランザクションIDの有効期間 604800000 (7日)
max.transaction.timeout.ms トランザクション実行のタイムアウト 900000 (15分)
transaction.state.log.replication.factor トランザクション状態を管理するトピックのレプリカ数 3
transaction.state.log.num.partitions 同トピックのパーティション数 50
transaction.state.log.min.isr 同トピックのISR最小数 2
transaction.state.log.segment.bytes 同トピックのセグメントサイズ 104857600バイト

producer

設定 説明 デフォルト
enable.idempotence 冪等性を有効にするかどうか false
transaction.timeout.ms トランザクション実行のタイムアウト 60000 (1分)
transactional.id トランザクション実行に利用するID "" (なし)

consumer

設定 説明 デフォルト
isolation.level トランザクションの分離レベル "read_uncommitted"

サンプルコード (Scala)

val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("client.id", "x")
props.put("enable.idempotence", "true")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

val producer = new KafkaProducer[String, String](props)
val record   = new ProducerRecord[String, String]("test", "1")

producer.send(record)
producer.close

InitProducerIdRequest (v0)

  • PIDを取得するためのリクエスト (apikey: 22)
  • InitProducerId Response (v0) が返される
  • javaクライアントの場合、 enable.idempotence=true が指定されるとproduce前に自動的に実行される

パケット (リクエスト)

00000000  00 00 00 11 00 16 00 00  00 00 00 02 00 01 78 ff  |..............x.|
00000010  ff 7f ff ff ff                                    |.....|
           InitProducerIdRequest (Version: 0)
(0000000)    Int16[2](api_key) -> 22
(0000002)    Int16[2](api_version) -> 0
(0000004)    Int32[4](correlation_id) -> 2
(0000008)    String[2](client_id) -> (1)"x"
(0000011)    String[2](transactional_id) -> (null)
(0000013)    Int32[4](transaction_timeout_ms) -> 2147483647

パケット (レスポンス)

00000000  00 00 00 14 00 00 00 02  00 00 00 00 00 00 00 00  |................|
00000010  00 00 00 00 03 ed 00 00                           |........|
         InitProducerIdResponse (Version: 0)
(0000000)  Int32[4](correlation_id) -> 2
(0000004)  Int32[4](throttle_time_ms) -> 0
(0000008)  Int16[2](error_code) -> 0
(0000010)  Int64[8](producer_id) -> 1005
(0000018)  Int16[2](producer_epoch) -> 0

Produce Request (v5)

  • メッセージを送信するためのリクエスト (apikey: 0)
  • Produce Response (v5) が返される
  • v3: TransactionalId => nullableString が追加
  • webのプロトコル仕様書の record_set => RECORDS の詳細が不明

パケット (リクエスト)

00000000  00 00 00 6e 00 00 00 05  00 00 00 04 00 01 78 ff  |...n..........x.|
00000010  ff ff ff 00 00 75 30 00  00 00 01 00 04 74 65 73  |.....u0......tes|
00000020  74 00 00 00 01 00 00 00  00 00 00 00 45 00 00 00  |t...........E...|
00000030  00 00 00 00 00 00 00 00  39 ff ff ff ff 02 a7 c8  |........9.......|
00000040  47 5d 00 00 00 00 00 00  00 00 01 62 17 5b da 8b  |G].........b.[..|
00000050  00 00 01 62 17 5b da 8b  00 00 00 00 00 00 03 ed  |...b.[..........|
00000060  00 00 00 00 00 00 00 00  00 01 0e 00 00 00 01 02  |................|
00000070  31 00                                             |1.|
           ProduceRequest (Version: 5)
(0000000)    Int16[2](api_key) -> 0
(0000002)    Int16[2](api_version) -> 5
(0000004)    Int32[4](correlation_id) -> 4
(0000008)    String[2](client_id) -> (1)"x"
(0000011)    String[2](transactional_id) -> (null)
(0000013)    Int16[2](acks) -> -1
(0000015)    Int32[4](timeout) -> 30000
(0000019)    Array[4](TopicDataV5) -> 1
               TopicDataV5
(0000023)        String[2](topic) -> (4)"test"
(0000029)        Array[4](PartitionRecordSetV5) -> 1
                   PartitionRecordSetV5
(0000033)            Int32[4](partition) -> 0
                     RecordBatchV2
# この後のバイナリデータ(record_set => RECORDS 部分)が不明

パケット (レスポンス)

00000000  00 00 00 34 00 00 00 04  00 00 00 01 00 04 74 65  |...4..........te|
00000010  73 74 00 00 00 01 00 00  00 00 00 00 00 00 00 00  |st..............|
00000020  00 00 01 51 ff ff ff ff  ff ff ff ff 00 00 00 00  |...Q............|
00000030  00 00 00 00 00 00 00 00                           |........|
         ProduceResponse (Version: 5)
(0000000)  Int32[4](correlation_id) -> 4
(0000004)  Array[4](TopicPartitionResponseV5) -> 1
             TopicPartitionResponse (Version: 5)
(0000008)      String[2](topic) -> (4)"test"
(0000014)      Array[4](PartitionResponseV5) -> 1
                 PartitionResponse (Version: 5)
(0000018)          Int32[4](partition) -> 0
(0000022)          Int16[2](error_code) -> 0
(0000024)          Int64[8](base_offset) -> 337
(0000032)          Int64[8](log_append_time) -> -1
(0000040)          Int64[8](log_start_offset) -> 0
(0000048)  Int32[4](throttle_time_ms) -> 0

Fetch Request

  • FetchRequest v4 : ReplicaId MaxWaitTime MinBytes IsolationLevel [TopicName [Partition FetchOffset MaxBytes]]

パフォーマンス

  • FetchRequest v4(READ_COMMITED) でない古いアクセスがあると、broker は READ_UNCOMMITTED 分離レベルで動作する。
  • この場合、ゼロコピーができなくなり、メッセージセットの変換がオンタイムに発生するためCPU負荷が上がる。(圧縮データの場合はよりインパクト大)

概要

  • kafkaと他システムとの連携に特化したconsuemrを扱うためのフレームワーク
  • 概念的にはFlowが意識されており、source(データソース)とsink(送信先)とput(処理)によって抽象化される

リバランスに時間がかる場合

対応

  • サーバを見直す
    • connectorとbrokerを同居させない
  • 設定を見直す
    • num.standby.replicas 1にすると StandbyTasks が有効になる
    • consumer.fetch.min.bytes
    • consumer.fetch.wait.max.ms
    • fetch.min.bytes
    • fetch.max.wait.ms
    • max.poll.records の値を増やす (or consumer. prefixがない)
    • max.poll.interval.ms
    • session.timeout.ms
  • バージョンを上げる (connector側だけ上げても効果あり)
    • 0.10 リバランス可能なだけ
    • 0.11 再送処理(リバランス時に追いつく処理)がバルクで実行可能
    • 1.0 さらに高速化
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment