Skip to content

Instantly share code, notes, and snippets.

@simbo1905
Created April 23, 2015 20:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save simbo1905/facb9cef0e8ceabb3715 to your computer and use it in GitHub Desktop.
Save simbo1905/facb9cef0e8ceabb3715 to your computer and use it in GitHub Desktop.
org.scala-lang.modules:scala-pickling_2.11:0.10.0.jar crashes Oracle Java OSX 1.8.0_45
/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/bin/java -Didea.launcher.port=7535 "-Didea.launcher.bin.path=/Applications/IntelliJ IDEA 14 CE.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath "/Users/simbo/Library/Application Support/IdeaIC14/Scala/lib/scala-plugin-runners.jar:/Users/simbo/Library/Application Support/IdeaIC14/Scala/lib/Runners.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/lib/ant-javafx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/lib/dt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/lib/javafx-mx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/lib/jconsole.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/lib/packager.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/lib/sa-jdi.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/lib/tools.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/jfxswt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/management-agent.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/plugin.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Users/simbo/projects/trex/trex/core/target/scala-2.11/test-classes:/Users/simbo/projects/trex/trex/core/target/scala-2.11/classes:/Users/simbo/.ivy2/cache/com.chuusai/shapeless_2.11/bundles/shapeless_2.11-2.0.0.jar:/Users/simbo/.ivy2/cache/com.typesafe/config/bundles/config-1.2.1.jar:/Users/simbo/.ivy2/cache/com.typesafe.akka/akka-actor_2.11/jars/akka-actor_2.11-2.3.9.jar:/Users/simbo/.ivy2/cache/com.typesafe.akka/akka-testkit_2.11/jars/akka-testkit_2.11-2.3.9.jar:/Users/simbo/.ivy2/cache/org.mapdb/mapdb/bundles/mapdb-1.0.6.jar:/Users/simbo/.ivy2/cache/org.scala-lang/scala-compiler/jars/scala-compiler-2.11.4.jar:/Users/simbo/.ivy2/cache/org.scala-lang.modules/scala-parser-combinators_2.11/bundles/scala-parser-combinators_2.11-1.0.2.jar:/Users/simbo/.ivy2/cache/org.scala-lang.modules/scala-xml_2.11/bundles/scala-xml_2.11-1.0.2.jar:/Users/simbo/.ivy2/cache/org.scalamock/scalamock-core_2.11/jars/scalamock-core_2.11-3.2.1.jar:/Users/simbo/.ivy2/cache/org.scalamock/scalamock-scalatest-support_2.11/jars/scalamock-scalatest-support_2.11-3.2.1.jar:/Users/simbo/.ivy2/cache/org.scalatest/scalatest_2.11/bundles/scalatest_2.11-2.2.4.jar:/Users/simbo/.ivy2/cache/org.scala-lang/scala-library/jars/scala-library-2.11.5.jar:/Users/simbo/.ivy2/cache/org.scala-lang/scala-reflect/jars/scala-reflect-2.11.4.jar:/Users/simbo/.ivy2/cache/org.scala-lang.modules/scala-pickling_2.11/jars/scala-pickling_2.11-0.10.0.jar:/Users/simbo/.ivy2/cache/org.scoverage/scalac-scoverage-plugin_2.11/jars/scalac-scoverage-plugin_2.11-1.0.4.jar:/Users/simbo/.ivy2/cache/org.scoverage/scalac-scoverage-runtime_2.11/jars/scalac-scoverage-runtime_2.11-1.0.4.jar:/Applications/IntelliJ IDEA 14 CE.app/Contents/lib/idea_rt.jar" com.intellij.rt.execution.application.AppMain org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner -s com.github.simbo1905.trex.internals.CoreSpec -showProgressMessages true -C org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestReporter
Testing started at 21:14 ...
#
# A fatal error has been detected by the Java Runtime Environment:
#
# SIGBUS (0xa) at pc=0x00007fff925dc331, pid=19228, tid=4867
#
# JRE version: Java(TM) SE Runtime Environment (8.0_45-b14) (build 1.8.0_45-b14)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.45-b02 mixed mode bsd-amd64 compressed oops)
# Problematic frame:
# C [libsystem_platform.dylib+0x1331] _platform_memmove$VARIANT$Ivybridge+0x31
#
# Failed to write core dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again
#
# An error report file with more information is saved as:
# /Users/simbo/projects/trex/trex/hs_err_pid19228.log
#
# If you would like to submit a bug report, please visit:
# http://bugreport.java.com/bugreport/crash.jsp
#
Process finished with exit code 134
package com.github.simbo1905.trex.internals
import java.nio.ByteBuffer
import akka.util.ByteString
object Pickle {
import scala.pickling.Defaults._
import scala.pickling.shareNothing._
import scala.pickling.binary._
def unpickleAccept(b: ByteString): Accept = bytes(b).unpickle[Accept]
def unpickleAcceptAck(b: ByteString): AcceptAck = bytes(b).unpickle[AcceptAck]
def unpickleAcceptNack(b: ByteString): AcceptNack = bytes(b).unpickle[AcceptNack]
def unpickleCommit(b: ByteString): Commit = bytes(b).unpickle[Commit]
def unpickleNotLeader(b: ByteString): NotLeader = bytes(b).unpickle[NotLeader]
def unpicklePrepare(b: ByteString): Prepare = bytes(b).unpickle[Prepare]
def unpicklePrepareNack(b: ByteString): PrepareNack = bytes(b).unpickle[PrepareNack]
def unpicklePrepareAck(b: ByteString): PrepareAck = bytes(b).unpickle[PrepareAck]
def unpickleRetransitRequest(b: ByteString): RetransmitRequest = bytes(b).unpickle[RetransmitRequest]
def unpickleRetransmitResponse(b: ByteString): RetransmitResponse = bytes(b).unpickle[RetransmitResponse]
val config: Map[Byte, (Class[_], ByteString => AnyRef)] = Map(
0x0.toByte -> (classOf[Accept] -> unpickleAccept _),
0x1.toByte -> (classOf[AcceptAck] -> unpickleAcceptAck _),
0x2.toByte -> (classOf[AcceptNack] -> unpickleAcceptNack _),
0x3.toByte -> (classOf[Commit] -> unpickleCommit _),
0x4.toByte -> (classOf[NotLeader] -> unpickleNotLeader _),
0x5.toByte -> (classOf[Prepare] -> unpicklePrepare _),
0x6.toByte -> (classOf[PrepareNack] -> unpicklePrepareNack _),
0x7.toByte -> (classOf[PrepareAck] -> unpicklePrepareAck _),
0x8.toByte -> (classOf[RetransmitRequest] -> unpickleRetransitRequest _),
0x9.toByte -> (classOf[RetransmitResponse] -> unpickleRetransmitResponse _)
)
val toMap: Map[Class[_], Byte] = (config.map {
case (b, (c, f)) =>
c -> b
}).toMap
val fromMap: Map[Byte, ByteString => AnyRef] = config.map {
case (b, (c, f)) =>
b -> f
}
def unpickle(b: ByteString): AnyRef = fromMap(b.head)(b.tail)
def pickle(a: AnyRef) = a match {
case _ =>
ByteString(toMap(a.getClass)) ++ a.pickle.value
}
def bytes(b: ByteString) = {
val bytes = Array.fill[Byte](b.length)(0)
b.copyToArray(bytes, 0, bytes.length)
bytes
}
}
package com.github.simbo1905.trex.internals
import scala.compat.Platform
// TODO move to internals package
/**
* The logical number used to discriminate messages as either higher or lower. Numbers must be unique to _both_ the node in the cluster *and* paxos prepare. Physically it is 64bits with high 32bits an epoch number and low 32bits a node unique identifier. The number will be fixed for a stable leader so it also represents a leaders term.
* @param counter Used by candidate leaders to "go higher" than prior or competing leaders. No guarantees are made as to this number; there may be gaps between values issued by a node and there may be collisions between dueling leaders.
* @param nodeIdentifier node unique number which must be unique to an agent within the cluster (e.g. set from unique configuration or parsed from DNS name ’node0’, ’node1'). This value is used to tie break between dueling leaders. Saftey of the algorithm requires that this value must be unique per cluster.
*/
case class ProposalNumber(counter: Int, nodeIdentifier: Int) {
def >(that: ProposalNumber) = if (this == that) false else if (this.counter > that.counter) true else if (this.counter < that.counter) false else this.nodeIdentifier > that.nodeIdentifier
def >=(that: ProposalNumber) = if (this == that) true else if (this.counter > that.counter) true else if (this.counter < that.counter) false else this.nodeIdentifier > that.nodeIdentifier
def <(that: ProposalNumber) = if (this == that) false else if (this.counter > that.counter) false else if (this.counter < that.counter) true else this.nodeIdentifier < that.nodeIdentifier
def <=(that: ProposalNumber) = if (this == that) true else if (this.counter > that.counter) false else if (this.counter < that.counter) true else this.nodeIdentifier < that.nodeIdentifier
override def toString = f"N(c=${counter.toLong-Int.MinValue},n=$nodeIdentifier)"
}
/**
* Identifies a unique leader epoch and log index “slot” into which a value may be proposed. Each leader must only propose a single value into any given slot and must change the [[ProposalNumber]] to propose a different value at the same slot. The identifier used for a given slot will be shared across prepare, accept and commit messages during a leader take-over. The ordering of identifiers is defined by their log index order which is used to commit accepted values in order.
*
* @param from The node sending the message
* @param number The paxos proposal number used for comparing messages, or values, as higher or lower than each other. This value will be fixed for a stable leadership.
* @param logIndex The contiguous log stream position, or “slot”, into which values are proposed and committed in order.
*/
case class Identifier(val from: Int, val number: ProposalNumber, val logIndex: Long) {
override def toString = f"I(f=$from,n=$number,s=$logIndex)"
}
/**
* The progress of a node is the highest promise number and the highest committed message.
* @param highestPromised Highest promise made by a node
* @param highestCommitted Highest position and highest number committed by a node
*/
case class Progress(val highestPromised: ProposalNumber, val highestCommitted: Identifier){
override def toString = s"P(p=$highestPromised,c=$highestCommitted)"
}
/**
* Prepare is only sent to either establish a leader else to probe for the uncommitted values of a previous leader during the leader take-over phase. Followers must:
*
* 1. Check the [[ProposalNumber]] of the [[Identifier]] against the highest value previously acknowledged; if the request is lower acknowledged negatively acknowledge (“nack") it.
* 1. Check the logIndex of the [[Identifier]] against the highest committed logIndex; if the request is lower nack it.
* 1. If the [[ProposalNumber]] of the [[Identifier]] is higher the then previously acknowledged the node must make the new number durable and promise to nack any messages with a lower [[ProposalNumber]] The positive acknowledgement ("ack") must return the highest uncommitted [[Accept]] message with the same log index or None if there is no uncommitted value at that slot.
*/
case class Prepare(id: Identifier) {
/**
* @return The unique identifier of the sender within the cluster.
*/
def from = id.number.nodeIdentifier
}
/**
* Base type for a response to a prepare message. It provides additional information beyond that prescribed by the core Paxos alogirth which is used during the leader takeover protocol and to prevent unnecessary leader failover attempts.
*/
sealed trait PrepareResponse {
/**
* @return The identifier of the [[Prepare]] message being acknowledged.
*/
def request: Identifier
/**
* @return The node responding.
*/
def from: Int
/**
* @return The high commit mark and high promise mark of the responding node.
*/
def progress: Progress
/**
* @return The highest slot this node has accepted a message. Used by a new leader during the leader takeover protocol to expand the range of slots it is recovering.
*/
def highestAcceptedIndex: Long
/**
* @return The last seen leader heartbeat. Used to detect a working leader behind a partial network partition to prevent unnecessary leader failover attempts.
*/
def leaderHeartbeat: Long
}
/**
* Positively acknowledge a [[Prepare]] message. See [[PrepareResponse]]
* @param highestAcceptedIndex The highest uncommitted log index accepted by the responding node.
*/
case class PrepareAck(request: Identifier, from: Int, progress: Progress, highestAcceptedIndex: Long, leaderHeartbeat: Long, highestUncommitted: Option[Accept]) extends PrepareResponse
/**
* Negatively acknowledge a [[Prepare]] message. See [[PrepareResponse]]
*/
case class PrepareNack(request: Identifier, from: Int, progress: Progress, highestAcceptedIndex: Long, leaderHeartbeat: Long) extends PrepareResponse
/**
* A value has a binary representation used to journal it to disk.
* ClientValues are from host application.
* MembershipValues are used to change the cluster as per Dynamic Paxos
* NoOperationValues are used during the leader takeover phase to fill lost slots
*/
case class ClusterMember(nodeUniqueId: Int, location: String, active: Boolean)
trait Value {
val emptyArray: Array[Byte] = Array()
}
case class ClientValue(val bytes: Array[Byte]) extends Value
case class MembershipValue(members: Seq[ClusterMember]) extends Value
case class NoOperationValue() extends Value {
def bytes = emptyArray
}
/**
* Accept proposes a value into a log index position. Followers must:
*
* 1. Nack if a promise has been made to a [[Prepare]] request with a higher [[ProposalNumber]].
* 1. Request retransmission of lost messages if the logIndex leads to a gap in log index sequence.
* 1. If the Ack with a positive response then journal the accept message at the log index if the number is higher than the message currently store at this index.
*
* @param id Unique identifier for this request.
* @param value The value to accept at the slot position indicated by the id
*/
case class Accept(id: Identifier, value: Value) {
/**
* @return The unique identifier of the sender within the cluster.
*/
def from = id.number.nodeIdentifier
}
/**
* Base type for a response to an accept message.
*/
sealed trait AcceptResponse {
/**
* @return The request being negatively acknowledged
*/
def request: Identifier
/**
* @return The unique identifier of the respondent
*/
def from: Int
/**
* @return The high commit mark and high promise mark of the responding node.
*/
def progress: Progress
}
/**
* Positive acknowledgement that the request has been made durable by the respondent.
* @param request The request being positively acknowledged.
* @param from The unique identifier of the respondent
*/
case class AcceptAck(request: Identifier, from: Int, progress: Progress) extends AcceptResponse
/**
* Negative acknowledgement that the request has been rejected by the respondent. The progress in the reply gives an indication of the reason: either the respondent has made a higher promise else the respondent has committed the proposed slot.
* @param request The request being negatively acknowledged
* @param from The unique identifier of the respondent
* @param progress The high commit mark and last promise of the responding node.
*/
case class AcceptNack(request: Identifier, from: Int, progress: Progress) extends AcceptResponse
/**
* Commit messages indicate the highest committed log stream number. The leader shall periodically heartbeat this message to indicate that it is alive. Followers must:
*
* 1. Commit the specified message in the log index if-and-only-if all previous values have been committed in order.
* 1. Request retransmission of any messages not known to have been committed at lower log index slot.
*
* Note that the leader must commit messages in log index order itself which implies that any prior slots user the same leader number have also been committed by the leader.
*
* @param identifier Identifies the unique accept message, and hence unique value, which is being committed into the identified slot.
* @param heartbeat A value which changes as a heartbeat used by nodes cut off from the leader, but not the leaders majority, to learn that a stable leader is making progress.
*/
case class Commit(identifier: Identifier, heartbeat: Long) {
def this(identifier: Identifier) = this(identifier, Platform.currentTime)
override def toString = s"Commit(${identifier})"
/**
* @return The unique identifier of the sender within the cluster.
*/
def from = identifier.number.nodeIdentifier
}
object Commit {
def apply(identifier: Identifier) = new Commit(identifier, Platform.currentTime)
}
/**
* Requests retransmission of accept messages higher than a given log message
* @param from The node unique id which is sending the request
* @param to The node unique id to which the request is to be routed
* @param logIndex The log index last committed by the requester
*/
case class RetransmitRequest(from: Int, to: Int, logIndex: Long)
/**
* Response to a retransmit request
* @param from The node unique id which is sending the request
* @param to The node unique id to which the request is to be routed
* @param committed A contiguous sequence of committed accept messages in ascending order
* @param uncommitted A contiguous sequence of proposed but uncommitted accept messages in ascending order
*/
case class RetransmitResponse(from: Int, to: Int, val committed: Seq[Accept], uncommitted: Seq[Accept]) // FIXME now has routing info doesn't need broadcasting
/**
* Response to a client when the node is not currently the leader. The client should retry the message to another node in the cluster. Note the leader may have crashed and the responding node may become the leader next.
* @param nodeId The node replying that it is not the leader.
* @param msg The message which the node is responding to.
*/
case class NotLeader(val nodeId: Int, val msg: Value) // FIXME value could be many meg so should use GUID as correlation id
/** Paxos node state machine constants */
sealed trait NodeState
case object Follower extends NodeState
case object Recoverer extends NodeState
case object Leader extends NodeState
package com.github.simbo1905.trex.internals
import org.scalatest.{WordSpecLike, Matchers}
import java.util.Arrays.{equals => bequals}
class CoreSpec extends WordSpecLike with Matchers {
"Pickling" should {
val bytes1 = Array[Byte](5, 6)
val bytes2 = Array[Byte](7, 8)
"roundrip RetransmitResponse" in {
val a1 = Accept(Identifier(1, ProposalNumber(2, 3), 4L), ClientValue(bytes1))
val a2 = Accept(Identifier(5, ProposalNumber(6, 7), 8L), ClientValue(bytes2))
val r = RetransmitResponse(10,11,Seq(a1),Seq(a2))
val b = Pickle.pickle(r)
Pickle.unpickle(b) match {
case RetransmitResponse(10,11,Seq(a1),Seq(a2)) => {
a1 match {
case Accept(Identifier(1, ProposalNumber(2, 3), 4L), ClientValue(bout)) =>
assert(bequals(Array[Byte](5, 6), bout))
}
a2 match {
case Accept(Identifier(5, ProposalNumber(6, 7), 8L), ClientValue(bout)) =>
assert(bequals(Array[Byte](7, 8), bout))
}
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment