Created
December 14, 2011 18:44
-
-
Save ts7i/1477900 to your computer and use it in GitHub Desktop.
Akasaka.scala 26 Learning Netty Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.ts7i.example.netty | |
import java.net.InetSocketAddress | |
import java.util.concurrent.Executors | |
import org.jboss.netty.bootstrap.ServerBootstrap | |
import org.jboss.netty.buffer.ChannelBuffers | |
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory | |
import org.jboss.netty.channel.Channel | |
import org.jboss.netty.channel.ChannelDownstreamHandler | |
import org.jboss.netty.channel.ChannelEvent | |
import org.jboss.netty.channel.ChannelFuture | |
import org.jboss.netty.channel.ChannelFutureListener | |
import org.jboss.netty.channel.ChannelHandlerContext | |
import org.jboss.netty.channel.ChannelPipeline | |
import org.jboss.netty.channel.ChannelPipelineFactory | |
import org.jboss.netty.channel.ChannelUpstreamHandler | |
import org.jboss.netty.channel.Channels | |
import org.jboss.netty.channel.ExceptionEvent | |
import org.jboss.netty.channel.MessageEvent | |
import org.jboss.netty.channel.SimpleChannelUpstreamHandler | |
import org.jboss.netty.handler.codec.http.DefaultHttpResponse | |
import org.jboss.netty.handler.codec.http.HttpChunkAggregator | |
import org.jboss.netty.handler.codec.http.HttpHeaders | |
import org.jboss.netty.handler.codec.http.HttpRequest | |
import org.jboss.netty.handler.codec.http.HttpRequestDecoder | |
import org.jboss.netty.handler.codec.http.HttpResponseEncoder | |
import org.jboss.netty.handler.codec.http.HttpResponseStatus | |
import org.jboss.netty.handler.codec.http.HttpVersion | |
import org.jboss.netty.handler.codec.http.QueryStringDecoder | |
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder | |
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder | |
import org.jboss.netty.util.CharsetUtil | |
object WordCountServer { | |
def main(args: Array[String]): Unit = { | |
val bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory( | |
Executors.newCachedThreadPool(), | |
Executors.newCachedThreadPool())) | |
bootstrap.setPipelineFactory(new WordCountServerPipelineFactory) | |
bootstrap.bind(new InetSocketAddress(3000)) | |
} | |
} | |
class WordCountServerPipelineFactory extends ChannelPipelineFactory { | |
override def getPipeline(): ChannelPipeline = { | |
val pipeline = Channels.pipeline | |
pipeline.addLast("simpleLoggingHandler", new SimpleLoggingHandler) | |
pipeline.addLast("httpRequestDecoder", new HttpRequestDecoder) | |
pipeline.addLast("httpChunkAggregator", new HttpChunkAggregator(65536)) | |
pipeline.addLast("httpResponseEncoder", new HttpResponseEncoder) | |
pipeline.addLast("wordCountRequestDecoder", new WordCountRequestDecoder) | |
pipeline.addLast("wordCountResponseEncoder", new WordCountResponseEncoder) | |
pipeline.addLast("wordCountHandler", new WordCountHandler) | |
pipeline | |
} | |
} | |
class SimpleLoggingHandler extends ChannelUpstreamHandler with ChannelDownstreamHandler { | |
override def handleUpstream(ctx: ChannelHandlerContext, e: ChannelEvent): Unit = { | |
log(e) | |
ctx.sendUpstream(e) | |
} | |
override def handleDownstream(ctx: ChannelHandlerContext, e: ChannelEvent): Unit = { | |
log(e) | |
ctx.sendDownstream(e) | |
} | |
def log(e: ChannelEvent): Unit = { | |
println("thread=" + Thread.currentThread + ", event=" + e) | |
} | |
} | |
case class WordCountRequest(body: String) | |
class WordCountRequestDecoder extends OneToOneDecoder { | |
override def decode(ctx: ChannelHandlerContext, channel: Channel, msg: Any) = { | |
msg match { | |
case httpRequest: HttpRequest => { | |
val queryStringDecoder = new QueryStringDecoder(httpRequest.getUri()) | |
val body = queryStringDecoder.getParameters().get("body").get(0) | |
WordCountRequest(body) | |
} | |
} | |
} | |
} | |
case class WordCountResponse(words: Map[String, Int]) | |
class WordCountResponseEncoder extends OneToOneEncoder { | |
override def encode(ctx: ChannelHandlerContext, channel: Channel, msg: Any) = { | |
msg match { | |
case WordCountResponse(words) => { | |
val response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK) | |
response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "text/plain: charset=UTF-8") | |
val result = words.toList.sortWith((a, b) => a._2 > b._2).toString | |
response.setContent(ChannelBuffers.copiedBuffer(result + "\r\n", CharsetUtil.UTF_8)) | |
response | |
} | |
} | |
} | |
} | |
class WordCountHandler extends SimpleChannelUpstreamHandler { | |
override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = { | |
e.getMessage match { | |
case WordCountRequest(body) => { | |
val words = body.split("[ ,.!]+").toList.groupBy(identity).map(t => (t._1, t._2.size)) | |
val wordCountResponse = WordCountResponse(words) | |
val future = e.getChannel().write(wordCountResponse) | |
future.addListener(ChannelFutureListener.CLOSE); | |
} | |
} | |
} | |
override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = { | |
e.getCause().printStackTrace(); | |
e.getChannel().close(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment