Skip to content

Instantly share code, notes, and snippets.

@fpopic
Last active October 29, 2018 11:33
Show Gist options
  • Save fpopic/858a55fb8b89ce523934e800cc58d18f to your computer and use it in GitHub Desktop.
Save fpopic/858a55fb8b89ce523934e800cc58d18f to your computer and use it in GitHub Desktop.
kafka

1. Producer API

- partitionsFor(topic)

// transactional producer

- initTransaction()  // uses config transactional.id
- beginTransaction()
- commitTransaction()

// also possible to make idempotent producer


- send(record)
    - ProducerRecord(topic, [partition], [timestamp], [key], value, [headers])
    - partition or hash of the key or round-robin will be used to decide partition
- sendOffsetsToTransaction(offsets, groupId)

- close()

2. Consumer API

- subscribe(topic) // auto group rebalancing partitions when adding/removing consumers (group.id needed)
- assign(partitions) // when you are interested in only some partitions (group.id not needed but better specify)

- seekToBeginning()
- seek(partition, offset)
- seekToEnd()

- poll(size) // read

- commitSync()
- commitAsync(offsets)

- close()

Consumer partition rebalancing

Happens because of:

  1. Delay in the next poll (consumer is too slow in processing the current poll)

  2. Rebalance is triggered

RebalanceListener.java
- Map<Topic<Partition, OffsetAndMetadata>> offsets;

- addOffset(topic, partition, offset)
- getCurrentOffsets()
- onPartitionsAssigned() 
    // used to commit current work (offset) before you lose partition, 
    // so another consumer can continue from there
- onPartitionsRevoked()

3. Consumer Group

  • allows parallel processing of a topic (splitting it on partitions for each consumer)
  • automatically manages partition assignment when calling subscribe(topic) + group.id
    • if interested in only some partitions use custom producer with partitioner and consumer assign()
    • then you don't have auto rebalancing
  • detects entry / exit / failure of a consumer and perform partition rebalancing activity

Offsets

  • available message offsets in kafka broker
    • beginning - offset of the first msg currently available in kafka (messages have retention policy)
    • ending - offset of the last message currently available in kafka
  • committed offset

Offset committing types

  1. auto commit

    • enable.auto.commit at read only (only at-most-once guarantee in case of too early committing)
    • auto.commit.interval.ms
    • You should manually maintain partition offsets to have a freedom to commit whatever/whenever do you want
  2. manual commit

    1. commit sync

      • blocking
      • if there is a problem with commiting offsets has commit retries logic
    2. commit async

      • non blocking
      • send commit and continue processing msgs, don't wait or retry
      • if commit(75) failes, and later commit(100) we don't want to retry 75 again

Constraints for exactly-once processing

  1. Transactional or atomic commit of work and offsets together in sink (jdbc transaction commit)

    !!!
    transaction {
        save_result()
        save_offset()
    }
    !!!
    topic partition offset
    mytopic 0 72
    mytopic 1 136
    ... ... ...
    ts result
    1 a
    2 b
    ... ...
    • if you use your db for storing output results, and zookeeper/kafka for storing offsets then you can forget about transactions
  2. then try with using idempotent output sink for results

    • UNIQUE constraint or UPDATE/UPSERT
    • so you don't care about duplicates if they occur
    • you can still commit offsets to kafka/zookeeper + at-least-once to restart faster after failure than by using earliest
  3. At-most-once processing

    do {
        save_offset()       // too early committed offsets
            !!!
        save_results()             
    }
  4. At-least-once processing

    do {
        save_results()
            !!!
        save_offset()       // too late committed offsets
    }

Assign topics/partitions/offsets manually/automatically to consumers at RESTART

  1. use auto.offset.reset

    • so it means you are commiting offsets to kafka / zookeeper (you don't have the same transactional sink for offsets and results, at-most-once or at-least-once)
    • it is used only if there are no commited offset for the group (start)
    • offsets are saved internally in kafka topic __consumer_offsets or in zookeeper (depends on consumer version)
    • options for auto.offset.reset = earliest/latest available offset
      • earliest - backlog can be huge
      • latest - we will lose some messages if pipeline did not drain properly (poll not processed completely)
  2. use manual committing offsets and load offsets from transactional sink (where you also save your results)

    val tp0 = new TopicPartition(topicName, partitionNumber0)
    val tp1 = new TopicPartition(topicName, partitionNumber1)
    
    consumer.assign(List(tp0, tp1)) 
    
    consumer.seek(tp0, loadOffsetFromDB(topicName, partitionNumber0))
    consumer.seek(tp1, loadOffsetFromDB(topicName, partitionNumber1))
    
    transaction {
        saveWorkToDB()
        saveOffsetToDB()
    }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment