Skip to content

Instantly share code, notes, and snippets.

@rionmonster
Created November 5, 2019 22:25
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rionmonster/c5ec6ae7dc79c61aad268e02a9c4cad0 to your computer and use it in GitHub Desktop.
Save rionmonster/c5ec6ae7dc79c61aad268e02a9c4cad0 to your computer and use it in GitHub Desktop.
Examples of Custom KStream-KTable Joins to Handle Slowly Loading KTables
package streams.common.kafka.transformers
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.kstream.Transformer
import org.apache.kafka.streams.state.ValueAndTimestamp
import org.apache.kafka.streams.processor.PunctuationType
import org.apache.kafka.streams.state.KeyValueStore
import org.apache.kafka.streams.processor.ProcessorContext
import org.apache.kafka.streams.kstream.TransformerSupplier
import java.time.Duration
import java.time.Instant
@Suppress("UNCHECKED_CAST")
class GuaranteedStreamSideJoinTransformer<K, V>(private val streamBufferStoreName: String, private val tableStoreName: String) : TransformerSupplier<K, V, KeyValue<K, KeyValue<K, V>>> {
private val approxMaxWaitTimePerRecordForTableData = Duration.ofMinutes(5)
private val frequencyToCheckForExpiredWaitTimes = Duration.ofSeconds(5)
override fun get(): Transformer<K, V, KeyValue<K, KeyValue<K, V>>> {
return object : Transformer<K, V, KeyValue<K, KeyValue<K, V>>> {
private var streamBufferStore: KeyValueStore<K, KeyValue<V, Instant>>? = null
private var tableStore: KeyValueStore<K, ValueAndTimestamp<V>>? = null
private var context: ProcessorContext? = null
override fun init(context: ProcessorContext) {
streamBufferStore = context.getStateStore(streamBufferStoreName) as KeyValueStore<K, KeyValue<V, Instant>>
tableStore = context.getStateStore(tableStoreName) as KeyValueStore<K, ValueAndTimestamp<V>>
this.context = context
this.context!!.schedule(frequencyToCheckForExpiredWaitTimes, PunctuationType.STREAM_TIME) { timestamp -> punctuate(timestamp)}
}
override fun transform(key: K, value: V): KeyValue<K, KeyValue<K, V>>? {
sendAnyWaitingRecordForKey(key)
return sendFullJoinRecordOrWaitForTableSide(key, value, context!!.timestamp())
}
/**
* In this example we opt to force-forward any waiting record for a given key when a new record for that key
* arrives. Alternatively, we could decide to keep buffering such records until either their wait times expire
* or a table-side record is received.
*/
private fun sendAnyWaitingRecordForKey(key: K) {
val streamValue = streamBufferStore!!.get(key)
if (streamValue != null) {
// No need to check whether a table-side record exists. Because if it did, the table side would have
// already triggered a join update and removed that stream record from the buffer.
val joinedValue = KeyValue(streamValue.key, null)
context!!.forward<K, Any>(key, joinedValue)
streamBufferStore!!.delete(key)
}
}
private fun sendFullJoinRecordOrWaitForTableSide(key: K, value: V, streamRecordTimestamp: Long): KeyValue<K, KeyValue<K, V>>? {
val tableValue = tableStore!!.get(key)
if (tableValue != null) {
return KeyValue.pair(key, KeyValue(key, tableValue.value()))
} else {
streamBufferStore!!.put(key, KeyValue(value, Instant.ofEpochMilli(streamRecordTimestamp)))
return null
}
}
private fun punctuate(timestamp: Long) {
sendAndPurgeAnyWaitingRecordsThatHaveExceededWaitTime(timestamp)
}
private fun sendAndPurgeAnyWaitingRecordsThatHaveExceededWaitTime(currentStreamTime: Long) {
streamBufferStore!!.all().use { iterator ->
while (iterator.hasNext()) {
val record = iterator.next()
if (waitTimeExpired(record.value.value, currentStreamTime)) {
val joinedValue = KeyValue(record.value.key, null)
context!!.forward<Any, Any>(record.key, joinedValue)
streamBufferStore!!.delete(record.key)
}
}
}
}
private fun waitTimeExpired(recordTimestamp: Instant, currentStreamTime: Long): Boolean {
return Duration.between(recordTimestamp, Instant.ofEpochMilli(currentStreamTime)) > approxMaxWaitTimePerRecordForTableData
}
override fun close() {}
}
}
}
package streams.common.kafka.transformers
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.state.KeyValueStore
import org.apache.kafka.streams.processor.ProcessorContext
import org.apache.kafka.streams.kstream.ValueTransformerWithKey
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier
import java.time.Duration
import java.time.Instant
@Suppress("UNCHECKED_CAST")
class GuaranteedTableSideJoinTransformer<K, V>(private val streamBufferStoreName: String) : ValueTransformerWithKeySupplier<K, V, KeyValue<K, V>> {
private val approxMaxWaitTimePerRecordForTableData = Duration.ofMinutes(5)
override fun get(): ValueTransformerWithKey<K, V, KeyValue<K, V>> {
return object : ValueTransformerWithKey<K, V, KeyValue<K, V>> {
private var streamBufferStore: KeyValueStore<K, KeyValue<K, Instant>>? = null
private var context: ProcessorContext? = null
override fun init(context: ProcessorContext) {
streamBufferStore = context.getStateStore(streamBufferStoreName) as KeyValueStore<K, KeyValue<K, Instant>>
this.context = context
}
override fun transform(key: K, value: V?): KeyValue<K, V>? {
return possiblySendFullJoinRecord(key, value, context!!.timestamp())
}
private fun possiblySendFullJoinRecord(key: K, tableValue: V?, tableRecordTimestamp: Long): KeyValue<K, V>? {
if (tableValue != null) {
val streamValue = streamBufferStore!!.get(key)
if (streamValue != null) {
// You can also incorporate timestamps of records into your join logic as shown here.
if (withinAcceptableBounds(streamValue.value, Instant.ofEpochMilli(tableRecordTimestamp))) {
streamBufferStore!!.delete(key)
return KeyValue(streamValue.key, tableValue)
} else {
return null
}
} else {
return null
}
} else {
return null
}
}
private fun withinAcceptableBounds(streamRecordTimestamp: Instant, tableRecordTimestamp: Instant): Boolean {
return Duration.between(streamRecordTimestamp, tableRecordTimestamp) <= approxMaxWaitTimePerRecordForTableData
}
override fun close() {}
}
}
}
package streams.common.kafka.extensions
import org.apache.avro.specific.SpecificRecord
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.KTable
import org.apache.kafka.streams.kstream.ValueJoiner
import streams.common.kafka.TopicManager
import streams.common.kafka.transformers.GuaranteedStreamSideJoinTransformer
import streams.common.kafka.transformers.GuaranteedTableSideJoinTransformer
fun <K, V: SpecificRecord, VO> KStream<K, V>.guaranteedJoin(ktable: KTable<K, VO>, joiner: ValueJoiner<K, V, K>, topicManager: TopicManager): KStream<K, V> {
// Set up the store
val streamBufferName = "guaranteed_buffer_for_${ktable.queryableStoreName()}"
// Ensure that the backing store exists
topicManager.getStoreForGuaranteedJoins<V>(streamBufferName)
val transformedStream = this.transform(
GuaranteedStreamSideJoinTransformer(streamBufferName, ktable.queryableStoreName()),
streamBufferName,
ktable.queryableStoreName())
val transformedTable = ktable.transformValues(
GuaranteedTableSideJoinTransformer(streamBufferName), streamBufferName)
.toStream()
.filter { _, value -> value != null }
val joined = transformedStream
// This line complains about a type mismatch in that its expecting to be a KStream<K, KeyValue<K, V>>
// as opposed to the KStream<K, KeyValue<K, VO>>
.merge(transformedTable)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment