Skip to content

Instantly share code, notes, and snippets.

@caniszczyk caniszczyk/gist:8920758
Last active Aug 29, 2015

Embed
What would you like to do?
Connect to remote host Finagle / Netty
private[netty3] class ChannelConnector[In, Out](
newChannel: () => Channel,
newTransport: Channel => Transport[In, Out]
) extends (SocketAddress => Future[Transport[In, Out]]) {
def apply(addr: SocketAddress): Future[Transport[In, Out]] = {
require(addr != null)
val ch = try newChannel() catch {
case NonFatal(exc) => return Future.exception(exc) #1
}
// Transport is now bound to the channel; this is done prior to
// it being connected so we don't lose any messages.
val transport = newTransport(ch) #2 val connectFuture = ch.connect(addr) #3
val promise = new Promise[Transport[In, Out]] #4
promise setInterruptHandler { case _cause =>
// Propagate cancellations onto the netty future.
connectFuture.cancel()
}
connectFuture.addListener(new ChannelFutureListener {
def operationComplete(f: ChannelFuture) { #5
if (f.isSuccess) {
promise.setValue(transport)
} else if (f.isCancelled) {
promise.setException(
WriteException(new CancelledConnectionException))
} else {
promise.setException(WriteException(f.getCause)) }
}
})
promise onFailure { _ =>
Channels.close(ch)
}
}
}
// #1 Try to create a new Channel. If it fails wrap the exception in a Future and return immediately
// #2 Create a new Transport with the Channel
// #3 Connect the remote host asynchronously. The returned ChannelFuture will be notified once the connection completes
// #4 Create a new Promise which will be notified once the connect attempt finished
// #5 Handle the completion of the connectFuture by fulfilling the created promise
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.