Skip to content

Instantly share code, notes, and snippets.

@magg
Last active January 5, 2022 02:28
Show Gist options
  • Save magg/576bf3381c9c0501b9761b54e9d86375 to your computer and use it in GitHub Desktop.
Save magg/576bf3381c9c0501b9761b54e9d86375 to your computer and use it in GitHub Desktop.
package mx.klar.balance.history.streams;
import static mx.klar.balance.history.config.KafkaConfig.BALANCE_HISTORY_STORE;
import io.micrometer.core.instrument.Counter;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.StreamSupport;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import mx.klar.balance.history.common.Protos.BalanceHistoryEvent;
import mx.klar.balance.history.common.Protos.TimestampedBalanceHistoryEvent;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@Slf4j
@RequiredArgsConstructor
public class BalanceHistoryEventSortTransformer
implements
Transformer<String, BalanceHistoryEvent, Iterable<KeyValue<String, BalanceHistoryEvent>>> {
private ProcessorContext context;
private TimestampedKeyValueStore<String, TimestampedBalanceHistoryEvent> kvStore;
private final Counter stateStoreCollision;
private final Duration duration;
private final Duration delay;
private final Duration enforceDuration;
private final long messageTtl;
private static final AtomicLong MAX = new AtomicLong(0L);
@Override
public void init(ProcessorContext context) {
this.context = context;
this.kvStore = context.getStateStore(BALANCE_HISTORY_STORE);
MAX.set(findMaxInStore(kvStore));
context.schedule(duration, PunctuationType.WALL_CLOCK_TIME, this::punctuate);
context.schedule(enforceDuration, PunctuationType.WALL_CLOCK_TIME, this::enforceTtl);
}
@Override
public Iterable<KeyValue<String, BalanceHistoryEvent>> transform(String key,
BalanceHistoryEvent value) {
String eventKey = generateKey(value);
final var event = kvStore.get(eventKey);
if (event == null) {
long msgTimestamp = context.timestamp();
long insertTimestamp = Instant.now().toEpochMilli();
log.trace("event saved {} {}", eventKey, msgTimestamp);
kvStore.put(eventKey,
ValueAndTimestamp.make(TimestampedBalanceHistoryEvent
.newBuilder()
.setBalanceHistoryEvent(value)
.setTimestamp(insertTimestamp).build(),
msgTimestamp));
MAX.set(Math.max(MAX.get(), msgTimestamp));
} else {
stateStoreCollision.increment();
log.warn("Collision found in state store with key {}", eventKey);
}
return null;
}
void punctuate(final long timestamp) {
if (MAX.get() == 0) {
return;
}
final long internalMax = MAX.get() - delay.toMillis();
try (var it = kvStore.all()) {
final List<KeyValue<String,
ValueAndTimestamp<TimestampedBalanceHistoryEvent>>>
result = new ArrayList<>();
it.forEachRemaining(
kv -> {
if (kv.value.timestamp() <= internalMax) {
result.add(kv);
}
});
if (!result.isEmpty()) {
sortAndSend(result);
}
}
}
@Override
public void close() {
}
private String generateKey(BalanceHistoryEvent bhe) {
return (bhe.hasTransactionEvent() ? bhe.getBalanceEvent().getId() + "_"
+ bhe.getTransactionEvent().getVersion() : bhe.getBalanceEvent().getId());
}
private Long findMaxInStore(TimestampedKeyValueStore<String,
TimestampedBalanceHistoryEvent> kvStore) {
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(kvStore.all(), Spliterator.ORDERED),
false).mapToLong(v -> v.value.timestamp()).max().orElse(0);
}
private void enforceTtl(Long timestamp) {
try (var it = kvStore.all()) {
final List<KeyValue<String,
ValueAndTimestamp<TimestampedBalanceHistoryEvent>>>
result = new ArrayList<>();
it.forEachRemaining(
kv -> {
Instant lastUpdated = Instant.ofEpochMilli(kv.value.value().getTimestamp());
long secondsSinceUpdate = Duration.between(lastUpdated,
Instant.ofEpochMilli(timestamp)).toSeconds();
if (secondsSinceUpdate >= messageTtl) {
result.add(kv);
}
});
if (!result.isEmpty()) {
sortAndSend(result);
}
}
}
private void sortAndSend(List<KeyValue<String,
ValueAndTimestamp<TimestampedBalanceHistoryEvent>>> result) {
result.sort(Comparator.comparing(kv -> kv.value.timestamp()));
result.forEach(kv -> {
log.trace("event sent {} {}", kv.key, kv.value.timestamp());
var bhe = kv.value.value().getBalanceHistoryEvent();
context.forward(bhe.getBalanceEvent().getUserId(), bhe);
context.commit();
kvStore.delete(kv.key);
});
}
}
package mx.klar.balance.history.streams;
import static mx.klar.balance.history.config.KafkaConfig.BALANCE_HISTORY_STORE;
import io.micrometer.core.instrument.Counter;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.StreamSupport;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import mx.klar.balance.history.common.Protos.BalanceHistoryEvent;
import mx.klar.balance.history.common.Protos.TimestampedBalanceHistoryEvent;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@Slf4j
@RequiredArgsConstructor
public class BalanceHistoryEventSortTransformer
implements
Transformer<String, BalanceHistoryEvent, Iterable<KeyValue<String, BalanceHistoryEvent>>> {
private ProcessorContext context;
private TimestampedKeyValueStore<String, TimestampedBalanceHistoryEvent> kvStore;
private final Counter stateStoreCollision;
private final Duration duration;
private final Duration delay;
private final Duration enforceDuration;
private final long messageTtl;
private final Integer punctuatorBatch;
private static final AtomicLong MAX = new AtomicLong(0L);
@Override
public void init(ProcessorContext context) {
// keep the processor context locally because we need it in punctuate() and commit()
this.context = context;
// retrieve the key-value store
this.kvStore = context.getStateStore(BALANCE_HISTORY_STORE);
MAX.set(findMaxInStore(kvStore));
context.schedule(duration, PunctuationType.STREAM_TIME, this::punctuate);
context.schedule(enforceDuration, PunctuationType.WALL_CLOCK_TIME, this::enforceTtl);
}
@Override
public Iterable<KeyValue<String, BalanceHistoryEvent>> transform(String key,
BalanceHistoryEvent value) {
String eventKey = generateKey(value);
final var event = kvStore.get(eventKey);
if (event == null) {
long msgTimestamp = context.timestamp();
long insertTimestamp = Instant.now().toEpochMilli();
log.trace("event saved {} {}", eventKey, msgTimestamp);
kvStore.put(eventKey,
ValueAndTimestamp.make(TimestampedBalanceHistoryEvent
.newBuilder()
.setBalanceHistoryEvent(value)
.setTimestamp(insertTimestamp).build(),
msgTimestamp));
MAX.set(Math.max(MAX.get(), msgTimestamp));
} else {
stateStoreCollision.increment();
log.warn("Collision found in state store with key {}", eventKey);
}
return null;
}
void punctuate(final long timestamp) {
if (MAX.get() == 0) {
return;
}
final long internalMax = MAX.get() - delay.toMillis();
try (var it = kvStore.all()) {
final List<KeyValue<String,
ValueAndTimestamp<TimestampedBalanceHistoryEvent>>>
result = new ArrayList<>();
it.forEachRemaining(
kv -> {
if (kv.value == null) {
return;
}
if (kv.value.timestamp() <= internalMax) {
result.add(kv);
}
});
if (!result.isEmpty() && result.size() > punctuatorBatch) {
sortAndSend(result);
}
}
}
@Override
public void close() {
}
private String generateKey(BalanceHistoryEvent bhe) {
return (bhe.hasTransactionEvent() ? bhe.getBalanceEvent().getId() + "_"
+ bhe.getTransactionEvent().getVersion() : bhe.getBalanceEvent().getId());
}
private Long findMaxInStore(TimestampedKeyValueStore<String,
TimestampedBalanceHistoryEvent> kvStore) {
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(kvStore.all(), Spliterator.ORDERED),
false).mapToLong(v -> v.value.timestamp()).max().orElse(0);
}
private void enforceTtl(Long timestamp) {
try (var it = kvStore.all()) {
final List<KeyValue<String,
ValueAndTimestamp<TimestampedBalanceHistoryEvent>>>
result = new ArrayList<>();
it.forEachRemaining(
kv -> {
if (kv.value == null) {
return;
}
Instant lastUpdated = Instant.ofEpochMilli(kv.value.value().getTimestamp());
long secondsSinceUpdate = Duration.between(lastUpdated,
Instant.ofEpochMilli(timestamp)).toSeconds();
if (secondsSinceUpdate >= messageTtl) {
result.add(kv);
}
});
if (!result.isEmpty()) {
sortAndSend(result);
}
}
}
private void sortAndSend(List<KeyValue<String,
ValueAndTimestamp<TimestampedBalanceHistoryEvent>>> result) {
result.sort(Comparator.comparing(kv -> kv.value.timestamp()));
result.forEach(kv -> {
log.trace("event sent {} {}", kv.key, kv.value.timestamp());
var bhe = kv.value.value().getBalanceHistoryEvent();
context.forward(bhe.getBalanceEvent().getUserId(), bhe);
var value = kvStore.get(kv.key);
if (value != null && value.value() != null) {
kvStore.delete(kv.key);
}
});
context.commit();
}
}
var storeBuilder =
Stores.timestampedKeyValueStoreBuilder(
Stores.persistentTimestampedKeyValueStore(BALANCE_HISTORY_STORE),
Serdes.String(), TIMESTAMPED_BALANCE_HISTORY_EVENT_SERDE);
streamsBuilder.addStateStore(storeBuilder);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment