Created
December 29, 2010 04:59
-
-
Save ymnk/758197 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.actors.{Actor, TIMEOUT, Exit} | |
import scala.actors.Actor.State.{New, Terminated} | |
import scala.util.logging.{Logged, ConsoleLogger} | |
import scala.util.control.Exception.allCatch | |
import scala.collection.JavaConversions._ | |
import scala.util.Random | |
import java.net.InetSocketAddress | |
import java.nio.ByteBuffer | |
import java.nio.channels.{SelectionKey, Selector, ServerSocketChannel, SocketChannel} | |
import java.nio.charset.Charset | |
import java.io.IOException | |
object EchoServer { | |
def main (args:Array[String]) { | |
val supervisor = new EchoServerSupervisor() with EchoServerConsoleLoggerFactory | |
supervisor.start | |
Thread.sleep(60000) | |
supervisor.stop | |
} | |
} | |
sealed abstract class SupervisorMessage | |
case class Link(childActor: Actor) extends SupervisorMessage | |
case object Stop extends SupervisorMessage | |
class EchoServerSupervisor( | |
port: Int = 10000 | |
) extends Actor with EchoServerLoggerFactory { | |
trapExit = true | |
val logger = makeLogger() | |
val acceptor = new EchoServerAcceptor(this, logger, port) | |
def act() { | |
startChildren() | |
loop { | |
react { | |
case Link(child: Actor) => | |
link(child) | |
case Exit(child: Actor, 'normal) if child == acceptor => | |
exit("stop") | |
case Exit(child: Actor, 'normal) => | |
case Exit(child: Actor, reason: Exception) => | |
logger.write("receive Exit: %s" format reason.getMessage) | |
restartChild(child) | |
case Exit(child: Actor, reason) => | |
logger.write("receive Exit: %s" format reason) | |
restartChild(child) | |
case Stop => | |
acceptor.stop() | |
case unknown => | |
logger.write("unknown message [%s], ignoring" format unknown) | |
} | |
} | |
} | |
def startChildren() { | |
Seq(logger, acceptor) foreach { child => | |
child.getState match { | |
case New => startChild(child) | |
case Terminated => exit("Could not restart server.") | |
case _ => | |
} | |
} | |
} | |
def startChild(child: Actor) { | |
link(child) | |
child.start | |
} | |
def restartChild(child: Actor) { | |
link(child) | |
child.restart | |
} | |
def stop = this ! Stop | |
} | |
sealed abstract class LoggerMessage | |
case class Log(message: String) extends LoggerMessage | |
class EchoServerLogger extends Actor with Logged { | |
def act = loop { | |
react { | |
case Log(message) => log(message) | |
case unknown => log("unknown message [%s], ignoring" format unknown) | |
} | |
} | |
def write(message: String) { | |
if (this.mailboxSize < 100) { | |
this ! Log(message) | |
} | |
} | |
} | |
trait EchoServerLoggerFactory { | |
def makeLogger(): EchoServerLogger = | |
new EchoServerLogger() | |
} | |
trait EchoServerConsoleLoggerFactory extends EchoServerLoggerFactory { | |
override def makeLogger(): EchoServerLogger = | |
new EchoServerLogger() with ConsoleLogger | |
} | |
class EchoServerAcceptor( | |
supervisor: EchoServerSupervisor, | |
logger: EchoServerLogger, | |
port: Int | |
) extends Actor { | |
val selector = Selector.open() | |
val serverChannel = { | |
val channel = ServerSocketChannel.open() | |
channel.configureBlocking(false) | |
val socket = channel.socket | |
socket.setReuseAddress(true) | |
socket.bind(new InetSocketAddress(port)) | |
channel | |
} | |
val serverKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT) | |
val random = Random | |
logger.write("Start echo server. listen port is %d" format port) | |
def act { | |
while (true) { | |
selector.select() | |
handleKeys() | |
receiveStop() | |
if (random.nextInt(500000) == 0) { | |
throw new Exception("acceptor exception test") | |
} | |
} | |
} | |
def receiveStop() { | |
receiveWithin(0) { | |
case Stop => | |
serverChannel.close() | |
selector.close() | |
logger.write("Stop echo server.") | |
exit() | |
case TIMEOUT => | |
case unknown => | |
logger.write("unknown message [%s], ignoring" format unknown) | |
} | |
} | |
def stop() = { | |
this ! Stop | |
selector.wakeup | |
} | |
def handleKeys() { | |
selector.selectedKeys foreach { key => | |
if (key.isValid) handleKey(key) | |
} | |
selector.selectedKeys.clear() | |
} | |
def handleKey(key: SelectionKey) { | |
if (serverKey == key && key.isAcceptable) { | |
accept() | |
} else { | |
val handler = key.attachment.asInstanceOf[EchoServerHandler] | |
if (key.isReadable) { | |
handler.sendMessage(Read) | |
} | |
if (key.isWritable) { | |
handler.sendMessage(Write) | |
} | |
} | |
} | |
def accept() { | |
serverChannel.accept() match { | |
case channel: SocketChannel => | |
val remoteAddress = channel.socket.getRemoteSocketAddress.toString | |
logger.write("connect from [%s]" format remoteAddress) | |
channel.configureBlocking(false) | |
val handler = new EchoServerHandler(logger, channel); | |
supervisor ! Link(handler) | |
handler.start | |
channel.register( | |
selector, | |
SelectionKey.OP_READ | SelectionKey.OP_WRITE, | |
handler | |
) | |
case _ => | |
} | |
} | |
} | |
sealed abstract class HandlerMessage | |
case object Read extends HandlerMessage | |
case object Write extends HandlerMessage | |
class EchoServerHandler( | |
logger: EchoServerLogger, | |
channel: SocketChannel | |
) extends Actor { | |
type State = PartialFunction[Any, Unit] | |
val buffer = ByteBuffer.allocate(1024) | |
val decoder = Charset.forName("UTF-8") | |
val remoteAddress = channel.socket.getRemoteSocketAddress.toString | |
val MessageLine = """^(.*)[\r\n]{0,2}$""".r | |
def sendMessage(message: HandlerMessage) { | |
if (this.mailboxSize < 5000) { | |
this ! message | |
} | |
} | |
def act = { | |
buffer.clear() | |
reactWithin(100)(ack) | |
} | |
def ack: State = { | |
case Write => react(doAck()) | |
case TIMEOUT => close() | |
} | |
def read: State = { | |
case Read => reactWithin(100)(doRead()) | |
case Write => react(read) | |
case TIMEOUT => react(read) | |
case unknown => | |
logger.write("unknown message [%s], ignoring" format unknown) | |
react(read) | |
} | |
def write: State = { | |
case Write => react(doWrite()) | |
case TIMEOUT => react(read) | |
} | |
def doAck(): State = { | |
"hello\r\n".map(_.hashCode.toByte).foreach(buffer.put) | |
buffer.flip() | |
writeBuffer() | |
} | |
def doRead(): State = { | |
allCatch opt channel.read(buffer) match { | |
case Some(0) => read | |
case Some(-1) => close() | |
case Some(_) => | |
buffer.flip() | |
handleMessage() | |
case None => | |
logger.write("read error") | |
close() | |
} | |
} | |
def close(): Nothing = { | |
channel.close() | |
logger.write("disconnect from [%s]" format remoteAddress) | |
exit() | |
} | |
def handleMessage(): State = { | |
val message = getMessage | |
printLog("read", message) | |
message match { | |
case MessageLine("exit") => close() | |
case MessageLine("test") => throw new Exception("handler exception test") | |
case _ => write | |
} | |
} | |
def doWrite(): State = { | |
printLog("write", getMessage) | |
writeBuffer() | |
} | |
def getMessage: String = { | |
val message = decoder.decode(buffer).toString | |
buffer.flip() | |
message | |
} | |
def printLog(state: String, message: String) { | |
logger.write("%s-Actor[%s] %s %s: %s".format( | |
Thread.currentThread, | |
this, | |
state, | |
remoteAddress, | |
message | |
)) | |
} | |
def writeBuffer(): State = { | |
allCatch opt channel.write(buffer) match { | |
case Some(_) => | |
buffer.clear() | |
read | |
case None => | |
logger.write("write error") | |
close() | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.actors.{Actor, TIMEOUT, Exit} | |
import scala.actors.Actor.State.{New, Terminated} | |
import scala.util.logging.{Logged, ConsoleLogger} | |
import scala.util.control.Exception.allCatch | |
import scala.collection.JavaConversions._ | |
import scala.util.Random | |
import java.net.InetSocketAddress | |
import java.nio.ByteBuffer | |
import java.nio.channels.{SelectionKey, Selector, ServerSocketChannel, SocketChannel} | |
import java.nio.charset.Charset | |
import java.io.IOException | |
object EchoServerRev1 { | |
def main (args:Array[String]) { | |
val supervisor = new EchoServerSupervisor() with EchoServerConsoleLoggerFactory | |
supervisor.start | |
Thread.sleep(60000) | |
supervisor.stop | |
} | |
} | |
sealed abstract class SupervisorMessage | |
case class Link(childActor: Actor) extends SupervisorMessage | |
case object Stop extends SupervisorMessage | |
class EchoServerSupervisor( | |
port: Int = 10000 | |
) extends Actor with EchoServerLoggerFactory { | |
trapExit = true | |
val logger = makeLogger() | |
val acceptor = new EchoServerAcceptor(this, logger, port) | |
def act() { | |
startChildren() | |
loop { | |
react { | |
case Link(child: Actor) => | |
link(child) | |
case Exit(child: Actor, 'normal) if child == acceptor => | |
exit("stop") | |
case Exit(child: Actor, 'normal) => | |
case Exit(child: Actor, reason: Exception) => | |
logger.write("receive Exit: %s" format reason.getMessage) | |
restartChild(child) | |
case Exit(child: Actor, reason) => | |
logger.write("receive Exit: %s" format reason) | |
restartChild(child) | |
case Stop => | |
acceptor.stop() | |
case unknown => | |
logger.write("unknown message [%s], ignoring" format unknown) | |
} | |
} | |
} | |
def startChildren() { | |
Seq(logger, acceptor) foreach { child => | |
child.getState match { | |
case New => startChild(child) | |
case Terminated => exit("Could not restart server.") | |
case _ => | |
} | |
} | |
} | |
def startChild(child: Actor) { | |
link(child) | |
child.start | |
} | |
def restartChild(child: Actor) { | |
link(child) | |
child.restart | |
} | |
def stop = this ! Stop | |
} | |
sealed abstract class LoggerMessage | |
case class Log(message: String) extends LoggerMessage | |
class EchoServerLogger extends Actor with Logged { | |
def act = loop { | |
react { | |
case Log(message) => log(message) | |
case unknown => log("unknown message [%s], ignoring" format unknown) | |
} | |
} | |
def write(message: String) { | |
if (this.mailboxSize < 100) { | |
this ! Log(message) | |
} | |
} | |
} | |
trait EchoServerLoggerFactory { | |
def makeLogger(): EchoServerLogger = | |
new EchoServerLogger() | |
} | |
trait EchoServerConsoleLoggerFactory extends EchoServerLoggerFactory { | |
override def makeLogger(): EchoServerLogger = | |
new EchoServerLogger() with ConsoleLogger | |
} | |
case class ChangeRequest(socket:SocketChannel, pos: Int) | |
case class CloseChannel(socket:SocketChannel) | |
class EchoServerAcceptor( | |
supervisor: EchoServerSupervisor, | |
logger: EchoServerLogger, | |
port: Int | |
) extends Actor { | |
val selector = Selector.open() | |
val serverChannel = { | |
val channel = ServerSocketChannel.open() | |
channel.configureBlocking(false) | |
val socket = channel.socket | |
socket.setReuseAddress(true) | |
socket.bind(new InetSocketAddress(port)) | |
channel | |
} | |
val serverKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT) | |
val random = Random | |
logger.write("Start echo server. listen port is %d" format port) | |
def act { | |
while (true) { | |
selector.select() | |
handleKeys() | |
receiveStop() | |
if (random.nextInt(500000) == 0) { | |
throw new Exception("acceptor exception test") | |
} | |
} | |
} | |
def receiveStop() { | |
receiveWithin(0) { | |
case Stop => | |
serverChannel.close() | |
selector.close() | |
logger.write("Stop echo server.") | |
exit() | |
case ChangeRequest(sc, ops) => | |
sc.keyFor(selector).interestOps(ops) | |
case CloseChannel(sc) => | |
sc.keyFor(selector).cancel | |
sc.close | |
case TIMEOUT => | |
case unknown => | |
logger.write("unknown message [%s], ignoring" format unknown) | |
} | |
} | |
def stop() = { | |
this ! Stop | |
selector.wakeup | |
} | |
def handleKeys() { | |
selector.selectedKeys foreach { key => | |
if (key.isValid) handleKey(key) | |
} | |
selector.selectedKeys.clear() | |
} | |
def handleKey(key: SelectionKey) { | |
if (serverKey == key && key.isAcceptable) { | |
accept() | |
} else { | |
val handler = key.attachment.asInstanceOf[EchoServerHandler] | |
if (key.isReadable) { | |
handler.sendMessage(Read) | |
key.interestOps(0) | |
} | |
if (key.isWritable) { | |
handler.sendMessage(Write) | |
key.interestOps(0) | |
} | |
} | |
} | |
def accept() { | |
serverChannel.accept() match { | |
case channel: SocketChannel => | |
val remoteAddress = channel.socket.getRemoteSocketAddress.toString | |
logger.write("connect from [%s]" format remoteAddress) | |
channel.configureBlocking(false) | |
val handler = new EchoServerHandler(this, logger, channel); | |
supervisor ! Link(handler) | |
handler.start | |
channel.register( | |
selector, | |
SelectionKey.OP_READ, | |
handler | |
) | |
handler ! Ack | |
case _ => | |
} | |
} | |
} | |
sealed abstract class HandlerMessage | |
case object Read extends HandlerMessage | |
case object Write extends HandlerMessage | |
case object Ack extends HandlerMessage | |
class EchoServerHandler( | |
acceptor: EchoServerAcceptor, | |
logger: EchoServerLogger, | |
channel: SocketChannel | |
) extends Actor { | |
type State = PartialFunction[Any, Unit] | |
val buffer = ByteBuffer.allocate(1024) | |
val decoder = Charset.forName("UTF-8") | |
val remoteAddress = channel.socket.getRemoteSocketAddress.toString | |
val MessageLine = """^(.*)[\r\n]{0,2}$""".r | |
def sendMessage(message: HandlerMessage) { | |
if (this.mailboxSize < 5000) { | |
this ! message | |
} | |
} | |
def act = { | |
buffer.clear() | |
reactWithin(100)(ack) | |
} | |
def ack: State = { | |
case Ack => react(doAck()) | |
case TIMEOUT => close() | |
} | |
def read: State = { | |
case Read => reactWithin(100)(doRead()) | |
case Write => react(read) | |
case TIMEOUT => react(read) | |
case unknown => | |
logger.write("unknown message [%s], ignoring" format unknown) | |
react(read) | |
} | |
def write: State = { | |
case Write => react(doWrite()) | |
case TIMEOUT => react(read) | |
} | |
def doAck(): State = { | |
"hello\r\n".map(_.hashCode.toByte).foreach(buffer.put) | |
buffer.flip() | |
acceptor ! ChangeRequest(channel, SelectionKey.OP_WRITE) | |
acceptor.selector.wakeup | |
write | |
} | |
def doRead(): State = { | |
allCatch opt channel.read(buffer) match { | |
case Some(0) => | |
acceptor ! ChangeRequest(channel, SelectionKey.OP_READ) | |
acceptor.selector.wakeup | |
read | |
case Some(-1) => close() | |
case Some(_) => | |
buffer.flip() | |
handleMessage() | |
case None => | |
logger.write("read error") | |
close() | |
} | |
} | |
def close(): Nothing = { | |
acceptor ! CloseChannel(channel) | |
acceptor.selector.wakeup | |
logger.write("disconnect from [%s]" format remoteAddress) | |
exit() | |
} | |
def handleMessage(): State = { | |
val message = getMessage | |
printLog("read", message) | |
message match { | |
case MessageLine("exit") => close() | |
case MessageLine("test") => throw new Exception("handler exception test") | |
case _ => | |
acceptor ! ChangeRequest(channel, SelectionKey.OP_WRITE) | |
acceptor.selector.wakeup | |
write | |
} | |
} | |
def doWrite(): State = { | |
printLog("write", getMessage) | |
writeBuffer() | |
} | |
def getMessage: String = { | |
val message = decoder.decode(buffer).toString | |
buffer.flip() | |
message | |
} | |
def printLog(state: String, message: String) { | |
logger.write("%s-Actor[%s] %s %s: %s".format( | |
Thread.currentThread, | |
this, | |
state, | |
remoteAddress, | |
message | |
)) | |
} | |
def writeBuffer(): State = { | |
// TODO: There is no guarantee that channel.write(buffer) | |
// writes all data at one time trial. | |
allCatch opt channel.write(buffer) match { | |
case Some(_) => | |
buffer.clear() | |
acceptor ! ChangeRequest(channel, SelectionKey.OP_READ) | |
acceptor.selector.wakeup | |
read | |
case None => | |
logger.write("write error") | |
close() | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.actors.{Actor, TIMEOUT, Exit} | |
import scala.actors.Actor.State.{New, Terminated} | |
import scala.util.logging.{Logged, ConsoleLogger} | |
import scala.util.control.Exception.allCatch | |
import scala.collection.JavaConversions._ | |
import scala.util.Random | |
import java.net.InetSocketAddress | |
import java.nio.ByteBuffer | |
import java.nio.channels.{SelectionKey, Selector, ServerSocketChannel, SocketChannel} | |
import java.nio.charset.Charset | |
import java.io.IOException | |
object EchoServerRev2 { | |
def main (args:Array[String]) { | |
val supervisor = new EchoServerSupervisor() with EchoServerConsoleLoggerFactory | |
supervisor.start | |
Thread.sleep(60000) | |
supervisor.stop | |
} | |
} | |
sealed abstract class SupervisorMessage | |
case class Link(childActor: Actor) extends SupervisorMessage | |
case object Stop extends SupervisorMessage | |
class EchoServerSupervisor( | |
port: Int = 10000 | |
) extends Actor with EchoServerLoggerFactory { | |
trapExit = true | |
val logger = makeLogger() | |
val acceptor = new EchoServerAcceptor(this, logger, port) | |
def act() { | |
startChildren() | |
loop { | |
react { | |
case Link(child: Actor) => | |
link(child) | |
case Exit(child: Actor, 'normal) if child == acceptor => | |
exit("stop") | |
case Exit(child: Actor, 'normal) => | |
case Exit(child: Actor, reason: Exception) => | |
logger.write("receive Exit: %s" format reason.getMessage) | |
restartChild(child) | |
case Exit(child: Actor, reason) => | |
logger.write("receive Exit: %s" format reason) | |
restartChild(child) | |
case Stop => | |
acceptor.stop() | |
case unknown => | |
logger.write("unknown message [%s], ignoring" format unknown) | |
} | |
} | |
} | |
def startChildren() { | |
Seq(logger, acceptor) foreach { child => | |
child.getState match { | |
case New => startChild(child) | |
case Terminated => exit("Could not restart server.") | |
case _ => | |
} | |
} | |
} | |
def startChild(child: Actor) { | |
link(child) | |
child.start | |
} | |
def restartChild(child: Actor) { | |
link(child) | |
child.restart | |
} | |
def stop = this ! Stop | |
} | |
sealed abstract class LoggerMessage | |
case class Log(message: String) extends LoggerMessage | |
class EchoServerLogger extends Actor with Logged { | |
def act = loop { | |
react { | |
case Log(message) => log(message) | |
case unknown => log("unknown message [%s], ignoring" format unknown) | |
} | |
} | |
def write(message: String) { | |
if (this.mailboxSize < 100) { | |
this ! Log(message) | |
} | |
} | |
} | |
trait EchoServerLoggerFactory { | |
def makeLogger(): EchoServerLogger = | |
new EchoServerLogger() | |
} | |
trait EchoServerConsoleLoggerFactory extends EchoServerLoggerFactory { | |
override def makeLogger(): EchoServerLogger = | |
new EchoServerLogger() with ConsoleLogger | |
} | |
case class ChangeRequest(socket:SocketChannel, pos: Int) | |
case class CloseChannel(socket:SocketChannel) | |
class EchoServerAcceptor( | |
supervisor: EchoServerSupervisor, | |
logger: EchoServerLogger, | |
port: Int | |
) extends Actor { | |
val selector = Selector.open() | |
val serverChannel = { | |
val channel = ServerSocketChannel.open() | |
channel.configureBlocking(false) | |
val socket = channel.socket | |
socket.setReuseAddress(true) | |
socket.bind(new InetSocketAddress(port)) | |
channel | |
} | |
val serverKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT) | |
val random = Random | |
logger.write("Start echo server. listen port is %d" format port) | |
def act { | |
while (true) { | |
selector.select() | |
handleKeys() | |
receiveStop() | |
if (random.nextInt(500000) == 0) { | |
throw new Exception("acceptor exception test") | |
} | |
} | |
} | |
def receiveStop() { | |
receiveWithin(0) { | |
case Stop => | |
serverChannel.close() | |
selector.close() | |
logger.write("Stop echo server.") | |
exit() | |
case ChangeRequest(sc, ops) => | |
sc.keyFor(selector).interestOps(ops) | |
case CloseChannel(sc) => | |
sc.keyFor(selector).cancel | |
sc.close | |
case TIMEOUT => | |
case unknown => | |
logger.write("unknown message [%s], ignoring" format unknown) | |
} | |
} | |
def stop() = { | |
this ! Stop | |
selector.wakeup | |
} | |
def handleKeys() { | |
selector.selectedKeys foreach { key => | |
if (key.isValid) handleKey(key) | |
} | |
selector.selectedKeys.clear() | |
} | |
def handleKey(key: SelectionKey) { | |
if (serverKey == key && key.isAcceptable) { | |
accept() | |
} else { | |
val handler = key.attachment.asInstanceOf[EchoServerHandler] | |
if (key.isReadable) { | |
handler.sendMessage(Read) | |
key.interestOps(0) | |
} | |
if (key.isWritable) { | |
handler.sendMessage(Write) | |
key.interestOps(0) | |
} | |
} | |
} | |
def accept() { | |
serverChannel.accept() match { | |
case channel: SocketChannel => | |
val remoteAddress = channel.socket.getRemoteSocketAddress.toString | |
logger.write("connect from [%s]" format remoteAddress) | |
channel.configureBlocking(false) | |
val handler = new EchoServerHandler(this, logger, channel); | |
supervisor ! Link(handler) | |
handler.start | |
channel.register( | |
selector, | |
SelectionKey.OP_READ, | |
handler | |
) | |
handler ! Ack | |
case _ => | |
} | |
} | |
} | |
sealed abstract class HandlerMessage | |
case object Read extends HandlerMessage | |
case object Write extends HandlerMessage | |
case object Ack extends HandlerMessage | |
class EchoServerHandler( | |
acceptor: EchoServerAcceptor, | |
logger: EchoServerLogger, | |
channel: SocketChannel | |
) extends Actor { | |
type State = PartialFunction[Any, Unit] | |
val buffer = ByteBuffer.allocate(1024) | |
val decoder = Charset.forName("UTF-8") | |
val remoteAddress = channel.socket.getRemoteSocketAddress.toString | |
val MessageLine = """^(.*)[\r\n]{0,2}$""".r | |
// A sample method to demonstrate to read the structured data. | |
def isEnough(buffer: ByteBuffer): Boolean = { | |
buffer.position > 10 || | |
new String(buffer.array).startsWith("exit") | |
} | |
def sendMessage(message: HandlerMessage) { | |
if (this.mailboxSize < 5000) { | |
this ! message | |
} | |
} | |
def act = { | |
buffer.clear() | |
reactWithin(100)(ack) | |
} | |
def ack: State = { | |
case Ack => react(doAck()) | |
case TIMEOUT => close() | |
} | |
def read: State = { | |
case Read => reactWithin(100)(doRead()) | |
case Write => react(read) | |
case TIMEOUT => react(read) | |
case unknown => | |
logger.write("unknown message [%s], ignoring" format unknown) | |
react(read) | |
} | |
def write: State = { | |
case Write => react(doWrite()) | |
case TIMEOUT => react(read) | |
} | |
def doAck(): State = { | |
"hello\r\n".map(_.hashCode.toByte).foreach(buffer.put) | |
buffer.flip() | |
acceptor ! ChangeRequest(channel, SelectionKey.OP_WRITE) | |
acceptor.selector.wakeup | |
write | |
} | |
def doRead(): State = { | |
allCatch opt channel.read(buffer) match { | |
case Some(0) => | |
acceptor ! ChangeRequest(channel, SelectionKey.OP_READ) | |
acceptor.selector.wakeup | |
read | |
case Some(-1) => close() | |
case Some(_) => | |
if(isEnough(buffer)) { | |
buffer.flip() | |
handleMessage() | |
} | |
else{ | |
// We have not gotten enough data, unfortunately. | |
acceptor ! ChangeRequest(channel, SelectionKey.OP_READ) | |
acceptor.selector.wakeup | |
read | |
} | |
case None => | |
logger.write("read error") | |
close() | |
} | |
} | |
def close(): Nothing = { | |
acceptor ! CloseChannel(channel) | |
acceptor.selector.wakeup | |
logger.write("disconnect from [%s]" format remoteAddress) | |
exit() | |
} | |
def handleMessage(): State = { | |
val message = getMessage | |
printLog("read", message) | |
message match { | |
case MessageLine("exit") => close() | |
case MessageLine("test") => throw new Exception("handler exception test") | |
case _ => | |
acceptor ! ChangeRequest(channel, SelectionKey.OP_WRITE) | |
acceptor.selector.wakeup | |
write | |
} | |
} | |
def doWrite(): State = { | |
printLog("write", getMessage) | |
writeBuffer() | |
} | |
def getMessage: String = { | |
val message = decoder.decode(buffer).toString | |
buffer.flip() | |
message | |
} | |
def printLog(state: String, message: String) { | |
logger.write("%s-Actor[%s] %s %s: %s".format( | |
Thread.currentThread, | |
this, | |
state, | |
remoteAddress, | |
message | |
)) | |
} | |
def writeBuffer(): State = { | |
try { | |
while(buffer.hasRemaining){ | |
channel.write(buffer) match { | |
case i if i<0 => | |
logger.write("write error") | |
close | |
case 0 => | |
acceptor ! ChangeRequest(channel, SelectionKey.OP_WRITE) | |
acceptor.selector.wakeup | |
return write | |
case _ => | |
} | |
} | |
buffer.clear() | |
acceptor ! ChangeRequest(channel, SelectionKey.OP_READ) | |
acceptor.selector.wakeup | |
read | |
} | |
catch { | |
case e => | |
logger.write("write error") | |
close | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.actors.{Actor, TIMEOUT, Exit} | |
import scala.actors.Actor.State.{New, Terminated} | |
import scala.util.logging.{Logged, ConsoleLogger} | |
import scala.util.control.Exception.allCatch | |
import scala.collection.JavaConversions._ | |
import scala.util.Random | |
import java.net.InetSocketAddress | |
import java.nio.ByteBuffer | |
import java.nio.channels.{SelectionKey, Selector, ServerSocketChannel, SocketChannel} | |
import java.nio.charset.Charset | |
import java.io.IOException | |
object EchoServerRev3 { | |
def main (args:Array[String]) { | |
val supervisor = new EchoServerSupervisor() with EchoServerConsoleLoggerFactory | |
supervisor.start | |
Thread.sleep(60000) | |
supervisor.stop | |
} | |
} | |
sealed abstract class SupervisorMessage | |
case class Link(childActor: Actor) extends SupervisorMessage | |
case object Stop extends SupervisorMessage | |
class EchoServerSupervisor( | |
port: Int = 10000 | |
) extends Actor with EchoServerLoggerFactory { | |
trapExit = true | |
val logger = makeLogger() | |
val acceptor = new EchoServerAcceptor(this, logger, port) | |
def act() { | |
startChildren() | |
loop { | |
react { | |
case Link(child: Actor) => | |
link(child) | |
case Exit(child: Actor, 'normal) if child == acceptor => | |
exit("stop") | |
case Exit(child: Actor, 'normal) => | |
case Exit(child: Actor, reason: Exception) => | |
logger.write("receive Exit: %s" format reason.getMessage) | |
restartChild(child) | |
case Exit(child: Actor, reason) => | |
logger.write("receive Exit: %s" format reason) | |
restartChild(child) | |
case Stop => | |
acceptor.stop() | |
case unknown => | |
logger.write("unknown message [%s], ignoring" format unknown) | |
} | |
} | |
} | |
def startChildren() { | |
Seq(logger, acceptor) foreach { child => | |
child.getState match { | |
case New => startChild(child) | |
case Terminated => exit("Could not restart server.") | |
case _ => | |
} | |
} | |
} | |
def startChild(child: Actor) { | |
link(child) | |
child.start | |
} | |
def restartChild(child: Actor) { | |
link(child) | |
child.restart | |
} | |
def stop = this ! Stop | |
} | |
sealed abstract class LoggerMessage | |
case class Log(message: String) extends LoggerMessage | |
class EchoServerLogger extends Actor with Logged { | |
def act = loop { | |
react { | |
case Log(message) => log(message) | |
case unknown => log("unknown message [%s], ignoring" format unknown) | |
} | |
} | |
def write(message: String) { | |
if (this.mailboxSize < 100) { | |
this ! Log(message) | |
} | |
} | |
} | |
trait EchoServerLoggerFactory { | |
def makeLogger(): EchoServerLogger = | |
new EchoServerLogger() | |
} | |
trait EchoServerConsoleLoggerFactory extends EchoServerLoggerFactory { | |
override def makeLogger(): EchoServerLogger = | |
new EchoServerLogger() with ConsoleLogger | |
} | |
case class ChangeRequest(socket:SocketChannel, pos: Int) | |
case class CloseChannel(socket:SocketChannel) | |
class EchoServerAcceptor( | |
supervisor: EchoServerSupervisor, | |
logger: EchoServerLogger, | |
port: Int | |
) extends Actor { | |
val selector = Selector.open() | |
val serverChannel = { | |
val channel = ServerSocketChannel.open() | |
channel.configureBlocking(false) | |
val socket = channel.socket | |
socket.setReuseAddress(true) | |
socket.bind(new InetSocketAddress(port)) | |
channel | |
} | |
val serverKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT) | |
val random = Random | |
logger.write("Start echo server. listen port is %d" format port) | |
def act { | |
while (true) { | |
selector.select() | |
handleKeys() | |
receiveStop() | |
if (random.nextInt(500000) == 0) { | |
throw new Exception("acceptor exception test") | |
} | |
} | |
} | |
def receiveStop() { | |
receiveWithin(0) { | |
case Stop => | |
serverChannel.close() | |
selector.close() | |
logger.write("Stop echo server.") | |
exit() | |
case ChangeRequest(sc, ops) => | |
sc.keyFor(selector).interestOps(ops) | |
case CloseChannel(sc) => | |
sc.keyFor(selector).cancel | |
sc.close | |
case TIMEOUT => | |
case unknown => | |
logger.write("unknown message [%s], ignoring" format unknown) | |
} | |
} | |
def stop() = { | |
this ! Stop | |
selector.wakeup | |
} | |
def handleKeys() { | |
selector.selectedKeys foreach { key => | |
if (key.isValid) handleKey(key) | |
} | |
selector.selectedKeys.clear() | |
} | |
def handleKey(key: SelectionKey) { | |
if (serverKey == key && key.isAcceptable) { | |
accept() | |
} else { | |
val handler = key.attachment.asInstanceOf[EchoServerHandler] | |
if (key.isReadable) { | |
handler.sendMessage(Read) | |
key.interestOps(0) | |
} | |
if (key.isWritable) { | |
handler.sendMessage(Write) | |
key.interestOps(0) | |
} | |
} | |
} | |
def accept() { | |
serverChannel.accept() match { | |
case channel: SocketChannel => | |
val remoteAddress = channel.socket.getRemoteSocketAddress.toString | |
logger.write("connect from [%s]" format remoteAddress) | |
channel.configureBlocking(false) | |
val handler = new EchoServerHandler(this, logger, channel); | |
supervisor ! Link(handler) | |
handler.start | |
channel.register( | |
selector, | |
SelectionKey.OP_READ, | |
handler | |
) | |
handler ! Ack | |
case _ => | |
} | |
} | |
} | |
sealed abstract class HandlerMessage | |
case object Read extends HandlerMessage | |
case object Write extends HandlerMessage | |
case object Ack extends HandlerMessage | |
class EchoServerHandler( | |
acceptor: EchoServerAcceptor, | |
logger: EchoServerLogger, | |
channel: SocketChannel | |
) extends Actor { | |
type State = PartialFunction[Any, Unit] | |
val buffer = ByteBuffer.allocate(1024) | |
val decoder = Charset.forName("UTF-8") | |
val remoteAddress = channel.socket.getRemoteSocketAddress.toString | |
val MessageLine = """^(.*)[\r\n]{0,2}$""".r | |
// A sample method to demonstrate to read the structured data. | |
def isEnough(buffer: ByteBuffer): Boolean = { | |
buffer.position > 10 || | |
new String(buffer.array).startsWith("exit") | |
} | |
def sendMessage(message: HandlerMessage) { | |
if (this.mailboxSize < 5000) { | |
this ! message | |
} | |
} | |
def act = { | |
buffer.clear() | |
reactWithin(100)(ack) | |
} | |
def ack: State = { | |
case Ack => react(doAck()) | |
case TIMEOUT => close() | |
} | |
def read: State = { | |
case Read => reactWithin(100)(doRead()) | |
case Write => react(read) | |
case TIMEOUT => react(read) | |
case unknown => | |
logger.write("unknown message [%s], ignoring" format unknown) | |
react(read) | |
} | |
def write: State = { | |
case Write => react(doWrite()) | |
case TIMEOUT => react(read) | |
} | |
def doAck(): State = { | |
"hello\r\n".map(_.hashCode.toByte).foreach(buffer.put) | |
buffer.flip() | |
acceptor ! ChangeRequest(channel, SelectionKey.OP_WRITE) | |
acceptor.selector.wakeup | |
write | |
} | |
def doRead(): State = { | |
allCatch opt channel.read(buffer) match { | |
case Some(0) => | |
acceptor ! ChangeRequest(channel, SelectionKey.OP_READ) | |
acceptor.selector.wakeup | |
read | |
case Some(-1) => close() | |
case Some(_) => | |
if(!isEnough(buffer)) { | |
// We have not gotten enough data, unfortunately. | |
// It will be better to implement the selector pool, and | |
// retrieve an instance from it, I'm too lazy to do it ;-) | |
val selector = Selector.open() | |
val key = channel.register(selector, SelectionKey.OP_READ) | |
// The following loop may use the current thread exclusivly for long time. | |
try{ | |
do { | |
if(selector.select(1000) == 0) { | |
// timeout | |
acceptor ! ChangeRequest(channel, SelectionKey.OP_READ) | |
acceptor.selector.wakeup | |
return read | |
} | |
channel.read(buffer) match { | |
case -1 => return close | |
case _ => | |
} | |
} while(!isEnough(buffer)) | |
} | |
finally { key.cancel } | |
} | |
buffer.flip() | |
handleMessage() | |
case None => | |
logger.write("read error") | |
close() | |
} | |
} | |
def close(): Nothing = { | |
acceptor ! CloseChannel(channel) | |
acceptor.selector.wakeup | |
logger.write("disconnect from [%s]" format remoteAddress) | |
exit() | |
} | |
def handleMessage(): State = { | |
val message = getMessage | |
printLog("read", message) | |
message match { | |
case MessageLine("exit") => close() | |
case MessageLine("test") => throw new Exception("handler exception test") | |
case _ => | |
acceptor ! ChangeRequest(channel, SelectionKey.OP_WRITE) | |
acceptor.selector.wakeup | |
write | |
} | |
} | |
def doWrite(): State = { | |
printLog("write", getMessage) | |
writeBuffer() | |
} | |
def getMessage: String = { | |
val message = decoder.decode(buffer).toString | |
buffer.flip() | |
message | |
} | |
def printLog(state: String, message: String) { | |
logger.write("%s-Actor[%s] %s %s: %s".format( | |
Thread.currentThread, | |
this, | |
state, | |
remoteAddress, | |
message | |
)) | |
} | |
def writeBuffer(): State = { | |
var key = Option.empty[SelectionKey] | |
try { | |
while(buffer.hasRemaining){ | |
channel.write(buffer) match { | |
case i if i<0 => | |
logger.write("write error") | |
return close | |
case 0 => | |
Thread.`yield` | |
val selector = key.map{_.selector}.getOrElse{ | |
val selector = Selector.open() | |
key = Some(channel.register(selector, SelectionKey.OP_WRITE)) | |
selector | |
} | |
// The following selec may use the current thread exclusivly for long time. | |
if(selector.select(100) == 0) { // timeout | |
acceptor ! ChangeRequest(channel, SelectionKey.OP_WRITE) | |
acceptor.selector.wakeup | |
return write | |
} | |
case _ => | |
} | |
} | |
buffer.clear() | |
acceptor ! ChangeRequest(channel, SelectionKey.OP_READ) | |
acceptor.selector.wakeup | |
read | |
} | |
catch { | |
case e => | |
logger.write("write error") | |
close | |
} | |
finally { | |
key.foreach{ _.cancel } | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment