Skip to content

Instantly share code, notes, and snippets.

@hikoz
Created May 19, 2012 08:19
Show Gist options
  • Save hikoz/2730062 to your computer and use it in GitHub Desktop.
Save hikoz/2730062 to your computer and use it in GitHub Desktop.
finagle failover
import java.net.InetSocketAddress
import org.jboss.netty.channel.ChannelPipelineFactory
import org.jboss.netty.channel.Channels
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder
import org.jboss.netty.handler.codec.frame.Delimiters
import org.jboss.netty.handler.codec.string.StringDecoder
import org.jboss.netty.handler.codec.string.StringEncoder
import org.jboss.netty.util.CharsetUtil
import com.twitter.finagle.builder.ClientBuilder
import com.twitter.finagle.builder.ServerBuilder
import com.twitter.finagle.Codec
import com.twitter.finagle.CodecFactory
import com.twitter.finagle.Service
import com.twitter.util.TimeConversions.intToTimeableNumber
import com.twitter.util.Future
object EchoClient {
def send(message: String): Future[String] = {
val client: Service[String, String] = ClientBuilder()
.codec(StringCodec)
.hosts("localhost:8080,localhost:8081")
.hostConnectionLimit(1)
// these 2 required for failover
.expFailFast(true)
.retries(2)
.build()
val f = client(message + "\n")
f onSuccess { result =>
println("Received result asynchronously: " + result)
} onFailure { e =>
e
} ensure {
client.release()
}
f
}
}
object EchoServer {
val server = (p: Int, s: String) => {
val service = new Service[String, String] {
def apply(request: String) = Future.value(s + ":" + request)
}
ServerBuilder()
.codec(StringCodec)
.bindTo(new InetSocketAddress(p))
.name("echoserver")
.build(service)
}
}
object StringCodec extends StringCodec
class StringCodec extends CodecFactory[String, String] {
def server = Function.const {
new Codec[String, String] {
def pipelineFactory = new ChannelPipelineFactory {
def getPipeline = {
val pipeline = Channels.pipeline()
pipeline.addLast("line",
new DelimiterBasedFrameDecoder(100, Delimiters.lineDelimiter: _*))
pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8))
pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8))
pipeline
}
}
}
}
def client = Function.const {
new Codec[String, String] {
def pipelineFactory = new ChannelPipelineFactory {
def getPipeline = {
val pipeline = Channels.pipeline()
pipeline.addLast("stringEncode", new StringEncoder(CharsetUtil.UTF_8))
pipeline.addLast("stringDecode", new StringDecoder(CharsetUtil.UTF_8))
pipeline
}
}
}
}
}
import org.junit.runner.RunWith
import org.specs2.mutable._
import org.specs2.runner.JUnitRunner
import scala.actors.Futures._
import com.twitter.util.Duration
import com.twitter.conversions.time.RichWholeNumber
import com.twitter.finagle.WriteException
@RunWith(classOf[JUnitRunner])
object EchoSpec extends Specification {
"failover" should {
"a" in {
val s1 = EchoServer.server(8080, "1")
val s2 = EchoServer.server(8081, "2")
EchoClient.send("foo")() === "1:foo"
s1.close((1: RichWholeNumber).second)
EchoClient.send("bar")() === "2:bar"
val s3 = EchoServer.server(8080, "3")
EchoClient.send("baz")() === "3:baz"
s2.close()
s3.close()
EchoClient.send("quux")() must throwA[WriteException]
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment