Skip to content

Instantly share code, notes, and snippets.

View robvadai's full-sized avatar

Rob Vadai robvadai

View GitHub Profile

Keybase proof

I hereby claim:

  • I am robvadai on github.
  • I am robvadai (https://keybase.io/robvadai) on keybase.
  • I have a public key ASBNTQ6e3ZmnSxJfQzjC3LsZTEgM8kYmWUtmQ9jQNQYPbQo

To claim this, I am signing this object:

@robvadai
robvadai / BlockingActor.scala
Created August 6, 2018 09:50
Non-blocking Actor calls
import akka.actor.{Actor, ActorLogging}
class BlockingActor extends Actor with ActorLogging {
override def receive: Receive = {
case someNum: Int =>
log.info(s"Blocking actor received: $someNum")
Thread.sleep(2000L)
log.info(s"Blocking actor finished: $someNum")
}
}
@robvadai
robvadai / test-queue.log
Created April 21, 2018 20:35
Queue implementation logs
2018-04-21T21:25:14,599 level=[INFO] sourceThread=[default-akka.actor.default-dispatcher-2] - msg=[Received 1]
2018-04-21T21:25:15,098 level=[INFO] sourceThread=[default-akka.actor.default-dispatcher-2] - msg=[Received 2]
2018-04-21T21:25:15,600 level=[INFO] sourceThread=[default-akka.actor.default-dispatcher-3] - msg=[Received 3]
2018-04-21T21:25:16,104 level=[INFO] sourceThread=[default-akka.actor.default-dispatcher-3] - msg=[Received 4]
2018-04-21T21:25:16,105 level=[INFO] sourceThread=[default-akka.actor.default-dispatcher-2] - msg=[Writing file of 4 messages]
2018-04-21T21:25:16,112 level=[INFO] sourceThread=[default-akka.actor.default-dispatcher-2] - msg=[Messages: Vector(1, 2, 3, 4)]
2018-04-21T21:25:16,609 level=[INFO] sourceThread=[default-akka.actor.default-dispatcher-4] - msg=[Received 5]
2018-04-21T21:25:17,111 level=[INFO] sourceThread=[default-akka.actor.default-dispatcher-4] - msg=[Received 6]
2018-04-21T21:25:17,616 level=[INFO] sourceThread=[default-akka.actor.default-dispatcher-3] - msg=[Rec
@robvadai
robvadai / TestQueue.scala
Last active May 8, 2018 21:34
Queue implementation in Akka Streams
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.scaladsl.{Sink, Source}
import org.apache.logging.log4j.LogManager
import scala.concurrent.Future
object TestQueue extends App {
implicit val system = ActorSystem()
@robvadai
robvadai / KafkaStreamsRunner.scala
Last active October 20, 2017 20:26
An example to start Kafka Streams
val kafkaStreamsConfig: Properties = new Properties()
kafkaStreamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, consumerGroup)
kafkaStreamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers)
// Assuming streamBuilder is already defined with the transformation pipeline
val kafkaStreams = new KafkaStreams(streamBuilder, kafkaStreamsConfig)
Try {
Logger.info("Starting Kafka Streams...")
kafkaStreams.start()
@robvadai
robvadai / KafkaStreamsSimpleDataPipeline.java
Last active October 20, 2017 20:27
Simple Kafka Streams data pipeline
streamBuilder
.stream(
STRING_SERDE, STRING_SERDE, upstreamTopicName
).filter((messageKey, messageBody) -> {
try {
Double.parseDouble(messageBody);
return true;
} catch (Throwable e) {
return false;
}
import scala.annotation.tailrec
def findNumberOfSteps(start: Int, target: Int, distance: Int): Int = {
@tailrec
def progressUntilTargetReached(numberOfSteps: Int, currentPosition: Int): Int = {
if (currentPosition >= target) {
numberOfSteps
} else {
progressUntilTargetReached(numberOfSteps + 1, currentPosition + distance)