Skip to content

Instantly share code, notes, and snippets.

@TanUkkii007
Created July 16, 2015 07:26
Show Gist options
  • Save TanUkkii007/dacbf45a65492808c53f to your computer and use it in GitHub Desktop.
Save TanUkkii007/dacbf45a65492808c53f to your computer and use it in GitHub Desktop.
サンプルコード in "プログラミング言語のパラダイムシフトーScalaから見る関数型と並列性時代の幕開けー http://www.slideshare.net/TanUkkii/functional-and-concurencyinscala"
import akka.actor._
import akka.routing.{SmallestMailboxPool}
import akka.testkit.{TestKit, ImplicitSender}
import com.typesafe.config.ConfigFactory
import jp.trifort.ifu.StopSystemAfterAll
import org.scalatest.{MustMatchers, WordSpecLike}
import scala.concurrent.{Await, Future}
import akka.util.Timeout
import scala.concurrent.duration._
class CountAActor extends Actor {
var totalA = 0
def receive: Receive = {
case "How many?" => sender ! totalA
case text: String => totalA += text.toUpperCase().count(_ == 'A')
}
}
object CountAActor {
def props = Props(new CountAActor)
}
trait RouterCreator {
def createRouter = SmallestMailboxPool(100).props(CountAActor.props)
}
class CountARouter extends Actor with RouterCreator {
val countARouter = context.actorOf(createRouter)
def receive: Receive = {
case hm@"How many?" => {
import akka.routing.Broadcast
val reducer = context.actorOf(Reducer.props(sender(), 100))
countARouter.tell(Broadcast(hm), reducer)
}
case msg => countARouter forward msg
}
}
object CountARouter {
def props = Props(new CountARouter)
}
class Reducer(sendTo: ActorRef, maxCount: Int) extends Actor {
var total = 0; var count = 0
def receive: Receive = {
case sum: Int => {
total += sum; count += 1
if (count == maxCount) {
sendTo ! total
self ! PoisonPill
}
}
}
}
object Reducer {
def props(sendTo: ActorRef, maxCount: Int) = Props(new Reducer(sendTo, maxCount))
}
class CrashActor extends Actor with ActorLogging {
def receive: Receive = {
case "Crash!!!" => throw new Exception("crashed!")
}
override def preStart() {
log.info("preStart")
}
override def preRestart(reason: Throwable, message: Option[Any]) = {
log.info("preRestart")
super.preRestart(reason, message)
}
override def postRestart(reason: Throwable) {
log.info("postRestart")
super.postRestart(reason)
}
override def postStop() {
log.info("postStop")
}
}
object CrashActor {
def props = Props(new CrashActor)
}
class Supervisor extends Actor {
val crashActor = context.actorOf(CrashActor.props)
def receive: Receive = {
case msg => crashActor forward msg
}
}
object Supervisor {
def props = Props(new Supervisor)
}
class SampleTest extends TestKit(ActorSystem("SampleSystem", ConfigFactory.empty()))
with WordSpecLike with MustMatchers with ImplicitSender with StopSystemAfterAll {
"Future" must {
import scala.concurrent.ExecutionContext.Implicits.global
"map and flatMap" in {
val futureMessage = Future {
Thread.sleep(1000); 1
}.flatMap(value => Future {
Thread.sleep(1000); value + 1
}).map(s => s"This is a value of future after $s seconds")
Await.result(futureMessage, 5 seconds) must be("This is a value of future after 2 seconds")
}
"for comprehension" in {
val futureMessage = for {
s1 <- Future {
Thread.sleep(1000); 1
}
s2 <- Future {
Thread.sleep(1000); s1 + 1
}
} yield s"This is a value of future after $s2 seconds"
Await.result(futureMessage, 5 seconds) must be("This is a value of future after 2 seconds")
}
}
"parallel collection" must {
"behave same as standard one" in {
val list = (0 to 9999).toList
list.map(_ + 1).filter(_ % 2 == 0).fold(0)(_ + _) must be
list.par.map(_ + 1).filter(_ % 2 == 0).fold(0)(_ + _)
}
}
"CountAActor" must {
"count A and a" in {
val countAActor = system.actorOf(CountAActor.props, "countAActor")
countAActor ! "na" * 16
countAActor ! "BATMAN!"
countAActor ! "How many?"
expectMsg(18)
}
"count A and a in parallel" in {
implicit val dispatcher = system.dispatcher
implicit val timeout = Timeout(5 seconds)
import akka.pattern.ask
val countAActor1 = system.actorOf(CountAActor.props, "countAActor1")
val countAActor2 = system.actorOf(CountAActor.props, "countAActor2")
countAActor1 ! "na" * 16
countAActor2 ! "BATMAN!"
val futures = Seq(countAActor1, countAActor2).map(_ ? "How many?").map(_.mapTo[Int])
val result = Future.sequence(futures).map(_.reduce(_ + _))
Await.result(result, 5 seconds) must be(18)
}
}
"Router" must {
"route messages" in {
val router = system.actorOf(CountARouter.props, "CountARouter")
Stream.fill(10000)("BATMAN!").foreach(router ! _)
router ! "How many?"
expectMsg(10000 * 2)
}
}
"Supervisor" must {
"crush CrashActor" in {
val supervisor = system.actorOf(Supervisor.props, "supervisor")
supervisor ! "Crash!!!"
Thread.sleep(1000)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment