Skip to content

Instantly share code, notes, and snippets.

@abdolence
Last active March 19, 2020 21:10
Show Gist options
  • Save abdolence/ad8cc478c2866887c1d09a92af29b3e9 to your computer and use it in GitHub Desktop.
Save abdolence/ad8cc478c2866887c1d09a92af29b3e9 to your computer and use it in GitHub Desktop.
package fizzbuzz
import cloudflow.akkastream._
import cloudflow.akkastream.scaladsl._
import cloudflow.streamlets.{ RoundRobinPartitioner, StreamletShape }
import cloudflow.streamlets.avro._
class FizzBuzzProcessor extends AkkaServerStreamlet {
def basicFizzbuzzRules( x: Int ): String = {
( x % 3, x % 5 ) match {
case ( 0, 0 ) => "FizzBuzz"
case ( 0, _ ) => "Fizz"
case ( _, 0 ) => "Buzz"
case _ => x.toString
}
}
val in = AvroInlet[FizzBuzzNumber]( "in" )
val out =
AvroOutlet[FizzBuzzResult]( "out" ).withPartitioner( RoundRobinPartitioner )
val shape = StreamletShape( in, out )
// define a flow that makes it possible for cloudflow to commit reads
def flow = {
FlowWithOffsetContext[FizzBuzzNumber]
.map { number ⇒
new FizzBuzzResult( number.value, basicFizzbuzzRules( number.value ) )
}
}
// override createLogic to provide streamlet behavior
override def createLogic = new RunnableGraphStreamletLogic() {
def runnableGraph =
sourceWithOffsetContext( in )
.via( flow )
.to( sinkWithOffsetContext( out ) )
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment