Skip to content

Instantly share code, notes, and snippets.

@dong77
Last active August 29, 2015 13:57
Show Gist options
  • Save dong77/9659988 to your computer and use it in GitHub Desktop.
Save dong77/9659988 to your computer and use it in GitHub Desktop.
persistent channel confirm() bug
package com.cc
import com.typesafe.config.ConfigFactory
import akka.actor._
import akka.pattern.ask
import akka.cluster.Cluster
import data._
import com.coinport.coinex.data._
import Implicits._
import Currency._
import akka.persistence._
import scala.concurrent.duration._
import akka.util.Timeout
import akka.contrib.pattern.ClusterSingletonManager
class FirstProcessor(path: ActorPath) extends Processor with ActorLogging {
override val processorId = "test_first"
val channel = context.actorOf(PersistentChannel.props(), "first_channel")
def receive = {
case p @ Persistent(i: Int, _) =>
case p @ Persistent(s: String, _) => channel ! Deliver(p.withPayload(s.toUpperCase), path)
}
}
class SecondProcessor(path: ActorPath) extends Processor with ActorLogging {
override val processorId = "test_second"
def receive = {
case p: ConfirmablePersistent => p.confirm()
}
}
import scala.concurrent.duration._
object TestApp extends App {
val port = if (args.length == 0) "0" else args(0)
val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port)
.withFallback(ConfigFactory.load())
implicit val system = ActorSystem("test", config)
implicit val cluster = Cluster(system)
val firstRouter = system.actorOf(
common.ClusterSingletonProxy.props("/user/first/singleton", Some("first"), 1 second), "first_router")
val secondRouter = system.actorOf(
common.ClusterSingletonProxy.props("/user/second/singleton", Some("second"), 1 second), "second_router")
if (cluster.selfRoles.contains("first")) {
val first =
system.actorOf(ClusterSingletonManager.props(
singletonProps = Props(new FirstProcessor(secondRouter.path)),
singletonName = "singleton",
terminationMessage = PoisonPill,
role = Some("first")),
name = "first")
}
if (cluster.selfRoles.contains("second")) {
val second =
system.actorOf(ClusterSingletonManager.props(
singletonProps = Props(new SecondProcessor(firstRouter.path)),
singletonName = "singleton",
terminationMessage = PoisonPill,
role = Some("second")),
name = "second")
}
if (args.length == 2) {
firstRouter ! Persistent(12) // COMMENT OUT THIS LINE TO SEE THE LOG DIFFERENCE
firstRouter ! Persistent("hi")
}
}
> db.messages.find()
{ "_id" : ObjectId("532ab02be31876dceb18ec85"), "processorId" : "test_first", "sequenceNr" : NumberLong(1), "marker" : "A", "message" : BinData(0,"Cg0IARIJrO0ABXQAAmhpEAEaCnRlc3RfZmlyc3QgADAAQABaGWFra2E6Ly9jb2luZXgvZGVhZExldHRlcnM=") }
{ "_id" : ObjectId("532ab02be31876dceb18ec87"), "processorId" : "test_first", "sequenceNr" : NumberLong(1), "marker" : "C-/user/first/singleton/first_channel", "message" : BinData(0,"") }
{ "_id" : ObjectId("532ab02be31876dceb18ec88"), "processorId" : "test_second", "sequenceNr" : NumberLong(1), "marker" : "A", "message" : BinData(0,"Cg0IARIJrO0ABXQAAkhJEAEaC3Rlc3Rfc2Vjb25kIAAwAEABWkVha2thLnRjcDovL2NvaW5leEAxOTIuMTY4LjAuMTAzOjI1NTUyL3VzZXIvZmlyc3Qvc2luZ2xldG9uIzE3NjU3NDY3NTA=") }
> db.messages.find()
{ "_id" : ObjectId("532ab0d3e3182e6b37f5d05e"), "processorId" : "test_first", "sequenceNr" : NumberLong(1), "marker" : "A", "message" : BinData(0,"ClUIARJRrO0ABXNyABFqYXZhLmxhbmcuSW50ZWdlchLioKT3gYc4AgABSQAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHAAAAAMEAEaCnRlc3RfZmlyc3QgADAAQABaGWFra2E6Ly9jb2luZXgvZGVhZExldHRlcnM=") }
{ "_id" : ObjectId("532ab0d3e3182e6b37f5d05f"), "processorId" : "test_first", "sequenceNr" : NumberLong(2), "marker" : "A", "message" : BinData(0,"Cg0IARIJrO0ABXQAAmhpEAIaCnRlc3RfZmlyc3QgADAAQABaGWFra2E6Ly9jb2luZXgvZGVhZExldHRlcnM=") }
{ "_id" : ObjectId("532ab0d3e3182e6b37f5d060"), "processorId" : "/user/first/singleton/first_channel", "sequenceNr" : NumberLong(1), "marker" : "A", "message" : BinData(0,"CoABCAcSYgo+Cg0IARIJrO0ABXQAAkhJEAIaCnRlc3RfZmlyc3QgADAAQABaGWFra2E6Ly9jb2luZXgvZGVhZExldHRlcnMSIGFra2E6Ly9jb2luZXgvdXNlci9zZWNvbmRfcm91dGVyGhhha2thLnBlcnNpc3RlbmNlLkRlbGl2ZXIQARojL3VzZXIvZmlyc3Qvc2luZ2xldG9uL2ZpcnN0X2NoYW5uZWwgADAAQABaRmFra2EudGNwOi8vY29pbmV4QDE5Mi4xNjguMC4xMDM6MjU1NTIvdXNlci9maXJzdC9zaW5nbGV0b24jLTIwMjAzNTcwODM=") }
{ "_id" : ObjectId("532ab0d3e3182e6b37f5d061"), "processorId" : "test_first", "sequenceNr" : NumberLong(2), "marker" : "C-/user/first/singleton/first_channel", "message" : BinData(0,"") }
{ "_id" : ObjectId("532ab0d4e3182e6b37f5d062"), "processorId" : "test_second", "sequenceNr" : NumberLong(1), "marker" : "A", "message" : BinData(0,"Cg0IARIJrO0ABXQAAkhJEAEaC3Rlc3Rfc2Vjb25kIAAwAEABWkZha2thLnRjcDovL2NvaW5leEAxOTIuMTY4LjAuMTAzOjI1NTUyL3VzZXIvZmlyc3Qvc2luZ2xldG9uIy0yMDIwMzU3MDgz") }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment