/KafkaSearchSource.java Secret
Created
January 8, 2021 18:03
Star
You must be signed in to star a gist
Timestamp policy example with Kafka source with Apache BEAM
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.api.services.cloudresourcemanager.CloudResourceManager; | |
import org.apache.beam.sdk.Pipeline; | |
import org.apache.beam.sdk.coders.AtomicCoder; | |
import org.apache.beam.sdk.coders.CoderException; | |
import org.apache.beam.sdk.io.TextIO; | |
import org.apache.beam.sdk.io.kafka.*; | |
import org.apache.beam.sdk.options.PipelineOptionsFactory; | |
import org.apache.beam.sdk.schemas.JavaFieldSchema; | |
import org.apache.beam.sdk.schemas.annotations.DefaultSchema; | |
import org.apache.beam.sdk.schemas.annotations.SchemaCreate; | |
import org.apache.beam.sdk.transforms.DoFn; | |
import org.apache.beam.sdk.transforms.PTransform; | |
import org.apache.beam.sdk.transforms.ParDo; | |
import org.apache.beam.sdk.transforms.SerializableFunction; | |
import org.apache.beam.sdk.values.PBegin; | |
import org.apache.beam.sdk.values.PCollection; | |
import org.apache.beam.sdk.values.Row; | |
import org.apache.kafka.common.TopicPartition; | |
import org.apache.kafka.common.serialization.Deserializer; | |
import org.apache.kafka.common.serialization.LongDeserializer; | |
import org.joda.time.Duration; | |
import org.joda.time.Instant; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.io.OutputStream; | |
import java.util.Optional; | |
public class KafkaSearchSource extends PTransform<PBegin, PCollection<KafkaRecord<Long, KafkaSearchSource.SearchQueryEvent>>> { | |
@DefaultSchema(JavaFieldSchema.class) | |
public class SearchQueryEvent { | |
public final String queryString; | |
public final Instant searchTimestamp; | |
@SchemaCreate | |
public SearchQueryEvent(String queryString, Instant searchTimestamp) { | |
this.queryString = queryString; | |
this.searchTimestamp = searchTimestamp; | |
} | |
} | |
public class SearchQueryEventDeserializer implements Deserializer<SearchQueryEvent> { | |
public SearchQueryEventDeserializer() {} | |
@Override | |
public SearchQueryEvent deserialize(String topic, byte[] data) { | |
String queryString = ""; | |
Instant timestamp = Instant.now(); | |
// decode querySTring and timestamp from binary data | |
return new SearchQueryEvent(queryString, timestamp); | |
} | |
} | |
public class GetSearchQueryTimestampFunction implements SerializableFunction<KafkaRecord<Long, SearchQueryEvent>, Instant> { | |
@Override | |
public Instant apply(KafkaRecord<Long, SearchQueryEvent> input) { | |
return input.getKV().getValue().searchTimestamp; | |
} | |
} | |
public class SearchQueryTimestampPolicyFactory implements TimestampPolicyFactory<Long, SearchQueryEvent> { | |
@Override | |
public TimestampPolicy<Long, SearchQueryEvent> createTimestampPolicy(TopicPartition tp, Optional<Instant> previousWatermark) { | |
return new CustomTimestampPolicyWithLimitedDelay<Long, SearchQueryEvent>( | |
new GetSearchQueryTimestampFunction(), | |
Duration.standardMinutes(5), | |
previousWatermark); | |
} | |
} | |
@Override | |
public PCollection<KafkaRecord<Long, SearchQueryEvent>> expand(PBegin input) { | |
return KafkaIO.<Long, SearchQueryEvent>read() | |
.withBootstrapServers("broker_1:9092,broker_2:9092") | |
.withTopic("my_topic") // use withTopics(List<String>) to read from multiple topics. | |
.withKeyDeserializer(LongDeserializer.class) | |
.withValueDeserializer(SearchQueryEventDeserializer.class) | |
.withCreateTime(Duration.standardMinutes(5)) | |
.withTimestampPolicyFactory(new SearchQueryTimestampPolicyFactory()) | |
.expand(input); | |
} | |
public static void main(String[] args) { | |
// read search query events, write them to CSV | |
Pipeline searchQueryToCsvPipeline = Pipeline.create(); | |
KafkaSearchSource searchSource = new KafkaSearchSource(); | |
searchQueryToCsvPipeline | |
.apply("Fetch from Kafka Source", searchSource) | |
.apply("Turn to CSV String", ParDo.of( new DoFn<KafkaRecord<Long, SearchQueryEvent>, String>() { | |
@ProcessElement | |
public void processElement(ProcessContext context) { | |
KafkaRecord<Long, SearchQueryEvent> nextElem = context.element(); | |
String output = nextElem.getKV().getValue().queryString + "," + nextElem.getKV().getValue().searchTimestamp; | |
context.output(output); | |
} | |
})) | |
.apply("Write to a file", TextIO.write().to("search_queries.csv")); | |
searchQueryToCsvPipeline.run().waitUntilFinish(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment