I hereby claim:
- I am robvadai on github.
- I am robvadai (https://keybase.io/robvadai) on keybase.
- I have a public key ASBNTQ6e3ZmnSxJfQzjC3LsZTEgM8kYmWUtmQ9jQNQYPbQo
To claim this, I am signing this object:
import scala.annotation.tailrec | |
def findNumberOfSteps(start: Int, target: Int, distance: Int): Int = { | |
@tailrec | |
def progressUntilTargetReached(numberOfSteps: Int, currentPosition: Int): Int = { | |
if (currentPosition >= target) { | |
numberOfSteps | |
} else { | |
progressUntilTargetReached(numberOfSteps + 1, currentPosition + distance) |
streamBuilder | |
.stream( | |
STRING_SERDE, STRING_SERDE, upstreamTopicName | |
).filter((messageKey, messageBody) -> { | |
try { | |
Double.parseDouble(messageBody); | |
return true; | |
} catch (Throwable e) { | |
return false; | |
} |
val kafkaStreamsConfig: Properties = new Properties() | |
kafkaStreamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, consumerGroup) | |
kafkaStreamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers) | |
// Assuming streamBuilder is already defined with the transformation pipeline | |
val kafkaStreams = new KafkaStreams(streamBuilder, kafkaStreamsConfig) | |
Try { | |
Logger.info("Starting Kafka Streams...") | |
kafkaStreams.start() |
import akka.actor.ActorSystem | |
import akka.stream.{ActorMaterializer, OverflowStrategy} | |
import akka.stream.scaladsl.{Sink, Source} | |
import org.apache.logging.log4j.LogManager | |
import scala.concurrent.Future | |
object TestQueue extends App { | |
implicit val system = ActorSystem() |
2018-04-21T21:25:14,599 level=[INFO] sourceThread=[default-akka.actor.default-dispatcher-2] - msg=[Received 1] | |
2018-04-21T21:25:15,098 level=[INFO] sourceThread=[default-akka.actor.default-dispatcher-2] - msg=[Received 2] | |
2018-04-21T21:25:15,600 level=[INFO] sourceThread=[default-akka.actor.default-dispatcher-3] - msg=[Received 3] | |
2018-04-21T21:25:16,104 level=[INFO] sourceThread=[default-akka.actor.default-dispatcher-3] - msg=[Received 4] | |
2018-04-21T21:25:16,105 level=[INFO] sourceThread=[default-akka.actor.default-dispatcher-2] - msg=[Writing file of 4 messages] | |
2018-04-21T21:25:16,112 level=[INFO] sourceThread=[default-akka.actor.default-dispatcher-2] - msg=[Messages: Vector(1, 2, 3, 4)] | |
2018-04-21T21:25:16,609 level=[INFO] sourceThread=[default-akka.actor.default-dispatcher-4] - msg=[Received 5] | |
2018-04-21T21:25:17,111 level=[INFO] sourceThread=[default-akka.actor.default-dispatcher-4] - msg=[Received 6] | |
2018-04-21T21:25:17,616 level=[INFO] sourceThread=[default-akka.actor.default-dispatcher-3] - msg=[Rec |
import akka.actor.{Actor, ActorLogging} | |
class BlockingActor extends Actor with ActorLogging { | |
override def receive: Receive = { | |
case someNum: Int => | |
log.info(s"Blocking actor received: $someNum") | |
Thread.sleep(2000L) | |
log.info(s"Blocking actor finished: $someNum") | |
} | |
} |
I hereby claim:
To claim this, I am signing this object: