Skip to content

Instantly share code, notes, and snippets.

@ymnk
Created December 29, 2010 04:59
Show Gist options
  • Save ymnk/758197 to your computer and use it in GitHub Desktop.
Save ymnk/758197 to your computer and use it in GitHub Desktop.
import scala.actors.{Actor, TIMEOUT, Exit}
import scala.actors.Actor.State.{New, Terminated}
import scala.util.logging.{Logged, ConsoleLogger}
import scala.util.control.Exception.allCatch
import scala.collection.JavaConversions._
import scala.util.Random
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.{SelectionKey, Selector, ServerSocketChannel, SocketChannel}
import java.nio.charset.Charset
import java.io.IOException
object EchoServer {
def main (args:Array[String]) {
val supervisor = new EchoServerSupervisor() with EchoServerConsoleLoggerFactory
supervisor.start
Thread.sleep(60000)
supervisor.stop
}
}
sealed abstract class SupervisorMessage
case class Link(childActor: Actor) extends SupervisorMessage
case object Stop extends SupervisorMessage
class EchoServerSupervisor(
port: Int = 10000
) extends Actor with EchoServerLoggerFactory {
trapExit = true
val logger = makeLogger()
val acceptor = new EchoServerAcceptor(this, logger, port)
def act() {
startChildren()
loop {
react {
case Link(child: Actor) =>
link(child)
case Exit(child: Actor, 'normal) if child == acceptor =>
exit("stop")
case Exit(child: Actor, 'normal) =>
case Exit(child: Actor, reason: Exception) =>
logger.write("receive Exit: %s" format reason.getMessage)
restartChild(child)
case Exit(child: Actor, reason) =>
logger.write("receive Exit: %s" format reason)
restartChild(child)
case Stop =>
acceptor.stop()
case unknown =>
logger.write("unknown message [%s], ignoring" format unknown)
}
}
}
def startChildren() {
Seq(logger, acceptor) foreach { child =>
child.getState match {
case New => startChild(child)
case Terminated => exit("Could not restart server.")
case _ =>
}
}
}
def startChild(child: Actor) {
link(child)
child.start
}
def restartChild(child: Actor) {
link(child)
child.restart
}
def stop = this ! Stop
}
sealed abstract class LoggerMessage
case class Log(message: String) extends LoggerMessage
class EchoServerLogger extends Actor with Logged {
def act = loop {
react {
case Log(message) => log(message)
case unknown => log("unknown message [%s], ignoring" format unknown)
}
}
def write(message: String) {
if (this.mailboxSize < 100) {
this ! Log(message)
}
}
}
trait EchoServerLoggerFactory {
def makeLogger(): EchoServerLogger =
new EchoServerLogger()
}
trait EchoServerConsoleLoggerFactory extends EchoServerLoggerFactory {
override def makeLogger(): EchoServerLogger =
new EchoServerLogger() with ConsoleLogger
}
class EchoServerAcceptor(
supervisor: EchoServerSupervisor,
logger: EchoServerLogger,
port: Int
) extends Actor {
val selector = Selector.open()
val serverChannel = {
val channel = ServerSocketChannel.open()
channel.configureBlocking(false)
val socket = channel.socket
socket.setReuseAddress(true)
socket.bind(new InetSocketAddress(port))
channel
}
val serverKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT)
val random = Random
logger.write("Start echo server. listen port is %d" format port)
def act {
while (true) {
selector.select()
handleKeys()
receiveStop()
if (random.nextInt(500000) == 0) {
throw new Exception("acceptor exception test")
}
}
}
def receiveStop() {
receiveWithin(0) {
case Stop =>
serverChannel.close()
selector.close()
logger.write("Stop echo server.")
exit()
case TIMEOUT =>
case unknown =>
logger.write("unknown message [%s], ignoring" format unknown)
}
}
def stop() = {
this ! Stop
selector.wakeup
}
def handleKeys() {
selector.selectedKeys foreach { key =>
if (key.isValid) handleKey(key)
}
selector.selectedKeys.clear()
}
def handleKey(key: SelectionKey) {
if (serverKey == key && key.isAcceptable) {
accept()
} else {
val handler = key.attachment.asInstanceOf[EchoServerHandler]
if (key.isReadable) {
handler.sendMessage(Read)
}
if (key.isWritable) {
handler.sendMessage(Write)
}
}
}
def accept() {
serverChannel.accept() match {
case channel: SocketChannel =>
val remoteAddress = channel.socket.getRemoteSocketAddress.toString
logger.write("connect from [%s]" format remoteAddress)
channel.configureBlocking(false)
val handler = new EchoServerHandler(logger, channel);
supervisor ! Link(handler)
handler.start
channel.register(
selector,
SelectionKey.OP_READ | SelectionKey.OP_WRITE,
handler
)
case _ =>
}
}
}
sealed abstract class HandlerMessage
case object Read extends HandlerMessage
case object Write extends HandlerMessage
class EchoServerHandler(
logger: EchoServerLogger,
channel: SocketChannel
) extends Actor {
type State = PartialFunction[Any, Unit]
val buffer = ByteBuffer.allocate(1024)
val decoder = Charset.forName("UTF-8")
val remoteAddress = channel.socket.getRemoteSocketAddress.toString
val MessageLine = """^(.*)[\r\n]{0,2}$""".r
def sendMessage(message: HandlerMessage) {
if (this.mailboxSize < 5000) {
this ! message
}
}
def act = {
buffer.clear()
reactWithin(100)(ack)
}
def ack: State = {
case Write => react(doAck())
case TIMEOUT => close()
}
def read: State = {
case Read => reactWithin(100)(doRead())
case Write => react(read)
case TIMEOUT => react(read)
case unknown =>
logger.write("unknown message [%s], ignoring" format unknown)
react(read)
}
def write: State = {
case Write => react(doWrite())
case TIMEOUT => react(read)
}
def doAck(): State = {
"hello\r\n".map(_.hashCode.toByte).foreach(buffer.put)
buffer.flip()
writeBuffer()
}
def doRead(): State = {
allCatch opt channel.read(buffer) match {
case Some(0) => read
case Some(-1) => close()
case Some(_) =>
buffer.flip()
handleMessage()
case None =>
logger.write("read error")
close()
}
}
def close(): Nothing = {
channel.close()
logger.write("disconnect from [%s]" format remoteAddress)
exit()
}
def handleMessage(): State = {
val message = getMessage
printLog("read", message)
message match {
case MessageLine("exit") => close()
case MessageLine("test") => throw new Exception("handler exception test")
case _ => write
}
}
def doWrite(): State = {
printLog("write", getMessage)
writeBuffer()
}
def getMessage: String = {
val message = decoder.decode(buffer).toString
buffer.flip()
message
}
def printLog(state: String, message: String) {
logger.write("%s-Actor[%s] %s %s: %s".format(
Thread.currentThread,
this,
state,
remoteAddress,
message
))
}
def writeBuffer(): State = {
allCatch opt channel.write(buffer) match {
case Some(_) =>
buffer.clear()
read
case None =>
logger.write("write error")
close()
}
}
}
import scala.actors.{Actor, TIMEOUT, Exit}
import scala.actors.Actor.State.{New, Terminated}
import scala.util.logging.{Logged, ConsoleLogger}
import scala.util.control.Exception.allCatch
import scala.collection.JavaConversions._
import scala.util.Random
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.{SelectionKey, Selector, ServerSocketChannel, SocketChannel}
import java.nio.charset.Charset
import java.io.IOException
object EchoServerRev1 {
def main (args:Array[String]) {
val supervisor = new EchoServerSupervisor() with EchoServerConsoleLoggerFactory
supervisor.start
Thread.sleep(60000)
supervisor.stop
}
}
sealed abstract class SupervisorMessage
case class Link(childActor: Actor) extends SupervisorMessage
case object Stop extends SupervisorMessage
class EchoServerSupervisor(
port: Int = 10000
) extends Actor with EchoServerLoggerFactory {
trapExit = true
val logger = makeLogger()
val acceptor = new EchoServerAcceptor(this, logger, port)
def act() {
startChildren()
loop {
react {
case Link(child: Actor) =>
link(child)
case Exit(child: Actor, 'normal) if child == acceptor =>
exit("stop")
case Exit(child: Actor, 'normal) =>
case Exit(child: Actor, reason: Exception) =>
logger.write("receive Exit: %s" format reason.getMessage)
restartChild(child)
case Exit(child: Actor, reason) =>
logger.write("receive Exit: %s" format reason)
restartChild(child)
case Stop =>
acceptor.stop()
case unknown =>
logger.write("unknown message [%s], ignoring" format unknown)
}
}
}
def startChildren() {
Seq(logger, acceptor) foreach { child =>
child.getState match {
case New => startChild(child)
case Terminated => exit("Could not restart server.")
case _ =>
}
}
}
def startChild(child: Actor) {
link(child)
child.start
}
def restartChild(child: Actor) {
link(child)
child.restart
}
def stop = this ! Stop
}
sealed abstract class LoggerMessage
case class Log(message: String) extends LoggerMessage
class EchoServerLogger extends Actor with Logged {
def act = loop {
react {
case Log(message) => log(message)
case unknown => log("unknown message [%s], ignoring" format unknown)
}
}
def write(message: String) {
if (this.mailboxSize < 100) {
this ! Log(message)
}
}
}
trait EchoServerLoggerFactory {
def makeLogger(): EchoServerLogger =
new EchoServerLogger()
}
trait EchoServerConsoleLoggerFactory extends EchoServerLoggerFactory {
override def makeLogger(): EchoServerLogger =
new EchoServerLogger() with ConsoleLogger
}
case class ChangeRequest(socket:SocketChannel, pos: Int)
case class CloseChannel(socket:SocketChannel)
class EchoServerAcceptor(
supervisor: EchoServerSupervisor,
logger: EchoServerLogger,
port: Int
) extends Actor {
val selector = Selector.open()
val serverChannel = {
val channel = ServerSocketChannel.open()
channel.configureBlocking(false)
val socket = channel.socket
socket.setReuseAddress(true)
socket.bind(new InetSocketAddress(port))
channel
}
val serverKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT)
val random = Random
logger.write("Start echo server. listen port is %d" format port)
def act {
while (true) {
selector.select()
handleKeys()
receiveStop()
if (random.nextInt(500000) == 0) {
throw new Exception("acceptor exception test")
}
}
}
def receiveStop() {
receiveWithin(0) {
case Stop =>
serverChannel.close()
selector.close()
logger.write("Stop echo server.")
exit()
case ChangeRequest(sc, ops) =>
sc.keyFor(selector).interestOps(ops)
case CloseChannel(sc) =>
sc.keyFor(selector).cancel
sc.close
case TIMEOUT =>
case unknown =>
logger.write("unknown message [%s], ignoring" format unknown)
}
}
def stop() = {
this ! Stop
selector.wakeup
}
def handleKeys() {
selector.selectedKeys foreach { key =>
if (key.isValid) handleKey(key)
}
selector.selectedKeys.clear()
}
def handleKey(key: SelectionKey) {
if (serverKey == key && key.isAcceptable) {
accept()
} else {
val handler = key.attachment.asInstanceOf[EchoServerHandler]
if (key.isReadable) {
handler.sendMessage(Read)
key.interestOps(0)
}
if (key.isWritable) {
handler.sendMessage(Write)
key.interestOps(0)
}
}
}
def accept() {
serverChannel.accept() match {
case channel: SocketChannel =>
val remoteAddress = channel.socket.getRemoteSocketAddress.toString
logger.write("connect from [%s]" format remoteAddress)
channel.configureBlocking(false)
val handler = new EchoServerHandler(this, logger, channel);
supervisor ! Link(handler)
handler.start
channel.register(
selector,
SelectionKey.OP_READ,
handler
)
handler ! Ack
case _ =>
}
}
}
sealed abstract class HandlerMessage
case object Read extends HandlerMessage
case object Write extends HandlerMessage
case object Ack extends HandlerMessage
class EchoServerHandler(
acceptor: EchoServerAcceptor,
logger: EchoServerLogger,
channel: SocketChannel
) extends Actor {
type State = PartialFunction[Any, Unit]
val buffer = ByteBuffer.allocate(1024)
val decoder = Charset.forName("UTF-8")
val remoteAddress = channel.socket.getRemoteSocketAddress.toString
val MessageLine = """^(.*)[\r\n]{0,2}$""".r
def sendMessage(message: HandlerMessage) {
if (this.mailboxSize < 5000) {
this ! message
}
}
def act = {
buffer.clear()
reactWithin(100)(ack)
}
def ack: State = {
case Ack => react(doAck())
case TIMEOUT => close()
}
def read: State = {
case Read => reactWithin(100)(doRead())
case Write => react(read)
case TIMEOUT => react(read)
case unknown =>
logger.write("unknown message [%s], ignoring" format unknown)
react(read)
}
def write: State = {
case Write => react(doWrite())
case TIMEOUT => react(read)
}
def doAck(): State = {
"hello\r\n".map(_.hashCode.toByte).foreach(buffer.put)
buffer.flip()
acceptor ! ChangeRequest(channel, SelectionKey.OP_WRITE)
acceptor.selector.wakeup
write
}
def doRead(): State = {
allCatch opt channel.read(buffer) match {
case Some(0) =>
acceptor ! ChangeRequest(channel, SelectionKey.OP_READ)
acceptor.selector.wakeup
read
case Some(-1) => close()
case Some(_) =>
buffer.flip()
handleMessage()
case None =>
logger.write("read error")
close()
}
}
def close(): Nothing = {
acceptor ! CloseChannel(channel)
acceptor.selector.wakeup
logger.write("disconnect from [%s]" format remoteAddress)
exit()
}
def handleMessage(): State = {
val message = getMessage
printLog("read", message)
message match {
case MessageLine("exit") => close()
case MessageLine("test") => throw new Exception("handler exception test")
case _ =>
acceptor ! ChangeRequest(channel, SelectionKey.OP_WRITE)
acceptor.selector.wakeup
write
}
}
def doWrite(): State = {
printLog("write", getMessage)
writeBuffer()
}
def getMessage: String = {
val message = decoder.decode(buffer).toString
buffer.flip()
message
}
def printLog(state: String, message: String) {
logger.write("%s-Actor[%s] %s %s: %s".format(
Thread.currentThread,
this,
state,
remoteAddress,
message
))
}
def writeBuffer(): State = {
// TODO: There is no guarantee that channel.write(buffer)
// writes all data at one time trial.
allCatch opt channel.write(buffer) match {
case Some(_) =>
buffer.clear()
acceptor ! ChangeRequest(channel, SelectionKey.OP_READ)
acceptor.selector.wakeup
read
case None =>
logger.write("write error")
close()
}
}
}
import scala.actors.{Actor, TIMEOUT, Exit}
import scala.actors.Actor.State.{New, Terminated}
import scala.util.logging.{Logged, ConsoleLogger}
import scala.util.control.Exception.allCatch
import scala.collection.JavaConversions._
import scala.util.Random
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.{SelectionKey, Selector, ServerSocketChannel, SocketChannel}
import java.nio.charset.Charset
import java.io.IOException
object EchoServerRev2 {
def main (args:Array[String]) {
val supervisor = new EchoServerSupervisor() with EchoServerConsoleLoggerFactory
supervisor.start
Thread.sleep(60000)
supervisor.stop
}
}
sealed abstract class SupervisorMessage
case class Link(childActor: Actor) extends SupervisorMessage
case object Stop extends SupervisorMessage
class EchoServerSupervisor(
port: Int = 10000
) extends Actor with EchoServerLoggerFactory {
trapExit = true
val logger = makeLogger()
val acceptor = new EchoServerAcceptor(this, logger, port)
def act() {
startChildren()
loop {
react {
case Link(child: Actor) =>
link(child)
case Exit(child: Actor, 'normal) if child == acceptor =>
exit("stop")
case Exit(child: Actor, 'normal) =>
case Exit(child: Actor, reason: Exception) =>
logger.write("receive Exit: %s" format reason.getMessage)
restartChild(child)
case Exit(child: Actor, reason) =>
logger.write("receive Exit: %s" format reason)
restartChild(child)
case Stop =>
acceptor.stop()
case unknown =>
logger.write("unknown message [%s], ignoring" format unknown)
}
}
}
def startChildren() {
Seq(logger, acceptor) foreach { child =>
child.getState match {
case New => startChild(child)
case Terminated => exit("Could not restart server.")
case _ =>
}
}
}
def startChild(child: Actor) {
link(child)
child.start
}
def restartChild(child: Actor) {
link(child)
child.restart
}
def stop = this ! Stop
}
sealed abstract class LoggerMessage
case class Log(message: String) extends LoggerMessage
class EchoServerLogger extends Actor with Logged {
def act = loop {
react {
case Log(message) => log(message)
case unknown => log("unknown message [%s], ignoring" format unknown)
}
}
def write(message: String) {
if (this.mailboxSize < 100) {
this ! Log(message)
}
}
}
trait EchoServerLoggerFactory {
def makeLogger(): EchoServerLogger =
new EchoServerLogger()
}
trait EchoServerConsoleLoggerFactory extends EchoServerLoggerFactory {
override def makeLogger(): EchoServerLogger =
new EchoServerLogger() with ConsoleLogger
}
case class ChangeRequest(socket:SocketChannel, pos: Int)
case class CloseChannel(socket:SocketChannel)
class EchoServerAcceptor(
supervisor: EchoServerSupervisor,
logger: EchoServerLogger,
port: Int
) extends Actor {
val selector = Selector.open()
val serverChannel = {
val channel = ServerSocketChannel.open()
channel.configureBlocking(false)
val socket = channel.socket
socket.setReuseAddress(true)
socket.bind(new InetSocketAddress(port))
channel
}
val serverKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT)
val random = Random
logger.write("Start echo server. listen port is %d" format port)
def act {
while (true) {
selector.select()
handleKeys()
receiveStop()
if (random.nextInt(500000) == 0) {
throw new Exception("acceptor exception test")
}
}
}
def receiveStop() {
receiveWithin(0) {
case Stop =>
serverChannel.close()
selector.close()
logger.write("Stop echo server.")
exit()
case ChangeRequest(sc, ops) =>
sc.keyFor(selector).interestOps(ops)
case CloseChannel(sc) =>
sc.keyFor(selector).cancel
sc.close
case TIMEOUT =>
case unknown =>
logger.write("unknown message [%s], ignoring" format unknown)
}
}
def stop() = {
this ! Stop
selector.wakeup
}
def handleKeys() {
selector.selectedKeys foreach { key =>
if (key.isValid) handleKey(key)
}
selector.selectedKeys.clear()
}
def handleKey(key: SelectionKey) {
if (serverKey == key && key.isAcceptable) {
accept()
} else {
val handler = key.attachment.asInstanceOf[EchoServerHandler]
if (key.isReadable) {
handler.sendMessage(Read)
key.interestOps(0)
}
if (key.isWritable) {
handler.sendMessage(Write)
key.interestOps(0)
}
}
}
def accept() {
serverChannel.accept() match {
case channel: SocketChannel =>
val remoteAddress = channel.socket.getRemoteSocketAddress.toString
logger.write("connect from [%s]" format remoteAddress)
channel.configureBlocking(false)
val handler = new EchoServerHandler(this, logger, channel);
supervisor ! Link(handler)
handler.start
channel.register(
selector,
SelectionKey.OP_READ,
handler
)
handler ! Ack
case _ =>
}
}
}
sealed abstract class HandlerMessage
case object Read extends HandlerMessage
case object Write extends HandlerMessage
case object Ack extends HandlerMessage
class EchoServerHandler(
acceptor: EchoServerAcceptor,
logger: EchoServerLogger,
channel: SocketChannel
) extends Actor {
type State = PartialFunction[Any, Unit]
val buffer = ByteBuffer.allocate(1024)
val decoder = Charset.forName("UTF-8")
val remoteAddress = channel.socket.getRemoteSocketAddress.toString
val MessageLine = """^(.*)[\r\n]{0,2}$""".r
// A sample method to demonstrate to read the structured data.
def isEnough(buffer: ByteBuffer): Boolean = {
buffer.position > 10 ||
new String(buffer.array).startsWith("exit")
}
def sendMessage(message: HandlerMessage) {
if (this.mailboxSize < 5000) {
this ! message
}
}
def act = {
buffer.clear()
reactWithin(100)(ack)
}
def ack: State = {
case Ack => react(doAck())
case TIMEOUT => close()
}
def read: State = {
case Read => reactWithin(100)(doRead())
case Write => react(read)
case TIMEOUT => react(read)
case unknown =>
logger.write("unknown message [%s], ignoring" format unknown)
react(read)
}
def write: State = {
case Write => react(doWrite())
case TIMEOUT => react(read)
}
def doAck(): State = {
"hello\r\n".map(_.hashCode.toByte).foreach(buffer.put)
buffer.flip()
acceptor ! ChangeRequest(channel, SelectionKey.OP_WRITE)
acceptor.selector.wakeup
write
}
def doRead(): State = {
allCatch opt channel.read(buffer) match {
case Some(0) =>
acceptor ! ChangeRequest(channel, SelectionKey.OP_READ)
acceptor.selector.wakeup
read
case Some(-1) => close()
case Some(_) =>
if(isEnough(buffer)) {
buffer.flip()
handleMessage()
}
else{
// We have not gotten enough data, unfortunately.
acceptor ! ChangeRequest(channel, SelectionKey.OP_READ)
acceptor.selector.wakeup
read
}
case None =>
logger.write("read error")
close()
}
}
def close(): Nothing = {
acceptor ! CloseChannel(channel)
acceptor.selector.wakeup
logger.write("disconnect from [%s]" format remoteAddress)
exit()
}
def handleMessage(): State = {
val message = getMessage
printLog("read", message)
message match {
case MessageLine("exit") => close()
case MessageLine("test") => throw new Exception("handler exception test")
case _ =>
acceptor ! ChangeRequest(channel, SelectionKey.OP_WRITE)
acceptor.selector.wakeup
write
}
}
def doWrite(): State = {
printLog("write", getMessage)
writeBuffer()
}
def getMessage: String = {
val message = decoder.decode(buffer).toString
buffer.flip()
message
}
def printLog(state: String, message: String) {
logger.write("%s-Actor[%s] %s %s: %s".format(
Thread.currentThread,
this,
state,
remoteAddress,
message
))
}
def writeBuffer(): State = {
try {
while(buffer.hasRemaining){
channel.write(buffer) match {
case i if i<0 =>
logger.write("write error")
close
case 0 =>
acceptor ! ChangeRequest(channel, SelectionKey.OP_WRITE)
acceptor.selector.wakeup
return write
case _ =>
}
}
buffer.clear()
acceptor ! ChangeRequest(channel, SelectionKey.OP_READ)
acceptor.selector.wakeup
read
}
catch {
case e =>
logger.write("write error")
close
}
}
}
import scala.actors.{Actor, TIMEOUT, Exit}
import scala.actors.Actor.State.{New, Terminated}
import scala.util.logging.{Logged, ConsoleLogger}
import scala.util.control.Exception.allCatch
import scala.collection.JavaConversions._
import scala.util.Random
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.{SelectionKey, Selector, ServerSocketChannel, SocketChannel}
import java.nio.charset.Charset
import java.io.IOException
object EchoServerRev3 {
def main (args:Array[String]) {
val supervisor = new EchoServerSupervisor() with EchoServerConsoleLoggerFactory
supervisor.start
Thread.sleep(60000)
supervisor.stop
}
}
sealed abstract class SupervisorMessage
case class Link(childActor: Actor) extends SupervisorMessage
case object Stop extends SupervisorMessage
class EchoServerSupervisor(
port: Int = 10000
) extends Actor with EchoServerLoggerFactory {
trapExit = true
val logger = makeLogger()
val acceptor = new EchoServerAcceptor(this, logger, port)
def act() {
startChildren()
loop {
react {
case Link(child: Actor) =>
link(child)
case Exit(child: Actor, 'normal) if child == acceptor =>
exit("stop")
case Exit(child: Actor, 'normal) =>
case Exit(child: Actor, reason: Exception) =>
logger.write("receive Exit: %s" format reason.getMessage)
restartChild(child)
case Exit(child: Actor, reason) =>
logger.write("receive Exit: %s" format reason)
restartChild(child)
case Stop =>
acceptor.stop()
case unknown =>
logger.write("unknown message [%s], ignoring" format unknown)
}
}
}
def startChildren() {
Seq(logger, acceptor) foreach { child =>
child.getState match {
case New => startChild(child)
case Terminated => exit("Could not restart server.")
case _ =>
}
}
}
def startChild(child: Actor) {
link(child)
child.start
}
def restartChild(child: Actor) {
link(child)
child.restart
}
def stop = this ! Stop
}
sealed abstract class LoggerMessage
case class Log(message: String) extends LoggerMessage
class EchoServerLogger extends Actor with Logged {
def act = loop {
react {
case Log(message) => log(message)
case unknown => log("unknown message [%s], ignoring" format unknown)
}
}
def write(message: String) {
if (this.mailboxSize < 100) {
this ! Log(message)
}
}
}
trait EchoServerLoggerFactory {
def makeLogger(): EchoServerLogger =
new EchoServerLogger()
}
trait EchoServerConsoleLoggerFactory extends EchoServerLoggerFactory {
override def makeLogger(): EchoServerLogger =
new EchoServerLogger() with ConsoleLogger
}
case class ChangeRequest(socket:SocketChannel, pos: Int)
case class CloseChannel(socket:SocketChannel)
class EchoServerAcceptor(
supervisor: EchoServerSupervisor,
logger: EchoServerLogger,
port: Int
) extends Actor {
val selector = Selector.open()
val serverChannel = {
val channel = ServerSocketChannel.open()
channel.configureBlocking(false)
val socket = channel.socket
socket.setReuseAddress(true)
socket.bind(new InetSocketAddress(port))
channel
}
val serverKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT)
val random = Random
logger.write("Start echo server. listen port is %d" format port)
def act {
while (true) {
selector.select()
handleKeys()
receiveStop()
if (random.nextInt(500000) == 0) {
throw new Exception("acceptor exception test")
}
}
}
def receiveStop() {
receiveWithin(0) {
case Stop =>
serverChannel.close()
selector.close()
logger.write("Stop echo server.")
exit()
case ChangeRequest(sc, ops) =>
sc.keyFor(selector).interestOps(ops)
case CloseChannel(sc) =>
sc.keyFor(selector).cancel
sc.close
case TIMEOUT =>
case unknown =>
logger.write("unknown message [%s], ignoring" format unknown)
}
}
def stop() = {
this ! Stop
selector.wakeup
}
def handleKeys() {
selector.selectedKeys foreach { key =>
if (key.isValid) handleKey(key)
}
selector.selectedKeys.clear()
}
def handleKey(key: SelectionKey) {
if (serverKey == key && key.isAcceptable) {
accept()
} else {
val handler = key.attachment.asInstanceOf[EchoServerHandler]
if (key.isReadable) {
handler.sendMessage(Read)
key.interestOps(0)
}
if (key.isWritable) {
handler.sendMessage(Write)
key.interestOps(0)
}
}
}
def accept() {
serverChannel.accept() match {
case channel: SocketChannel =>
val remoteAddress = channel.socket.getRemoteSocketAddress.toString
logger.write("connect from [%s]" format remoteAddress)
channel.configureBlocking(false)
val handler = new EchoServerHandler(this, logger, channel);
supervisor ! Link(handler)
handler.start
channel.register(
selector,
SelectionKey.OP_READ,
handler
)
handler ! Ack
case _ =>
}
}
}
sealed abstract class HandlerMessage
case object Read extends HandlerMessage
case object Write extends HandlerMessage
case object Ack extends HandlerMessage
class EchoServerHandler(
acceptor: EchoServerAcceptor,
logger: EchoServerLogger,
channel: SocketChannel
) extends Actor {
type State = PartialFunction[Any, Unit]
val buffer = ByteBuffer.allocate(1024)
val decoder = Charset.forName("UTF-8")
val remoteAddress = channel.socket.getRemoteSocketAddress.toString
val MessageLine = """^(.*)[\r\n]{0,2}$""".r
// A sample method to demonstrate to read the structured data.
def isEnough(buffer: ByteBuffer): Boolean = {
buffer.position > 10 ||
new String(buffer.array).startsWith("exit")
}
def sendMessage(message: HandlerMessage) {
if (this.mailboxSize < 5000) {
this ! message
}
}
def act = {
buffer.clear()
reactWithin(100)(ack)
}
def ack: State = {
case Ack => react(doAck())
case TIMEOUT => close()
}
def read: State = {
case Read => reactWithin(100)(doRead())
case Write => react(read)
case TIMEOUT => react(read)
case unknown =>
logger.write("unknown message [%s], ignoring" format unknown)
react(read)
}
def write: State = {
case Write => react(doWrite())
case TIMEOUT => react(read)
}
def doAck(): State = {
"hello\r\n".map(_.hashCode.toByte).foreach(buffer.put)
buffer.flip()
acceptor ! ChangeRequest(channel, SelectionKey.OP_WRITE)
acceptor.selector.wakeup
write
}
def doRead(): State = {
allCatch opt channel.read(buffer) match {
case Some(0) =>
acceptor ! ChangeRequest(channel, SelectionKey.OP_READ)
acceptor.selector.wakeup
read
case Some(-1) => close()
case Some(_) =>
if(!isEnough(buffer)) {
// We have not gotten enough data, unfortunately.
// It will be better to implement the selector pool, and
// retrieve an instance from it, I'm too lazy to do it ;-)
val selector = Selector.open()
val key = channel.register(selector, SelectionKey.OP_READ)
// The following loop may use the current thread exclusivly for long time.
try{
do {
if(selector.select(1000) == 0) {
// timeout
acceptor ! ChangeRequest(channel, SelectionKey.OP_READ)
acceptor.selector.wakeup
return read
}
channel.read(buffer) match {
case -1 => return close
case _ =>
}
} while(!isEnough(buffer))
}
finally { key.cancel }
}
buffer.flip()
handleMessage()
case None =>
logger.write("read error")
close()
}
}
def close(): Nothing = {
acceptor ! CloseChannel(channel)
acceptor.selector.wakeup
logger.write("disconnect from [%s]" format remoteAddress)
exit()
}
def handleMessage(): State = {
val message = getMessage
printLog("read", message)
message match {
case MessageLine("exit") => close()
case MessageLine("test") => throw new Exception("handler exception test")
case _ =>
acceptor ! ChangeRequest(channel, SelectionKey.OP_WRITE)
acceptor.selector.wakeup
write
}
}
def doWrite(): State = {
printLog("write", getMessage)
writeBuffer()
}
def getMessage: String = {
val message = decoder.decode(buffer).toString
buffer.flip()
message
}
def printLog(state: String, message: String) {
logger.write("%s-Actor[%s] %s %s: %s".format(
Thread.currentThread,
this,
state,
remoteAddress,
message
))
}
def writeBuffer(): State = {
var key = Option.empty[SelectionKey]
try {
while(buffer.hasRemaining){
channel.write(buffer) match {
case i if i<0 =>
logger.write("write error")
return close
case 0 =>
Thread.`yield`
val selector = key.map{_.selector}.getOrElse{
val selector = Selector.open()
key = Some(channel.register(selector, SelectionKey.OP_WRITE))
selector
}
// The following selec may use the current thread exclusivly for long time.
if(selector.select(100) == 0) { // timeout
acceptor ! ChangeRequest(channel, SelectionKey.OP_WRITE)
acceptor.selector.wakeup
return write
}
case _ =>
}
}
buffer.clear()
acceptor ! ChangeRequest(channel, SelectionKey.OP_READ)
acceptor.selector.wakeup
read
}
catch {
case e =>
logger.write("write error")
close
}
finally {
key.foreach{ _.cancel }
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment