Skip to content

Instantly share code, notes, and snippets.

View vmohanan1's full-sized avatar

Vinod Mohanan vmohanan1

View GitHub Profile
public class MasterOrgBucketAssigner implements BucketAssigner<MeteringEventsWrapper, String> {
@Override public String getBucketId(final MeteringEventsWrapper element, final Context context) {
final String partition = getPartitionFromProcessingTime(context);
final String masterOrg = (String) element.getEvents().stream().findFirst()
.map(event -> event.get(Constants.EVENT_MASTER_ORG_KEY)).get();
final String schema = element.getMetadata().get(Constants.EVENT_SCHEMA_ID_KEY);
return schema + "/" + masterOrg + "/" + partition;
}
private String getPartitionFromProcessingTime(final Context context) {
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.serialization.Encoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
public class JsonEncoder<T> implements Encoder<T> {
public class DeduplicatingFlatmap extends RichFlatMapFunction<EventsWrapper, EventsWrapper> {
private LoadingCache<String, Boolean> dedupeCache;
private final long cacheExpirationTimeMs;
private final int cacheSize;
public DeduplicatingFlatmap(final long cacheExpirationTimeMs, final int cacheSize) {
this.cacheExpirationTimeMs = cacheExpirationTimeMs;
this.cacheSize = cacheSize;
}
### Keybase proof
I hereby claim:
* I am vmohanan1 on github.
* I am vmohanan (https://keybase.io/vmohanan) on keybase.
* I have a public key ASCuGKT-ljVCVyFf0ZvVtL11RUv2l0HER0cjnZ46__iV2wo
To claim this, I am signing this object:
package com.mulesoft.mdas.ingestion;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import lombok.SneakyThrows;
package events.piepline;
import events.model.Event;
import events.model.EventsWrapper;
import java.util.ArrayList;
import java.util.List;
public class Driver {
package events.piepline;
import events.model.EventsWrapper;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import java.util.List;
public class EventsPipeline {
package events.piepline;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import events.model.Event;
import events.model.EventsWrapper;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
@vmohanan1
vmohanan1 / Events.java
Last active December 27, 2021 04:40
The model files used in the flink pipeline
package events.model;
import java.io.Serializable;
// Events class
public class Event implements Serializable {
private final String eventId;
private final String name;