val props = new Properties ()
props.put(" bootstrap.servers" , " localhost:9092" )
props.put(" client.id" , " x" )
props.put(" enable.idempotence" , " true" )
props.put(" key.serializer" , " org.apache.kafka.common.serialization.StringSerializer" )
props.put(" value.serializer" , " org.apache.kafka.common.serialization.StringSerializer" )
val producer = new KafkaProducer [String , String ](props)
val record = new ProducerRecord [String , String ](" test" , " 1" )
producer.send(record)
producer.close
InitProducerIdRequest (v0)
PIDを取得するためのリクエスト (apikey: 22)
InitProducerId Response (v0)
が返される
javaクライアントの場合、 enable.idempotence=true
が指定されるとproduce前に自動的に実行される
00000000 00 00 00 11 00 16 00 00 00 00 00 02 00 01 78 ff |..............x.|
00000010 ff 7f ff ff ff |.....|
InitProducerIdRequest (Version: 0)
(0000000) Int16[2](api_key) -> 22
(0000002) Int16[2](api_version) -> 0
(0000004) Int32[4](correlation_id) -> 2
(0000008) String[2](client_id) -> (1)"x"
(0000011) String[2](transactional_id) -> (null)
(0000013) Int32[4](transaction_timeout_ms) -> 2147483647
00000000 00 00 00 14 00 00 00 02 00 00 00 00 00 00 00 00 |................|
00000010 00 00 00 00 03 ed 00 00 |........|
InitProducerIdResponse (Version: 0)
(0000000) Int32[4](correlation_id) -> 2
(0000004) Int32[4](throttle_time_ms) -> 0
(0000008) Int16[2](error_code) -> 0
(0000010) Int64[8](producer_id) -> 1005
(0000018) Int16[2](producer_epoch) -> 0
メッセージを送信するためのリクエスト (apikey: 0)
Produce Response (v5)
が返される
v3: TransactionalId => nullableString
が追加
webのプロトコル仕様書の record_set => RECORDS
の詳細が不明
00000000 00 00 00 6e 00 00 00 05 00 00 00 04 00 01 78 ff |...n..........x.|
00000010 ff ff ff 00 00 75 30 00 00 00 01 00 04 74 65 73 |.....u0......tes|
00000020 74 00 00 00 01 00 00 00 00 00 00 00 45 00 00 00 |t...........E...|
00000030 00 00 00 00 00 00 00 00 39 ff ff ff ff 02 a7 c8 |........9.......|
00000040 47 5d 00 00 00 00 00 00 00 00 01 62 17 5b da 8b |G].........b.[..|
00000050 00 00 01 62 17 5b da 8b 00 00 00 00 00 00 03 ed |...b.[..........|
00000060 00 00 00 00 00 00 00 00 00 01 0e 00 00 00 01 02 |................|
00000070 31 00 |1.|
ProduceRequest (Version: 5)
(0000000) Int16[2](api_key) -> 0
(0000002) Int16[2](api_version) -> 5
(0000004) Int32[4](correlation_id) -> 4
(0000008) String[2](client_id) -> (1)"x"
(0000011) String[2](transactional_id) -> (null)
(0000013) Int16[2](acks) -> -1
(0000015) Int32[4](timeout) -> 30000
(0000019) Array[4](TopicDataV5) -> 1
TopicDataV5
(0000023) String[2](topic) -> (4)"test"
(0000029) Array[4](PartitionRecordSetV5) -> 1
PartitionRecordSetV5
(0000033) Int32[4](partition) -> 0
RecordBatchV2
# この後のバイナリデータ(record_set => RECORDS 部分)が不明
00000000 00 00 00 34 00 00 00 04 00 00 00 01 00 04 74 65 |...4..........te|
00000010 73 74 00 00 00 01 00 00 00 00 00 00 00 00 00 00 |st..............|
00000020 00 00 01 51 ff ff ff ff ff ff ff ff 00 00 00 00 |...Q............|
00000030 00 00 00 00 00 00 00 00 |........|
ProduceResponse (Version: 5)
(0000000) Int32[4](correlation_id) -> 4
(0000004) Array[4](TopicPartitionResponseV5) -> 1
TopicPartitionResponse (Version: 5)
(0000008) String[2](topic) -> (4)"test"
(0000014) Array[4](PartitionResponseV5) -> 1
PartitionResponse (Version: 5)
(0000018) Int32[4](partition) -> 0
(0000022) Int16[2](error_code) -> 0
(0000024) Int64[8](base_offset) -> 337
(0000032) Int64[8](log_append_time) -> -1
(0000040) Int64[8](log_start_offset) -> 0
(0000048) Int32[4](throttle_time_ms) -> 0
FetchRequest v4 : ReplicaId MaxWaitTime MinBytes IsolationLevel [TopicName [Partition FetchOffset MaxBytes]]