Skip to content

Instantly share code, notes, and snippets.

@joesan
Last active January 12, 2017 06:18
Show Gist options
  • Save joesan/085023d9d2256f73daf5de48024dcd1a to your computer and use it in GitHub Desktop.
Save joesan/085023d9d2256f73daf5de48024dcd1a to your computer and use it in GitHub Desktop.
class MyObserver(actorRef: ActorRef, sourceName: String)
(implicit s: Scheduler) extends Subscriber[String] {
private[this] def logger = LoggerFactory.getLogger(this.getClass)
override implicit def scheduler: Scheduler = s
override def onError(ex: Throwable): Unit = {
logger.error(s"error happened when processing the stream: error message << ${ex.printStackTrace()} >>")
}
override def onComplete(): Unit = {
logger.info("stream completed")
}
override def onNext(elem: String): Future[Ack] = {
logger.info(s"message received from source $sourceName --> $elem")
val tick = Tick(sourceName, elem)
actorRef ! tick
Continue
}
}
object MyObserver {
def apply(actorRef: ActorRef, sourceName: String)(implicit s: Scheduler) = {
new MyObserver(actorRef, sourceName)(s)
}
}
class SocketObservable(in: BufferedReader) extends Observable[String] {
override def unsafeSubscribeFn(subscriber: Subscriber[String]): Cancelable = {
Observable.interval(1.second).map(_ => in.readLine()).subscribe(subscriber)
}
}
class TLSConnectorActor(host: String, port: Int, globalChannel: GlobalOutputChannel)(implicit s: Scheduler)
extends Actor with ActorLogging {
private[this] val subscription = MultiAssignmentCancelable()
override def preStart = {
super.preStart
self ! Init
}
override def postStop = {
subscription.cancel()
log.info(s"cancelling all subscriptions :: isCancelled ${subscription.isCanceled}")
}
val socketCancelable = SingleAssignmentCancelable()
def doConnection: Option[TLSConnection] = {
def initiateTLSConnection = scala.concurrent.blocking {
val socket = new Socket(host, port)
val bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream))
TLSConnection(socket, bufferedReader)
}
try {
val tLSConnection = initiateTLSConnection
// our cancelable must be able to cleanly dispose of our connection
socketCancelable := BooleanCancelable { () =>
blocking {
println("closing shit !!!!!")
try tLSConnection.in.close()
finally tLSConnection.socket.close()
}
}
if (tLSConnection.in.readLine() == null || tLSConnection.in.read() == -1) {
log.info(s"still connection unavailable... will retry")
None
} else {
log.info(s"successfully connected")
Some(tLSConnection)
}
} catch {
case NonFatal(ex) =>
log.error(ex.getMessage)
socketCancelable.cancel()
None
}
}
private def init(tLSConnection: TLSConnection) = {
// 1. our observer
val tlsSubscriber = MyObserver.apply(self, "cold-subscriber")
// 2. our Observable
val tlsObservable = new SocketObservable(tLSConnection.in)
// 3. marry
subscription := tlsObservable.unsafeSubscribeFn(tlsSubscriber)
}
override def receive: Receive = {
case Init =>
doConnection match {
case Some(tLSConnection) => init(tLSConnection)
case None =>
socketCancelable.cancel()
subscription.cancel()
context.system.scheduler.scheduleOnce(10.seconds, self, Init)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment