Skip to content

Instantly share code, notes, and snippets.

@hochgi
Last active March 6, 2022 11:48
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hochgi/c8ee3273034fff8958d306459072fb22 to your computer and use it in GitHub Desktop.
Save hochgi/c8ee3273034fff8958d306459072fb22 to your computer and use it in GitHub Desktop.
akka-stream retry flow

#Thoughts on retry in streams ##This document is a WIP Too often, when writing akka-stream code, I find my self in need of retrying some logic.

a very common scenario, is when I use connection pools, e.g: mySource.via(Http().superPool()) in this case, what we get from superPool is a flow of type: Flow[(HttpRequest,T),(Try[HttpResponse,T),NotUsed]. we can't do anything with this flow, let alone retry based on some smart decision.

So why not have a generic retry combinator?

since akka-http connection pools yield flows that look like Flow[(A,S),(Try[B],S),_], I decided to base the API on this kind of flows.

assume the existence of this FanOutShape2, which would be easy to implement:

class PartitionWith[In,Out0,Out1](p: In => Either[Out0,Out1]) extends GraphStage[FanOutShape2[In,Out0,Out1]]

and would work very similar to a method Iv'e written in my personal util, to operate on collections.

with this stage and 2 other trivial stages, we can come up with a fairly reasonable API:

def retry[I,O,S,M](flow: Flow[(I,S),(Try[O],S),M])(retryWith: S => Option[(I,S)]): Flow[(I,S),Try[O],M]

which could be implemented like: Graph Diagram

the custom increment and decrement stages is needed for graceful stream completion. the increment stage will intercept upstream completion, and will only complete itself if there is no retry leftovers cycling in the graph. if there are such leftovers, it will wait until enough decrement counter messages is received.

PartitionWith stage is created using retryWith as follows:

{
  case (t@Success(_),_) => Right(t)
  case (t,s) => retryWith(s).fold(Right(t))(Left.apply)
}

the generic type S should be some state object that the user can extract another (I,S) - input and state tuple, similar to how you might use unfold.

##Bonus also, we can consider a "family" of retries, such as:

def retryConcat[I,O,S,M](flow: Flow[(I,S),(Try[O],S),M])(retryWith: S => Option[Seq[(I,S)]]): Flow[(I,S),Try[O],M]

//or (empty sequence means to not retry again):
def retryConcat[I,O,S,M](flow: Flow[(I,S),(Try[O],S),M])(retryWith: S => Seq[(I,S)]): Flow[(I,S),Try[O],M]

which might be handy if the input elements I can be broke down to smaller pieces, that may succeed where the uber object failed. e.g: an HttpRequest for some web service bulk API, with heavy body, may be broken down to multiple requests using the regular API.

def retryAsync[I,O,S,M](flow: Flow[(I,S),(Try[O],S),M])(retryWith: S => Future[Option[(I,S)]]): Flow[(I,S),Try[O],M]

which is pretty obvious. if I had the base version analogous to unfold, this may be the unfoldAsync equivalent.

feedback, and c&c are very welcome.

package hochgi.util
package object collections {
/**
* `partition` and `map` combined.
* for a given collection, and a function from the collection elements to `Either[A,B]`,
* generates a tuple of 2 collections of types `A` and `B`
*
* @param xs the collection of elements
* @param f a function that convert an element to an `Either[A,B]`
* @tparam A left collection elements' type
* @tparam B right collection elements' type
* @tparam T original collection ekements' type
* @tparam Coll collection's type
*/
def partitionWith[A, B, T, Coll[_]]
(xs: Coll[T])
(f: T => Either[A,B])
(implicit ev: Coll[T] <:< TraversableLike[T,Coll[T]],
cbf1: CanBuildFrom[Coll[T], A, Coll[A]],
cbf2: CanBuildFrom[Coll[T], B, Coll[B]]): (Coll[A],Coll[B]) = {
val b1 = cbf1(xs)
val b2 = cbf2(xs)
for(x <- xs) f(x) match {
case Left(a) => b1 += a
case Right(b) => b2 += b
}
b1.result() -> b2.result()
}
}
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
@oridag
Copy link

oridag commented Aug 16, 2017

That's interesting. Do you have an implementation for "Custom Increment" and "Custom Decrement" stages?

@hochgi
Copy link
Author

hochgi commented Jan 2, 2018

@oridag it was implemented in a much simpler way using BidiFlow. Check implementation at:
https://github.com/akka/akka-stream-contrib

There was no need for the custom increment & decrement.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment