Skip to content

Instantly share code, notes, and snippets.

def createConsoleAppender(): Unit = {
val lc: LoggerContext = LoggerFactory.getILoggerFactory.asInstanceOf[LoggerContext]
val ca = new ConsoleAppender[ILoggingEvent]
ca.setContext(lc)
ca.setName("console")
val encoder = new LayoutWrappingEncoder[ILoggingEvent]
encoder.setContext(lc)
val layout = new PatternLayout
layout.setPattern("%d{HH:mm:ss.SSS} %-5level %logger{36} - %msg%n")
<configuration>
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>src/main/resources/testFile.log</file>
<append>true</append>
<encoder>
<pattern>%d{HH:mm:ss.SSS} %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
if (params.has("output")) {
counts.writeAsText(params.get("output"))
} else {
println("Printing result to stdout. Use --output to specify output path.")
counts.print()
}
val counts: DataStream[(String, Int)] = text
// split up the lines in pairs (2-tuples) containing: (word,1)
.flatMap(_.toLowerCase.split("\\W+"))
.filter(_.nonEmpty)
.map((_, 1))
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(0)
.sum(1)
val text =
// read the text file from given input path
if (params.has("input")) {
env.readTextFile(params.get("input"))
} else {
println("Executing WordCount example with default inputs data set.")
println("Use --input to specify file input.")
// get default test text data
env.fromElements(WordCountData.WORDS: _*)
}
kafkaSource
.via(intermediateFlow)
.runWith(esSink)
val intermediateFlow =Flow[ConsumerRecord[Array[Byte], String]].map { kafkaMessage =>
// Parsing the record as Company Object
val company = Json.parse(kafkaMessage.value()).as[Company]
val companyLocation = company.location
// Transform message so that we can write to elastic
WriteMessage.createIndexMessage(companyLocation, company)
}
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("akka-stream-kafka-test")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val kafkaSource: Source[ConsumerRecord[Array[Byte], String], Consumer.Control] =
Consumer.plainSource(consumerSettings, Subscriptions.topics("kafkaTopic"))
val esSink = ElasticsearchSink.create[Company](
indexName = "essink",
typeName = "company"
)