Skip to content

Instantly share code, notes, and snippets.

@roger-link
Last active June 15, 2020 17:34
Show Gist options
  • Save roger-link/3a6c8c6c51181c00932dcee8c86b8ae2 to your computer and use it in GitHub Desktop.
Save roger-link/3a6c8c6c51181c00932dcee8c86b8ae2 to your computer and use it in GitHub Desktop.
Beam-Pipeline.java
package com.redacted.redacted.processing.beam.errorrate;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.extensions.jackson.ParseJsons;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.io.aws.options.AwsOptions;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.joda.time.Duration;
import com.redacted.redacted.processing.beam.common.newrelic.NewRelicRecord;
import com.redacted.redacted.processing.beam.common.options.OptionsParser;
import com.redacted.redacted.processing.beam.common.kinesis.WriteKinesisRecords;
import com.redacted.redacted.processing.beam.common.kinesis.KinesisClientsProvider;
import com.redacted.redacted.processing.beam.common.kafka.Kafka;
import com.redacted.redacted.processing.beam.common.coders.Coders;
/** Main streaming class. */
public final class ErrorRate {
/** Main streaming class. */
private ErrorRate() { };
/** Logger class. */
private static final Logger LOGGER = LoggerFactory.getLogger(ErrorRate.class);
/** main.
* @param args contains arguments
* @throws Exception if a problem occurs.
*/
public static void main(final String[] args) throws Exception {
LOGGER.info("redacted-Logger: Entering main");
@SuppressWarnings("checkstyle:linelength")
final ErrorRatePipelineOptions options = new OptionsParser<ErrorRatePipelineOptions>().getOptions(args, ErrorRatePipelineOptions.class);
LOGGER.info("redacted-Logger: Entering runErrorRate and running with {}", options);
runErrorRate(options);
}
/** runErrorrate proceses kafka stream and calculates error rate. @param options contains options. */
private static void runErrorRate(final ErrorRatePipelineOptions options) {
LOGGER.info("redacted-Logger: Options are: {}", options);
//need to figure out how to get rid of these and use kafkaOptions
// but I'm getting weird error
String brokers = Config.KAFKA_BROKERS;
String topic = Config.KAFKA_TOPIC;
Pipeline pipeline = Pipeline.create(options);
LOGGER.info("redacted-Logger: Retrieving New Relic records.");
PCollection<NewRelicRecord> newRelicRecords = pipeline.apply(Kafka.readKafkaTopic(brokers, topic))
.apply(
ParDo.of(
new DoFn<KafkaRecord<Long, String>, String>() {
@ProcessElement
public void processElement(final ProcessContext processContext) {
KafkaRecord<Long, String> record = processContext.element();
LOGGER.info("redacted-Logger: Found Kafka value of {}", record.getKV().getValue());
processContext.output(record.getKV().getValue());
}
}
)
)
.apply(ParseJsons.of(NewRelicRecord.class)).setCoder(SerializableCoder.of(NewRelicRecord.class));
// adds a key of application name to New Relic events. this is needed to keyby application name later
// https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/transforms/WithKeys
PCollection<KV<String, NewRelicRecord>> keyedNewRelicRecords =
newRelicRecords.apply(
Window.<NewRelicRecord>into(FixedWindows.of(Duration.standardSeconds(options.getFlinkWindow())))
.discardingFiredPanes()
.withAllowedLateness(Duration.standardSeconds(options.getFlinkAllowedLateness())))
.apply(
WithKeys.of(new SerializableFunction<NewRelicRecord, String>() {
public String apply(final NewRelicRecord record) {
return record.getAppname();
}
}));
// perform aggregations on new relic records
PCollection<KV<String, ErrorRateResult>> aggregatedRecords =
keyedNewRelicRecords.apply(
Combine.<String, NewRelicRecord, ErrorRateResult>perKey(
new NewRelicRecordAggregatorFn()));
// encode kv of <string,errorrateresult> to byte array
PCollection<byte[]> encodedRecords =
aggregatedRecords.apply("Map to byte array for KinesisStream", MapElements.via(new MapKVToByteArray()));
encodedRecords.apply("WriteToKinesis",
new WriteKinesisRecords(new KinesisClientsProvider(options.as(AwsOptions.class))));
pipeline.run().waitUntilFinish();
}
/** class to encode flinks value for writing to kinesis. */
private static class MapKVToByteArray extends SimpleFunction<KV<String, ErrorRateResult>, byte[]> {
@Override
public byte[] apply(final KV<String, ErrorRateResult> input) {
return Coders.encodeInputElement(input.getValue());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment