Skip to content

Instantly share code, notes, and snippets.

@ymnk
Created January 19, 2011 08:48
Show Gist options
  • Save ymnk/785885 to your computer and use it in GitHub Desktop.
Save ymnk/785885 to your computer and use it in GitHub Desktop.
import scala.concurrent.stm._
import scala.actors.{Actor, Exit}
import scala.actors.Actor.actor
import scala.actors.Actor.State.{New, Terminated}
import System.currentTimeMillis
import java.io.{File, PrintWriter}
trait EnsureStartActor {
def ensureStart(actor: Actor) {
actor.getState match {
case New => actor.start
case Terminated => actor.restart
case _ =>
}
}
}
object SampleForSTM4 extends EnsureStartActor {
def main (args: Array[String]) {
Tester.start
}
}
object Tester extends Actor with EnsureStartActor {
def act() {
test(
"actor only " -> new Supervisor with UseActor,
"actor with stm" -> new Supervisor with UseSTM
)
LoggerInActor.close()
LoggerInSTM.close()
}
def test (testCases: (String, Supervisor)*) {
val tryCount = 10
val testData = 1 to 256 map {n => "a"} mkString
100 to 400 by 50 foreach { workersCount =>
println("--<actor = %s, try count = %s>--".format(workersCount, tryCount))
testCases foreach { testCase =>
val (message, actor) = testCase
ensureStart(actor)
val workers = actor.startChildren(workersCount)
Counter.enroll(this, workersCount * tryCount)
val startTime = currentTimeMillis
workers foreach {_.write(tryCount, testData)}
receive {
case Finish =>
case unknown =>
println("Tester: unknown message [%s], ignoring" format unknown)
}
println("%s [%s]ms".format(message, currentTimeMillis - startTime))
actor.stop
}
}
}
}
trait WorkerFactory {
def makeWorker(): Worker = new Worker()
def getLogger(): Option[Actor] = None
}
trait UseSTM extends WorkerFactory {
override def makeWorker(): Worker = new Worker() with CanLoggingWithSTM
}
trait UseActor extends WorkerFactory {
override def makeWorker(): Worker = new Worker() with CanLoggingWithActor
override def getLogger(): Option[Actor] = Some(LoggerInActor)
}
class Supervisor() extends Actor with EnsureStartActor with WorkerFactory {
case class StartChildren(workersCount: Int)
case object Stop
trapExit = true
def act() = loop {
react {
case Exit(child: Actor, 'normal) =>
case Exit(child: Actor, reason) =>
restartChild(child)
case StartChildren(workersCount) =>
startLogger()
reply(startWorkers(workersCount))
case Stop =>
exit("stop")
case unknown =>
println("Supervisor: unknown message [%s], ignoring" format unknown)
}
}
def restartChild(child: Actor) {
link(child)
child.restart
}
def startLogger() {
getLogger() match {
case Some(logger) =>
ensureStart(logger)
link(logger)
case None =>
}
}
def startWorkers(workersCount: Int): Array[Worker] = {
1 to workersCount map {count =>
val worker = makeWorker()
link(worker)
worker.start
worker
} toArray
}
def startChildren(workersCount: Int): List[Worker] = {
val arrayWorker = this !? StartChildren(workersCount) match {
case workers: Array[Worker] => workers
case _ => Array()
}
arrayWorker.toList
}
def stop = this ! Stop
}
trait CanLogging {
def writeLog(line: String) {}
}
trait CanLoggingWithSTM extends CanLogging {
override def writeLog(line: String) {LoggerInSTM.write(line)}
}
trait CanLoggingWithActor extends CanLogging {
override def writeLog(line: String) {LoggerInActor.write(line)}
}
class Worker() extends Actor with CanLogging {
case class WriteLog(tryCount: Int, line: String)
def act() = loop {
react {
case WriteLog(tryCount, line) =>
1 to tryCount foreach {n => writeLog(line)}
case unknown =>
println("Worker: unknown message [%s], ignoring" format unknown)
}
}
def write(tryCount: Int, line: String) {
this ! WriteLog(tryCount, line)
}
}
trait HasWriter {
var hasWriter: Option[PrintWriter] = None
def open(name: String) {
hasWriter = Some(new PrintWriter(new File("./%s.log" format name)))
}
def close(): Unit = hasWriter match {
case Some(writer) => writer.close
case _ =>
}
def puts(line: String): Unit = hasWriter match {
case Some(writer) => writer.println(line)
case _ =>
}
}
object LoggerInSTM extends HasWriter {
open("stm")
val refLock = Ref(false)
def write(line: String) {
atomic { implicit txn =>
val lock = refLock()
if (lock) retry
refLock() = true
}
puts(line)
Counter.increment
atomic { implicit txn =>
refLock() = false
}
}
}
object LoggerInActor extends Actor with HasWriter {
case class WriteLog(line: String)
open("actor")
def act() = loop {
react {
case WriteLog(line) =>
puts(line)
Counter.increment
case unknown =>
println("LoggerInActor: unknown message [%s], ignoring" format unknown)
}
}
def write(line: String) {
this ! WriteLog(line)
}
}
case object Finish
object Counter {
var hasReceiver: Option[Actor] = None
var replyCount = 0
val refCounter = Ref(0)
def increment() {
atomic { implicit txn =>
refCounter() = refCounter() + 1
}
sendFinish()
}
def sendFinish() {
if (replyCount != 0 && replyCount <= refCounter.single()) {
hasReceiver match {
case Some(receiver) => receiver ! Finish
case _ =>
}
}
}
def enroll(setReceiver: Actor, setReplyCount: Int) {
hasReceiver = Some(setReceiver)
replyCount = setReplyCount
atomic { implicit txn =>
refCounter() = 0
}
}
}
import scala.concurrent.stm._
import scala.actors.{Actor, Exit}
import scala.actors.Actor.actor
import scala.actors.Actor.State.{New, Terminated}
import System.currentTimeMillis
import java.io.{File, PrintWriter}
trait EnsureStartActor {
def ensureStart(actor: Actor) {
actor.getState match {
case New => actor.start
case Terminated => actor.restart
case _ =>
}
}
}
object SampleForSTM4Rev1 extends EnsureStartActor {
def main (args: Array[String]) {
args.headOption.foreach{n => Tester.tryCount = n.toInt }
Tester.start
}
}
object Tester extends Actor with EnsureStartActor {
var tryCount = 10
def act() {
test(
"actor only " -> new Supervisor with UseActor,
"actor with stm" -> new Supervisor with UseSTM
)
LoggerInActor.close()
LoggerInSTM.close()
}
def test (testCases: (String, Supervisor)*) {
val testData = 1 to 256 map {n => "a"} mkString
100 to 1000 by 50 foreach { workersCount =>
println("--<actor = %s, try count = %s>--".format(workersCount, tryCount))
testCases foreach { testCase =>
val (message, actor) = testCase
ensureStart(actor)
val workers = actor.startChildren(workersCount)
Counter.enroll(this, workersCount * tryCount)
val startTime = currentTimeMillis
workers foreach {_.write(tryCount, testData)}
receive {
case Finish =>
case unknown =>
println("Tester: unknown message [%s], ignoring" format unknown)
}
println("%s [%s]ms".format(message, currentTimeMillis - startTime))
actor.stop
}
}
}
}
trait WorkerFactory {
def makeWorker(): Worker = new Worker()
def getLogger(): Option[Actor] = None
}
trait UseSTM extends WorkerFactory {
override def makeWorker(): Worker = new Worker() with CanLoggingWithSTM
}
trait UseActor extends WorkerFactory {
override def makeWorker(): Worker = new Worker() with CanLoggingWithActor
override def getLogger(): Option[Actor] = Some(LoggerInActor)
}
class Supervisor() extends Actor with EnsureStartActor with WorkerFactory {
case class StartChildren(workersCount: Int)
case object Stop
trapExit = true
def act() = loop {
react {
case Exit(child: Actor, 'normal) =>
case Exit(child: Actor, reason) =>
restartChild(child)
case StartChildren(workersCount) =>
startLogger()
reply(startWorkers(workersCount))
case Stop =>
try{
exit("stop")
}
finally{
reply('ok)
}
case unknown =>
println("Supervisor: unknown message [%s], ignoring" format unknown)
}
}
def restartChild(child: Actor) {
link(child)
child.restart
}
def startLogger() {
getLogger() match {
case Some(logger) =>
ensureStart(logger)
link(logger)
case None =>
}
}
def startWorkers(workersCount: Int): Array[Worker] = {
1 to workersCount map {count =>
val worker = makeWorker()
link(worker)
worker.start
worker
} toArray
}
def startChildren(workersCount: Int): List[Worker] = {
val arrayWorker = this !? StartChildren(workersCount) match {
case workers: Array[Worker] => workers
case _ => Array()
}
arrayWorker.toList
}
def stop = this !? Stop match { case _ => }
}
trait CanLogging {
def writeLog(line: String) {}
}
trait CanLoggingWithSTM extends CanLogging {
override def writeLog(line: String) {LoggerInSTM.write(line)}
}
trait CanLoggingWithActor extends CanLogging {
override def writeLog(line: String) {LoggerInActor.write(line)}
}
class Worker() extends Actor with CanLogging {
case class WriteLog(tryCount: Int, line: String)
def act() = loop {
react {
case WriteLog(tryCount, line) =>
1 to tryCount foreach {n => writeLog(line)}
case unknown =>
println("Worker: unknown message [%s], ignoring" format unknown)
}
}
def write(tryCount: Int, line: String) {
this ! WriteLog(tryCount, line)
}
}
trait HasWriter {
var hasWriter: Option[PrintWriter] = None
def open(name: String) {
hasWriter = Some(new PrintWriter(new File("./%s.log" format name)))
}
def close(): Unit = hasWriter match {
case Some(writer) => writer.close
case _ =>
}
def puts(line: String): Unit = hasWriter match {
case Some(writer) => writer.println(line)
case _ =>
}
}
object LoggerInSTM extends HasWriter {
open("stm")
val refLock = Ref(false)
def write(line: String) {
atomic { implicit txn =>
val lock = refLock()
if (lock) retry
refLock() = true
}
puts(line)
Counter.increment
atomic { implicit txn =>
refLock() = false
}
}
}
object LoggerInActor extends Actor with HasWriter {
case class WriteLog(line: String)
open("actor")
def act() = loop {
react {
case WriteLog(line) =>
puts(line)
Counter.increment
case unknown =>
println("LoggerInActor: unknown message [%s], ignoring" format unknown)
}
}
def write(line: String) {
this ! WriteLog(line)
}
}
case object Finish
object Counter {
var hasReceiver: Option[Actor] = None
var replyCount = 0
val refCounter = Ref(0)
def increment() {
atomic { implicit txn =>
refCounter() = refCounter() + 1
}
sendFinish()
}
def sendFinish() {
if (replyCount != 0 && replyCount <= refCounter.single()) {
hasReceiver match {
case Some(receiver) => receiver ! Finish
case _ =>
}
}
}
def enroll(setReceiver: Actor, setReplyCount: Int) {
hasReceiver = Some(setReceiver)
replyCount = setReplyCount
atomic { implicit txn =>
refCounter() = 0
}
}
}
import scala.concurrent.stm._
import scala.actors.{Actor, Exit}
import scala.actors.Actor.actor
import scala.actors.Actor.State.{New, Terminated}
import System.currentTimeMillis
import java.io.{File, PrintWriter}
trait EnsureStartActor {
def ensureStart(actor: Actor) {
actor.getState match {
case New => actor.start
case Terminated => actor.restart
case _ =>
}
}
}
object SampleForSTM4Rev2 extends EnsureStartActor {
def main (args: Array[String]) {
args.headOption.foreach{n => Tester.tryCount = n.toInt }
Tester.start
}
}
object Tester extends Actor with EnsureStartActor {
var tryCount = 10
def act() {
test(
"actor only " -> new Supervisor with UseActor,
"actor with stm" -> new Supervisor with UseSTM
)
LoggerInActor.close()
LoggerInSTM.close()
}
def test (testCases: (String, Supervisor)*) {
val testData = 1 to 256 map {n => "a"} mkString
100 to 1000 by 50 foreach { workersCount =>
println("--<actor = %s, try count = %s>--".format(workersCount, tryCount))
testCases foreach { testCase =>
val (message, actor) = testCase
ensureStart(actor)
val workers = actor.startChildren(workersCount)
Counter.enroll(this, workersCount * tryCount)
val startTime = currentTimeMillis
workers foreach {_.write(tryCount, testData)}
receive {
case Finish =>
case unknown =>
println("Tester: unknown message [%s], ignoring" format unknown)
}
println("%s [%s]ms".format(message, currentTimeMillis - startTime))
actor.unlinkWorkers(workers)
}
}
testCases foreach { case(_, actor) => actor.stop }
}
}
trait WorkerFactory {
def makeWorker(): Worker = new Worker()
def getLogger(): Option[Actor] = None
}
trait UseSTM extends WorkerFactory {
override def makeWorker(): Worker = new Worker() with CanLoggingWithSTM
}
trait UseActor extends WorkerFactory {
override def makeWorker(): Worker = new Worker() with CanLoggingWithActor
override def getLogger(): Option[Actor] = Some(LoggerInActor)
}
class Supervisor() extends Actor with EnsureStartActor with WorkerFactory {
case class UnlinkWorkers(w: List[Worker])
case class StartChildren(workersCount: Int)
case object Stop
trapExit = true
def act() = loop {
react {
case Exit(child: Actor, 'normal) =>
case Exit(child: Actor, reason) =>
restartChild(child)
case StartChildren(workersCount) =>
startLogger()
reply(startWorkers(workersCount))
case UnlinkWorkers(ws) =>
ws.foreach { w => {unlink(w); w ! 'exit }}
case Stop =>
exit("stop")
case unknown =>
println("Supervisor: unknown message [%s], ignoring" format unknown)
}
}
def restartChild(child: Actor) {
link(child)
child.restart
}
def startLogger() {
getLogger() match {
case Some(logger) =>
ensureStart(logger)
link(logger)
case None =>
}
}
def startWorkers(workersCount: Int): Array[Worker] = {
1 to workersCount map {count =>
val worker = makeWorker()
link(worker)
worker.start
worker
} toArray
}
def startChildren(workersCount: Int): List[Worker] = {
val arrayWorker = this !? StartChildren(workersCount) match {
case workers: Array[Worker] => workers
case _ => Array()
}
arrayWorker.toList
}
def unlinkWorkers(w: List[Worker]) = this ! UnlinkWorkers(w)
def stop = this ! Stop
}
trait CanLogging {
def writeLog(line: String) {}
}
trait CanLoggingWithSTM extends CanLogging {
override def writeLog(line: String) {LoggerInSTM.write(line)}
}
trait CanLoggingWithActor extends CanLogging {
override def writeLog(line: String) {LoggerInActor.write(line)}
}
class Worker() extends Actor with CanLogging {
case class WriteLog(tryCount: Int, line: String)
def act() = loop {
react {
case 'exit => exit
case WriteLog(tryCount, line) =>
1 to tryCount foreach {n => writeLog(line)}
case unknown =>
println("Worker: unknown message [%s], ignoring" format unknown)
}
}
def write(tryCount: Int, line: String) {
this ! WriteLog(tryCount, line)
}
}
trait HasWriter {
var hasWriter: Option[PrintWriter] = None
def open(name: String) {
hasWriter = Some(new PrintWriter(new File("./%s.log" format name)))
}
def close(): Unit = hasWriter match {
case Some(writer) => writer.close
case _ =>
}
def puts(line: String): Unit = hasWriter match {
case Some(writer) => writer.println(line)
case _ =>
}
}
object LoggerInSTM extends HasWriter {
open("stm")
val refLock = Ref(false)
def write(line: String) {
atomic { implicit txn =>
val lock = refLock()
if (lock) retry
refLock() = true
}
puts(line)
Counter.increment
atomic { implicit txn =>
refLock() = false
}
}
}
object LoggerInActor extends Actor with HasWriter {
case class WriteLog(line: String)
open("actor")
def act() = loop {
react {
case WriteLog(line) =>
puts(line)
Counter.increment
case unknown =>
println("LoggerInActor: unknown message [%s], ignoring" format unknown)
}
}
def write(line: String) {
this ! WriteLog(line)
}
}
case object Finish
object Counter {
var hasReceiver: Option[Actor] = None
var replyCount = 0
val refCounter = Ref(0)
def increment() {
atomic { implicit txn =>
refCounter() = refCounter() + 1
}
sendFinish()
}
def sendFinish() {
if (replyCount != 0 && replyCount <= refCounter.single()) {
hasReceiver match {
case Some(receiver) => receiver ! Finish
case _ =>
}
}
}
def enroll(setReceiver: Actor, setReplyCount: Int) {
hasReceiver = Some(setReceiver)
replyCount = setReplyCount
atomic { implicit txn =>
refCounter() = 0
}
}
}
import scala.concurrent.stm._
import scala.actors.{Actor, Exit}
import scala.actors.Actor.actor
import scala.actors.Actor.State.{New, Terminated}
import System.currentTimeMillis
import java.io.{File, PrintWriter}
trait EnsureStartActor {
def ensureStart(actor: Actor) {
actor.getState match {
case New => actor.start
case Terminated => actor.restart
case _ =>
}
}
}
object SampleForSTM4Rev3 extends EnsureStartActor {
def main (args: Array[String]) {
args.headOption.foreach{n => Tester.tryCount = n.toInt }
Tester.start
}
}
object Tester extends Actor with EnsureStartActor {
var tryCount = 10
def act() {
test(
"actor only " -> new Supervisor with UseActor,
"actor with stm" -> new Supervisor with UseSTM,
"actor with synchronized" -> new Supervisor with UseSynchronized
)
LoggerInActor.close()
LoggerInSTM.close()
}
def test (testCases: (String, Supervisor)*) {
val testData = 1 to 256 map {n => "a"} mkString
100 to 1000 by 50 foreach { workersCount =>
println("--<actor = %s, try count = %s>--".format(workersCount, tryCount))
testCases foreach { testCase =>
System.gc
val (message, actor) = testCase
ensureStart(actor)
val workers = actor.startChildren(workersCount)
Counter.enroll(this, workersCount * tryCount)
val startTime = currentTimeMillis
workers foreach {_.write(tryCount, testData)}
receive {
case Finish =>
case unknown =>
println("Tester: unknown message [%s], ignoring" format unknown)
}
println("%s [%s]ms".format(message, currentTimeMillis - startTime))
actor.stop
}
}
}
}
trait WorkerFactory {
def makeWorker(): Worker = new Worker()
def getLogger(): Option[Actor] = None
}
trait UseSTM extends WorkerFactory {
override def makeWorker(): Worker = new Worker() with CanLoggingWithSTM
}
trait UseActor extends WorkerFactory {
override def makeWorker(): Worker = new Worker() with CanLoggingWithActor
override def getLogger(): Option[Actor] = Some(LoggerInActor)
}
trait UseSynchronized extends WorkerFactory {
override def makeWorker(): Worker = new Worker() with CanLoggingWithSynchronized
}
class Supervisor() extends Actor with EnsureStartActor with WorkerFactory {
case class StartChildren(workersCount: Int)
case object Stop
trapExit = true
def act() = loop {
react {
case Exit(child: Actor, 'normal) =>
case Exit(child: Actor, reason) =>
restartChild(child)
case StartChildren(workersCount) =>
startLogger()
reply(startWorkers(workersCount))
case Stop =>
try{
exit("stop")
}
finally{
reply('ok)
}
case unknown =>
println("Supervisor: unknown message [%s], ignoring" format unknown)
}
}
def restartChild(child: Actor) {
link(child)
child.restart
}
def startLogger() {
getLogger() match {
case Some(logger) =>
ensureStart(logger)
link(logger)
case None =>
}
}
def startWorkers(workersCount: Int): Array[Worker] = {
1 to workersCount map {count =>
val worker = makeWorker()
link(worker)
worker.start
worker
} toArray
}
def startChildren(workersCount: Int): List[Worker] = {
val arrayWorker = this !? StartChildren(workersCount) match {
case workers: Array[Worker] => workers
case _ => Array()
}
arrayWorker.toList
}
def stop = this !? Stop match { case _ => }
}
trait CanLogging {
def writeLog(line: String) {}
}
trait CanLoggingWithSTM extends CanLogging {
override def writeLog(line: String) {LoggerInSTM.write(line)}
}
trait CanLoggingWithActor extends CanLogging {
override def writeLog(line: String) {LoggerInActor.write(line)}
}
trait CanLoggingWithSynchronized extends CanLogging {
override def writeLog(line: String) {LoggerInSynchronized.write(line)}
}
class Worker() extends Actor with CanLogging {
case class WriteLog(tryCount: Int, line: String)
def act() = loop {
react {
case WriteLog(tryCount, line) =>
1 to tryCount foreach {n => writeLog(line)}
case unknown =>
println("Worker: unknown message [%s], ignoring" format unknown)
}
}
def write(tryCount: Int, line: String) {
this ! WriteLog(tryCount, line)
}
}
trait HasWriter {
var hasWriter: Option[PrintWriter] = None
def open(name: String) {
hasWriter = Some(new PrintWriter(new File("./%s.log" format name)))
}
def close(): Unit = hasWriter match {
case Some(writer) => writer.close
case _ =>
}
def puts(line: String): Unit = hasWriter match {
case Some(writer) => writer.println(line)//;try{ Thread.sleep(1); }catch{ case e =>}
case _ =>
}
}
object LoggerInSTM extends HasWriter {
open("stm")
val refLock = Ref(false)
def write(line: String) {
atomic { implicit txn =>
val lock = refLock()
if (lock) retry
refLock() = true
}
puts(line)
Counter.increment
atomic { implicit txn =>
refLock() = false
}
}
}
object LoggerInActor extends Actor with HasWriter {
var counter = 0
case class WriteLog(line: String)
open("actor")
def act() = loop {
react {
case WriteLog(line) =>
puts(line)
counter += 1
import Counter._
if (replyCount != 0 && replyCount <= counter) {
hasReceiver match {
case Some(receiver) => counter = 0; receiver ! Finish
case _ =>
}
}
case unknown =>
println("LoggerInActor: unknown message [%s], ignoring" format unknown)
}
}
def write(line: String) {
this ! WriteLog(line)
}
}
object LoggerInSynchronized extends HasWriter {
open("synchronized")
def write(line: String) = synchronized {
puts(line)
Counter.increment
}
}
case object Finish
object Counter {
var hasReceiver: Option[Actor] = None
var replyCount = 0
val refCounter = Ref(0)
def increment() {
atomic { implicit txn =>
refCounter() = refCounter() + 1
}
sendFinish()
}
def sendFinish() {
if (replyCount != 0 && replyCount <= refCounter.single()) {
hasReceiver match {
case Some(receiver) => receiver ! Finish
case _ =>
}
}
}
def enroll(setReceiver: Actor, setReplyCount: Int) {
hasReceiver = Some(setReceiver)
replyCount = setReplyCount
atomic { implicit txn =>
refCounter() = 0
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment