This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class MasterOrgBucketAssigner implements BucketAssigner<MeteringEventsWrapper, String> { | |
@Override public String getBucketId(final MeteringEventsWrapper element, final Context context) { | |
final String partition = getPartitionFromProcessingTime(context); | |
final String masterOrg = (String) element.getEvents().stream().findFirst() | |
.map(event -> event.get(Constants.EVENT_MASTER_ORG_KEY)).get(); | |
final String schema = element.getMetadata().get(Constants.EVENT_SCHEMA_ID_KEY); | |
return schema + "/" + masterOrg + "/" + partition; | |
} | |
private String getPartitionFromProcessingTime(final Context context) { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.fasterxml.jackson.databind.ObjectMapper; | |
import org.apache.flink.api.common.serialization.Encoder; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.IOException; | |
import java.io.OutputStream; | |
import java.nio.charset.StandardCharsets; | |
public class JsonEncoder<T> implements Encoder<T> { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class DeduplicatingFlatmap extends RichFlatMapFunction<EventsWrapper, EventsWrapper> { | |
private LoadingCache<String, Boolean> dedupeCache; | |
private final long cacheExpirationTimeMs; | |
private final int cacheSize; | |
public DeduplicatingFlatmap(final long cacheExpirationTimeMs, final int cacheSize) { | |
this.cacheExpirationTimeMs = cacheExpirationTimeMs; | |
this.cacheSize = cacheSize; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
### Keybase proof | |
I hereby claim: | |
* I am vmohanan1 on github. | |
* I am vmohanan (https://keybase.io/vmohanan) on keybase. | |
* I have a public key ASCuGKT-ljVCVyFf0ZvVtL11RUv2l0HER0cjnZ46__iV2wo | |
To claim this, I am signing this object: |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.mulesoft.mdas.ingestion; | |
import com.esotericsoftware.kryo.Kryo; | |
import com.esotericsoftware.kryo.Serializer; | |
import com.esotericsoftware.kryo.io.Input; | |
import com.esotericsoftware.kryo.io.Output; | |
import com.google.common.cache.Cache; | |
import com.google.common.cache.CacheBuilder; | |
import lombok.SneakyThrows; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package events.piepline; | |
import events.model.Event; | |
import events.model.EventsWrapper; | |
import java.util.ArrayList; | |
import java.util.List; | |
public class Driver { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package events.piepline; | |
import events.model.EventsWrapper; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; | |
import java.util.List; | |
public class EventsPipeline { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package events.piepline; | |
import com.google.common.cache.Cache; | |
import com.google.common.cache.CacheBuilder; | |
import com.google.common.cache.CacheLoader; | |
import com.google.common.cache.LoadingCache; | |
import events.model.Event; | |
import events.model.EventsWrapper; | |
import org.apache.flink.api.common.functions.RichFlatMapFunction; | |
import org.apache.flink.api.common.state.ValueState; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package events.model; | |
import java.io.Serializable; | |
// Events class | |
public class Event implements Serializable { | |
private final String eventId; | |
private final String name; |