Skip to content

Instantly share code, notes, and snippets.

@tallpsmith
Created March 13, 2017 22:42
Show Gist options
  • Save tallpsmith/a2e5212547fb3c7220b0e49846d2f152 to your computer and use it in GitHub Desktop.
Save tallpsmith/a2e5212547fb3c7220b0e49846d2f152 to your computer and use it in GitHub Desktop.
Apache Flink Batch Processing Sink behaviour
[2017-02-22 00:00:00,055 INFO ][it.timing.LoggerSink][http-60600-Processor6777 izfjg9oh ][124.176.244.224][1476499903][134218675] Top CrossProjectSearch:search CrossProjectSearch:search Elapsed time: own 172.672232 ms, total 172.672559 ms Total CPU: own 98.022589 ms, total 98.025673 ms User CPU: own 80.0 ms, total 80.0 ms System CPU: own 18.108476 ms, total 18.144984 ms Heap Bytes: own 30335152 By, total 30335200 By Blocked count: own 0, total 0 Blocked time: own 0 ms, total 0 ms Wait count: own 0, total 0 Wait time: own 0 ms, total 0 ms DB time: own 24 ms, total 24 ms DB count: own 37, total 37 DB lread: own 0, total 0 DB pread: own 0, total 0 DB CPU time: own 0 ms, total 0 ms DB in bytes: own 782693 By, total 782693 By DB out bytes: own 2390 By, total 2390 By Error Pages: own 0, total 0 SS entity count: own 25, total 25 SS count: own 1, total 1 SS bytes: own 425 By, total 425 By SS time: own 46 ms, total 46 ms AC count: own 0, total 0 AC time: own 0.0 ms, total 0.0 ms ES Count: own 0, total 0 ES Time: own 0.0 ms, total 0.0 ms ES Hits: own 0, total 0 MCS time: own 0 ms, total 0 ms CAS time: own 0 ms, total 0 ms GMS time: own 0 ms, total 0 ms MRS time: own 0 ms, total 0 ms PKG time: own 0 ms, total 0 ms
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<String> streamSource = env.readTextFile("/Users/psmith/temp/au1/mel-metrics.2017-02-22");
List<InetSocketAddress> esHosts = Arrays.asList(new InetSocketAddress("127.0.0.1", 9300));
HashMap<String, String> userConfig = Maps.newHashMap();
userConfig.put("bulk.flush.max.size.mb", "1");
userConfig.put("bulk.flush.interval.ms", "5000");
userConfig.put("cluster.name", "flink");
streamSource.
flatMap(new LogMetricParserFlatMapFunction()).
assignTimestampsAndWatermarks(new LogMetricTupleTimeExtractor()).
keyBy("identifierType", "identifier").
timeWindow(Time.minutes(15)).
fold(new LogMetricTuple(), new LogMetricTupleLogMetricTupleFoldFunction(), new LogMetricTupleWindowFunction()).
addSink(new ElasticsearchSink<LogMetricTuple>(userConfig, esHosts, new LogMetricTupleElasticsearchSinkFunction()))
//setParallelism(1)
;
env.execute("Babylon Log Metric Parsing");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment