This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def createConsoleAppender(): Unit = { | |
val lc: LoggerContext = LoggerFactory.getILoggerFactory.asInstanceOf[LoggerContext] | |
val ca = new ConsoleAppender[ILoggingEvent] | |
ca.setContext(lc) | |
ca.setName("console") | |
val encoder = new LayoutWrappingEncoder[ILoggingEvent] | |
encoder.setContext(lc) | |
val layout = new PatternLayout | |
layout.setPattern("%d{HH:mm:ss.SSS} %-5level %logger{36} - %msg%n") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<configuration> | |
<appender name="FILE" class="ch.qos.logback.core.FileAppender"> | |
<file>src/main/resources/testFile.log</file> | |
<append>true</append> | |
<encoder> | |
<pattern>%d{HH:mm:ss.SSS} %-5level %logger{36} - %msg%n</pattern> | |
</encoder> | |
</appender> | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
if (params.has("output")) { | |
counts.writeAsText(params.get("output")) | |
} else { | |
println("Printing result to stdout. Use --output to specify output path.") | |
counts.print() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val counts: DataStream[(String, Int)] = text | |
// split up the lines in pairs (2-tuples) containing: (word,1) | |
.flatMap(_.toLowerCase.split("\\W+")) | |
.filter(_.nonEmpty) | |
.map((_, 1)) | |
// group by the tuple field "0" and sum up tuple field "1" | |
.keyBy(0) | |
.sum(1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val text = | |
// read the text file from given input path | |
if (params.has("input")) { | |
env.readTextFile(params.get("input")) | |
} else { | |
println("Executing WordCount example with default inputs data set.") | |
println("Use --input to specify file input.") | |
// get default test text data | |
env.fromElements(WordCountData.WORDS: _*) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kafkaSource | |
.via(intermediateFlow) | |
.runWith(esSink) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val intermediateFlow =Flow[ConsumerRecord[Array[Byte], String]].map { kafkaMessage => | |
// Parsing the record as Company Object | |
val company = Json.parse(kafkaMessage.value()).as[Company] | |
val companyLocation = company.location | |
// Transform message so that we can write to elastic | |
WriteMessage.createIndexMessage(companyLocation, company) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer) | |
.withBootstrapServers("localhost:9092") | |
.withGroupId("akka-stream-kafka-test") | |
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") | |
val kafkaSource: Source[ConsumerRecord[Array[Byte], String], Consumer.Control] = | |
Consumer.plainSource(consumerSettings, Subscriptions.topics("kafkaTopic")) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val esSink = ElasticsearchSink.create[Company]( | |
indexName = "essink", | |
typeName = "company" | |
) | |