Skip to content

Instantly share code, notes, and snippets.

@tarsa
Created August 6, 2016 21:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tarsa/2b7531d1726b960a64303f6c70de735f to your computer and use it in GitHub Desktop.
Save tarsa/2b7531d1726b960a64303f6c70de735f to your computer and use it in GitHub Desktop.
ReconnectingProxy
package akka.stream
import akka.{Done, NotUsed}
import akka.stream.scaladsl.{Flow, Source}
import akka.stream.stage._
import akka.util.ByteString
import scala.concurrent.{Future, Promise}
final class ReconnectingProxy[T, M](
flow: Graph[FlowShape[ByteString, T], M],
seedSource: Graph[SourceShape[ByteString], NotUsed],
newSeedSourceFromLastElement: T => Graph[SourceShape[ByteString], NotUsed])
extends GraphStageWithMaterializedValue[SourceShape[T], Future[Done]] {
val out = Outlet[T]("lol")
override def shape: SourceShape[T] = SourceShape[T](out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes):
(GraphStageLogic, Future[Done]) = {
val p = Promise[Done]
(new GraphStageLogic(shape) {
var lastElement: Option[T] = None
setHandler(out, new OutHandler {
override def onPull(): Unit = ??? // that's risky, isn't it?
})
override def preStart(): Unit = init()
def init(): Unit = {
val sinkIn = new SubSinkInlet[T]("RecoverWithSink")
sinkIn.setHandler(new InHandler {
override def onPush(): Unit =
if (isAvailable(out)) {
val grabbed = sinkIn.grab()
lastElement = Some(grabbed)
push(out, grabbed)
sinkIn.pull()
}
override def onUpstreamFinish(): Unit =
if (!sinkIn.isAvailable) completeStage()
override def onUpstreamFailure(ex: Throwable) =
init()
})
def pushOut(): Unit = {
val grabbed = sinkIn.grab()
lastElement = Some(grabbed)
push(out, grabbed)
if (!sinkIn.isClosed) sinkIn.pull()
else completeStage()
}
val outHandler = new OutHandler {
override def onPull(): Unit = {
if (sinkIn.isAvailable) pushOut()
}
override def onDownstreamFinish(): Unit = {
sinkIn.cancel()
p.success(Done)
}
}
val sourceGraph =
lastElement.fold(seedSource)(newSeedSourceFromLastElement)
Source.fromGraph(sourceGraph)
.via(Flow.fromGraph(flow))
// interpreter is not visible outside 'akka' package
.runWith(sinkIn.sink)(interpreter.subFusingMaterializer)
setHandler(out, outHandler)
sinkIn.pull()
}
}, p.future)
}
override def toString: String = "ReconnectingProxy"
}
package sss
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Framing, Sink, Source, Tcp}
import akka.stream.{ActorMaterializer, ReconnectingProxy}
import akka.util.ByteString
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Random
object Stream {
implicit val system = ActorSystem("abcd")
implicit val materializer = ActorMaterializer()
val host = "127.0.0.1"
val port = 8888
def main(args: Array[String]): Unit = {
try {
logic()
} finally {
system.terminate()
}
}
val prng = new Random()
def fail: Nothing =
throw new Exception("fail")
def logic(): Unit = {
val wonkyServerFlow = Flow[ByteString]
.via(Framing.simpleFramingProtocolDecoder(1000))
.take(1)
.map(_.utf8String.toLong)
.flatMapConcat(start => Source.unfold(start)(s => Some(s + 1, s)))
.map(element => if (prng.nextDouble() < 0.05) fail else element)
.map(i => ByteString(i.toString))
.via(Framing.simpleFramingProtocolEncoder(1000))
val wonkyServerConnections = Tcp().bind(host, port)
wonkyServerConnections.runForeach { connection =>
println(s"New connection from: ${connection.remoteAddress}")
connection.handleWith(wonkyServerFlow)
}
val reconnectingProxy = {
def newSeedSourceFromLastElement(element: String) =
Source.single(ByteString((element.toLong + 1).toString))
.via(Framing.simpleFramingProtocolEncoder(1000))
val initialSource = newSeedSourceFromLastElement("-1")
val connection = Tcp().outgoingConnection(host, port)
.via(Framing.simpleFramingProtocolDecoder(1000))
.map(_.utf8String)
Source.fromGraph(new ReconnectingProxy(
connection, initialSource, newSeedSourceFromLastElement))
}
val doneFut = reconnectingProxy.take(123).runWith(Sink.foreach(println))
Await.result(doneFut, Duration.Inf)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment