Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Timestamp policy example with Kafka source with Apache BEAM
import com.google.api.services.cloudresourcemanager.CloudResourceManager;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.kafka.*;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.joda.time.Duration;
import org.joda.time.Instant;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Optional;
public class KafkaSearchSource extends PTransform<PBegin, PCollection<KafkaRecord<Long, KafkaSearchSource.SearchQueryEvent>>> {
@DefaultSchema(JavaFieldSchema.class)
public class SearchQueryEvent {
public final String queryString;
public final Instant searchTimestamp;
@SchemaCreate
public SearchQueryEvent(String queryString, Instant searchTimestamp) {
this.queryString = queryString;
this.searchTimestamp = searchTimestamp;
}
}
public class SearchQueryEventDeserializer implements Deserializer<SearchQueryEvent> {
public SearchQueryEventDeserializer() {}
@Override
public SearchQueryEvent deserialize(String topic, byte[] data) {
String queryString = "";
Instant timestamp = Instant.now();
// decode querySTring and timestamp from binary data
return new SearchQueryEvent(queryString, timestamp);
}
}
public class GetSearchQueryTimestampFunction implements SerializableFunction<KafkaRecord<Long, SearchQueryEvent>, Instant> {
@Override
public Instant apply(KafkaRecord<Long, SearchQueryEvent> input) {
return input.getKV().getValue().searchTimestamp;
}
}
public class SearchQueryTimestampPolicyFactory implements TimestampPolicyFactory<Long, SearchQueryEvent> {
@Override
public TimestampPolicy<Long, SearchQueryEvent> createTimestampPolicy(TopicPartition tp, Optional<Instant> previousWatermark) {
return new CustomTimestampPolicyWithLimitedDelay<Long, SearchQueryEvent>(
new GetSearchQueryTimestampFunction(),
Duration.standardMinutes(5),
previousWatermark);
}
}
@Override
public PCollection<KafkaRecord<Long, SearchQueryEvent>> expand(PBegin input) {
return KafkaIO.<Long, SearchQueryEvent>read()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("my_topic") // use withTopics(List<String>) to read from multiple topics.
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(SearchQueryEventDeserializer.class)
.withCreateTime(Duration.standardMinutes(5))
.withTimestampPolicyFactory(new SearchQueryTimestampPolicyFactory())
.expand(input);
}
public static void main(String[] args) {
// read search query events, write them to CSV
Pipeline searchQueryToCsvPipeline = Pipeline.create();
KafkaSearchSource searchSource = new KafkaSearchSource();
searchQueryToCsvPipeline
.apply("Fetch from Kafka Source", searchSource)
.apply("Turn to CSV String", ParDo.of( new DoFn<KafkaRecord<Long, SearchQueryEvent>, String>() {
@ProcessElement
public void processElement(ProcessContext context) {
KafkaRecord<Long, SearchQueryEvent> nextElem = context.element();
String output = nextElem.getKV().getValue().queryString + "," + nextElem.getKV().getValue().searchTimestamp;
context.output(output);
}
}))
.apply("Write to a file", TextIO.write().to("search_queries.csv"));
searchQueryToCsvPipeline.run().waitUntilFinish();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment