Created
March 13, 2017 22:42
-
-
Save tallpsmith/a2e5212547fb3c7220b0e49846d2f152 to your computer and use it in GitHub Desktop.
Apache Flink Batch Processing Sink behaviour
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[2017-02-22 00:00:00,055 INFO ][it.timing.LoggerSink][http-60600-Processor6777 izfjg9oh ][124.176.244.224][1476499903][134218675] Top CrossProjectSearch:search CrossProjectSearch:search Elapsed time: own 172.672232 ms, total 172.672559 ms Total CPU: own 98.022589 ms, total 98.025673 ms User CPU: own 80.0 ms, total 80.0 ms System CPU: own 18.108476 ms, total 18.144984 ms Heap Bytes: own 30335152 By, total 30335200 By Blocked count: own 0, total 0 Blocked time: own 0 ms, total 0 ms Wait count: own 0, total 0 Wait time: own 0 ms, total 0 ms DB time: own 24 ms, total 24 ms DB count: own 37, total 37 DB lread: own 0, total 0 DB pread: own 0, total 0 DB CPU time: own 0 ms, total 0 ms DB in bytes: own 782693 By, total 782693 By DB out bytes: own 2390 By, total 2390 By Error Pages: own 0, total 0 SS entity count: own 25, total 25 SS count: own 1, total 1 SS bytes: own 425 By, total 425 By SS time: own 46 ms, total 46 ms AC count: own 0, total 0 AC time: own 0.0 ms, total 0.0 ms ES Count: own 0, total 0 ES Time: own 0.0 ms, total 0.0 ms ES Hits: own 0, total 0 MCS time: own 0 ms, total 0 ms CAS time: own 0 ms, total 0 ms GMS time: own 0 ms, total 0 ms MRS time: own 0 ms, total 0 ms PKG time: own 0 ms, total 0 ms |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); | |
DataStream<String> streamSource = env.readTextFile("/Users/psmith/temp/au1/mel-metrics.2017-02-22"); | |
List<InetSocketAddress> esHosts = Arrays.asList(new InetSocketAddress("127.0.0.1", 9300)); | |
HashMap<String, String> userConfig = Maps.newHashMap(); | |
userConfig.put("bulk.flush.max.size.mb", "1"); | |
userConfig.put("bulk.flush.interval.ms", "5000"); | |
userConfig.put("cluster.name", "flink"); | |
streamSource. | |
flatMap(new LogMetricParserFlatMapFunction()). | |
assignTimestampsAndWatermarks(new LogMetricTupleTimeExtractor()). | |
keyBy("identifierType", "identifier"). | |
timeWindow(Time.minutes(15)). | |
fold(new LogMetricTuple(), new LogMetricTupleLogMetricTupleFoldFunction(), new LogMetricTupleWindowFunction()). | |
addSink(new ElasticsearchSink<LogMetricTuple>(userConfig, esHosts, new LogMetricTupleElasticsearchSinkFunction())) | |
//setParallelism(1) | |
; | |
env.execute("Babylon Log Metric Parsing"); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment