Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Minimalist Java Actors
/*
Copyright 2012-2021 Viktor Klang
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package java.klang;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class Actor { // Visibility is achieved by volatile-piggybacking of reads+writes to "on"
public static interface Fun<T, R> { public R apply(T t); } // Simple Function interface for Java
public static interface Effect extends Fun<Behavior, Behavior> { }; // An Effect returns a Behavior given a Behavior
public static interface Behavior extends Fun<Object, Effect> { }; // A Behavior is a message (Object) which returns the behavior for the next message
public static interface Address { Address tell(Object msg); }; // An Address is somewhere you can send messages
static abstract class AtomicRunnableAddress implements Runnable, Address { protected final AtomicInteger on = new AtomicInteger(); }; // Defining a composite of AtomcInteger, Runnable and Address
public final static Effect Become(final Behavior behavior) { return new Effect() { public Behavior apply(Behavior old) { return behavior; } }; } // Become is an Effect that returns a captured Behavior no matter what the old Behavior is
public final static Effect Stay = new Effect() { public Behavior apply(Behavior old) { return old; } }; // Stay is an Effect that returns the old Behavior when applied.
public final static Effect Die = Become(new Behavior() { public Effect apply(Object msg) { return Stay; } }); // Die is an Effect which replaces the old Behavior with a new one which does nothing, forever.
public static Address create(final Fun<Address, Behavior> initial, final Executor e) {
final Address a = new AtomicRunnableAddress() {
private final ConcurrentLinkedQueue<Object> mb = new ConcurrentLinkedQueue<Object>();
private Behavior behavior = new Behavior() { public Effect apply(Object msg) { return (msg instanceof Address) ? Become(initial.apply((Address)msg)) : Stay; } };
public final Address tell(Object msg) { if (mb.offer(msg)) async(); return this; }
public final void run() { if(on.get() == 1) { try { final Object m = mb.poll(); if(m != null) behavior = behavior.apply(m).apply(behavior); } finally { on.set(0); async(); } } }
private final void async() { if(!mb.isEmpty() && on.getAndSet(1) == 0) try { e.execute(this); } catch(RuntimeException re) { on.set(0); throw re; } }
};
return a.tell(a); // Make self-aware
}
}
@chirino

This comment has been minimized.

Copy link

@chirino chirino commented May 1, 2012

Don't you need to drain all polls /w a loop before you on.set(false) ?

@viktorklang

This comment has been minimized.

Copy link
Owner Author

@viktorklang viktorklang commented May 1, 2012

That's a performance optimization that you could apply, but unfortunately that'd make the model very unfair since one actor would hog a thread for as long as it has messages, effectively starving other actors.
trySchedule will reschedule the Actor to be executed if there's any more messages.

@viktorklang

This comment has been minimized.

Copy link
Owner Author

@viktorklang viktorklang commented May 1, 2012

The provided Executor could apply the following optimization:

def execute(r: Runnable): Unit = if (isCalledFromWithinMyOwnWorkerThread()) throw Trampoline(r) else super.execute(r) // Needs to be trampolined to avoid stack explosion

And then the worker threads execute Trampolines. (Of course your Trampoline would never fill in it's stack trace for performance)

@alextkachman

This comment has been minimized.

Copy link

@alextkachman alextkachman commented May 1, 2012

I would suggest to use AtomicBooleanFieldUpdater

@viktorklang

This comment has been minimized.

Copy link
Owner Author

@viktorklang viktorklang commented May 1, 2012

Doesn't exist: http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/atomic/package-summary.html
So I assume you mean AtomicIntegerFieldUpdater, but if you want performance you also want cache line padding in that case.

@alextkachman

This comment has been minimized.

Copy link

@alextkachman alextkachman commented May 1, 2012

@viktorklang

This comment has been minimized.

Copy link
Owner Author

@viktorklang viktorklang commented May 1, 2012

The saving would be per actor instance and not per message, which is currently where the highest overhead lies.
But yes, switching to a AIFU would lower memory usage per actor.

@viktorklang

This comment has been minimized.

Copy link
Owner Author

@viktorklang viktorklang commented May 1, 2012

Alright, so this is how it looks with with an AIFU, unfortunately it adds more classfile size.

@mictadlo

This comment has been minimized.

Copy link

@mictadlo mictadlo commented Jul 16, 2012

Could you please provide some examples how to use it? Did it someone run successfully on Android?

@viktorklang

This comment has been minimized.

Copy link
Owner Author

@viktorklang viktorklang commented Jul 16, 2012

import static java.klang.Actor.*;
final Address myActor = create( new Fun<Address, Behavior>() {
  public Behavior apply(final Address address) {
    return new Behavior() {
        public Effect apply(final Object message) {
           if (message == "someMessage") {
               doFoo();
           }
           return Stay;
        }
    };
  }
}, yourExecutor);

myActor.tell("someMessage");

Something like that.

@viktorklang

This comment has been minimized.

Copy link
Owner Author

@viktorklang viktorklang commented Jul 16, 2012

Also, why wouldn't it work on Android?

@emolitor

This comment has been minimized.

Copy link

@emolitor emolitor commented Dec 30, 2012

Works fine on Android but how about a license?

@viktorklang

This comment has been minimized.

Copy link
Owner Author

@viktorklang viktorklang commented Jan 2, 2013

License added

@plokhotnyuk

This comment has been minimized.

Copy link

@plokhotnyuk plokhotnyuk commented Dec 28, 2014

On Scala version of this actor, I got NPE at checkNotNull call of mbox queue during ping-pong test: https://travis-ci.org/plokhotnyuk/actors/jobs/45208220#L2164
I think source of problem is that processing of last message can occurs between isEmpty and compareAndSet call.
Please see my solution that proposed here: plokhotnyuk/actors@ca46ad6

@plokhotnyuk

This comment has been minimized.

Copy link

@plokhotnyuk plokhotnyuk commented Dec 31, 2014

I have improved the Minimalist Scala Actor for better performance and smaller footprint in memory: https://github.com/plokhotnyuk/actors/blob/42f39f15edc84ce5b6960e6bf7f1b0e75563e5fc/src/test/scala/com/github/gist/viktorklang/Actor.scala
@viktorklang , please review it if it's worth a look.

@butjeffsays

This comment has been minimized.

Copy link

@butjeffsays butjeffsays commented May 1, 2016

@viktorklang I'm late to the party, but I love using this class as an alternative when I don't want the heavyweight akka/scala frameworks as dependencies.
@plokhotnyuk I encountered the same NPE, simply checking the mailbox's return from poll() as you did alleviated the issue. Took an embarrassing amount of time to discover and now i'm even more embarrassed to find that your solution was posted years ago...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment