Last active
January 12, 2017 06:18
-
-
Save joesan/085023d9d2256f73daf5de48024dcd1a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class MyObserver(actorRef: ActorRef, sourceName: String) | |
(implicit s: Scheduler) extends Subscriber[String] { | |
private[this] def logger = LoggerFactory.getLogger(this.getClass) | |
override implicit def scheduler: Scheduler = s | |
override def onError(ex: Throwable): Unit = { | |
logger.error(s"error happened when processing the stream: error message << ${ex.printStackTrace()} >>") | |
} | |
override def onComplete(): Unit = { | |
logger.info("stream completed") | |
} | |
override def onNext(elem: String): Future[Ack] = { | |
logger.info(s"message received from source $sourceName --> $elem") | |
val tick = Tick(sourceName, elem) | |
actorRef ! tick | |
Continue | |
} | |
} | |
object MyObserver { | |
def apply(actorRef: ActorRef, sourceName: String)(implicit s: Scheduler) = { | |
new MyObserver(actorRef, sourceName)(s) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SocketObservable(in: BufferedReader) extends Observable[String] { | |
override def unsafeSubscribeFn(subscriber: Subscriber[String]): Cancelable = { | |
Observable.interval(1.second).map(_ => in.readLine()).subscribe(subscriber) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class TLSConnectorActor(host: String, port: Int, globalChannel: GlobalOutputChannel)(implicit s: Scheduler) | |
extends Actor with ActorLogging { | |
private[this] val subscription = MultiAssignmentCancelable() | |
override def preStart = { | |
super.preStart | |
self ! Init | |
} | |
override def postStop = { | |
subscription.cancel() | |
log.info(s"cancelling all subscriptions :: isCancelled ${subscription.isCanceled}") | |
} | |
val socketCancelable = SingleAssignmentCancelable() | |
def doConnection: Option[TLSConnection] = { | |
def initiateTLSConnection = scala.concurrent.blocking { | |
val socket = new Socket(host, port) | |
val bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream)) | |
TLSConnection(socket, bufferedReader) | |
} | |
try { | |
val tLSConnection = initiateTLSConnection | |
// our cancelable must be able to cleanly dispose of our connection | |
socketCancelable := BooleanCancelable { () => | |
blocking { | |
println("closing shit !!!!!") | |
try tLSConnection.in.close() | |
finally tLSConnection.socket.close() | |
} | |
} | |
if (tLSConnection.in.readLine() == null || tLSConnection.in.read() == -1) { | |
log.info(s"still connection unavailable... will retry") | |
None | |
} else { | |
log.info(s"successfully connected") | |
Some(tLSConnection) | |
} | |
} catch { | |
case NonFatal(ex) => | |
log.error(ex.getMessage) | |
socketCancelable.cancel() | |
None | |
} | |
} | |
private def init(tLSConnection: TLSConnection) = { | |
// 1. our observer | |
val tlsSubscriber = MyObserver.apply(self, "cold-subscriber") | |
// 2. our Observable | |
val tlsObservable = new SocketObservable(tLSConnection.in) | |
// 3. marry | |
subscription := tlsObservable.unsafeSubscribeFn(tlsSubscriber) | |
} | |
override def receive: Receive = { | |
case Init => | |
doConnection match { | |
case Some(tLSConnection) => init(tLSConnection) | |
case None => | |
socketCancelable.cancel() | |
subscription.cancel() | |
context.system.scheduler.scheduleOnce(10.seconds, self, Init) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment