Timestamp policy example with Kafka source with Apache BEAM
import com.google.api.services.cloudresourcemanager.CloudResourceManager; | |
import org.apache.beam.sdk.Pipeline; | |
import org.apache.beam.sdk.coders.AtomicCoder; | |
import org.apache.beam.sdk.coders.CoderException; | |
import org.apache.beam.sdk.io.TextIO; | |
import org.apache.beam.sdk.io.kafka.*; | |
import org.apache.beam.sdk.options.PipelineOptionsFactory; | |
import org.apache.beam.sdk.schemas.JavaFieldSchema; | |
import org.apache.beam.sdk.schemas.annotations.DefaultSchema; | |
import org.apache.beam.sdk.schemas.annotations.SchemaCreate; | |
import org.apache.beam.sdk.transforms.DoFn; | |
import org.apache.beam.sdk.transforms.PTransform; | |
import org.apache.beam.sdk.transforms.ParDo; | |
import org.apache.beam.sdk.transforms.SerializableFunction; | |
import org.apache.beam.sdk.values.PBegin; | |
import org.apache.beam.sdk.values.PCollection; | |
import org.apache.beam.sdk.values.Row; | |
import org.apache.kafka.common.TopicPartition; | |
import org.apache.kafka.common.serialization.Deserializer; | |
import org.apache.kafka.common.serialization.LongDeserializer; | |
import org.joda.time.Duration; | |
import org.joda.time.Instant; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.io.OutputStream; | |
import java.util.Optional; | |
public class KafkaSearchSource extends PTransform<PBegin, PCollection<KafkaRecord<Long, KafkaSearchSource.SearchQueryEvent>>> { | |
@DefaultSchema(JavaFieldSchema.class) | |
public class SearchQueryEvent { | |
public final String queryString; | |
public final Instant searchTimestamp; | |
@SchemaCreate | |
public SearchQueryEvent(String queryString, Instant searchTimestamp) { | |
this.queryString = queryString; | |
this.searchTimestamp = searchTimestamp; | |
} | |
} | |
public class SearchQueryEventDeserializer implements Deserializer<SearchQueryEvent> { | |
public SearchQueryEventDeserializer() {} | |
@Override | |
public SearchQueryEvent deserialize(String topic, byte[] data) { | |
String queryString = ""; | |
Instant timestamp = Instant.now(); | |
// decode querySTring and timestamp from binary data | |
return new SearchQueryEvent(queryString, timestamp); | |
} | |
} | |
public class GetSearchQueryTimestampFunction implements SerializableFunction<KafkaRecord<Long, SearchQueryEvent>, Instant> { | |
@Override | |
public Instant apply(KafkaRecord<Long, SearchQueryEvent> input) { | |
return input.getKV().getValue().searchTimestamp; | |
} | |
} | |
public class SearchQueryTimestampPolicyFactory implements TimestampPolicyFactory<Long, SearchQueryEvent> { | |
@Override | |
public TimestampPolicy<Long, SearchQueryEvent> createTimestampPolicy(TopicPartition tp, Optional<Instant> previousWatermark) { | |
return new CustomTimestampPolicyWithLimitedDelay<Long, SearchQueryEvent>( | |
new GetSearchQueryTimestampFunction(), | |
Duration.standardMinutes(5), | |
previousWatermark); | |
} | |
} | |
@Override | |
public PCollection<KafkaRecord<Long, SearchQueryEvent>> expand(PBegin input) { | |
return KafkaIO.<Long, SearchQueryEvent>read() | |
.withBootstrapServers("broker_1:9092,broker_2:9092") | |
.withTopic("my_topic") // use withTopics(List<String>) to read from multiple topics. | |
.withKeyDeserializer(LongDeserializer.class) | |
.withValueDeserializer(SearchQueryEventDeserializer.class) | |
.withCreateTime(Duration.standardMinutes(5)) | |
.withTimestampPolicyFactory(new SearchQueryTimestampPolicyFactory()) | |
.expand(input); | |
} | |
public static void main(String[] args) { | |
// read search query events, write them to CSV | |
Pipeline searchQueryToCsvPipeline = Pipeline.create(); | |
KafkaSearchSource searchSource = new KafkaSearchSource(); | |
searchQueryToCsvPipeline | |
.apply("Fetch from Kafka Source", searchSource) | |
.apply("Turn to CSV String", ParDo.of( new DoFn<KafkaRecord<Long, SearchQueryEvent>, String>() { | |
@ProcessElement | |
public void processElement(ProcessContext context) { | |
KafkaRecord<Long, SearchQueryEvent> nextElem = context.element(); | |
String output = nextElem.getKV().getValue().queryString + "," + nextElem.getKV().getValue().searchTimestamp; | |
context.output(output); | |
} | |
})) | |
.apply("Write to a file", TextIO.write().to("search_queries.csv")); | |
searchQueryToCsvPipeline.run().waitUntilFinish(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment