Last active
January 5, 2022 02:28
-
-
Save magg/576bf3381c9c0501b9761b54e9d86375 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package mx.klar.balance.history.streams; | |
import static mx.klar.balance.history.config.KafkaConfig.BALANCE_HISTORY_STORE; | |
import io.micrometer.core.instrument.Counter; | |
import java.time.Duration; | |
import java.time.Instant; | |
import java.util.ArrayList; | |
import java.util.Comparator; | |
import java.util.List; | |
import java.util.Spliterator; | |
import java.util.Spliterators; | |
import java.util.concurrent.atomic.AtomicLong; | |
import java.util.stream.StreamSupport; | |
import lombok.RequiredArgsConstructor; | |
import lombok.extern.slf4j.Slf4j; | |
import mx.klar.balance.history.common.Protos.BalanceHistoryEvent; | |
import mx.klar.balance.history.common.Protos.TimestampedBalanceHistoryEvent; | |
import org.apache.kafka.streams.KeyValue; | |
import org.apache.kafka.streams.kstream.Transformer; | |
import org.apache.kafka.streams.processor.ProcessorContext; | |
import org.apache.kafka.streams.processor.PunctuationType; | |
import org.apache.kafka.streams.state.TimestampedKeyValueStore; | |
import org.apache.kafka.streams.state.ValueAndTimestamp; | |
@Slf4j | |
@RequiredArgsConstructor | |
public class BalanceHistoryEventSortTransformer | |
implements | |
Transformer<String, BalanceHistoryEvent, Iterable<KeyValue<String, BalanceHistoryEvent>>> { | |
private ProcessorContext context; | |
private TimestampedKeyValueStore<String, TimestampedBalanceHistoryEvent> kvStore; | |
private final Counter stateStoreCollision; | |
private final Duration duration; | |
private final Duration delay; | |
private final Duration enforceDuration; | |
private final long messageTtl; | |
private static final AtomicLong MAX = new AtomicLong(0L); | |
@Override | |
public void init(ProcessorContext context) { | |
this.context = context; | |
this.kvStore = context.getStateStore(BALANCE_HISTORY_STORE); | |
MAX.set(findMaxInStore(kvStore)); | |
context.schedule(duration, PunctuationType.WALL_CLOCK_TIME, this::punctuate); | |
context.schedule(enforceDuration, PunctuationType.WALL_CLOCK_TIME, this::enforceTtl); | |
} | |
@Override | |
public Iterable<KeyValue<String, BalanceHistoryEvent>> transform(String key, | |
BalanceHistoryEvent value) { | |
String eventKey = generateKey(value); | |
final var event = kvStore.get(eventKey); | |
if (event == null) { | |
long msgTimestamp = context.timestamp(); | |
long insertTimestamp = Instant.now().toEpochMilli(); | |
log.trace("event saved {} {}", eventKey, msgTimestamp); | |
kvStore.put(eventKey, | |
ValueAndTimestamp.make(TimestampedBalanceHistoryEvent | |
.newBuilder() | |
.setBalanceHistoryEvent(value) | |
.setTimestamp(insertTimestamp).build(), | |
msgTimestamp)); | |
MAX.set(Math.max(MAX.get(), msgTimestamp)); | |
} else { | |
stateStoreCollision.increment(); | |
log.warn("Collision found in state store with key {}", eventKey); | |
} | |
return null; | |
} | |
void punctuate(final long timestamp) { | |
if (MAX.get() == 0) { | |
return; | |
} | |
final long internalMax = MAX.get() - delay.toMillis(); | |
try (var it = kvStore.all()) { | |
final List<KeyValue<String, | |
ValueAndTimestamp<TimestampedBalanceHistoryEvent>>> | |
result = new ArrayList<>(); | |
it.forEachRemaining( | |
kv -> { | |
if (kv.value.timestamp() <= internalMax) { | |
result.add(kv); | |
} | |
}); | |
if (!result.isEmpty()) { | |
sortAndSend(result); | |
} | |
} | |
} | |
@Override | |
public void close() { | |
} | |
private String generateKey(BalanceHistoryEvent bhe) { | |
return (bhe.hasTransactionEvent() ? bhe.getBalanceEvent().getId() + "_" | |
+ bhe.getTransactionEvent().getVersion() : bhe.getBalanceEvent().getId()); | |
} | |
private Long findMaxInStore(TimestampedKeyValueStore<String, | |
TimestampedBalanceHistoryEvent> kvStore) { | |
return StreamSupport.stream( | |
Spliterators.spliteratorUnknownSize(kvStore.all(), Spliterator.ORDERED), | |
false).mapToLong(v -> v.value.timestamp()).max().orElse(0); | |
} | |
private void enforceTtl(Long timestamp) { | |
try (var it = kvStore.all()) { | |
final List<KeyValue<String, | |
ValueAndTimestamp<TimestampedBalanceHistoryEvent>>> | |
result = new ArrayList<>(); | |
it.forEachRemaining( | |
kv -> { | |
Instant lastUpdated = Instant.ofEpochMilli(kv.value.value().getTimestamp()); | |
long secondsSinceUpdate = Duration.between(lastUpdated, | |
Instant.ofEpochMilli(timestamp)).toSeconds(); | |
if (secondsSinceUpdate >= messageTtl) { | |
result.add(kv); | |
} | |
}); | |
if (!result.isEmpty()) { | |
sortAndSend(result); | |
} | |
} | |
} | |
private void sortAndSend(List<KeyValue<String, | |
ValueAndTimestamp<TimestampedBalanceHistoryEvent>>> result) { | |
result.sort(Comparator.comparing(kv -> kv.value.timestamp())); | |
result.forEach(kv -> { | |
log.trace("event sent {} {}", kv.key, kv.value.timestamp()); | |
var bhe = kv.value.value().getBalanceHistoryEvent(); | |
context.forward(bhe.getBalanceEvent().getUserId(), bhe); | |
context.commit(); | |
kvStore.delete(kv.key); | |
}); | |
} | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package mx.klar.balance.history.streams; | |
import static mx.klar.balance.history.config.KafkaConfig.BALANCE_HISTORY_STORE; | |
import io.micrometer.core.instrument.Counter; | |
import java.time.Duration; | |
import java.time.Instant; | |
import java.util.ArrayList; | |
import java.util.Comparator; | |
import java.util.List; | |
import java.util.Spliterator; | |
import java.util.Spliterators; | |
import java.util.concurrent.atomic.AtomicLong; | |
import java.util.stream.StreamSupport; | |
import lombok.RequiredArgsConstructor; | |
import lombok.extern.slf4j.Slf4j; | |
import mx.klar.balance.history.common.Protos.BalanceHistoryEvent; | |
import mx.klar.balance.history.common.Protos.TimestampedBalanceHistoryEvent; | |
import org.apache.kafka.streams.KeyValue; | |
import org.apache.kafka.streams.kstream.Transformer; | |
import org.apache.kafka.streams.processor.ProcessorContext; | |
import org.apache.kafka.streams.processor.PunctuationType; | |
import org.apache.kafka.streams.state.TimestampedKeyValueStore; | |
import org.apache.kafka.streams.state.ValueAndTimestamp; | |
@Slf4j | |
@RequiredArgsConstructor | |
public class BalanceHistoryEventSortTransformer | |
implements | |
Transformer<String, BalanceHistoryEvent, Iterable<KeyValue<String, BalanceHistoryEvent>>> { | |
private ProcessorContext context; | |
private TimestampedKeyValueStore<String, TimestampedBalanceHistoryEvent> kvStore; | |
private final Counter stateStoreCollision; | |
private final Duration duration; | |
private final Duration delay; | |
private final Duration enforceDuration; | |
private final long messageTtl; | |
private final Integer punctuatorBatch; | |
private static final AtomicLong MAX = new AtomicLong(0L); | |
@Override | |
public void init(ProcessorContext context) { | |
// keep the processor context locally because we need it in punctuate() and commit() | |
this.context = context; | |
// retrieve the key-value store | |
this.kvStore = context.getStateStore(BALANCE_HISTORY_STORE); | |
MAX.set(findMaxInStore(kvStore)); | |
context.schedule(duration, PunctuationType.STREAM_TIME, this::punctuate); | |
context.schedule(enforceDuration, PunctuationType.WALL_CLOCK_TIME, this::enforceTtl); | |
} | |
@Override | |
public Iterable<KeyValue<String, BalanceHistoryEvent>> transform(String key, | |
BalanceHistoryEvent value) { | |
String eventKey = generateKey(value); | |
final var event = kvStore.get(eventKey); | |
if (event == null) { | |
long msgTimestamp = context.timestamp(); | |
long insertTimestamp = Instant.now().toEpochMilli(); | |
log.trace("event saved {} {}", eventKey, msgTimestamp); | |
kvStore.put(eventKey, | |
ValueAndTimestamp.make(TimestampedBalanceHistoryEvent | |
.newBuilder() | |
.setBalanceHistoryEvent(value) | |
.setTimestamp(insertTimestamp).build(), | |
msgTimestamp)); | |
MAX.set(Math.max(MAX.get(), msgTimestamp)); | |
} else { | |
stateStoreCollision.increment(); | |
log.warn("Collision found in state store with key {}", eventKey); | |
} | |
return null; | |
} | |
void punctuate(final long timestamp) { | |
if (MAX.get() == 0) { | |
return; | |
} | |
final long internalMax = MAX.get() - delay.toMillis(); | |
try (var it = kvStore.all()) { | |
final List<KeyValue<String, | |
ValueAndTimestamp<TimestampedBalanceHistoryEvent>>> | |
result = new ArrayList<>(); | |
it.forEachRemaining( | |
kv -> { | |
if (kv.value == null) { | |
return; | |
} | |
if (kv.value.timestamp() <= internalMax) { | |
result.add(kv); | |
} | |
}); | |
if (!result.isEmpty() && result.size() > punctuatorBatch) { | |
sortAndSend(result); | |
} | |
} | |
} | |
@Override | |
public void close() { | |
} | |
private String generateKey(BalanceHistoryEvent bhe) { | |
return (bhe.hasTransactionEvent() ? bhe.getBalanceEvent().getId() + "_" | |
+ bhe.getTransactionEvent().getVersion() : bhe.getBalanceEvent().getId()); | |
} | |
private Long findMaxInStore(TimestampedKeyValueStore<String, | |
TimestampedBalanceHistoryEvent> kvStore) { | |
return StreamSupport.stream( | |
Spliterators.spliteratorUnknownSize(kvStore.all(), Spliterator.ORDERED), | |
false).mapToLong(v -> v.value.timestamp()).max().orElse(0); | |
} | |
private void enforceTtl(Long timestamp) { | |
try (var it = kvStore.all()) { | |
final List<KeyValue<String, | |
ValueAndTimestamp<TimestampedBalanceHistoryEvent>>> | |
result = new ArrayList<>(); | |
it.forEachRemaining( | |
kv -> { | |
if (kv.value == null) { | |
return; | |
} | |
Instant lastUpdated = Instant.ofEpochMilli(kv.value.value().getTimestamp()); | |
long secondsSinceUpdate = Duration.between(lastUpdated, | |
Instant.ofEpochMilli(timestamp)).toSeconds(); | |
if (secondsSinceUpdate >= messageTtl) { | |
result.add(kv); | |
} | |
}); | |
if (!result.isEmpty()) { | |
sortAndSend(result); | |
} | |
} | |
} | |
private void sortAndSend(List<KeyValue<String, | |
ValueAndTimestamp<TimestampedBalanceHistoryEvent>>> result) { | |
result.sort(Comparator.comparing(kv -> kv.value.timestamp())); | |
result.forEach(kv -> { | |
log.trace("event sent {} {}", kv.key, kv.value.timestamp()); | |
var bhe = kv.value.value().getBalanceHistoryEvent(); | |
context.forward(bhe.getBalanceEvent().getUserId(), bhe); | |
var value = kvStore.get(kv.key); | |
if (value != null && value.value() != null) { | |
kvStore.delete(kv.key); | |
} | |
}); | |
context.commit(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var storeBuilder = | |
Stores.timestampedKeyValueStoreBuilder( | |
Stores.persistentTimestampedKeyValueStore(BALANCE_HISTORY_STORE), | |
Serdes.String(), TIMESTAMPED_BALANCE_HISTORY_EVENT_SERDE); | |
streamsBuilder.addStateStore(storeBuilder); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment