Last active
March 19, 2020 21:10
-
-
Save abdolence/ad8cc478c2866887c1d09a92af29b3e9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package fizzbuzz | |
import cloudflow.akkastream._ | |
import cloudflow.akkastream.scaladsl._ | |
import cloudflow.streamlets.{ RoundRobinPartitioner, StreamletShape } | |
import cloudflow.streamlets.avro._ | |
class FizzBuzzProcessor extends AkkaServerStreamlet { | |
def basicFizzbuzzRules( x: Int ): String = { | |
( x % 3, x % 5 ) match { | |
case ( 0, 0 ) => "FizzBuzz" | |
case ( 0, _ ) => "Fizz" | |
case ( _, 0 ) => "Buzz" | |
case _ => x.toString | |
} | |
} | |
val in = AvroInlet[FizzBuzzNumber]( "in" ) | |
val out = | |
AvroOutlet[FizzBuzzResult]( "out" ).withPartitioner( RoundRobinPartitioner ) | |
val shape = StreamletShape( in, out ) | |
// define a flow that makes it possible for cloudflow to commit reads | |
def flow = { | |
FlowWithOffsetContext[FizzBuzzNumber] | |
.map { number ⇒ | |
new FizzBuzzResult( number.value, basicFizzbuzzRules( number.value ) ) | |
} | |
} | |
// override createLogic to provide streamlet behavior | |
override def createLogic = new RunnableGraphStreamletLogic() { | |
def runnableGraph = | |
sourceWithOffsetContext( in ) | |
.via( flow ) | |
.to( sinkWithOffsetContext( out ) ) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment