-
-
Save evacchi/b601498c3d8d483e3350f00b4c8aabf5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Inspired by Viktor Klang's minscalaactors.scala | |
* https://gist.github.com/viktorklang/2362563 | |
* Copyright 2014 Viktor Klang | |
* | |
* Copyright 2021 Edoardo Vacchi | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
* | |
*/ | |
//JAVA 17 | |
//JAVAC_OPTIONS --enable-preview --release 17 | |
//JAVA_OPTIONS --enable-preview | |
package io.github.evacchi; | |
import java.util.concurrent.ConcurrentLinkedQueue; | |
import java.util.concurrent.Executor; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.function.Function; | |
import static java.lang.System.out; | |
public interface Actor { | |
interface Effect<T> extends Function<Behavior<T>, Behavior<T>> {} | |
interface Behavior<T> extends Function<T, Effect<T>> {} | |
interface Address<T> { void tell(T msg); } | |
static <T> Effect<T> Become(Behavior<T> like) { return old -> like; } | |
static <T> Effect<T> Stay() { return old -> old; } | |
static <T> Effect<T> Die() { return Become(msg -> { out.println("Dropping msg [" + msg + "] due to severe case of death."); return Stay(); }); } | |
record System(Executor executor) { | |
public <T> Address<T> actorOf(Function<Address<T>, Behavior<T>> initial) { | |
abstract class AtomicRunnableAddress<T> implements Address<T>, Runnable | |
{ AtomicInteger on = new AtomicInteger(0); } | |
var addr = new AtomicRunnableAddress<T>() { | |
// Our awesome little mailbox, free of blocking and evil | |
final ConcurrentLinkedQueue<T> mbox = new ConcurrentLinkedQueue<>(); | |
Behavior<T> behavior = // Rebindable top of the mailbox, bootstrapped to identity | |
m -> (m instanceof Address self) ? Become(initial.apply((Address<T>) self)) : Stay(); | |
public void tell(T msg) { mbox.offer(msg); async(); } // Enqueue the message onto the mailbox and try to schedule for execution | |
// Switch ourselves off, and then see if we should be rescheduled for execution | |
public void run() { | |
try { if (on.get() == 1) behavior = behavior.apply((T) mbox.poll()).apply(behavior); } finally { on.set(0); async(); } | |
} | |
// If there's something to process, and we're not already scheduled | |
void async() { | |
if (!mbox.isEmpty() && on.compareAndSet(0, 1)) { | |
// Schedule to run on the Executor and back out on failure | |
try { executor.execute(this); } catch (Throwable t) { on.set(0); throw t; } | |
} | |
} | |
void init() { executor.execute(() -> { behavior = initial.apply(this); async();}); } | |
}; | |
addr.init(); // Make the actor self aware by seeding its address to the initial behavior | |
return addr; | |
} | |
} | |
static void main(String... args) { | |
String choice = args.length >= 1? args[0] : "1"; | |
switch (Integer.parseInt(choice)) { | |
case 1, default -> new Demo1().run(); | |
case 2 -> new Demo2().closure(); | |
case 3 -> new Demo2().stateful(); | |
case 4 -> new DemoVending().run(); | |
} | |
} | |
class Demo1 { | |
void run() { | |
var actorSystem = new Actor.System(Executors.newCachedThreadPool()); | |
var actor = actorSystem.actorOf(self -> msg -> { | |
out.println("self: " + self + " got msg " + msg); | |
return Actor.Die(); | |
}); | |
actor.tell("foo"); | |
actor.tell("foo"); | |
} | |
} | |
class Demo2 { | |
sealed interface TPong { Address<Ping> sender(); } | |
static record Ping(Address<TPong> sender) {} | |
static record Pong(Address<Ping> sender) implements TPong {} | |
static record DeadlyPong(Address<Ping> sender) implements TPong {} | |
void closure() { | |
var actorSystem = new Actor.System(Executors.newCachedThreadPool()); | |
var ponger = actorSystem.actorOf((Address<Ping> self) -> (Ping msg) -> pongerBehavior(self, msg, 0)); | |
var pinger = actorSystem.actorOf((Address<TPong> self) -> (TPong msg) -> pingerBehavior(self, msg)); | |
ponger.tell(new Ping(pinger)); | |
} | |
void stateful() { | |
var actorSystem = new Actor.System(Executors.newCachedThreadPool()); | |
var ponger = actorSystem.actorOf(StatefulPonger::new); | |
var pinger = actorSystem.actorOf((Address<TPong> self) -> (TPong msg) -> pingerBehavior(self, msg)); | |
ponger.tell(new Ping(pinger)); | |
} | |
Effect<Ping> pongerBehavior(Address<Ping> self, Ping msg, int counter) { | |
return switch (msg) { | |
case Ping p && counter < 10 -> { | |
out.println("ping! ➡️"); | |
p.sender().tell(new Pong(self)); | |
yield Become(m -> pongerBehavior(self, m, counter + 1)); | |
} | |
case Ping p -> { | |
out.println("ping! ☠️"); | |
p.sender().tell(new DeadlyPong(self)); | |
yield Die(); | |
} | |
}; | |
} | |
Effect<TPong> pingerBehavior(Address<TPong> self, TPong msg) { | |
return switch (msg) { | |
case Pong p -> { | |
out.println("pong! ⬅️"); | |
p.sender().tell(new Ping(self)); | |
yield Stay(); | |
} | |
case DeadlyPong p -> { | |
out.println("pong! 😵"); | |
p.sender().tell(new Ping(self)); | |
yield Die(); | |
} | |
}; | |
} | |
class StatefulPonger implements Behavior<Ping> { | |
Address<Ping> self; int counter = 0; | |
StatefulPonger(Address<Ping> self) { this.self = self; } | |
public Effect<Ping> apply(Ping msg) { | |
return switch (msg) { | |
case Ping p && counter < 10 -> { | |
out.println("ping! ➡️"); | |
p.sender().tell(new Pong(self)); | |
this.counter++; | |
yield Stay(); | |
} | |
case Ping p -> { | |
out.println("ping! ☠️"); | |
p.sender().tell(new DeadlyPong(self)); | |
yield Die(); | |
} | |
}; | |
} | |
} | |
} | |
class DemoVending { | |
sealed interface Vend {} | |
static record Coin(int amount) implements Vend{ | |
public Coin { | |
if (amount < 1 && amount > 100) | |
throw new AssertionError("1 <= amount < 100"); | |
} | |
} | |
static record Choice(String product) implements Vend{} | |
void run() { | |
var actorSystem = new Actor.System(Executors.newCachedThreadPool()); | |
var vendingMachine = actorSystem.actorOf((Address<Vend> self) -> (Vend msg) -> new DemoVending().initial(msg)); | |
vendingMachine.tell(new Coin(50)); | |
vendingMachine.tell(new Coin(40)); | |
vendingMachine.tell(new Coin(30)); | |
vendingMachine.tell(new Choice("Chocolate")); | |
} | |
Effect<Vend> initial(Vend message) { | |
return switch(message) { | |
case Coin c -> { | |
out.println("Received first coin: " + c.amount); | |
yield Become(m -> waitCoin(m, c.amount())); | |
} | |
default -> Stay(); // ignore message, stay in this state | |
}; | |
} | |
Effect<Vend> waitCoin(Object message, int counter) { | |
return switch(message) { | |
case Coin c && counter + c.amount() < 100 -> { | |
var count = counter + c.amount(); | |
out.println("Received coin: " + count + " of 100"); | |
yield Become(m -> waitCoin(m, count)); | |
} | |
case Coin c -> { | |
var count = counter + c.amount(); | |
out.println("Received last coin: " + count + " of 100"); | |
var change = counter + c.amount() - 100; | |
yield Become(m -> vend(m, change)); | |
} | |
default -> Stay(); // ignore message, stay in this state | |
}; | |
} | |
Effect<Vend> vend(Object message, int change) { | |
return switch(message) { | |
case Choice c -> { | |
vendProduct(c.product()); | |
releaseChange(change); | |
yield Become(this::initial); | |
} | |
default -> Stay(); // ignore message, stay in this state | |
}; | |
} | |
void vendProduct(String product) { | |
out.println("VENDING: " + product); | |
} | |
void releaseChange(int change) { | |
out.println("CHANGE: " + change); | |
} | |
} | |
} | |
in the body of the blog draft I have revised it with the null check :-) -- thx for the suggestion about getandSet() !
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
You'll need to check the result of the poll for null as here: https://gist.github.com/viktorklang/2557678#file-actor-java-L34
Also, if I were to write this today, I'd go with
getAndSet(1) == 0
instead ofcompareAndSet(0,1)