Skip to content

Instantly share code, notes, and snippets.

@senorcarbone
Last active November 14, 2017 04:11
Show Gist options
  • Save senorcarbone/5c960ee27a67ec8b6bd42c33303fdcd2 to your computer and use it in GitHub Desktop.
Save senorcarbone/5c960ee27a67ec8b6bd42c33303fdcd2 to your computer and use it in GitHub Desktop.
ID2203.1x Exercise Lib
package se.kth.edx.id2203.core
import java.net.{ InetAddress, InetSocketAddress }
import se.kth.edx.id2203.core.ExercisePrimitives.PerfectP2PLink._
import se.kth.edx.id2203.core.Ports._
import se.sics.kompics.{ Init, KompicsEvent }
import se.sics.kompics.network.{ Address, Network, Transport }
import se.sics.kompics.sl.{ ComponentDefinition, _ }
object ExercisePrimitives {
object AddressUtils {
def toAddress(x: Int): Address = {
TAddress(new InetSocketAddress(InetAddress.getByName("192.168.2." + x.toString), 1000))
}
def toRank(addr: Address): Int = {
return addr.getIp().getAddress()(3).toInt;
}
}
object PerfectP2PLink {
case class PerfectLinkInit(selfAddr: Address) extends Init[PerfectP2PLink];
case class PerfectLinkMessage(src: Address, dest: Address, payload: KompicsEvent) extends TMessage(THeader(src, dest, Transport.TCP));
}
class PerfectP2PLink(pp2pInit: PerfectLinkInit) extends ComponentDefinition {
val pLink = provides[PerfectLink];
val network = requires[Network];
val self = pp2pInit.selfAddr;
pLink uponEvent {
case PL_Send(dest, payload) => handle {
trigger(PerfectLinkMessage(self, dest, payload) -> network);
}
}
network uponEvent {
case PerfectLinkMessage(src, dest, payload) => handle {
trigger(PL_Deliver(src, payload) -> pLink);
}
}
}
case class VectorClock(var vc: Map[Address, Int]) {
def inc(addr: Address) = {
vc = vc + ((addr, vc.get(addr).get + 1));
}
def set(addr: Address, value: Int) = {
vc = vc + ((addr, value));
}
def <=(that: VectorClock): Boolean = vc.foldLeft[Boolean](true)((leq, entry) => leq & (entry._2 <= that.vc.getOrElse(entry._1, entry._2)))
}
object VectorClock {
def empty(topology: scala.Seq[Address]): VectorClock = {
VectorClock(topology.foldLeft[Map[Address, Int]](Map[Address, Int]())((mp, addr) => mp + ((addr, 0))))
}
def apply(that: VectorClock): VectorClock = {
VectorClock(that.vc);
}
}
}
import se.sics.kompics.sl._;
import se.sics.kompics.{ Kompics, KompicsEvent, Start };
import spray.json._
object State {
type Initializer = ((Int, Int) => State);
}
trait State
case object Alive extends State {
override def toString(): String = "x";
}
case object Dead extends State {
override def toString(): String = " ";
}
case object Unknown extends State {
override def toString(): String = "?";
}
case class BroadcastState(val generation: Long, val x: Int, val y: Int, val state: State) extends KompicsEvent
case class Progress(val generation: Long) extends KompicsEvent
object EnvironmentPort extends Port {
indication[BroadcastState];
indication[Progress];
}
case class GameOfLifeInit(
initializer: State.Initializer,
cellType: Class[Cell], cellInit: Cell.Initializer,
numGenerations: Int = 100,
size: Int = 20) extends se.sics.kompics.Init[ParentC]
object Cell {
type Initializer = Tuple3[Int, Int, State.Initializer] => se.sics.kompics.Init[Cell]
}
abstract class Cell extends ComponentDefinition {
}
class ParentC(init: GameOfLifeInit) extends ComponentDefinition {
import StateJsonProtocol._
val envOut = provides(EnvironmentPort);
val envIn = requires(EnvironmentPort);
val size = init.size;
val grid = Array.tabulate(size, size)((i, j) => {
create(init.cellType, init.cellInit((i, j, init.initializer)));
});
val wrap = stateWrap(size);
for (i <- 0 until size) {
for (j <- 0 until size) {
val centre = grid(i)(j);
connect(EnvironmentPort)(this.getComponentCore -> centre);
connect(EnvironmentPort)(centre -> this.getComponentCore);
connect(EnvironmentPort)(grid(wrap(i - 1))(wrap(j - 1)) -> centre);
connect(EnvironmentPort)(grid(wrap(i - 1))(j) -> centre);
connect(EnvironmentPort)(grid(wrap(i - 1))(wrap(j + 1)) -> centre);
connect(EnvironmentPort)(grid(i)(wrap(j - 1)) -> centre);
connect(EnvironmentPort)(grid(i)(wrap(j + 1)) -> centre);
connect(EnvironmentPort)(grid(wrap(i + 1))(wrap(j - 1)) -> centre);
connect(EnvironmentPort)(grid(wrap(i + 1))(j) -> centre);
connect(EnvironmentPort)(grid(wrap(i + 1))(wrap(j + 1)) -> centre);
}
}
val stateGrid = Array.fill[State](size, size) { Unknown }
val stateHistory = collection.mutable.ArrayBuffer.empty[Array[Array[State]]];
val gridSize = size * size;
var broadcastCount = 0l;
private var generation = -1l
ctrl uponEvent {
case _: Start => handle {
println("Starting generation 0...");
generation = 0;
trigger(Progress(generation) -> envOut);
}
}
envIn uponEvent {
case BroadcastState(gen, x, y, state) => handle {
if (x >= 0 && y >= 0 && x < size && y < size) {
stateGrid(x)(y) = state;
broadcastCount += 1;
}
if (broadcastCount >= gridSize) {
val sgc = stateGrid.map(_.clone);
stateHistory += sgc;
broadcastCount = 0l;
if (generation < init.numGenerations) {
generation += 1;
trigger(Progress(generation) -> envOut);
} else {
val hist = stateHistory.toList;
val js = hist.toJson.compactPrint;
HTMLRenderer.render(js);
Kompics.asyncShutdown();
}
}
}
}
}
package se.kth.edx.id2203.core
import java.net.InetAddress;
import java.net.InetSocketAddress;
import se.sics.kompics.network.{ Address, Header, Msg, Transport };
import se.sics.kompics.KompicsEvent;
final case class TAddress(isa: InetSocketAddress) extends Address {
override def asSocket(): InetSocketAddress = isa;
override def getIp(): InetAddress = isa.getAddress;
override def getPort(): Int = isa.getPort;
override def sameHostAs(other: Address): Boolean = {
this.isa.equals(other.asSocket());
}
}
final case class THeader(src: Address, dst: Address, proto: Transport) extends Header[Address] {
override def getDestination(): Address = dst;
override def getProtocol(): Transport = proto;
override def getSource(): Address = src;
}
class TMessage(header: THeader) extends Msg[Address, THeader] {
override def getDestination(): Address = header.dst;
override def getHeader(): THeader = header;
override def getProtocol(): Transport = header.proto;
override def getSource(): Address = header.src;
}
package se.kth.edx.id2203.core
import se.sics.kompics.KompicsEvent
import se.sics.kompics.network.Address
import se.sics.kompics.sl._
object Ports {
case class PL_Deliver(src: Address, payload: KompicsEvent) extends KompicsEvent;
case class PL_Send(dest: Address, payload: KompicsEvent) extends KompicsEvent;
class PerfectLink extends Port {
indication[PL_Deliver];
request[PL_Send];
}
case class Suspect(src: Address) extends KompicsEvent;
case class Restore(src: Address) extends KompicsEvent;
class EventuallyPerfectFailureDetector extends Port {
indication[Suspect];
indication[Restore];
}
case class BEB_Deliver(src: Address, payload: KompicsEvent) extends KompicsEvent;
case class BEB_Broadcast(payload: KompicsEvent) extends KompicsEvent;
class BestEffortBroadcast extends Port {
indication[BEB_Deliver];
request[BEB_Broadcast];
}
case class RB_Deliver(src: Address, payload: KompicsEvent) extends KompicsEvent;
case class RB_Broadcast(payload: KompicsEvent) extends KompicsEvent;
class ReliableBroadcast extends Port {
indication[RB_Deliver];
request[RB_Broadcast];
}
case class CRB_Deliver(src: Address, payload: KompicsEvent) extends KompicsEvent;
case class CRB_Broadcast(payload: KompicsEvent) extends KompicsEvent;
class CausalOrderReliableBroadcast extends Port {
indication[CRB_Deliver];
request[CRB_Broadcast];
}
case class AR_Read_Request() extends KompicsEvent
case class AR_Read_Response(value: Option[Any]) extends KompicsEvent
case class AR_Write_Request(value: Any) extends KompicsEvent
case class AR_Write_Response() extends KompicsEvent
class AtomicRegister extends Port {
request[AR_Read_Request]
request[AR_Write_Request]
indication[AR_Read_Response]
indication[AR_Write_Response]
}
case class C_Decide(value: Any) extends KompicsEvent;
case class C_Propose(value: Any) extends KompicsEvent;
class Consensus extends Port{
request[C_Propose];
indication[C_Decide];
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment