Skip to content

Instantly share code, notes, and snippets.

@ts7i
Created December 14, 2011 18:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ts7i/1477900 to your computer and use it in GitHub Desktop.
Save ts7i/1477900 to your computer and use it in GitHub Desktop.
Akasaka.scala 26 Learning Netty Example
package org.ts7i.example.netty
import java.net.InetSocketAddress
import java.util.concurrent.Executors
import org.jboss.netty.bootstrap.ServerBootstrap
import org.jboss.netty.buffer.ChannelBuffers
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.channel.Channel
import org.jboss.netty.channel.ChannelDownstreamHandler
import org.jboss.netty.channel.ChannelEvent
import org.jboss.netty.channel.ChannelFuture
import org.jboss.netty.channel.ChannelFutureListener
import org.jboss.netty.channel.ChannelHandlerContext
import org.jboss.netty.channel.ChannelPipeline
import org.jboss.netty.channel.ChannelPipelineFactory
import org.jboss.netty.channel.ChannelUpstreamHandler
import org.jboss.netty.channel.Channels
import org.jboss.netty.channel.ExceptionEvent
import org.jboss.netty.channel.MessageEvent
import org.jboss.netty.channel.SimpleChannelUpstreamHandler
import org.jboss.netty.handler.codec.http.DefaultHttpResponse
import org.jboss.netty.handler.codec.http.HttpChunkAggregator
import org.jboss.netty.handler.codec.http.HttpHeaders
import org.jboss.netty.handler.codec.http.HttpRequest
import org.jboss.netty.handler.codec.http.HttpRequestDecoder
import org.jboss.netty.handler.codec.http.HttpResponseEncoder
import org.jboss.netty.handler.codec.http.HttpResponseStatus
import org.jboss.netty.handler.codec.http.HttpVersion
import org.jboss.netty.handler.codec.http.QueryStringDecoder
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder
import org.jboss.netty.util.CharsetUtil
object WordCountServer {
def main(args: Array[String]): Unit = {
val bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()))
bootstrap.setPipelineFactory(new WordCountServerPipelineFactory)
bootstrap.bind(new InetSocketAddress(3000))
}
}
class WordCountServerPipelineFactory extends ChannelPipelineFactory {
override def getPipeline(): ChannelPipeline = {
val pipeline = Channels.pipeline
pipeline.addLast("simpleLoggingHandler", new SimpleLoggingHandler)
pipeline.addLast("httpRequestDecoder", new HttpRequestDecoder)
pipeline.addLast("httpChunkAggregator", new HttpChunkAggregator(65536))
pipeline.addLast("httpResponseEncoder", new HttpResponseEncoder)
pipeline.addLast("wordCountRequestDecoder", new WordCountRequestDecoder)
pipeline.addLast("wordCountResponseEncoder", new WordCountResponseEncoder)
pipeline.addLast("wordCountHandler", new WordCountHandler)
pipeline
}
}
class SimpleLoggingHandler extends ChannelUpstreamHandler with ChannelDownstreamHandler {
override def handleUpstream(ctx: ChannelHandlerContext, e: ChannelEvent): Unit = {
log(e)
ctx.sendUpstream(e)
}
override def handleDownstream(ctx: ChannelHandlerContext, e: ChannelEvent): Unit = {
log(e)
ctx.sendDownstream(e)
}
def log(e: ChannelEvent): Unit = {
println("thread=" + Thread.currentThread + ", event=" + e)
}
}
case class WordCountRequest(body: String)
class WordCountRequestDecoder extends OneToOneDecoder {
override def decode(ctx: ChannelHandlerContext, channel: Channel, msg: Any) = {
msg match {
case httpRequest: HttpRequest => {
val queryStringDecoder = new QueryStringDecoder(httpRequest.getUri())
val body = queryStringDecoder.getParameters().get("body").get(0)
WordCountRequest(body)
}
}
}
}
case class WordCountResponse(words: Map[String, Int])
class WordCountResponseEncoder extends OneToOneEncoder {
override def encode(ctx: ChannelHandlerContext, channel: Channel, msg: Any) = {
msg match {
case WordCountResponse(words) => {
val response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)
response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "text/plain: charset=UTF-8")
val result = words.toList.sortWith((a, b) => a._2 > b._2).toString
response.setContent(ChannelBuffers.copiedBuffer(result + "\r\n", CharsetUtil.UTF_8))
response
}
}
}
}
class WordCountHandler extends SimpleChannelUpstreamHandler {
override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = {
e.getMessage match {
case WordCountRequest(body) => {
val words = body.split("[ ,.!]+").toList.groupBy(identity).map(t => (t._1, t._2.size))
val wordCountResponse = WordCountResponse(words)
val future = e.getChannel().write(wordCountResponse)
future.addListener(ChannelFutureListener.CLOSE);
}
}
}
override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = {
e.getCause().printStackTrace();
e.getChannel().close();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment