Skip to content

Instantly share code, notes, and snippets.

@tonosaman
Created October 4, 2011 09:08
Show Gist options
  • Save tonosaman/1261200 to your computer and use it in GitHub Desktop.
Save tonosaman/1261200 to your computer and use it in GitHub Desktop.
finagle ping pong
// scala>:load <thisfile>
// stop by <Ctrl-C>
import com.twitter.finagle.stream._
import com.twitter.finagle.Service
import com.twitter.concurrent._
import com.twitter.finagle.builder.{ClientBuilder, ServerBuilder}
import org.jboss.netty.handler.codec.http.{HttpMethod, HttpVersion, DefaultHttpRequest, HttpRequest}
import org.jboss.netty.buffer.{ChannelBuffers, ChannelBuffer}
import com.twitter.util.{Future, CountDownLatch, Promise, Return}
import com.twitter.conversions.time._
import java.nio.charset.Charset
class SimpleService extends Service[Channel[ChannelBuffer], Channel[ChannelBuffer]] {
var input: Promise[Channel[ChannelBuffer]] = new Promise[Channel[ChannelBuffer]]
var output: ChannelSource[ChannelBuffer] = new ChannelSource[ChannelBuffer]
def apply(channel: Channel[ChannelBuffer]) = {
input() = Return(channel)
Future.value(output)
}
}
val address = com.twitter.util.RandomSocket()
val service = new SimpleService
val server = ServerBuilder().codec(new DuplexStreamCodec(true)).bindTo(address).name("SimpleService").build(service)
val clFactory: com.twitter.finagle.ServiceFactory[Channel[ChannelBuffer], Channel[ChannelBuffer]] = ClientBuilder().codec(new DuplexStreamCodec(true)).hosts(Seq(address)).hostConnectionLimit(1).buildFactory()
val client = clFactory.make()()
val clOutbound = new ChannelSource[ChannelBuffer]
val clInbound = client(clOutbound)()
clInbound.respond { buf => val msg = buf.toString(Charset.defaultCharset); println("ping: " + msg); clOutbound.send(ChannelBuffers.wrappedBuffer(msg.getBytes)); Future.Unit }
service.input().respond { buf: ChannelBuffer => val msg = buf.toString(Charset.defaultCharset); println("pong:" + msg); service.output.send(ChannelBuffers.wrappedBuffer(msg.reverse.getBytes)); Future.Unit }
clOutbound.send(ChannelBuffers.wrappedBuffer("hoge".getBytes))
@tonosaman
Copy link
Author

// build.sbt

name := "finagle-pingpong"

organization := "net.tono"

version := "0.1"

scalaVersion := "2.8.1"

shellPrompt in ThisBuild := { state => Project.extract(state).currentRef.project + "> " }

resolvers ++= Seq(
"Scala Tools Releases" at "http://scala-tools.org/repo-releases",
"Scala Tools Snapshots" at "http://scala-tools.org/repo-snapshots",
"Twitter" at "http://maven.twttr.com/"
)

// @see Dependencies at http://etorreborre.github.com/specs2/
libraryDependencies ++= Seq(
"org.specs2" %% "specs2" % "1.5"
)

testFrameworks += TestFrameworks.Specs2

libraryDependencies ++= Seq(
"com.twitter" % "util-codec" % "1.11.7",
"com.twitter" % "finagle-stream" % "1.9.1"
)

// define the statements initially evaluated when entering 'console', 'console-quick', or 'console-project'
initialCommands := """
import System.{currentTimeMillis => now}
def time[T](f: => T): T = {
val start = now
try { f } finally { println("Elapsed: " + (now - start)/1000.0 + " s") }
}
"""

scalacOptions ++= Seq(
"-deprecation",
"-unchecked"
)

// set the initial commands when entering 'console' only
//initialCommands in console := "import org.specs2._"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment