Skip to content

Instantly share code, notes, and snippets.

@ticofab
Last active July 20, 2018 14:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ticofab/9a2636777771821f428022cb94a3dfd7 to your computer and use it in GitHub Desktop.
Save ticofab/9a2636777771821f428022cb94a3dfd7 to your computer and use it in GitHub Desktop.
val eventRequestFlow = Flow.fromGraph(GraphDSL.create() { implicit b =>
// flow to prepare the request from a token
val prepareRequest = Flow[Token].map(token => eventRequest(token))
// the filter for http responses
val requestFilter = b.add(GraphDSL.create() { implicit b =>
b.add(Partition[FlowHttpResponse](3, {
case (resp, _) =>
resp.status match {
case StatusCodes.OK => 0
case StatusCodes.Unauthorized => 1
case _ => 2
}
}))
})
// the fan-in stage
val merge = b.add(Merge[Token](2))
// linking all stages with the GraphDSL syntax
// you can see the custom tokenRequestFlow on the second line
merge.out ~> prepareRequest ~> httpFlow ~> divertHttpFailure ~> requestFilter
requestFilter.out(1) ~> tokenRequestFlow ~> merge.in(1)
requestFilter.out(2) ~> httpUnknownStatusSink
// inlet and outlet of this custom flow
FlowShape(merge.in(0), requestFilter.out(0))
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment