Skip to content

Instantly share code, notes, and snippets.

@torao
Created August 23, 2016 12:59
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save torao/1e4a6bf3d2dbdd318c84918cf5c49c9c to your computer and use it in GitHub Desktop.
Save torao/1e4a6bf3d2dbdd318c84918cf5c49c9c to your computer and use it in GitHub Desktop.
WebSocket Client Example for Scala 2.11 with Netty 4
organization := "org.koiroha"
name := "websocket-test"
version := "1.0.0-SNAPSHOT"
scalaVersion := "2.11.7"
scalacOptions ++= Seq("-deprecation","-feature","-unchecked","-Xlint","-Ywarn-dead-code","-Ywarn-numeric-widen","-Ywarn-unused","-Ywarn-unused-import")
libraryDependencies ++= Seq(
"io.netty" % "netty-all" % "4.+"
)
// WebSocket Client Example for Scala 2.11 with Netty 4
// http://netty.io/4.0/xref/io/netty/example/http/websocketx/client/WebSocketClient.html
package org.koiroha.websocket
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.Unpooled
import io.netty.channel.{Channel,ChannelFuture,ChannelHandlerContext,ChannelInitializer,ChannelPipeline,ChannelPromise,EventLoopGroup,SimpleChannelInboundHandler}
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.http.{DefaultHttpHeaders,FullHttpResponse,HttpClientCodec,HttpObjectAggregator}
import io.netty.handler.codec.http.websocketx.{CloseWebSocketFrame,PingWebSocketFrame,PongWebSocketFrame,TextWebSocketFrame,WebSocketClientHandshaker,WebSocketClientHandshakerFactory,WebSocketFrame,WebSocketVersion}
import io.netty.handler.ssl.{SslContext,SslContextBuilder}
import io.netty.handler.ssl.util.{InsecureTrustManagerFactory,SelfSignedCertificate}
import io.netty.util.CharsetUtil
import java.io.{BufferedReader,InputStreamReader}
import java.net.URI
import scala.annotation.tailrec
object WebSocketClient extends App {
val WSURL = if(args.length == 0) "ws://echo.websocket.org" else args(0)
val uri = new URI(WSURL)
val scheme = Option(uri.getScheme).getOrElse("ws").toLowerCase
val host = Option(uri.getHost).getOrElse("127.0.0.1")
val port = if(uri.getPort < 0){
scheme match {
case "ws" => 80
case "wss" => 443
case _ => -1
}
} else uri.getPort
if(scheme != "ws" && scheme != "wss"){
System.err.println(s"ERROR: unsupported schema: $scheme")
System.exit(1)
}
val secure = scheme == "wss"
val sslContext = if(secure){
Some(SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build())
} else None
val group = new NioEventLoopGroup()
try {
val handler = new WebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, false, new DefaultHttpHeaders()))
val bootstrap = new Bootstrap()
bootstrap.group(group)
.channel(classOf[NioSocketChannel])
.handler(new ChannelInitializer[SocketChannel](){
override def initChannel(ch:SocketChannel){
val pipeline = ch.pipeline()
sslContext.foreach{ s => pipeline.addLast(s.newHandler(ch.alloc(), host, port)) }
pipeline.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), handler)
}
})
val ch = bootstrap.connect(uri.getHost, port).sync().channel()
handler.handshakeFuture.sync()
val in = new BufferedReader(new InputStreamReader(System.in))
@tailrec
def _loop():Unit = {
in.readLine() match {
case null => ()
case "quit" =>
ch.writeAndFlush(new CloseWebSocketFrame())
ch.closeFuture().sync()
case "ping" =>
ch.writeAndFlush(new PingWebSocketFrame(Unpooled.wrappedBuffer(Array[Byte](8,1,8,1))))
_loop()
case msg =>
ch.writeAndFlush(new TextWebSocketFrame(msg))
_loop()
}
}
_loop()
} finally {
group.shutdownGracefully()
}
}
private class WebSocketClientHandler(handshaker:WebSocketClientHandshaker) extends SimpleChannelInboundHandler[AnyRef] {
var handshakeFuture:ChannelPromise = _
override def handlerAdded(ctx:ChannelHandlerContext):Unit = handshakeFuture = ctx.newPromise()
override def channelActive(ctx:ChannelHandlerContext):Unit = handshaker.handshake(ctx.channel())
override def channelInactive(ctx:ChannelHandlerContext):Unit = System.out.println("WebSocket Client Disconnected")
override def channelRead0(ctx:ChannelHandlerContext, msg:AnyRef):Unit = {
val ch = ctx.channel()
if(! handshaker.isHandshakeComplete){
handshaker.finishHandshake(ch, msg.asInstanceOf[FullHttpResponse])
System.out.println("WebSocket Client Connected")
handshakeFuture.setSuccess()
} else msg match {
case res:FullHttpResponse =>
throw new IllegalStateException(s"ERROR: Unexpected FullHttpResponse (status=${res.status.code}, content=${res.content().toString(CharsetUtil.UTF_8)})")
case text:TextWebSocketFrame =>
System.out.println(s"<< ${text.text()}")
case pong:PongWebSocketFrame =>
System.out.println("!! PONG")
case ping:PingWebSocketFrame =>
System.out.println("!! PING")
ch.writeAndFlush(new PongWebSocketFrame(Unpooled.wrappedBuffer(Array[Byte](8,1,8,1))))
case close:CloseWebSocketFrame =>
System.out.println("WebSocket Client Received Closing")
ch.close()
}
}
override def exceptionCaught(ctx:ChannelHandlerContext, cause:Throwable):Unit = {
cause.printStackTrace()
if(! handshakeFuture.isDone()){
handshakeFuture.setFailure(cause)
}
ctx.close()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment