Skip to content

Instantly share code, notes, and snippets.

@caniszczyk
Last active August 29, 2015 13:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save caniszczyk/8920758 to your computer and use it in GitHub Desktop.
Save caniszczyk/8920758 to your computer and use it in GitHub Desktop.
Connect to remote host Finagle / Netty
private[netty3] class ChannelConnector[In, Out](
newChannel: () => Channel,
newTransport: Channel => Transport[In, Out]
) extends (SocketAddress => Future[Transport[In, Out]]) {
def apply(addr: SocketAddress): Future[Transport[In, Out]] = {
require(addr != null)
val ch = try newChannel() catch {
case NonFatal(exc) => return Future.exception(exc) #1
}
// Transport is now bound to the channel; this is done prior to
// it being connected so we don't lose any messages.
val transport = newTransport(ch) #2 val connectFuture = ch.connect(addr) #3
val promise = new Promise[Transport[In, Out]] #4
promise setInterruptHandler { case _cause =>
// Propagate cancellations onto the netty future.
connectFuture.cancel()
}
connectFuture.addListener(new ChannelFutureListener {
def operationComplete(f: ChannelFuture) { #5
if (f.isSuccess) {
promise.setValue(transport)
} else if (f.isCancelled) {
promise.setException(
WriteException(new CancelledConnectionException))
} else {
promise.setException(WriteException(f.getCause)) }
}
})
promise onFailure { _ =>
Channels.close(ch)
}
}
}
// #1 Try to create a new Channel. If it fails wrap the exception in a Future and return immediately
// #2 Create a new Transport with the Channel
// #3 Connect the remote host asynchronously. The returned ChannelFuture will be notified once the connection completes
// #4 Create a new Promise which will be notified once the connect attempt finished
// #5 Handle the completion of the connectFuture by fulfilling the created promise
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment