Created
October 4, 2011 09:08
-
-
Save tonosaman/1261200 to your computer and use it in GitHub Desktop.
finagle ping pong
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// scala>:load <thisfile> | |
// stop by <Ctrl-C> | |
import com.twitter.finagle.stream._ | |
import com.twitter.finagle.Service | |
import com.twitter.concurrent._ | |
import com.twitter.finagle.builder.{ClientBuilder, ServerBuilder} | |
import org.jboss.netty.handler.codec.http.{HttpMethod, HttpVersion, DefaultHttpRequest, HttpRequest} | |
import org.jboss.netty.buffer.{ChannelBuffers, ChannelBuffer} | |
import com.twitter.util.{Future, CountDownLatch, Promise, Return} | |
import com.twitter.conversions.time._ | |
import java.nio.charset.Charset | |
class SimpleService extends Service[Channel[ChannelBuffer], Channel[ChannelBuffer]] { | |
var input: Promise[Channel[ChannelBuffer]] = new Promise[Channel[ChannelBuffer]] | |
var output: ChannelSource[ChannelBuffer] = new ChannelSource[ChannelBuffer] | |
def apply(channel: Channel[ChannelBuffer]) = { | |
input() = Return(channel) | |
Future.value(output) | |
} | |
} | |
val address = com.twitter.util.RandomSocket() | |
val service = new SimpleService | |
val server = ServerBuilder().codec(new DuplexStreamCodec(true)).bindTo(address).name("SimpleService").build(service) | |
val clFactory: com.twitter.finagle.ServiceFactory[Channel[ChannelBuffer], Channel[ChannelBuffer]] = ClientBuilder().codec(new DuplexStreamCodec(true)).hosts(Seq(address)).hostConnectionLimit(1).buildFactory() | |
val client = clFactory.make()() | |
val clOutbound = new ChannelSource[ChannelBuffer] | |
val clInbound = client(clOutbound)() | |
clInbound.respond { buf => val msg = buf.toString(Charset.defaultCharset); println("ping: " + msg); clOutbound.send(ChannelBuffers.wrappedBuffer(msg.getBytes)); Future.Unit } | |
service.input().respond { buf: ChannelBuffer => val msg = buf.toString(Charset.defaultCharset); println("pong:" + msg); service.output.send(ChannelBuffers.wrappedBuffer(msg.reverse.getBytes)); Future.Unit } | |
clOutbound.send(ChannelBuffers.wrappedBuffer("hoge".getBytes)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
// build.sbt
name := "finagle-pingpong"
organization := "net.tono"
version := "0.1"
scalaVersion := "2.8.1"
shellPrompt in ThisBuild := { state => Project.extract(state).currentRef.project + "> " }
resolvers ++= Seq(
"Scala Tools Releases" at "http://scala-tools.org/repo-releases",
"Scala Tools Snapshots" at "http://scala-tools.org/repo-snapshots",
"Twitter" at "http://maven.twttr.com/"
)
// @see Dependencies at http://etorreborre.github.com/specs2/
libraryDependencies ++= Seq(
"org.specs2" %% "specs2" % "1.5"
)
testFrameworks += TestFrameworks.Specs2
libraryDependencies ++= Seq(
"com.twitter" % "util-codec" % "1.11.7",
"com.twitter" % "finagle-stream" % "1.9.1"
)
// define the statements initially evaluated when entering 'console', 'console-quick', or 'console-project'
initialCommands := """
import System.{currentTimeMillis => now}
def time[T](f: => T): T = {
val start = now
try { f } finally { println("Elapsed: " + (now - start)/1000.0 + " s") }
}
"""
scalacOptions ++= Seq(
"-deprecation",
"-unchecked"
)
// set the initial commands when entering 'console' only
//initialCommands in console := "import org.specs2._"