Skip to content

Instantly share code, notes, and snippets.

@patriknw
Created August 18, 2017 14:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save patriknw/51b8cb0ba54742c752a7a93009d1ebbf to your computer and use it in GitHub Desktop.
Save patriknw/51b8cb0ba54742c752a7a93009d1ebbf to your computer and use it in GitHub Desktop.
Getting Started Guide in plain text format
Akka Documentation - Getting Started Guide
------------------------------------------
Version 2.5.4
Also in HTML format:
http://doc.akka.io/docs/akka/current/scala/guide/index.html
http://doc.akka.io/docs/akka/current/java/guide/index.html
© 2011-2017 Lightbend
Akka is Open Source and available under the Apache 2 License.
INTRODUCTION TO AKKA
Welcome to Akka, a set of open-source libraries for designing scalable,
resilient systems that span processor cores and networks. Akka allows
you to focus on meeting business needs instead of writing low-level code
to provide reliable behavior, fault tolerance, and high performance.
Many common practices and accepted programming models do not address
important challenges inherent in designing systems for modern computer
architectures. To be successful, distributed systems must cope in an
environment where components crash without responding, messages get lost
without a trace on the wire, and network latency fluctuates. These
problems occur regularly in carefully managed intra-datacenter
environments - even more so in virtualized architectures.
To help you deal with these realities, Akka provides:
- Multi-threaded behavior without the use of low-level concurrency
constructs like atomics or locks — relieving you from even thinking
about memory visibility issues.
- Transparent remote communication between systems and their
components — relieving you from writing and maintaining difficult
networking code.
- A clustered, high-availability architecture that is elastic, scales
in or out, on demand — enabling you to deliver a truly reactive
system.
Akka’s use of the actor model provides a level of abstraction that makes
it easier to write correct concurrent, parallel and distributed systems.
The actor model spans the full set of Akka libraries, providing you with
a consistent way of understanding and using them. Thus, Akka offers a
depth of integration that you cannot achieve by picking libraries to
solve individual problems and trying to piece them together.
By learning Akka and how to use the actor model, you will gain access to
a vast and deep set of tools that solve difficult distributed/parallel
systems problems in a uniform programming model where everything fits
together tightly and efficiently.
How to get started
If this is your first experience with Akka, we recommend that you start
by running a simple Hello World project. See the Quickstart Guide
Quickstart Guide for instructions on downloading and running the Hello
World example. The _Quickstart_ guide walks you through example code
that introduces how to define actor systems, actors, and messages as
well as how to use the test module and logging. Within 30 minutes, you
should be able to run the Hello World example and learn how it is
constructed.
This _Getting Started_ guide provides the next level of information. It
covers why the actor model fits the needs of modern distributed systems
and includes a tutorial that will help further your knowledge of Akka.
Topics include:
- Why modern systems need a new programming model
- How the actor model meets the needs of concurrent, distributed
systems
- Overview of Akka libraries and modules
- A more complex example that builds on the Hello World example to
illustrate common Akka patterns.
WHY MODERN SYSTEMS NEED A NEW PROGRAMMING MODEL
The actor model was proposed decades ago by Carl Hewitt as a way to
handle parallel processing in a high performance network — an
environment that was not available at the time. Today, hardware and
infrastructure capabilities have caught up with and exceeded Hewitt’s
vision. Consequently, organizations building distributed systems with
demanding requirements encounter challenges that cannot fully be solved
with a traditional object-oriented programming (OOP) model, but that can
benefit from the actor model.
Today, the actor model is not only recognized as a highly effective
solution — it has been proven in production for some of the world’s most
demanding applications. To highlight issues that the actor model
addresses, this topic discusses the following mismatches between
traditional programming assumptions and the reality of modern
multi-threaded, multi-CPU architectures:
- The challenge of encapsulation
- The illusion of shared memory on modern computer architectures
- The illustion of a call stack
The challenge of encapsulation
A core pillar of OOP is _encapsulation_. Encapsulation dictates that the
internal data of an object is not accessible directly from the outside;
it can only be modified by invoking a set of curated methods. The object
is responsible for exposing safe operations that protect the invariant
nature of its encapsulated data.
For example, operations on an ordered binary tree implementation must
not allow violation of the tree ordering invariant. Callers expect the
ordering to be intact and when querying the tree for a certain piece of
data, they need to be able to rely on this constraint.
When we analyze OOP runtime behavior, we sometimes draw a message
sequence chart showing the interactions of method calls. For example:
[sequence chart]
Unfortunately, the above diagram does not accurately represent the
_lifelines_ of the instances during execution. In reality, a _thread_
executes all these calls, and the enforcement of invariants occurs on
the same thread from which the method was called. Updating the diagram
with the thread of execution, it looks like this:
[sequence chart with thread]
The significance of this clarification becomes clear when you try to
model what happens with _multiple threads_. Suddenly, our neatly drawn
diagram becomes inadequate. We can try to illustrate multiple threads
accessing the same instance:
[sequence chart with threads interacting]
There is a section of execution where two threads enter the same method.
Unfortunately, the encapsulation model of objects does not guarantee
anything about what happens in that section. Instructions of the two
invocations can be interleaved in arbitrary ways which eliminate any
hope for keeping the invariants intact without some type of coordination
between two threads. Now, imagine this issue compounded by the existence
of many threads.
The common approach to solving this problem is to add a lock around
these methods. While this ensures that at most one thread will enter the
method at any given time, this is a very costly strategy:
- Locks _seriously limit_ concurrency, they are very costly on modern
CPU architectures, requiring heavy-lifting from the operating system
to suspend the thread and restore it later.
- The caller thread is now blocked, so it cannot do any other
meaningful work. Even in desktop applications this is unacceptable,
we want to keep user-facing parts of applications (its UI) to be
responsive even when a long background job is running. In the
backend, blocking is outright wasteful. One might think that this
can be compensated by launching new threads, but threads are also a
costly abstraction.
- Locks introduce a new menace: deadlocks.
These realities result in a no-win situation:
- Without sufficient locks, the state gets corrupted.
- With many locks in place, performance suffers and very easily leads
to deadlocks.
Additionally, locks only really work well locally. When it comes to
coordinating across multiple machines, the only alternative is
distributed locks. Unfortunately, distributed locks are several
magnitudes less efficient than local locks and usually impose a hard
limit on scaling out. Distributed lock protocols require several
communication round-trips over the network across multiple machines, so
latency goes through the roof.
In Object Oriented languages we rarely think about threads or linear
execution paths in general. We often envision a system as a network of
object instances that react to method calls, modify their internal
state, then communicate with each other via method calls driving the
whole application state forward:
[network of interacting objects]
However, in a multi-threaded distributed environment, what actually
happens is that threads “traverse” this network of object instances by
following method calls. As a result, threads are what really drive
execution:
[network of interactive objects traversed by threads]
IN SUMMARY:
- OBJECTS CAN ONLY GUARANTEE ENCAPSULATION (PROTECTION OF INVARIANTS)
IN THE FACE OF SINGLE-THREADED ACCESS, MULTI-THREAD EXECUTION ALMOST
ALWAYS LEADS TO CORRUPTED INTERNAL STATE. EVERY INVARIANT CAN BE
VIOLATED BY HAVING TWO CONTENDING THREADS IN THE SAME CODE SEGMENT.
- WHILE LOCKS SEEM TO BE THE NATURAL REMEDY TO UPHOLD ENCAPSULATION
WITH MULTIPLE THREADS, IN PRACTICE THEY ARE INEFFICIENT AND EASILY
LEAD TO DEADLOCKS IN ANY APPLICATION OF REAL-WORLD SCALE.
- LOCKS WORK LOCALLY, ATTEMPTS TO MAKE THEM DISTRIBUTED EXIST, BUT
OFFER LIMITED POTENTIAL FOR SCALING OUT.
The illusion of shared memory on modern computer architectures
Programming models of the 80’-90’s conceptualize that writing to a
variable means writing to a memory location directly (which somewhat
muddies the water that local variables might exist only in registers).
On modern architectures - if we simplify things a bit - CPUs are writing
to cache lines instead of writing to memory directly. Most of these
caches are local to the CPU core, that is, writes by one core are not
visible by another. In order to make local changes visible to another
core, and hence to another thread, the cache line needs to be shipped to
the other core’s cache.
On the JVM, we have to explicitly denote memory locations to be shared
across threads by using _volatile_ markers or Atomic wrappers.
Otherwise, we can access them only in a locked section. Why don’t we
just mark all variables as volatile? Because shipping cache lines across
cores is a very costly operation! Doing so would implicitly stall the
cores involved from doing additional work, and result in bottlenecks on
the cache coherence protocol (the protocol CPUs use to transfer cache
lines between main memory and other CPUs). The result is magnitudes of
slowdown.
Even for developers aware of this situation, figuring out which memory
locations should be marked as volatile, or which atomic structures to
use is a dark art.
IN SUMMARY:
- THERE IS NO REAL SHARED MEMORY ANYMORE, CPU CORES PASS CHUNKS OF
DATA (CACHE LINES) EXPLICITLY TO EACH OTHER JUST AS COMPUTERS ON A
NETWORK DO. INTER-CPU COMMUNICATION AND NETWORK COMMUNICATION HAVE
MORE IN COMMON THAN MANY REALIZE. PASSING MESSAGES IS THE NORM NOW
BE IT ACROSS CPUS OR NETWORKED COMPUTERS.
- INSTEAD OF HIDING THE MESSAGE PASSING ASPECT THROUGH VARIABLES
MARKED AS SHARED OR USING ATOMIC DATA STRUCTURES, A MORE DISCIPLINED
AND PRINCIPLED APPROACH IS TO KEEP STATE LOCAL TO A CONCURRENT
ENTITY AND PROPAGATE DATA OR EVENTS BETWEEN CONCURRENT ENTITIES
EXPLICITLY VIA MESSAGES.
The illusion of a call stack
Today, we often take call stacks for granted. But, they were invented in
an era where concurrent programming was not as important because
multi-CPU systems were not common. Call stacks do not cross threads and
hence, do not model asynchronous call chains.
The problem arises when a thread intends to delegate a task to the
“background”. In practice, this really means delegating to another
thread. This cannot be a simple method/function call because calls are
strictly local to the thread. What usually happens, is that the “caller”
puts an object into a memory location shared by a worker thread
(“callee”), which in turn, picks it up in some event loop. This allows
the “caller” thread to move on and do other tasks.
The first issue is, how can the “caller” be notified of the completion
of the task? But a more serious issue arises when a task fails with an
exception. Where does the exception propagate to? It will propagate to
the exception handler of the worker thread completely ignoring who the
actual “caller” was:
[exceptions cannot propagate between different threads]
This is a serious problem. How does the worker thread deal with the
situation? It likely cannot fix the issue as it is usually oblivious of
the purpose of the failed task. The “caller” thread needs to be notified
somehow, but there is no call stack to unwind with an exception. Failure
notification can only be done via a side-channel, for example putting an
error code where the “caller” thread otherwise expects the result once
ready. If this notification is not in place, the “caller” never gets
notified of a failure and the task is lost! THIS IS SURPRISINGLY SIMILAR
TO HOW NETWORKED SYSTEMS WORK WHERE MESSAGES/REQUESTS CAN GET LOST/FAIL
WITHOUT ANY NOTIFICATION.
This bad situation gets worse when things go really wrong and a worker
backed by a thread encounters a bug and ends up in an unrecoverable
situation. For example, an internal exception caused by a bug bubbles up
to the root of the thread and makes the thread shut down. This
immediately raises the question, who should restart the normal operation
of the service hosted by the thread, and how should it be restored to a
known-good state? At first glance, this might seem manageable, but we
are suddenly faced by a new, unexpected phenomena: the actual task, that
the thread was currently working on, is no longer in the shared memory
location where tasks are taken from (usually a queue). In fact, due to
the exception reaching to the top, unwinding all of the call stack, the
task state is fully lost! WE HAVE LOST A MESSAGE EVEN THOUGH THIS IS
LOCAL COMMUNICATION WITH NO NETWORKING INVOLVED (WHERE MESSAGE LOSSES
ARE TO BE EXPECTED).
IN SUMMARY:
- TO ACHIEVE ANY MEANINGFUL CONCURRENCY AND PERFORMANCE ON CURRENT
SYSTEMS, THREADS MUST DELEGATE TASKS AMONG EACH OTHER IN AN
EFFICIENT WAY WITHOUT BLOCKING. WITH THIS STYLE OF TASK-DELEGATING
CONCURRENCY (AND EVEN MORE SO WITH NETWORKED/DISTRIBUTED COMPUTING)
CALL STACK-BASED ERROR HANDLING BREAKS DOWN AND NEW, EXPLICIT ERROR
SIGNALING MECHANISMS NEED TO BE INTRODUCED. FAILURES BECOME PART OF
THE DOMAIN MODEL.
- CONCURRENT SYSTEMS WITH WORK DELEGATION NEEDS TO HANDLE SERVICE
FAULTS AND HAVE PRINCIPLED MEANS TO RECOVER FROM THEM. CLIENTS OF
SUCH SERVICES NEED TO BE AWARE THAT TASKS/MESSAGES MIGHT GET LOST
DURING RESTARTS. EVEN IF LOSS DOES NOT HAPPEN, A RESPONSE MIGHT BE
DELAYED ARBITRARILY DUE TO PREVIOUSLY ENQUEUED TASKS (A LONG QUEUE),
DELAYS CAUSED BY GARBAGE COLLECTION, ETC. IN FACE OF THESE,
CONCURRENT SYSTEMS SHOULD HANDLE RESPONSE DEADLINES IN THE FORM OF
TIMEOUTS, JUST LIKE NETWORKED/DISTRIBUTED SYSTEMS.
Next, let’s see how use of the actor model can overcome these
challenges.
HOW THE ACTOR MODEL MEETS THE NEEDS OF MODERN, DISTRIBUTED SYSTEMS
As described in the previous topic, common programming practices do not
properly address the needs of demanding modern systems. Thankfully, we
don’t need to scrap everything we know. Instead, the actor model
addresses these shortcomings in a principled way, allowing systems to
behave in a way that better matches our mental model. The actor model
abstraction allows you to think about your code in terms of
communication, not unlike the exchanges that occur between people in a
large organization.
Use of actors allows us to:
- Enforce encapsulation without resorting to locks.
- Use the model of cooperative entities reacting to signals, changing
state, and sending signals to each other to drive the whole
application forward.
- Stop worrying about an executing mechanism which is a mismatch to
our world view.
Usage of message passing avoids locking and blocking
Instead of calling methods, actors send messages to each other. Sending
a message does not transfer the thread of execution from the sender to
the destination. An actor can send a message and continue without
blocking. Therefore, it can accomplish more in the same amount of time.
With objects, when a method returns, it releases control of its
executing thread. In this respect, actors behave much like objects, they
react to messages and return execution when they finish processing the
current message. In this way, actors actually achieve the execution we
imagined for objects:
[actors interact with each other by sending messages]
An important difference between passing messages and calling methods is
that messages have no return value. By sending a message, an actor
delegates work to another actor. As we saw in The illusion of a call
stack, if it expected a return value, the sending actor would either
need to block or to execute the other actor’s work on the same thread.
Instead, the receiving actor delivers the results in a reply message.
The second key change we need in our model is to reinstate
encapsulation. Actors react to messages just like objects “react” to
methods invoked on them. The difference is that instead of multiple
threads “protruding” into our actor and wreaking havoc to internal state
and invariants, actors execute independently from the senders of a
message, and they react to incoming messages sequentially, one at a
time. While each actor processes messages sent to it sequentially,
different actors work concurrently with each other so that an actor
system can process as many messages simultaneously as the hardware will
support.
Since there is always at most one message being processed per actor, the
invariants of an actor can be kept without synchronization. This happens
automatically without using locks:
[messages don’t invalidate invariants as they are processed
sequentially]
In summary, this is what happens when an actor receives a message:
1. The actor adds the message to the end of a queue.
2. If the actor was not scheduled for execution, it is marked as ready
to execute.
3. A (hidden) scheduler entity takes the actor and starts executing it.
4. Actor picks the message from the front of the queue.
5. Actor modifies internal state, sends messages to other actors.
6. The actor is unscheduled.
To accomplish this behavior, actors have:
- A mailbox (the queue where messages end up).
- A behavior (the state of the actor, internal variables etc.).
- Messages (pieces of data representing a signal, similar to method
calls and their parameters).
- An execution environment (the machinery that takes actors that have
messages to react to and invokes their message handling code).
- An address (more on this later).
Messages go into actor mailboxes. The behavior of the actor describes
how the actor responds to messages (like sending more messages and/or
changing state). An execution environment orchestrates a pool of threads
to drive all these actions completely transparently.
This is a very simple model and it solves the issues enumerated
previously:
- Encapsulation is preserved by decoupling execution from signaling
(method calls transfer execution, message passing does not).
- There is no need for locks. Modifying the internal state of an actor
is only possible via messages, which are processed one at a time
eliminating races when trying to keep invariants.
- There are no locks used anywhere, and senders are not blocked.
Millions of actors can be efficiently scheduled on a dozen of
threads reaching the full potential of modern CPUs. Task delegation
is the natural mode of operation for actors.
- State of actors is local and not shared, changes and data is
propagated via messages, which maps to how modern memory hierarchy
actually works. In many cases, this means transferring over only the
cache lines that contain the data in the message while keeping local
state and data cached at the original core. The same model maps
exactly to remote communication where the state is kept in the RAM
of machines and changes/data is propagated over the network as
packets.
Actors handle error situations gracefully
Since we no longer have a shared call stack between actors that send
messages to each other, we need to handle error situations differently.
There are two kinds of errors we need to consider:
- The first case is when the delegated task on the target actor failed
due to an error in the task (typically some validation issue, like a
non-existent user ID). In this case, the service encapsulated by the
target actor is intact, it is only the task that itself is
erroneous. The service actor should reply to the sender with a
message, presenting the error case. There is nothing special here,
errors are part of the domain and hence become ordinary messages.
- The second case is when a service itself encounters an internal
fault. Akka enforces that all actors are organized into a tree-like
hierarchy, i.e. an actor that creates another actor becomes the
parent of that new actor. This is very similar how operating systems
organize processes into a tree. Just like with processes, when an
actor fails, its parent actor is notified and it can react to the
failure. Also, if the parent actor is stopped, all of its children
are recursively stopped, too. This service is called supervision and
it is central to Akka.
[actors supervise and handle the failures of child actors]
A supervisor (parent) can decide to restart its child actors on certain
types of failures or stop them completely on others. Children never go
silently dead (with the notable exception of entering an infinite loop)
instead they are either failing and their parent can react to the fault,
or they are stopped (in which case interested parties are automatically
notified). There is always a responsible entity for managing an actor:
its parent. Restarts are not visible from the outside: collaborating
actors can keep continuing sending messages while the target actor
restarts.
Now, let’s take a short tour of the functionality Akka provides.
OVERVIEW OF AKKA LIBRARIES AND MODULES
Before delving into some best practices for writing actors, it will be
helpful to preview the most commonly used Akka libraries. This will help
you start thinking about the functionality you want to use in your
system. All core Akka functionality is available as Open Source Software
(OSS). Lightbend sponsors Akka development but can also help you with
commercial offerings such as training, consulting, support, and
Enterprise Suite — a comprehensive set of tools for managing Akka
systems.
The following capabilities are included with Akka OSS and are introduced
later on this page:
- Actor library
- Remoting
- Cluster
- Cluster Sharding
- Cluster Singleton
- Cluster Publish-Subscribe
- Persistence
- Distributed Data
- HTTP
With a Lightbend subscription, you can use Enterprise Suite in
production. Enterprise Suite includes the following extensions to Akka
core functionality:
- Split Brain Resolver — Detects and recovers from network partitions,
eliminating data inconsistencies and possible downtime.
- Configuration Checker — Checks for potential configuration issues
and logs suggestions.
- Diagnostics Recorder — Captures configuration and system information
in a format that makes it easy to troubleshoot issues during
development and production.
- Thread Starvation Detector — Monitors an Akka system dispatcher and
logs warnings if it becomes unresponsive.
This page does not list all available modules, but overviews the main
functionality and gives you an idea of the level of sophistication you
can reach when you start building systems on top of Akka.
Actor library
The core Akka library is akka-actor. But, actors are used across Akka
libraries, providing a consistent, integrated model that relieves you
from individually solving the challenges that arise in concurrent or
distributed system design. From a birds-eye view, actors are a
programming paradigm that takes encapsulation, one of the pillars of
OOP, to its extreme. Unlike objects, actors encapsulate not only their
state but their execution. Communication with actors is not via method
calls but by passing messages. While this difference may seem minor, it
is actually what allows us to break clean from the limitations of OOP
when it comes to concurrency and remote communication. Don’t worry if
this description feels too high level to fully grasp yet, in the next
chapter we will explain actors in detail. For now, the important point
is that this is a model that handles concurrency and distribution at the
fundamental level instead of ad hoc patched attempts to bring these
features to OOP.
Challenges that actors solve include the following:
- How to build and design high-performance, concurrent applications.
- How to handle errors in a multi-threaded environment.
- How to protect my project from the pitfalls of concurrency.
Remoting
Remoting enables actors that live on different computers, to seamlessly
exchange messages. While distributed as a JAR artifact, Remoting
resembles a module more than it does a library. You enable it mostly
with configuration and it has only a few APIs. Thanks to the actor
model, a remote and local message send looks exactly the same. The
patterns that you use on local systems translate directly to remote
systems. You will rarely need to use Remoting directly, but it provides
the foundation on which the Cluster subsystem is built.
Challenges Remoting solves include the following:
- How to address actor systems living on remote hosts.
- How to address individual actors on remote actor systems.
- How to turn messages to bytes on the wire.
- How to manage low-level, network connections (and reconnections)
between hosts, detect crashed actor systems and hosts, all
transparently.
- How to multiplex communications from an unrelated set of actors on
the same network connection, all transparently.
Cluster
If you have a set of actor systems that cooperate to solve some business
problem, then you likely want to manage these set of systems in a
disciplined way. While Remoting solves the problem of addressing and
communicating with components of remote systems, Clustering gives you
the ability to organize these into a “meta-system” tied together by a
membership protocol. IN MOST CASES, YOU WANT TO USE THE CLUSTER MODULE
INSTEAD OF USING REMOTING DIRECTLY. Clustering provides an additional
set of services on top of Remoting that most real world applications
need.
Challenges the Cluster module solves include the following:
- How to maintain a set of actor systems (a cluster) that can
communicate with each other and consider each other as part of the
cluster.
- How to introduce a new system safely to the set of already existing
members.
- How to reliably detect systems that are temporarily unreachable.
- How to remove failed hosts/systems (or scale down the system) so
that all remaining members agree on the remaining subset of the
cluster?
- How to distribute computations among the current set of members.
- How do I designate members of the cluster to a certain role, in
other words, to provide certain services and not others.
Cluster Sharding
Sharding helps to solve the problem of distributing a set of actors
among members of an Akka cluster. Sharding is a pattern that mostly used
together with Persistence to balance a large set of persistent entities
(backed by actors) to members of a cluster and also migrate them to
other nodes when members crash or leave.
Challenges that Sharding solves include the following:
- How to model and scale out a large set of stateful entities on a set
of systems.
- How to ensure that entities in the cluster are distributed properly
so that load is properly balanced across the machines.
- How to ensure migrating entities from a crashed system without
losing the state.
- How to ensure that an entity does not exist on multiple systems at
the same time and hence kept consistent.
Cluster Singleton
A common (in fact, a bit too common) use case in distributed systems is
to have a single entity responsible for a given task which is shared
among other members of the cluster and migrated if the host system
fails. While this undeniably introduces a common bottleneck for the
whole cluster that limits scaling, there are scenarios where the use of
this pattern is unavoidable. Cluster singleton allows a cluster to
select an actor system which will host a particular actor while other
systems can always access said service independently from where it is.
The Singleton module can be used to solve these challenges:
- How to ensure that only one instance of a service is running in the
whole cluster.
- How to ensure that the service is up even if the system hosting it
currently crashes or shut down during the process of scaling down.
- How to reach this instance from any member of the cluster assuming
that it can migrate to other systems over time.
Cluster Publish-Subscribe
For coordination among systems, it is often necessary to distribute
messages to all, or one system of a set of interested systems in a
cluster. This pattern is usually called publish-subscribe and this
module solves this exact problem. It is possible to broadcast messages
to all subscribers of a topic or send a message to an arbitrary actor
that has expressed interest.
Cluster Publish-Subscribe is intended to solve the following challenges:
- How to broadcast messages to an interested set of parties in a
cluster.
- How to send a message to a member from an interested set of parties
in a cluster.
- How to subscribe and unsubscribe for events of a certain topic in
the cluster.
Persistence
Just like objects in OOP, actors keep their state in volatile memory.
Once the system is shut down, gracefully or because of a crash, all data
that was in memory is lost. Persistence provides patterns to enable
actors to persist events that lead to their current state. Upon startup,
events can be replayed to restore the state of the entity hosted by the
actor. The event stream can be queried and fed into additional
processing pipelines (an external Big Data cluster for example) or
alternate views (like reports).
Persistence tackles the following challenges:
- How to restore the state of an entity/actor when system restarts or
crashes.
- How to implement a CQRS system.
- How to ensure reliable delivery of messages in face of network
errors and system crashes.
- How to introspect domain events that have lead an entity to its
current state.
- How to leverage Event Sourcing in my application to support
long-running processes while the project continues to evolve.
Distributed Data
In situations where eventual consistency is acceptable, it is possible
to share data between nodes in an Akka Cluster and accept both reads and
writes even in the face of cluster partitions. This can be achieved
using Conflict Free Replicated Data Types (CRDTs), where writes on
different nodes can happen concurrently and are merged in a predictable
way afterward. The Distributed Data module provides infrastructure to
share data and a number of useful data types.
Distributed Data is intended to solve the following challenges:
- How to accept writes even in the face of cluster partitions.
- How to share data while at the same time ensuring low-latency local
read and write access.
Streams
Actors are a fundamental model for concurrency, but there are common
patterns where their use requires the user to implement the same pattern
over and over. Very common is the scenario where a chain, or graph, of
actors, need to process a potentially large, or infinite, stream of
sequential events and properly coordinate resource usage so that faster
processing stages does not overwhelm slower ones in the chain or graph.
Streams provide a higher-level abstraction on top of actors that
simplifies writing such processing networks, handling all the fine
details in the background and providing a safe, typed, composable
programming model. Streams is also an implementation of the Reactive
Streams standard which enables integration with all third party
implementations of that standard.
Streams solve the following challenges:
- How to handle streams of events or large datasets with high
performance, exploiting concurrency and keep resource usage tight.
- How to assemble reusable pieces of event/data processing into
flexible pipelines.
- How to connect asynchronous services in a flexible way to each
other, and have good performance.
- How to provide or consume Reactive Streams compliant interfaces to
interface with a third party library.
HTTP
The de facto standard for providing APIs remotely, internal or external,
is HTTP. Akka provides a library to construct or consume such HTTP
services by giving a set of tools to create HTTP services (and serve
them) and a client that can be used to consume other services. These
tools are particularly suited to streaming in and out a large set of
data or real-time events by leveraging the underlying model of Akka
Streams.
Some of the challenges that HTTP tackles:
- How to expose services of a system or cluster to the external world
via an HTTP API in a performant way.
- How to stream large datasets in and out of a system using HTTP.
- How to stream live events in and out of a system using HTTP.
Example of module use
Akka modules integrate together seamlessly. For example, think of a
large set of stateful business objects, such as documents or shopping
carts, that website users access. If you model these as sharded
entities, using Sharding and Persistence, they will be balanced across a
cluster that you can scale out on-demand. They will be available during
spikes that come from advertising campaigns or before holidays will be
handled, even if some systems crash. You can also easily take the
real-time stream of domain events with Persistence Query and use Streams
to pipe them into a streaming Fast Data engine. Then, take the output of
that engine as a Stream, manipulate it using Akka Streams operators and
expose it as web socket connections served by a load balanced set of
HTTP servers hosted by your cluster to power your real-time business
analytics tool.
We hope this preview caught your interest! The next topic introduces the
example application we will build in the tutorial portion of this guide.
INTRODUCTION TO THE EXAMPLE
When writing prose, the hardest part is often composing the first few
sentences. There is a similar “blank canvas” feeling when starting to
build an Akka system. You might wonder: Which should be the first actor?
Where should it live? What should it do? Fortunately — unlike with prose
— established best practices can guide us through these initial steps.
In the remainder of this guide, we examine the core logic of a simple
Akka application to introduce you to actors and show you how to
formulate solutions with them. The example demonstrates common patterns
that will help you kickstart your Akka projects.
Prerequisites
You should have already followed the instructions in the Akka Quickstart
with Scala guide Akka Quickstart with Java guide to download and run the
Hello World example. You will use this as a seed project and add the
functionality described in this tutorial.
IoT example use case
In this tutorial, we’ll use Akka to build out part of an Internet of
Things (IoT) system that reports data from sensor devices installed in
customers’ homes. The example focuses on temperature readings. The
target use case simply allows customers to log in and view the last
reported temperature from different areas of their homes. You can
imagine that such sensors could also collect relative humidity or other
interesting data and an application would likely support reading and
changing device configuration, maybe even alerting home owners when
sensor state falls outside of a particular range.
In a real system, the application would be exposed to customers through
a mobile app or browser. This guide concentrates only on the core logic
for storing temperaturs that would be called over a network protocol,
such as HTTP. It also includes writing tests to help you get comfortable
and proficient with testing actors.
The tutorial application consists of two main components:
- DEVICE DATA COLLECTION: — maintains a local representation of the
remote devices. Multiple sensor devices for a home are organized
into one device group.
- USER DASHBOARD: — periodically collects data from the devices for a
logged in user’s home and presents the results as a report.
The following diagram illustrates the example application architecture.
Since we are interested in the state of each sensor device, we will
model devices as actors. The running application will create as many
instances of device actors and device groups as necessary.
[box diagram of the architecture]
What you will learn in this tutorial
This tutorial introduces and illustrates:
- The actor hierarchy and how it influences actor behavior
- How to choose the right granularity for actors
- How to define protocols as messages
- Typical conversational styles
Let’s get started by learning more about actors.
PART 1: ACTOR ARCHITECTURE
Use of Akka relieves you from creating the infrastructure for an actor
system and from writing the low-level code necessary to control basic
behavior. To appreciate this, let’s look at the relationships between
actors you create in your code and those that Akka creates and manages
for you internally, the actor lifecycle, and failure handling.
The Akka actor hierarchy
An actor in Akka always belongs to a parent. Typically, you create an
actor by calling getContext().actorOf()context.actorOf(). Rather than
creating a “freestanding” actor, this injects the new actor as a child
into an already existing tree: the creator actor becomes the _parent_ of
the newly created _child_ actor. You might ask then, who is the parent
of the _first_ actor you create?
As illustrated below, all actors have a common parent, the user
guardian. New actor instances can be created under this actor using
system.actorOf(). As we covered in the Quickstart GuideQuickstart Guide,
creation of an actor returns a reference that is a valid URL. So, for
example, if we create an actor named someActor with
system.actorOf(…, "someActor"), its reference will include the path
/user/someActor.
[box diagram of the architecture]
In fact, before you create an actor in your code, Akka has already
created three actors in the system. The names of these built-in actors
contain _guardian_ because they _supervise_ every child actor in their
path. The guardian actors include:
- / the so-called _root guardian_. This is the parent of all actors in
the system, and the last one to stop when the system itself is
terminated.
- /user the _guardian_. THIS IS THE PARENT ACTOR FOR ALL USER CREATED
ACTORS. Don’t let the name user confuse you, it has nothing to do
with end users, nor with user handling. Every actor you create using
the Akka library will have the constant path /user/ prepended to it.
- /system the _system guardian_.
In the Hello World example, we have already seen how system.actorOf(),
creates an actor directly under /user. We call this a _top level_ actor,
even though, in practice it is only on the top of the _user defined_
hierarchy. You typically have only one (or very few) top level actors in
your ActorSystem. We create child, or non-top-level, actors by invoking
context.actorOf() from an existing actor. The context.actorOf() method
has a signature identical to system.actorOf(), its top-level
counterpart.
The easiest way to see the actor hierarchy in action is to simply print
ActorRef instances. In this small experiment, we create an actor, print
its reference, create a child of this actor, and print the child’s
reference. We start with the Hello World project, if you have not
downloaded it, download the Quickstart project from the Lightbend Tech
HubLightbend Tech Hub.
In your Hello World project, navigate to the com.lightbend.akka.sample
package and create a new Scala file called
ActorHierarchyExperiments.scalaJava file called
ActorHierarchyExperiments.java here. Copy and paste the code from the
snippet below to this new source file. Save your file and run
sbt "runMain com.lightbend.akka.sample.ActorHierarchyExperiments" to
observe the output.
Scala
package com.lightbend.akka.sample
import akka.actor.{ Actor, Props, ActorSystem }
import scala.io.StdIn
class PrintMyActorRefActor extends Actor {
override def receive: Receive = {
case "printit" =>
val secondRef = context.actorOf(Props.empty, "second-actor")
println(s"Second: $secondRef")
}
}
object ActorHierarchyExperiments extends App {
val system = ActorSystem()
val firstRef = system.actorOf(Props[PrintMyActorRefActor], "first-actor")
println(s"First: $firstRef")
firstRef ! "printit"
println(">>> Press ENTER to exit <<<")
try StdIn.readLine()
finally system.terminate()
}
Java
package com.lightbend.akka.sample;
import akka.actor.AbstractActor;
import akka.actor.AbstractActor.Receive;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
class PrintMyActorRefActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("printit", p -> {
ActorRef secondRef = getContext().actorOf(Props.empty(), "second-actor");
System.out.println("Second: " + secondRef);
})
.build();
}
}
public class ActorHierarchyExperiments {
public static void main(String[] args) throws java.io.IOException {
ActorSystem system = ActorSystem.create("test");
ActorRef firstRef = system.actorOf(Props.create(PrintMyActorRefActor.class), "first-actor");
System.out.println("First: " + firstRef);
firstRef.tell("printit", ActorRef.noSender());
System.out.println(">>> Press ENTER to exit <<<");
try {
System.in.read();
} finally {
system.terminate();
}
}
}
Note the way a message asked the first actor to do its work. We sent the
message by using the parent’s reference:
firstRef ! "printit"firstRef.tell("printit", ActorRef.noSender()). When
the code executes, the output includes the references for the first
actor and the child it created as part of the printit case. Your output
should look similar to the following:
First: Actor[akka://testSystem/user/first-actor#1053618476]
Second: Actor[akka://testSystem/user/first-actor/second-actor#-1544706041]
Notice the structure of the references:
- Both paths start with akka://testSystem/. Since all actor references
are valid URLs, akka:// is the value of the protocol field.
- Next, just like on the World Wide Web, the URL identifies the
system. In this example, the system is named testSystem, but it
could be any other name. If remote communication between multiple
systems is enabled, this part of the URL includes the hostname so
other systems can find it on the network.
- Because the second actor’s reference includes the path
/first-actor/, it identifies it as a child of the first.
- The last part of the actor reference, #1053618476 or #-1544706041 is
a unique identifier that you can ignore in most cases.
Now that you understand what the actor hierarchy looks like, you might
be wondering: _Why do we need this hierarchy? What is it used for?_
An important role of the hierarchy is to safely manage actor lifecycles.
Let’s consider this next and see how that knowledge can help us write
better code.
The actor lifecycle
Actors pop into existence when created, then later, at user requests,
they are stopped. Whenever an actor is stopped, all of its children are
_recursively stopped_ too. This behavior greatly simplifies resource
cleanup and helps avoid resource leaks such as those caused by open
sockets and files. In fact, a commonly overlooked difficulty when
dealing with low-level multi-threaded code is the lifecycle management
of various concurrent resources.
To stop an actor, the recommended pattern is to call
context.stop(self)getContext().stop(getSelf()) inside the actor to stop
itself, usually as a response to some user defined stop message or when
the actor is done with its job. Stopping another actor is technically
possible by calling context.stop(actorRef)getContext().stop(actorRef),
but IT IS CONSIDERED A BAD PRACTICE TO STOP ARBITRARY ACTORS THIS WAY:
try sending them a PoisonPill or custom stop message instead.
The Akka actor API exposes many lifecycle hooks that you can override in
an actor implementation. The most commonly used are preStart() and
postStop().
- preStart() is invoked after the actor has started but before it
processes its first message.
- postStop() is invoked just before the actor stops. No messages are
processed after this point.
Let’s use the preStart() and postStop() lifecycle hooks in a simple
experiment to observe the behavior when we stop an actor. First, add the
following 2 actor classes to your project:
Scala
class StartStopActor1 extends Actor {
override def preStart(): Unit = {
println("first started")
context.actorOf(Props[StartStopActor2], "second")
}
override def postStop(): Unit = println("first stopped")
override def receive: Receive = {
case "stop" => context.stop(self)
}
}
class StartStopActor2 extends Actor {
override def preStart(): Unit = println("second started")
override def postStop(): Unit = println("second stopped")
// Actor.emptyBehavior is a useful placeholder when we don't
// want to handle any messages in the actor.
override def receive: Receive = Actor.emptyBehavior
}
Java
class StartStopActor1 extends AbstractActor {
@Override
public void preStart() {
System.out.println("first started");
getContext().actorOf(Props.create(StartStopActor2.class), "second");
}
@Override
public void postStop() {
System.out.println("first stopped");
}
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("stop", s -> {
getContext().stop(getSelf());
})
.build();
}
}
class StartStopActor2 extends AbstractActor {
@Override
public void preStart() {
System.out.println("second started");
}
@Override
public void postStop() {
System.out.println("second stopped");
}
// Actor.emptyBehavior is a useful placeholder when we don't
// want to handle any messages in the actor.
@Override
public Receive createReceive() {
return receiveBuilder()
.build();
}
}
And create a ‘main’ class like above to start the actors and then send
them a "stop" message:
Scala
val first = system.actorOf(Props[StartStopActor1], "first")
first ! "stop"
Java
ActorRef first = system.actorOf(Props.create(StartStopActor1.class), "first");
first.tell("stop", ActorRef.noSender());
You can again use sbt to start this program. The output should look like
this:
first started
second started
second stopped
first stopped
When we stopped actor first, it stopped its child actor, second, before
stopping itself. This ordering is strict, _all_ postStop() hooks of the
children are called before the postStop() hook of the parent is called.
The Actor Lifecycle section of the Akka reference manual provides
details on the full set of lifecyle hooks.
Failure handling
Parents and children are connected throughout their lifecycles. Whenever
an actor fails (throws an exception or an unhandled exception bubbles
out from receive) it is temporarily suspended. As mentioned earlier, the
failure information is propagated to the parent, which then decides how
to handle the exception caused by the child actor. In this way, parents
act as supervisors for their children. The default _supervisor strategy_
is to stop and restart the child. If you don’t change the default
strategy all failures result in a restart.
Let’s observe the default strategy in a simple experiment. Add the
following classes to your project, just as you did with the previous
ones:
Scala
class SupervisingActor extends Actor {
val child = context.actorOf(Props[SupervisedActor], "supervised-actor")
override def receive: Receive = {
case "failChild" => child ! "fail"
}
}
class SupervisedActor extends Actor {
override def preStart(): Unit = println("supervised actor started")
override def postStop(): Unit = println("supervised actor stopped")
override def receive: Receive = {
case "fail" =>
println("supervised actor fails now")
throw new Exception("I failed!")
}
}
Java
class SupervisingActor extends AbstractActor {
ActorRef child = getContext().actorOf(Props.create(SupervisedActor.class), "supervised-actor");
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("failChild", f -> {
child.tell("fail", getSelf());
})
.build();
}
}
class SupervisedActor extends AbstractActor {
@Override
public void preStart() {
System.out.println("supervised actor started");
}
@Override
public void postStop() {
System.out.println("supervised actor stopped");
}
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("fail", f -> {
System.out.println("supervised actor fails now");
throw new Exception("I failed!");
})
.build();
}
}
And run with:
Scala
val supervisingActor = system.actorOf(Props[SupervisingActor], "supervising-actor")
supervisingActor ! "failChild"
Java
ActorRef supervisingActor = system.actorOf(Props.create(SupervisingActor.class), "supervising-actor");
supervisingActor.tell("failChild", ActorRef.noSender());
You should see output similar to the following:
supervised actor started
supervised actor fails now
supervised actor stopped
supervised actor started
[ERROR] [03/29/2017 10:47:14.150] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/supervising-actor/supervised-actor] I failed!
java.lang.Exception: I failed!
at tutorial_1.SupervisedActor$$anonfun$receive$4.applyOrElse(ActorHierarchyExperiments.scala:57)
at akka.actor.Actor$class.aroundReceive(Actor.scala:513)
at tutorial_1.SupervisedActor.aroundReceive(ActorHierarchyExperiments.scala:47)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:519)
at akka.actor.ActorCell.invoke(ActorCell.scala:488)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
We see that after failure the supervised actor is stopped and
immediately restarted. We also see a log entry reporting the exception
that was handled, in this case, our test exception. In this example we
used preStart() and postStop() hooks which are the default to be called
after and before restarts, so we cannot distinguish from inside the
actor whether it was started for the first time or restarted. This is
usually the right thing to do, the purpose of the restart is to set the
actor in a known-good state, which usually means a clean starting stage.
WHAT ACTUALLY HAPPENS THOUGH IS THAT THE preRestart() AND postRestart()
METHODS ARE CALLED WHICH, IF NOT OVERRIDDEN, BY DEFAULT DELEGATE TO
postStop() AND preStart() RESPECTIVELY. You can experiment with
overriding these additional methods and see how the output changes.
For the impatient, we also recommend looking into the supervision
reference page for more in-depth details.
SUMMARY
We’ve learned about how Akka manages actors in hierarchies where parents
supervise their children and handle exceptions. We saw how to create a
very simple actor and child. Next, we’ll apply this knowledge to our
example use case by modeling the communication necessary to get
information from device actors. Later, we’ll deal with how to manage the
actors in groups.
PART 2: CREATING THE FIRST ACTOR
With an understanding of actor hierarchy and behavior, the remaining
question is how to map the top-level components of our IoT system to
actors. It might be tempting to make the actors that represent devices
and dashboards at the top level. Instead, we recommend creating an
explicit component that represents the whole application. In other
words, we will have a single top-level actor in our IoT system. The
components that create and manage devices and dashboards will be
children of this actor. This allows us to refactor the example use case
architecture diagram into a tree of actors:
[actor tree diagram of the architecture]
We can define the first actor, the IotSupervisor, with a few simple
lines of code. To start your tutorial application:
1. Create a new IotSupervisor source file in the
com.lightbend.akka.sample package.
2. Paste the following code into the new file to define the
IotSupervisor.
Scala
package com.lightbend.akka.sample
import akka.actor.{ Actor, ActorLogging, Props }
object IotSupervisor {
def props(): Props = Props(new IotSupervisor)
}
class IotSupervisor extends Actor with ActorLogging {
override def preStart(): Unit = log.info("IoT Application started")
override def postStop(): Unit = log.info("IoT Application stopped")
// No need to handle any messages
override def receive = Actor.emptyBehavior
}
Java
package com.lightbend.akka.sample;
import akka.actor.AbstractActor;
import akka.actor.ActorLogging;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class IotSupervisor extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
public static Props props() {
return Props.create(IotSupervisor.class);
}
@Override
public void preStart() {
log.info("IoT Application started");
}
@Override
public void postStop() {
log.info("IoT Application stopped");
}
// No need to handle any messages
@Override
public Receive createReceive() {
return receiveBuilder()
.build();
}
}
The code is similar to the actor examples we used in the previous
experiments, but notice:
- Instead of println() we use the ActorLogging helper trait
akka.event.Logging, which directly invokes Akka’s built in logging
facility.
- We use the recommended pattern for creating actors by defining a
props() method in the companion object of static method on the
actor.
To provide the main entry point that creates the actor system, add the
following code to the new IotApp object IotMain class.
Scala
package com.lightbend.akka.sample
import akka.actor.ActorSystem
import scala.io.StdIn
object IotApp {
def main(args: Array[String]): Unit = {
val system = ActorSystem("iot-system")
try {
// Create top level supervisor
val supervisor = system.actorOf(IotSupervisor.props(), "iot-supervisor")
// Exit the system after ENTER is pressed
StdIn.readLine()
} finally {
system.terminate()
}
}
}
Java
package com.lightbend.akka.sample;
import java.io.IOException;
import akka.actor.ActorSystem;
import akka.actor.ActorRef;
public class IotMain {
public static void main(String[] args) throws IOException {
ActorSystem system = ActorSystem.create("iot-system");
try {
// Create top level supervisor
ActorRef supervisor = system.actorOf(IotSupervisor.props(), "iot-supervisor");
System.out.println("Press ENTER to exit the system");
System.in.read();
} finally {
system.terminate();
}
}
}
The application does little, other than print out that it is started.
But, we have the first actor in place and we are ready to add other
actors.
What’s next?
In the following chapters we will grow the application gradually, by:
1. Creating the representation for a device.
2. Creating the device management component.
3. Adding query capabilities to device groups.
PART 3: WORKING WITH DEVICE ACTORS
In the previous topics we explained how to view actor systems _in the
large_, that is, how components should be represented, how actors should
be arranged in the hierarchy. In this part, we will look at actors _in
the small_ by implementing the device actor.
If we were working with objects, we would typically design the API as
_interfaces_, a collection of abstract methods to be filled out by the
actual implementation. In the world of actors, protocols take the place
of interfaces. While it is not possible to formalize general protocols
in the programming language, we can compose their most basic element,
messages. So, we will start by identifying the messages we will want to
send to device actors.
Typically, messages fall into categories, or patterns. By identifying
these patterns, you will find that it becomes easier to choose between
them and to implement them. The first example demonstrates the
_request-respond_ message pattern.
Identifying messages for devices
The tasks of a device actor will be simple:
- Collect temperature measurements
- When asked, report the last measured temperature
However, a device might start without immediately having a temperature
measurement. Hence, we need to account for the case where a temperature
is not present. This also allows us to test the query part of the actor
without the write part present, as the device actor can simply report an
empty result.
The protocol for obtaining the current temperature from the device actor
is simple. The actor:
1. Waits for a request for the current temperature.
2. Responds to the request with a reply that either:
- contains the current temperature or,
- indicates that a temperature is not yet available.
We need two messages, one for the request, and one for the reply. Our
first attempt might look like the following:
Scala
final case object ReadTemperature
final case class RespondTemperature(value: Option[Double])
Java
public static final class ReadTemperature {
}
public static final class RespondTemperature {
final Optional<Double> value;
public RespondTemperature(Optional<Double> value) {
this.value = value;
}
}
These two messages seem to cover the required functionality. However,
the approach we choose must take into account the distributed nature of
the application. While the basic mechanism is the same for communicating
with an actor on the local JVM as with a remote actor, we need to keep
the following in mind:
- There will be observable differences in the latency of delivery
between local and remote messages, because factors like network link
bandwidth and the message size also come into play.
- Reliability is a concern because a remote message send involves more
steps, which means that more can go wrong.
- A local send will just pass a reference to the message inside the
same JVM, without any restrictions on the underlying object which is
sent, whereas a remote transport will place a limit on the message
size.
In addition, while sending inside the same JVM is significantly more
reliable, if an actor fails due to a programmer error while processing
the message, the effect is basically the same as if a remote network
request fails due to the remote host crashing while processing the
message. Even though in both cases, the service recovers after a while
(the actor is restarted by its supervisor, the host is restarted by an
operator or by a monitoring system) individual requests are lost during
the crash. THEREFORE, WRITING YOUR ACTORS SUCH THAT EVERY MESSAGE COULD
POSSIBLY BE LOST IS THE SAFE, PESSIMISTIC BET.
But to further understand the need for flexibility in the protocol, it
will help to consider Akka message ordering and message delivery
guarantees. Akka provides the following behavior for message sends:
- At-most-once delivery, that is, no guaranteed delivery.
- Message ordering is maintained per sender, receiver pair.
The following sections discuss this behavior in more detail:
- Message delivery
- Message ordering
Message delivery
The delivery semantics provided by messaging subsystems typically fall
into the following categories:
- AT-MOST-ONCE DELIVERY — each message is delivered zero or one time;
in more causal terms it means that messages can be lost, but are
never duplicated.
- AT-LEAST-ONCE DELIVERY — potentially multiple attempts are made to
deliver each message, until at least one succeeds; again, in more
causal terms this means that messages can be duplicated but are
never lost.
- EXACTLY-ONCE DELIVERY — each message is delivered exactly once to
the recipient; the message can neither be lost nor be duplicated.
The first behavior, the one used by Akka, is the cheapest and results in
the highest performance. It has the least implementation overhead
because it can be done in a fire-and-forget fashion without keeping the
state at the sending end or in the transport mechanism. The second,
at-least-once, requires retries to counter transport losses. This adds
the overhead of keeping the state at the sending end and having an
acknowledgment mechanism at the receiving end. Exactly-once delivery is
most expensive, and results in the worst performance: in addition to the
overhead added by at-least-once delivery, it requires the state to be
kept at the receiving end in order to filter out duplicate deliveries.
In an actor system, we need to determine exact meaning of a guarantee —
at which point does the system consider the delivery as accomplished:
1. When the message is sent out on the network?
2. When the message is received by the target actor’s host?
3. When the message is put into the target actor’s mailbox?
4. When the message target actor starts to process the message?
5. When the target actor has successfully processed the message?
Most frameworks and protocols that claim guaranteed delivery actually
provide something similar to points 4 and 5. While this sounds
reasonable, IS IT ACTUALLY USEFUL? To understand the implications,
consider a simple, practical example: a user attempts to place an order
and we only want to claim that it has successfully processed once it is
actually on disk in the orders database.
If we rely on the successful processing of the message, the actor will
report success as soon as the order has been submitted to the internal
API that has the responsibility to validate it, process it and put it
into the database. Unfortunately, immediately after the API has been
invoked any the following can happen:
- The host can crash.
- Deserialization can fail.
- Validation can fail.
- The database might be unavailable.
- A programming error might occur.
This illustrates that the GUARANTEE OF DELIVERY does not translate to
the DOMAIN LEVEL GUARANTEE. We only want to report success once the
order has been actually fully processed and persisted. THE ONLY ENTITY
THAT CAN REPORT SUCCESS IS THE APPLICATION ITSELF, SINCE ONLY IT HAS ANY
UNDERSTANDING OF THE DOMAIN GUARANTEES REQUIRED. NO GENERALIZED
FRAMEWORK CAN FIGURE OUT THE SPECIFICS OF A PARTICULAR DOMAIN AND WHAT
IS CONSIDERED A SUCCESS IN THAT DOMAIN.
In this particular example, we only want to signal success after a
successful database write, where the database acknowledged that the
order is now safely stored. FOR THESE REASONS AKKA LIFTS THE
RESPONSIBILITIES OF GUARANTEES TO THE APPLICATION ITSELF, I.E. YOU HAVE
TO IMPLEMENT THEM YOURSELF. THIS GIVES YOU FULL CONTROL OF THE
GUARANTEES THAT YOU WANT TO PROVIDE. Now, let’s consider the message
ordering that Akka provides to make it easy to reason about application
logic.
Message Ordering
In Akka, for a given pair of actors, messages sent directly from the
first to the second will not be received out-of-order. The word directly
emphasizes that this guarantee only applies when sending with the tell
operator directly to the final destination, but not when employing
mediators.
If:
- Actor A1 sends messages M1, M2, M3 to A2.
- Actor A3 sends messages M4, M5, M6 to A2.
This means that, for Akka messages:
- If M1 is delivered it must be delivered before M2 and M3.
- If M2 is delivered it must be delivered before M3.
- If M4 is delivered it must be delivered before M5 and M6.
- If M5 is delivered it must be delivered before M6.
- A2 can see messages from A1 interleaved with messages from A3.
- Since there is no guaranteed delivery, any of the messages may be
dropped, i.e. not arrive at A2.
These guarantees strike a good balance: having messages from one actor
arrive in-order is convenient for building systems that can be easily
reasoned about, while on the other hand allowing messages from different
actors to arrive interleaved provides sufficient freedom for an
efficient implementation of the actor system.
For the full details on delivery guarantees please refer to the
reference page.
Adding flexibility to device messages
Our first query protocol was correct, but did not take into account
distributed application execution. If we want to implement resends in
the actor that queries a device actor (because of timed out requests),
or if we want to query multiple actors, we need to be able to correlate
requests and responses. Hence, we add one more field to our messages, so
that an ID can be provided by the requester (we will add this code to
our app in a later step):
Scala
final case class ReadTemperature(requestId: Long)
final case class RespondTemperature(requestId: Long, value: Option[Double])
Java
public static final class ReadTemperature {
long requestId;
public ReadTemperature(long requestId) {
this.requestId = requestId;
}
}
public static final class RespondTemperature {
long requestId;
Optional<Double> value;
public RespondTemperature(long requestId, Optional<Double> value) {
this.requestId = requestId;
this.value = value;
}
}
Defining the device actor and its read protocol
As we learned in the Hello World example, each actor defines the type of
messages it will accept. Our device actor has the responsibility to use
the same ID parameter for the response of a given query, which would
make it look like the following.
Scala
import akka.actor.{ Actor, ActorLogging, Props }
object Device {
def props(groupId: String, deviceId: String): Props = Props(new Device(groupId, deviceId))
final case class ReadTemperature(requestId: Long)
final case class RespondTemperature(requestId: Long, value: Option[Double])
}
class Device(groupId: String, deviceId: String) extends Actor with ActorLogging {
import Device._
var lastTemperatureReading: Option[Double] = None
override def preStart(): Unit = log.info("Device actor {}-{} started", groupId, deviceId)
override def postStop(): Unit = log.info("Device actor {}-{} stopped", groupId, deviceId)
override def receive: Receive = {
case ReadTemperature(id) =>
sender() ! RespondTemperature(id, lastTemperatureReading)
}
}
Java
import java.util.Optional;
import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
class Device extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
final String groupId;
final String deviceId;
public Device(String groupId, String deviceId) {
this.groupId = groupId;
this.deviceId = deviceId;
}
public static Props props(String groupId, String deviceId) {
return Props.create(Device.class, groupId, deviceId);
}
public static final class ReadTemperature {
long requestId;
public ReadTemperature(long requestId) {
this.requestId = requestId;
}
}
public static final class RespondTemperature {
long requestId;
Optional<Double> value;
public RespondTemperature(long requestId, Optional<Double> value) {
this.requestId = requestId;
this.value = value;
}
}
Optional<Double> lastTemperatureReading = Optional.empty();
@Override
public void preStart() {
log.info("Device actor {}-{} started", groupId, deviceId);
}
@Override
public void postStop() {
log.info("Device actor {}-{} stopped", groupId, deviceId);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(ReadTemperature.class, r -> {
getSender().tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf());
})
.build();
}
}
Note in the code that:
- The companion objectstatic method defines how to construct a Device
actor. The props parameters include an ID for the device and the
group to which it belongs, which we will use later.
- The companion objectclass includes the definitions of the messages
we reasoned about previously.
- In the Device class, the value of lastTemperatureReading is
initially set to NoneOptional.empty(), and the actor will simply
report it back if queried.
Testing the actor
Based on the simple actor above, we could write a simple test. In the
com.lightbend.akka.sample package in the test tree of your project, add
the following code to a DeviceSpec.scalaDeviceTest.java file. (We use
ScalaTest but any other test framework can be used with the Akka
Testkit).
You can run this test by running mvn test or by running test at the sbt
prompt.
Scala
"reply with empty reading if no temperature is known" in {
val probe = TestProbe()
val deviceActor = system.actorOf(Device.props("group", "device"))
deviceActor.tell(Device.ReadTemperature(requestId = 42), probe.ref)
val response = probe.expectMsgType[Device.RespondTemperature]
response.requestId should ===(42)
response.value should ===(None)
}
Java
@Test
public void testReplyWithEmptyReadingIfNoTemperatureIsKnown() {
TestKit probe = new TestKit(system);
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
deviceActor.tell(new Device.ReadTemperature(42L), probe.getRef());
Device.RespondTemperature response = probe.expectMsgClass(Device.RespondTemperature.class);
assertEquals(42L, response.requestId);
assertEquals(Optional.empty(), response.value);
}
Now, the actor needs a way to change the state of the temperature when
it receives a message from the sensor.
Adding a write protocol
The purpose of the write protocol is to update the currentTemperature
field when the actor receives a message that contains the temperature.
Again, it is tempting to define the write protocol as a very simple
message, something like this:
Scala
final case class RecordTemperature(value: Double)
Java
public static final class RecordTemperature {
final double value;
public RecordTemperature(double value) {
this.value = value;
}
}
However, this approach does not take into account that the sender of the
record temperature message can never be sure if the message was
processed or not. We have seen that Akka does not guarantee delivery of
these messages and leaves it to the application to provide success
notifications. In our case, we would like to send an acknowledgment to
the sender once we have updated our last temperature recording, e.g.
final case class TemperatureRecorded(requestId: Long)TemperatureRecorded.
Just like in the case of temperature queries and responses, it is a good
idea to include an ID field to provide maximum flexibility.
Actor with read and write messages
Putting the read and write protocol together, the device actor looks
like the following example:
Scala
import akka.actor.{ Actor, ActorLogging, Props }
object Device {
def props(groupId: String, deviceId: String): Props = Props(new Device(groupId, deviceId))
final case class RecordTemperature(requestId: Long, value: Double)
final case class TemperatureRecorded(requestId: Long)
final case class ReadTemperature(requestId: Long)
final case class RespondTemperature(requestId: Long, value: Option[Double])
}
class Device(groupId: String, deviceId: String) extends Actor with ActorLogging {
import Device._
var lastTemperatureReading: Option[Double] = None
override def preStart(): Unit = log.info("Device actor {}-{} started", groupId, deviceId)
override def postStop(): Unit = log.info("Device actor {}-{} stopped", groupId, deviceId)
override def receive: Receive = {
case RecordTemperature(id, value) =>
log.info("Recorded temperature reading {} with {}", value, id)
lastTemperatureReading = Some(value)
sender() ! TemperatureRecorded(id)
case ReadTemperature(id) =>
sender() ! RespondTemperature(id, lastTemperatureReading)
}
}
Java
import java.util.Optional;
import akka.actor.AbstractActor;
import akka.actor.AbstractActor.Receive;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class Device extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
final String groupId;
final String deviceId;
public Device(String groupId, String deviceId) {
this.groupId = groupId;
this.deviceId = deviceId;
}
public static Props props(String groupId, String deviceId) {
return Props.create(Device.class, groupId, deviceId);
}
public static final class RecordTemperature {
final long requestId;
final double value;
public RecordTemperature(long requestId, double value) {
this.requestId = requestId;
this.value = value;
}
}
public static final class TemperatureRecorded {
final long requestId;
public TemperatureRecorded(long requestId) {
this.requestId = requestId;
}
}
public static final class ReadTemperature {
final long requestId;
public ReadTemperature(long requestId) {
this.requestId = requestId;
}
}
public static final class RespondTemperature {
final long requestId;
final Optional<Double> value;
public RespondTemperature(long requestId, Optional<Double> value) {
this.requestId = requestId;
this.value = value;
}
}
Optional<Double> lastTemperatureReading = Optional.empty();
@Override
public void preStart() {
log.info("Device actor {}-{} started", groupId, deviceId);
}
@Override
public void postStop() {
log.info("Device actor {}-{} stopped", groupId, deviceId);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(RecordTemperature.class, r -> {
log.info("Recorded temperature reading {} with {}", r.value, r.requestId);
lastTemperatureReading = Optional.of(r.value);
getSender().tell(new TemperatureRecorded(r.requestId), getSelf());
})
.match(ReadTemperature.class, r -> {
getSender().tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf());
})
.build();
}
}
We should also write a new test case now, exercising both the read/query
and write/record functionality together:
Scala
"reply with latest temperature reading" in {
val probe = TestProbe()
val deviceActor = system.actorOf(Device.props("group", "device"))
deviceActor.tell(Device.RecordTemperature(requestId = 1, 24.0), probe.ref)
probe.expectMsg(Device.TemperatureRecorded(requestId = 1))
deviceActor.tell(Device.ReadTemperature(requestId = 2), probe.ref)
val response1 = probe.expectMsgType[Device.RespondTemperature]
response1.requestId should ===(2)
response1.value should ===(Some(24.0))
deviceActor.tell(Device.RecordTemperature(requestId = 3, 55.0), probe.ref)
probe.expectMsg(Device.TemperatureRecorded(requestId = 3))
deviceActor.tell(Device.ReadTemperature(requestId = 4), probe.ref)
val response2 = probe.expectMsgType[Device.RespondTemperature]
response2.requestId should ===(4)
response2.value should ===(Some(55.0))
}
Java
@Test
public void testReplyWithLatestTemperatureReading() {
TestKit probe = new TestKit(system);
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
deviceActor.tell(new Device.RecordTemperature(1L, 24.0), probe.getRef());
assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
deviceActor.tell(new Device.ReadTemperature(2L), probe.getRef());
Device.RespondTemperature response1 = probe.expectMsgClass(Device.RespondTemperature.class);
assertEquals(2L, response1.requestId);
assertEquals(Optional.of(24.0), response1.value);
deviceActor.tell(new Device.RecordTemperature(3L, 55.0), probe.getRef());
assertEquals(3L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
deviceActor.tell(new Device.ReadTemperature(4L), probe.getRef());
Device.RespondTemperature response2 = probe.expectMsgClass(Device.RespondTemperature.class);
assertEquals(4L, response2.requestId);
assertEquals(Optional.of(55.0), response2.value);
}
What’s Next?
So far, we have started designing our overall architecture, and we wrote
the first actor that directly corresponds to the domain. We now have to
create the component that is responsible for maintaining groups of
devices and the device actors themselves.
PART 4: WORKING WITH DEVICE GROUPS
Let’s take a closer look at the main functionality required by our use
case. In a complete IoT system for monitoring home temperatures, the
steps for connecting a device sensor to our system might look like this:
1. A sensor device in the home connects through some protocol.
2. The component managing network connections accepts the connection.
3. The sensor provides its group and device ID to register with the
device manager component of our system.
4. The device manager component handles registration by looking up or
creating the actor responsible for keeping sensor state.
5. The actor responds with an acknowledgement, exposing its ActorRef.
6. The networking component now uses the ActorRef for communication
between the sensor and device actor without going through the device
manager.
Steps 1 and 2 take place outside the boundaries of our tutorial system.
In this chapter, we will start addressing steps 3-6 and create a way for
sensors to register with our system and to communicate with actors. But
first, we have another architectural decision — how many levels of
actors should we use to represent device groups and device sensors?
One of the main design challenges for Akka programmers is choosing the
best granularity for actors. In practice, depending on the
characteristics of the interactions between actors, there are usually
several valid ways to organize a system. In our use case, for example,
it would be possible to have a single actor maintain all the groups and
devices — perhaps using hash maps. It would also be reasonable to have
an actor for each group that tracks the state of all devices in the same
home.
The following guidelines help us choose the most appropriate actor
hierarchy:
- In general, prefer larger granularity. Introducing more fine-grained
actors than needed causes more problems than it solves.
- Add finer granularity when the system requires:
- Higher concurrency.
- Complex conversations between actors that have many states. We
will see a very good example for this in the next chapter.
- Sufficient state that it makes sense to divide into smaller
actors.
- Multiple unrelated responsibilities. Using separate actors
allows individuals to fail and be restored with little impact on
others.
Device manager hierarchy
Considering the principles outlined in the previous section, We will
model the device manager component as an actor tree with three levels:
- The top level supervisor actor represents the system component for
devices. It is also the entry point to look up and create device
group and device actors.
- At the next level, group actors each supervise the device actors for
one group id (e.g. one home). They also provide services, such as
querying temperature readings from all of the available devices in
their group.
- Device actors manage all the interactions with the actual device
sensors, such as storing temperature readings.
[device manager tree]
We chose this three-layered architecture for these reasons:
- Having groups of individual actors:
- Isolates failures that occur in a group. If a single actor
managed all device groups, an error in one group that causes a
restart would wipe out the state of groups that are otherwise
non-faulty.
- Simplifies the problem of querying all the devices belonging to
a group. Each group actor only contains state related to its
group.
- Increases parallelism in the system. Since each group has a
dedicated actor, they run concurrently and we can query multiple
groups concurrently.
- Having sensors modeled as individual device actors:
- Isolates failures of one device actor from the rest of the
devices in the group.
- Increases the parallelism of collecting temperature readings.
Network connections from different sensors communicate with
their individual device actors directly, reducing contention
points.
With the architecture defined, we can start working on the protocol for
registering sensors.
The Registration Protocol
As the first step, we need to design the protocol both for registering a
device and for creating the group and device actors that will be
responsible for it. This protocol will be provided by the DeviceManager
component itself because that is the only actor that is known and
available up front: device groups and device actors are created
on-demand.
Looking at registration in more detail, we can outline the necessary
functionality:
1. When a DeviceManager receives a request with a group and device id:
- If the manager already has an actor for the device group, it
forwards the request to it.
- Otherwise, it creates a new device group actor and then forwards
the request.
2. The DeviceGroup actor receives the request to register an actor for
the given device:
- If the group already has an actor for the device, the group
actor forwards the request to the device actor.
- Otherwise, the DeviceGroup actor first creates a device actor
and then forwards the request.
3. The device actor receives the request and sends an acknowledgement
to the original sender. Since the device actor acknowledges receipt
(instead of the group actor), the sensor will now have the ActorRef
to send messages directly to its actor.
The messages that we will use to communicate registration requests and
their acknowledgement have a simple definition:
Scala
final case class RequestTrackDevice(groupId: String, deviceId: String)
case object DeviceRegistered
Java
public static final class RequestTrackDevice {
public final String groupId;
public final String deviceId;
public RequestTrackDevice(String groupId, String deviceId) {
this.groupId = groupId;
this.deviceId = deviceId;
}
}
public static final class DeviceRegistered {
}
In this case we have not included a request ID field in the messages.
Since registration happens once, when the component connects the system
to some network protocol, the ID is not important. However, it is
usually a best practice to include a request ID.
Now, we’ll start implementing the protocol from the bottom up. In
practice, both a top-down and bottom-up approach can work, but in our
case, we benefit from the bottom-up approach as it allows us to
immediately write tests for the new features without mocking out parts
that we will need to build later.
Adding registration support to device actors
At the bottom of our hierarchy are the Device actors. Their job in the
registration process is simple: reply to the registration request with
an acknowledgment to the sender. It is also prudent to add a safeguard
against requests that come with a mismatched group or device ID.
_We will assume that the ID of the sender of the registration message is
preserved in the upper layers._ We will show you in the next section how
this can be achieved.
The device actor registration code looks like the following. Modify your
example to match.
Scala
object Device {
def props(groupId: String, deviceId: String): Props = Props(new Device(groupId, deviceId))
final case class RecordTemperature(requestId: Long, value: Double)
final case class TemperatureRecorded(requestId: Long)
final case class ReadTemperature(requestId: Long)
final case class RespondTemperature(requestId: Long, value: Option[Double])
}
class Device(groupId: String, deviceId: String) extends Actor with ActorLogging {
import Device._
var lastTemperatureReading: Option[Double] = None
override def preStart(): Unit = log.info("Device actor {}-{} started", groupId, deviceId)
override def postStop(): Unit = log.info("Device actor {}-{} stopped", groupId, deviceId)
override def receive: Receive = {
case DeviceManager.RequestTrackDevice(`groupId`, `deviceId`) =>
sender() ! DeviceManager.DeviceRegistered
case DeviceManager.RequestTrackDevice(groupId, deviceId) =>
log.warning(
"Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.",
groupId, deviceId, this.groupId, this.deviceId
)
case RecordTemperature(id, value) =>
log.info("Recorded temperature reading {} with {}", value, id)
lastTemperatureReading = Some(value)
sender() ! TemperatureRecorded(id)
case ReadTemperature(id) =>
sender() ! RespondTemperature(id, lastTemperatureReading)
}
}
Java
import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import jdocs.tutorial_4.DeviceManager.DeviceRegistered;
import jdocs.tutorial_4.DeviceManager.RequestTrackDevice;
import java.util.Optional;
public class Device extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
final String groupId;
final String deviceId;
public Device(String groupId, String deviceId) {
this.groupId = groupId;
this.deviceId = deviceId;
}
public static Props props(String groupId, String deviceId) {
return Props.create(Device.class, groupId, deviceId);
}
public static final class RecordTemperature {
final long requestId;
final double value;
public RecordTemperature(long requestId, double value) {
this.requestId = requestId;
this.value = value;
}
}
public static final class TemperatureRecorded {
final long requestId;
public TemperatureRecorded(long requestId) {
this.requestId = requestId;
}
}
public static final class ReadTemperature {
final long requestId;
public ReadTemperature(long requestId) {
this.requestId = requestId;
}
}
public static final class RespondTemperature {
final long requestId;
final Optional<Double> value;
public RespondTemperature(long requestId, Optional<Double> value) {
this.requestId = requestId;
this.value = value;
}
}
Optional<Double> lastTemperatureReading = Optional.empty();
@Override
public void preStart() {
log.info("Device actor {}-{} started", groupId, deviceId);
}
@Override
public void postStop() {
log.info("Device actor {}-{} stopped", groupId, deviceId);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(RequestTrackDevice.class, r -> {
if (this.groupId.equals(r.groupId) && this.deviceId.equals(r.deviceId)) {
getSender().tell(new DeviceRegistered(), getSelf());
} else {
log.warning(
"Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.",
r.groupId, r.deviceId, this.groupId, this.deviceId
);
}
})
.match(RecordTemperature.class, r -> {
log.info("Recorded temperature reading {} with {}", r.value, r.requestId);
lastTemperatureReading = Optional.of(r.value);
getSender().tell(new TemperatureRecorded(r.requestId), getSelf());
})
.match(ReadTemperature.class, r -> {
getSender().tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf());
})
.build();
}
}
Note
We used a feature of scala pattern matching where we can check to see if
a certain field equals an expected value. By bracketing variables with
backticks, like `variable`, the pattern will only match if it contains
the value of variable in that position.
We can now write two new test cases, one exercising successful
registration, the other testing the case when IDs don’t match:
Scala
"reply to registration requests" in {
val probe = TestProbe()
val deviceActor = system.actorOf(Device.props("group", "device"))
deviceActor.tell(DeviceManager.RequestTrackDevice("group", "device"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
probe.lastSender should ===(deviceActor)
}
"ignore wrong registration requests" in {
val probe = TestProbe()
val deviceActor = system.actorOf(Device.props("group", "device"))
deviceActor.tell(DeviceManager.RequestTrackDevice("wrongGroup", "device"), probe.ref)
probe.expectNoMsg(500.milliseconds)
deviceActor.tell(DeviceManager.RequestTrackDevice("group", "Wrongdevice"), probe.ref)
probe.expectNoMsg(500.milliseconds)
}
Java
@Test
public void testReplyToRegistrationRequests() {
TestKit probe = new TestKit(system);
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "device"), probe.getRef());
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
assertEquals(deviceActor, probe.getLastSender());
}
@Test
public void testIgnoreWrongRegistrationRequests() {
TestKit probe = new TestKit(system);
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
deviceActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device"), probe.getRef());
probe.expectNoMsg();
deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "wrongDevice"), probe.getRef());
probe.expectNoMsg();
}
Note
We used the expectNoMsg() helper method from TestProbeTestKit. This
assertion waits until the defined time-limit and fails if it receives
any messages during this period. If no messages are received during the
waiting period, the assertion passes. It is usually a good idea to keep
these timeouts low (but not too low) because they add significant test
execution time.
Adding registration support to device group actors
We are done with registration support at the device level, now we have
to implement it at the group level. A group actor has more work to do
when it comes to registrations, including:
- Handling the registration request by either forwarding it to an
existing device actor or by creating a new actor and forwarding the
message.
- Tracking which device actors exist in the group and removing them
from the group when they are stopped.
Handling the registration request
A device group actor must either forward the request to an existing
child, or it should create one. To look up child actors by their device
IDs we will use a Map[String, ActorRef]Map<String, ActorRef>.
We also want to keep the the ID of the original sender of the request so
that our device actor can reply directly. This is possible by using
forward instead of the ! tell operator. The only difference between the
two is that forward keeps the original sender while ! tell sets the
sender to be the current actor. Just like with our device actor, we
ensure that we don’t respond to wrong group IDs. Add the following to
your source file:
Scala
object DeviceGroup {
def props(groupId: String): Props = Props(new DeviceGroup(groupId))
}
class DeviceGroup(groupId: String) extends Actor with ActorLogging {
var deviceIdToActor = Map.empty[String, ActorRef]
override def preStart(): Unit = log.info("DeviceGroup {} started", groupId)
override def postStop(): Unit = log.info("DeviceGroup {} stopped", groupId)
override def receive: Receive = {
case trackMsg @ RequestTrackDevice(`groupId`, _) =>
deviceIdToActor.get(trackMsg.deviceId) match {
case Some(deviceActor) =>
deviceActor forward trackMsg
case None =>
log.info("Creating device actor for {}", trackMsg.deviceId)
val deviceActor = context.actorOf(Device.props(groupId, trackMsg.deviceId), s"device-${trackMsg.deviceId}")
deviceIdToActor += trackMsg.deviceId -> deviceActor
deviceActor forward trackMsg
}
case RequestTrackDevice(groupId, deviceId) =>
log.warning(
"Ignoring TrackDevice request for {}. This actor is responsible for {}.",
groupId, this.groupId
)
}
}
Java
public static Props props(String groupId) {
return Props.create(DeviceGroup.class, groupId);
}
final Map<String, ActorRef> deviceIdToActor = new HashMap<>();
@Override
public void preStart() {
log.info("DeviceGroup {} started", groupId);
}
@Override
public void postStop() {
log.info("DeviceGroup {} stopped", groupId);
}
private void onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) {
if (this.groupId.equals(trackMsg.groupId)) {
ActorRef deviceActor = deviceIdToActor.get(trackMsg.deviceId);
if (deviceActor != null) {
deviceActor.forward(trackMsg, getContext());
} else {
log.info("Creating device actor for {}", trackMsg.deviceId);
deviceActor = getContext().actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId);
deviceIdToActor.put(trackMsg.deviceId, deviceActor);
deviceActor.forward(trackMsg, getContext());
}
} else {
log.warning(
"Ignoring TrackDevice request for {}. This actor is responsible for {}.",
groupId, this.groupId
);
}
}
private void onDeviceList(RequestDeviceList r) {
getSender().tell(new ReplyDeviceList(r.requestId, deviceIdToActor.keySet()), getSelf());
}
private void onTerminated(Terminated t) {
ActorRef deviceActor = t.getActor();
String deviceId = actorToDeviceId.get(deviceActor);
log.info("Device actor for {} has been terminated", deviceId);
actorToDeviceId.remove(deviceActor);
deviceIdToActor.remove(deviceId);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(DeviceManager.RequestTrackDevice.class, this::onTrackDevice)
.match(RequestDeviceList.class, this::onDeviceList)
.match(Terminated.class, this::onTerminated)
.build();
}
}
Just as we did with the device, we test this new functionality. We also
test that the actors returned for the two different IDs are actually
different, and we also attempt to record a temperature reading for each
of the devices to see if the actors are responding.
Scala
"be able to register a device actor" in {
val probe = TestProbe()
val groupActor = system.actorOf(DeviceGroup.props("group"))
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
val deviceActor1 = probe.lastSender
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
val deviceActor2 = probe.lastSender
deviceActor1 should !==(deviceActor2)
// Check that the device actors are working
deviceActor1.tell(Device.RecordTemperature(requestId = 0, 1.0), probe.ref)
probe.expectMsg(Device.TemperatureRecorded(requestId = 0))
deviceActor2.tell(Device.RecordTemperature(requestId = 1, 2.0), probe.ref)
probe.expectMsg(Device.TemperatureRecorded(requestId = 1))
}
"ignore requests for wrong groupId" in {
val probe = TestProbe()
val groupActor = system.actorOf(DeviceGroup.props("group"))
groupActor.tell(DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.ref)
probe.expectNoMsg(500.milliseconds)
}
Java
@Test
public void testRegisterDeviceActor() {
TestKit probe = new TestKit(system);
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
ActorRef deviceActor1 = probe.getLastSender();
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
ActorRef deviceActor2 = probe.getLastSender();
assertNotEquals(deviceActor1, deviceActor2);
// Check that the device actors are working
deviceActor1.tell(new Device.RecordTemperature(0L, 1.0), probe.getRef());
assertEquals(0L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
deviceActor2.tell(new Device.RecordTemperature(1L, 2.0), probe.getRef());
assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
}
@Test
public void testIgnoreRequestsForWrongGroupId() {
TestKit probe = new TestKit(system);
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
groupActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.getRef());
probe.expectNoMsg();
}
If a device actor already exists for the registration request, we would
like to use the existing actor instead of a new one. We have not tested
this yet, so we need to fix this:
Scala
"return same actor for same deviceId" in {
val probe = TestProbe()
val groupActor = system.actorOf(DeviceGroup.props("group"))
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
val deviceActor1 = probe.lastSender
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
val deviceActor2 = probe.lastSender
deviceActor1 should ===(deviceActor2)
}
Java
@Test
public void testReturnSameActorForSameDeviceId() {
TestKit probe = new TestKit(system);
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
ActorRef deviceActor1 = probe.getLastSender();
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
ActorRef deviceActor2 = probe.getLastSender();
assertEquals(deviceActor1, deviceActor2);
}
Keeping track of the device actors in the group
So far, we have implemented logic for registering device actors in the
group. Devices come and go, however, so we will need a way to remove
device actors from the Map[String, ActorRef] Map<String, ActorRef>. We
will assume that when a device is removed, its corresponding device
actor is simply stopped. Supervision, as we discussed earlier, only
handles error scenarios — not graceful stopping. So we need to notify
the parent when one of the device actors is stopped.
Akka provides a _Death Watch_ feature that allows an actor to _watch_
another actor and be notified if the other actor is stopped. Unlike
supervision, watching is not limited to parent-child relationships, any
actor can watch any other actor as long as it knows the ActorRef. After
a watched actor stops, the watcher receives a Terminated(actorRef)
message which also contains the reference to the watched actor. The
watcher can either handle this message explicitly or will fail with a
DeathPactException. This latter is useful if the actor can no longer
perform its own duties after the watched actor has been stopped. In our
case, the group should still function after one device have been
stopped, so we need to handle the Terminated(actorRef) message.
Our device group actor needs to include functionality that:
1. Starts watching new device actors when they are created.
2. Removes a device actor from the Map[String, ActorRef]
Map<String, ActorRef> — which maps devices to device actors — when
the notification indicates it has stopped.
Unfortunately, the Terminated message only contains the ActorRef of the
child actor. We need the actor’s ID to remove it from the map of
existing device to device actor mappings. To be able to do this removal,
we need to introduce another placeholder, Map[String, ActorRef]
Map<String, ActorRef>, that allow us to find out the device ID
corresponding to a given ActorRef.
Adding the functionality to identify the actor results in this:
Scala
class DeviceGroup(groupId: String) extends Actor with ActorLogging {
var deviceIdToActor = Map.empty[String, ActorRef]
var actorToDeviceId = Map.empty[ActorRef, String]
override def preStart(): Unit = log.info("DeviceGroup {} started", groupId)
override def postStop(): Unit = log.info("DeviceGroup {} stopped", groupId)
override def receive: Receive = {
case trackMsg @ RequestTrackDevice(`groupId`, _) =>
deviceIdToActor.get(trackMsg.deviceId) match {
case Some(deviceActor) =>
deviceActor forward trackMsg
case None =>
log.info("Creating device actor for {}", trackMsg.deviceId)
val deviceActor = context.actorOf(Device.props(groupId, trackMsg.deviceId), s"device-${trackMsg.deviceId}")
context.watch(deviceActor)
actorToDeviceId += deviceActor -> trackMsg.deviceId
deviceIdToActor += trackMsg.deviceId -> deviceActor
deviceActor forward trackMsg
}
case RequestTrackDevice(groupId, deviceId) =>
log.warning(
"Ignoring TrackDevice request for {}. This actor is responsible for {}.",
groupId, this.groupId
)
case Terminated(deviceActor) =>
val deviceId = actorToDeviceId(deviceActor)
log.info("Device actor for {} has been terminated", deviceId)
actorToDeviceId -= deviceActor
deviceIdToActor -= deviceId
}
}
Java
final Map<String, ActorRef> deviceIdToActor = new HashMap<>();
final Map<ActorRef, String> actorToDeviceId = new HashMap<>();
@Override
public void preStart() {
log.info("DeviceGroup {} started", groupId);
}
@Override
public void postStop() {
log.info("DeviceGroup {} stopped", groupId);
}
private void onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) {
if (this.groupId.equals(trackMsg.groupId)) {
ActorRef deviceActor = deviceIdToActor.get(trackMsg.deviceId);
if (deviceActor != null) {
deviceActor.forward(trackMsg, getContext());
} else {
log.info("Creating device actor for {}", trackMsg.deviceId);
deviceActor = getContext().actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId);
getContext().watch(deviceActor);
actorToDeviceId.put(deviceActor, trackMsg.deviceId);
deviceIdToActor.put(trackMsg.deviceId, deviceActor);
deviceActor.forward(trackMsg, getContext());
}
} else {
log.warning(
"Ignoring TrackDevice request for {}. This actor is responsible for {}.",
groupId, this.groupId
);
}
}
private void onDeviceList(RequestDeviceList r) {
getSender().tell(new ReplyDeviceList(r.requestId, deviceIdToActor.keySet()), getSelf());
}
private void onTerminated(Terminated t) {
ActorRef deviceActor = t.getActor();
String deviceId = actorToDeviceId.get(deviceActor);
log.info("Device actor for {} has been terminated", deviceId);
actorToDeviceId.remove(deviceActor);
deviceIdToActor.remove(deviceId);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(DeviceManager.RequestTrackDevice.class, this::onTrackDevice)
.match(RequestDeviceList.class, this::onDeviceList)
.match(Terminated.class, this::onTerminated)
.build();
}
}
So far we have no means to get which devices the group device actor
keeps track of and, therefore, we cannot test our new functionality yet.
To make it testable, we add a new query capability (message
RequestDeviceList(requestId: Long) RequestDeviceList) that simply lists
the currently active device IDs:
Scala
object DeviceGroup {
def props(groupId: String): Props = Props(new DeviceGroup(groupId))
final case class RequestDeviceList(requestId: Long)
final case class ReplyDeviceList(requestId: Long, ids: Set[String])
}
class DeviceGroup(groupId: String) extends Actor with ActorLogging {
var deviceIdToActor = Map.empty[String, ActorRef]
var actorToDeviceId = Map.empty[ActorRef, String]
override def preStart(): Unit = log.info("DeviceGroup {} started", groupId)
override def postStop(): Unit = log.info("DeviceGroup {} stopped", groupId)
override def receive: Receive = {
case trackMsg @ RequestTrackDevice(`groupId`, _) =>
deviceIdToActor.get(trackMsg.deviceId) match {
case Some(deviceActor) =>
deviceActor forward trackMsg
case None =>
log.info("Creating device actor for {}", trackMsg.deviceId)
val deviceActor = context.actorOf(Device.props(groupId, trackMsg.deviceId), s"device-${trackMsg.deviceId}")
context.watch(deviceActor)
actorToDeviceId += deviceActor -> trackMsg.deviceId
deviceIdToActor += trackMsg.deviceId -> deviceActor
deviceActor forward trackMsg
}
case RequestTrackDevice(groupId, deviceId) =>
log.warning(
"Ignoring TrackDevice request for {}. This actor is responsible for {}.",
groupId, this.groupId
)
case RequestDeviceList(requestId) =>
sender() ! ReplyDeviceList(requestId, deviceIdToActor.keySet)
case Terminated(deviceActor) =>
val deviceId = actorToDeviceId(deviceActor)
log.info("Device actor for {} has been terminated", deviceId)
actorToDeviceId -= deviceActor
deviceIdToActor -= deviceId
}
}
Java
public class DeviceGroup extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
final String groupId;
public DeviceGroup(String groupId) {
this.groupId = groupId;
}
public static Props props(String groupId) {
return Props.create(DeviceGroup.class, groupId);
}
public static final class RequestDeviceList {
final long requestId;
public RequestDeviceList(long requestId) {
this.requestId = requestId;
}
}
public static final class ReplyDeviceList {
final long requestId;
final Set<String> ids;
public ReplyDeviceList(long requestId, Set<String> ids) {
this.requestId = requestId;
this.ids = ids;
}
}
final Map<String, ActorRef> deviceIdToActor = new HashMap<>();
final Map<ActorRef, String> actorToDeviceId = new HashMap<>();
@Override
public void preStart() {
log.info("DeviceGroup {} started", groupId);
}
@Override
public void postStop() {
log.info("DeviceGroup {} stopped", groupId);
}
private void onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) {
if (this.groupId.equals(trackMsg.groupId)) {
ActorRef deviceActor = deviceIdToActor.get(trackMsg.deviceId);
if (deviceActor != null) {
deviceActor.forward(trackMsg, getContext());
} else {
log.info("Creating device actor for {}", trackMsg.deviceId);
deviceActor = getContext().actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId);
getContext().watch(deviceActor);
actorToDeviceId.put(deviceActor, trackMsg.deviceId);
deviceIdToActor.put(trackMsg.deviceId, deviceActor);
deviceActor.forward(trackMsg, getContext());
}
} else {
log.warning(
"Ignoring TrackDevice request for {}. This actor is responsible for {}.",
groupId, this.groupId
);
}
}
private void onDeviceList(RequestDeviceList r) {
getSender().tell(new ReplyDeviceList(r.requestId, deviceIdToActor.keySet()), getSelf());
}
private void onTerminated(Terminated t) {
ActorRef deviceActor = t.getActor();
String deviceId = actorToDeviceId.get(deviceActor);
log.info("Device actor for {} has been terminated", deviceId);
actorToDeviceId.remove(deviceActor);
deviceIdToActor.remove(deviceId);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(DeviceManager.RequestTrackDevice.class, this::onTrackDevice)
.match(RequestDeviceList.class, this::onDeviceList)
.match(Terminated.class, this::onTerminated)
.build();
}
}
We are almost ready to test the removal of devices. But, we still need
the following capabilities:
- To stop a device actor from our test case. From the outside, any
actor can be stopped by simply sending a special the built-in
message, PoisonPill, which instructs the actor to stop.
- To be notified once the device actor is stopped. We can use the
_Death Watch_ facility for this purpose, too. The TestProbe TestKit
has two messages that we can easily use, watch() to watch a specific
actor, and expectTerminated to assert that the watched actor has
been terminated.
We add two more test cases now. In the first, we just test that we get
back the list of proper IDs once we have added a few devices. The second
test case makes sure that the device ID is properly removed after the
device actor has been stopped:
Scala
"be able to list active devices" in {
val probe = TestProbe()
val groupActor = system.actorOf(DeviceGroup.props("group"))
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 0), probe.ref)
probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device1", "device2")))
}
"be able to list active devices after one shuts down" in {
val probe = TestProbe()
val groupActor = system.actorOf(DeviceGroup.props("group"))
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
val toShutDown = probe.lastSender
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 0), probe.ref)
probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device1", "device2")))
probe.watch(toShutDown)
toShutDown ! PoisonPill
probe.expectTerminated(toShutDown)
// using awaitAssert to retry because it might take longer for the groupActor
// to see the Terminated, that order is undefined
probe.awaitAssert {
groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 1), probe.ref)
probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 1, Set("device2")))
}
}
Java
@Test
public void testListActiveDevices() {
TestKit probe = new TestKit(system);
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef());
DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
assertEquals(0L, reply.requestId);
assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids);
}
@Test
public void testListActiveDevicesAfterOneShutsDown() {
TestKit probe = new TestKit(system);
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
ActorRef toShutDown = probe.getLastSender();
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef());
DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
assertEquals(0L, reply.requestId);
assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids);
probe.watch(toShutDown);
toShutDown.tell(PoisonPill.getInstance(), ActorRef.noSender());
probe.expectTerminated(toShutDown);
// using awaitAssert to retry because it might take longer for the groupActor
// to see the Terminated, that order is undefined
probe.awaitAssert(() -> {
groupActor.tell(new DeviceGroup.RequestDeviceList(1L), probe.getRef());
DeviceGroup.ReplyDeviceList r =
probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
assertEquals(1L, r.requestId);
assertEquals(Stream.of("device2").collect(Collectors.toSet()), r.ids);
return null;
});
}
Creating device manager actors
Going up to the next level in our hierarchy, we need to create the entry
point for our device manager component in the DeviceManager source file.
This actor is very similar to the device group actor, but creates device
group actors instead of device actors:
Scala
object DeviceManager {
def props(): Props = Props(new DeviceManager)
final case class RequestTrackDevice(groupId: String, deviceId: String)
case object DeviceRegistered
}
class DeviceManager extends Actor with ActorLogging {
var groupIdToActor = Map.empty[String, ActorRef]
var actorToGroupId = Map.empty[ActorRef, String]
override def preStart(): Unit = log.info("DeviceManager started")
override def postStop(): Unit = log.info("DeviceManager stopped")
override def receive = {
case trackMsg @ RequestTrackDevice(groupId, _) =>
groupIdToActor.get(groupId) match {
case Some(ref) =>
ref forward trackMsg
case None =>
log.info("Creating device group actor for {}", groupId)
val groupActor = context.actorOf(DeviceGroup.props(groupId), "group-" + groupId)
context.watch(groupActor)
groupActor forward trackMsg
groupIdToActor += groupId -> groupActor
actorToGroupId += groupActor -> groupId
}
case Terminated(groupActor) =>
val groupId = actorToGroupId(groupActor)
log.info("Device group actor for {} has been terminated", groupId)
actorToGroupId -= groupActor
groupIdToActor -= groupId
}
}
Java
public class DeviceManager extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
public static Props props() {
return Props.create(DeviceManager.class);
}
public static final class RequestTrackDevice {
public final String groupId;
public final String deviceId;
public RequestTrackDevice(String groupId, String deviceId) {
this.groupId = groupId;
this.deviceId = deviceId;
}
}
public static final class DeviceRegistered {
}
final Map<String, ActorRef> groupIdToActor = new HashMap<>();
final Map<ActorRef, String> actorToGroupId = new HashMap<>();
@Override
public void preStart() {
log.info("DeviceManager started");
}
@Override
public void postStop() {
log.info("DeviceManager stopped");
}
private void onTrackDevice(RequestTrackDevice trackMsg) {
String groupId = trackMsg.groupId;
ActorRef ref = groupIdToActor.get(groupId);
if (ref != null) {
ref.forward(trackMsg, getContext());
} else {
log.info("Creating device group actor for {}", groupId);
ActorRef groupActor = getContext().actorOf(DeviceGroup.props(groupId), "group-" + groupId);
getContext().watch(groupActor);
groupActor.forward(trackMsg, getContext());
groupIdToActor.put(groupId, groupActor);
actorToGroupId.put(groupActor, groupId);
}
}
private void onTerminated(Terminated t) {
ActorRef groupActor = t.getActor();
String groupId = actorToGroupId.get(groupActor);
log.info("Device group actor for {} has been terminated", groupId);
actorToGroupId.remove(groupActor);
groupIdToActor.remove(groupId);
}
public Receive createReceive() {
return receiveBuilder()
.match(RequestTrackDevice.class, this::onTrackDevice)
.match(Terminated.class, this::onTerminated)
.build();
}
}
We leave tests of the device manager as an exercise for you since it is
very similar to the tests we have already written for the group actor.
What’s next?
We have now a hierarchical component for registering and tracking
devices and recording measurements. We have seen how to implement
different types of conversation patterns, such as:
- Request-respond (for temperature recordings)
- Delegate-respond (for registration of devices)
- Create-watch-terminate (for creating the group and device actor as
children)
In the next chapter, we will introduce group query capabilities, which
will establish a new conversation pattern of scatter-gather. In
particular, we will implement the functionality that allows users to
query the status of all the devices belonging to a group.
PART 5: QUERYING DEVICE GROUPS
The conversational patterns that we have seen so far are simple in the
sense that they require the actor to keep little or no state.
Specifically:
- Device actors return a reading, which requires no state change
- Record a temperature, which updates a single field
- Device Group actors maintain group membership by simply adding or
removing entries from a map
In this part, we will use a more complex example. Since homeowners will
be interested in the temperatures throughout their home, our goal is to
be able to query all of the device actors in a group. Let us start by
investigating how such a query API should behave.
Dealing with possible scenarios
The very first issue we face is that the membership of a group is
dynamic. Each sensor device is represented by an actor that can stop at
any time. At the beginning of the query, we can ask all of the existing
device actors for the current temperature. However, during the lifecycle
of the query:
- A device actor might stop and not be able to respond back with a
temperature reading.
- A new device actor might start up and not be included in the query
because we weren’t aware of it.
These issues can be addressed in many different ways, but the important
point is to settle on the desired behavior. The following works well for
our use case:
- When a query arrives, the group actor takes a _snapshot_ of the
existing device actors and will only ask those actors for the
temperature.
- Actors that start up _after_ the query arrives are simply ignored.
- If an actor in the snapshot stops during the query without
answering, we will simply report the fact that it stopped to the
sender of the query message.
Apart from device actors coming and going dynamically, some actors might
take a long time to answer. For example, they could be stuck in an
accidental infinite loop, or fail due to a bug and drop our request. We
don’t want the query to continue indefinitely, so we will consider it
complete in either of the following cases:
- All actors in the snapshot have either responded or have confirmed
being stopped.
- We reach a pre-defined deadline.
Given these decisions, along with the fact that a device in the snapshot
might have just started and not yet received a temperature to record, we
can define four states for each device actor, with respect to a
temperature query:
- It has a temperature available: Temperature(value) Temperature.
- It has responded, but has no temperature available yet:
TemperatureNotAvailable.
- It has stopped before answering: DeviceNotAvailable.
- It did not respond before the deadline: DeviceTimedOut.
Summarizing these in message types we can add the following to
DeviceGroup:
Scala
final case class RequestAllTemperatures(requestId: Long)
final case class RespondAllTemperatures(requestId: Long, temperatures: Map[String, TemperatureReading])
sealed trait TemperatureReading
final case class Temperature(value: Double) extends TemperatureReading
case object TemperatureNotAvailable extends TemperatureReading
case object DeviceNotAvailable extends TemperatureReading
case object DeviceTimedOut extends TemperatureReading
Java
public static final class RequestAllTemperatures {
final long requestId;
public RequestAllTemperatures(long requestId) {
this.requestId = requestId;
}
}
public static final class RespondAllTemperatures {
final long requestId;
final Map<String, TemperatureReading> temperatures;
public RespondAllTemperatures(long requestId, Map<String, TemperatureReading> temperatures) {
this.requestId = requestId;
this.temperatures = temperatures;
}
}
public static interface TemperatureReading {
}
public static final class Temperature implements TemperatureReading {
public final double value;
public Temperature(double value) {
this.value = value;
}
}
public static final class TemperatureNotAvailable implements TemperatureReading {
}
public static final class DeviceNotAvailable implements TemperatureReading {
}
public static final class DeviceTimedOut implements TemperatureReading {
}
Implementing the query
One approach for implementing the query involves adding code to the
group device actor. However, in practice this can be very cumbersome and
error prone. Remember that when we start a query, we need to take a
snapshot of the devices present and start a timer so that we can enforce
the deadline. In the meantime, _another query_ can arrive. For the
second query, of course, we need to keep track of the exact same
information but in isolation from the previous query. This would require
us to maintain separate mappings between queries and device actors.
Instead, we will implement a simpler, and superior approach. We will
create an actor that represents a _single query_ and that performs the
tasks needed to complete the query on behalf of the group actor. So far
we have created actors that belonged to classical domain objects, but
now, we will create an actor that represents a process or a task rather
than an entity. We benefit by keeping our group device actor simple and
being able to better test query capability in isolation.
Defining the query actor
First, we need to design the lifecycle of our query actor. This consists
of identifying its initial state, the first action it will take, and the
cleanup — if necessary. The query actor will need the following
information:
- The snapshot and IDs of active device actors to query.
- The ID of the request that started the query (so that we can include
it in the reply).
- The reference of the actor who sent the query. We will send the
reply to this actor directly.
- A deadline that indicates how long the query should wait for
replies. Making this a parameter will simplify testing.
Scheduling the query timeout
Since we need a way to indicate how long we are willing to wait for
responses, it is time to introduce a new Akka feature that we have not
used yet, the built-in scheduler facility. Using the scheduler is
simple:
- We get the scheduler from the ActorSystem, which, in turn, is
accessible from the actor’s context:
context.system.schedulergetContext().getSystem().scheduler(). This
needs an implicit ExecutionContext which is basically the
thread-pool that will execute the timer task itself. In our case, we
use the same dispatcher as the actor by importing
import context.dispatcher passing in getContext().dispatcher().
- The scheduler.scheduleOnce(time, actorRef, message)
scheduler.scheduleOnce(time, actorRef, message, executor, sender)
method will schedule the message message into the future by the
specified time and send it to the actor actorRef.
We need to create a message that represents the query timeout. We create
a simple message CollectionTimeout without any parameters for this
purpose. The return value from scheduleOnce is a Cancellable which can
be used to cancel the timer if the query finishes successfully in time.
At the start of the query, we need to ask each of the device actors for
the current temperature. To be able to quickly detect devices that
stopped before they got the ReadTemperature message we will also watch
each of the actors. This way, we get Terminated messages for those that
stop during the lifetime of the query, so we don’t need to wait until
the timeout to mark these as not available.
Putting this together, the outline of our DeviceGroupQuery actor looks
like this:
Scala
object DeviceGroupQuery {
case object CollectionTimeout
def props(
actorToDeviceId: Map[ActorRef, String],
requestId: Long,
requester: ActorRef,
timeout: FiniteDuration
): Props = {
Props(new DeviceGroupQuery(actorToDeviceId, requestId, requester, timeout))
}
}
class DeviceGroupQuery(
actorToDeviceId: Map[ActorRef, String],
requestId: Long,
requester: ActorRef,
timeout: FiniteDuration
) extends Actor with ActorLogging {
import DeviceGroupQuery._
import context.dispatcher
val queryTimeoutTimer = context.system.scheduler.scheduleOnce(timeout, self, CollectionTimeout)
override def preStart(): Unit = {
actorToDeviceId.keysIterator.foreach { deviceActor =>
context.watch(deviceActor)
deviceActor ! Device.ReadTemperature(0)
}
}
override def postStop(): Unit = {
queryTimeoutTimer.cancel()
}
}
Java
public class DeviceGroupQuery extends AbstractActor {
public static final class CollectionTimeout {
}
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
final Map<ActorRef, String> actorToDeviceId;
final long requestId;
final ActorRef requester;
Cancellable queryTimeoutTimer;
public DeviceGroupQuery(Map<ActorRef, String> actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) {
this.actorToDeviceId = actorToDeviceId;
this.requestId = requestId;
this.requester = requester;
queryTimeoutTimer = getContext().getSystem().scheduler().scheduleOnce(
timeout, getSelf(), new CollectionTimeout(), getContext().dispatcher(), getSelf()
);
}
public static Props props(Map<ActorRef, String> actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) {
return Props.create(DeviceGroupQuery.class, actorToDeviceId, requestId, requester, timeout);
}
@Override
public void preStart() {
for (ActorRef deviceActor : actorToDeviceId.keySet()) {
getContext().watch(deviceActor);
deviceActor.tell(new Device.ReadTemperature(0L), getSelf());
}
}
@Override
public void postStop() {
queryTimeoutTimer.cancel();
}
}
Tracking actor state
The query actor, apart from the pending timer, has one stateful aspect,
tracking the set of actors that: have replied, have stopped, or have not
replied. One way to track this state is to create a mutable field in the
actor (a var). A different approach takes advantage of the ability to
change how an actor responds to messages. A Receive is just a function
(or an object, if you like) that can be returned from another function.
By default, the receive block defines the behavior of the actor, but it
is possible to change it multiple times during the life of the actor. We
simply call context.become(newBehavior) where newBehavior is anything
with type Receive (which is just a shorthand for
PartialFunction[Any, Unit]). We will leverage this feature to track the
state of our actor.
For our use case:
1. Instead of defining receive directly, we delegate to a
waitingForReplies function to create the Receive.
2. The waitingForReplies function will keep track of two changing
values:
- a Map of already received replies
- a Set of actors that we still wait on
1. We have three events to act on:
- We can receive a RespondTemperature message from one of the devices.
- We can receive a Terminated message for a device actor that has been
stopped in the meantime.
- We can reach the deadline and receive a CollectionTimeout.
In the first two cases, we need to keep track of the replies, which we
now simply delegate to a method receivedResponse, which we will discuss
later. In the case of timeout, we need to simply take all the actors
that have not yet replied yet (the members of the set stillWaiting) and
put a DeviceTimedOut as the status in the final reply. Then we reply to
the submitter of the query with the collected results and stop the query
actor.
To accomplish this, add the following to your DeviceGroupQuery source
file:
Scala
override def receive: Receive =
waitingForReplies(
Map.empty,
actorToDeviceId.keySet
)
def waitingForReplies(
repliesSoFar: Map[String, DeviceGroup.TemperatureReading],
stillWaiting: Set[ActorRef]
): Receive = {
case Device.RespondTemperature(0, valueOption) =>
val deviceActor = sender()
val reading = valueOption match {
case Some(value) => DeviceGroup.Temperature(value)
case None => DeviceGroup.TemperatureNotAvailable
}
receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar)
case Terminated(deviceActor) =>
receivedResponse(deviceActor, DeviceGroup.DeviceNotAvailable, stillWaiting, repliesSoFar)
case CollectionTimeout =>
val timedOutReplies =
stillWaiting.map { deviceActor =>
val deviceId = actorToDeviceId(deviceActor)
deviceId -> DeviceGroup.DeviceTimedOut
}
requester ! DeviceGroup.RespondAllTemperatures(requestId, repliesSoFar ++ timedOutReplies)
context.stop(self)
}
Java
@Override
public Receive createReceive() {
return waitingForReplies(new HashMap<>(), actorToDeviceId.keySet());
}
public Receive waitingForReplies(
Map<String, DeviceGroup.TemperatureReading> repliesSoFar,
Set<ActorRef> stillWaiting) {
return receiveBuilder()
.match(Device.RespondTemperature.class, r -> {
ActorRef deviceActor = getSender();
DeviceGroup.TemperatureReading reading = r.value
.map(v -> (DeviceGroup.TemperatureReading) new DeviceGroup.Temperature(v))
.orElse(new DeviceGroup.TemperatureNotAvailable());
receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar);
})
.match(Terminated.class, t -> {
receivedResponse(t.getActor(), new DeviceGroup.DeviceNotAvailable(), stillWaiting, repliesSoFar);
})
.match(CollectionTimeout.class, t -> {
Map<String, DeviceGroup.TemperatureReading> replies = new HashMap<>(repliesSoFar);
for (ActorRef deviceActor : stillWaiting) {
String deviceId = actorToDeviceId.get(deviceActor);
replies.put(deviceId, new DeviceGroup.DeviceTimedOut());
}
requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, replies), getSelf());
getContext().stop(getSelf());
})
.build();
}
It is not yet clear how we will “mutate” the answersSoFar and
stillWaiting data structures. One important thing to note is that the
function waitingForReplies DOES NOT HANDLE THE MESSAGES DIRECTLY. IT
RETURNS A Receive FUNCTION THAT WILL HANDLE THE MESSAGES. This means
that if we call waitingForReplies again, with different parameters, then
it returns a brand new Receive that will use those new parameters.
We have seen how we can install the initial Receive by simply returning
it from receive. In order to install a new one, to record a new reply,
for example, we need some mechanism. This mechanism is the method
context.become(newReceive) which will _change_ the actor’s message
handling function to the provided newReceive function. You can imagine
that before starting, your actor automatically calls
context.become(receive), i.e. installing the Receive function that is
returned from receive. This is another important observation: IT IS NOT
receive THAT HANDLES THE MESSAGES, IT JUST RETURNS A Receive FUNCTION
THAT WILL ACTUALLY HANDLE THE MESSAGES.
We now have to figure out what to do in receivedResponse. First, we need
to record the new result in the map repliesSoFar and remove the actor
from stillWaiting. The next step is to check if there are any remaining
actors we are waiting for. If there is none, we send the result of the
query to the original requester and stop the query actor. Otherwise, we
need to update the repliesSoFar and stillWaiting structures and wait for
more messages.
In the code before, we treated Terminated as the implicit response
DeviceNotAvailable, so receivedResponse does not need to do anything
special. However, there is one small task we still need to do. It is
possible that we receive a proper response from a device actor, but then
it stops during the lifetime of the query. We don’t want this second
event to overwrite the already received reply. In other words, we don’t
want to receive Terminated after we recorded the response. This is
simple to achieve by calling context.unwatch(ref). This method also
ensures that we don’t receive Terminated events that are already in the
mailbox of the actor. It is also safe to call this multiple times, only
the first call will have any effect, the rest is simply ignored.
With all this knowledge, we can create the receivedResponse method:
Scala
def receivedResponse(
deviceActor: ActorRef,
reading: DeviceGroup.TemperatureReading,
stillWaiting: Set[ActorRef],
repliesSoFar: Map[String, DeviceGroup.TemperatureReading]
): Unit = {
context.unwatch(deviceActor)
val deviceId = actorToDeviceId(deviceActor)
val newStillWaiting = stillWaiting - deviceActor
val newRepliesSoFar = repliesSoFar + (deviceId -> reading)
if (newStillWaiting.isEmpty) {
requester ! DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar)
context.stop(self)
} else {
context.become(waitingForReplies(newRepliesSoFar, newStillWaiting))
}
}
Java
public void receivedResponse(ActorRef deviceActor,
DeviceGroup.TemperatureReading reading,
Set<ActorRef> stillWaiting,
Map<String, DeviceGroup.TemperatureReading> repliesSoFar) {
getContext().unwatch(deviceActor);
String deviceId = actorToDeviceId.get(deviceActor);
Set<ActorRef> newStillWaiting = new HashSet<>(stillWaiting);
newStillWaiting.remove(deviceActor);
Map<String, DeviceGroup.TemperatureReading> newRepliesSoFar = new HashMap<>(repliesSoFar);
newRepliesSoFar.put(deviceId, reading);
if (newStillWaiting.isEmpty()) {
requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar), getSelf());
getContext().stop(getSelf());
} else {
getContext().become(waitingForReplies(newRepliesSoFar, newStillWaiting));
}
}
It is quite natural to ask at this point, what have we gained by using
the context.become() trick instead of just making the repliesSoFar and
stillWaiting structures mutable fields of the actor (i.e. vars)? In this
simple example, not that much. The value of this style of state keeping
becomes more evident when you suddenly have _more kinds_ of states.
Since each state might have temporary data that is relevant itself,
keeping these as fields would pollute the global state of the actor,
i.e. it is unclear what fields are used in what state. Using
parameterized Receive “factory” methods we can keep data private that is
only relevant to the state. It is still a good exercise to rewrite the
query using vars mutable fields instead of context.become(). However, it
is recommended to get comfortable with the solution we have used here as
it helps structuring more complex actor code in a cleaner and more
maintainable way.
Our query actor is now done:
Scala
object DeviceGroupQuery {
case object CollectionTimeout
def props(
actorToDeviceId: Map[ActorRef, String],
requestId: Long,
requester: ActorRef,
timeout: FiniteDuration
): Props = {
Props(new DeviceGroupQuery(actorToDeviceId, requestId, requester, timeout))
}
}
class DeviceGroupQuery(
actorToDeviceId: Map[ActorRef, String],
requestId: Long,
requester: ActorRef,
timeout: FiniteDuration
) extends Actor with ActorLogging {
import DeviceGroupQuery._
import context.dispatcher
val queryTimeoutTimer = context.system.scheduler.scheduleOnce(timeout, self, CollectionTimeout)
override def preStart(): Unit = {
actorToDeviceId.keysIterator.foreach { deviceActor =>
context.watch(deviceActor)
deviceActor ! Device.ReadTemperature(0)
}
}
override def postStop(): Unit = {
queryTimeoutTimer.cancel()
}
override def receive: Receive =
waitingForReplies(
Map.empty,
actorToDeviceId.keySet
)
def waitingForReplies(
repliesSoFar: Map[String, DeviceGroup.TemperatureReading],
stillWaiting: Set[ActorRef]
): Receive = {
case Device.RespondTemperature(0, valueOption) =>
val deviceActor = sender()
val reading = valueOption match {
case Some(value) => DeviceGroup.Temperature(value)
case None => DeviceGroup.TemperatureNotAvailable
}
receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar)
case Terminated(deviceActor) =>
receivedResponse(deviceActor, DeviceGroup.DeviceNotAvailable, stillWaiting, repliesSoFar)
case CollectionTimeout =>
val timedOutReplies =
stillWaiting.map { deviceActor =>
val deviceId = actorToDeviceId(deviceActor)
deviceId -> DeviceGroup.DeviceTimedOut
}
requester ! DeviceGroup.RespondAllTemperatures(requestId, repliesSoFar ++ timedOutReplies)
context.stop(self)
}
def receivedResponse(
deviceActor: ActorRef,
reading: DeviceGroup.TemperatureReading,
stillWaiting: Set[ActorRef],
repliesSoFar: Map[String, DeviceGroup.TemperatureReading]
): Unit = {
context.unwatch(deviceActor)
val deviceId = actorToDeviceId(deviceActor)
val newStillWaiting = stillWaiting - deviceActor
val newRepliesSoFar = repliesSoFar + (deviceId -> reading)
if (newStillWaiting.isEmpty) {
requester ! DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar)
context.stop(self)
} else {
context.become(waitingForReplies(newRepliesSoFar, newStillWaiting))
}
}
}
Java
public class DeviceGroupQuery extends AbstractActor {
public static final class CollectionTimeout {
}
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
final Map<ActorRef, String> actorToDeviceId;
final long requestId;
final ActorRef requester;
Cancellable queryTimeoutTimer;
public DeviceGroupQuery(Map<ActorRef, String> actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) {
this.actorToDeviceId = actorToDeviceId;
this.requestId = requestId;
this.requester = requester;
queryTimeoutTimer = getContext().getSystem().scheduler().scheduleOnce(
timeout, getSelf(), new CollectionTimeout(), getContext().dispatcher(), getSelf()
);
}
public static Props props(Map<ActorRef, String> actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) {
return Props.create(DeviceGroupQuery.class, actorToDeviceId, requestId, requester, timeout);
}
@Override
public void preStart() {
for (ActorRef deviceActor : actorToDeviceId.keySet()) {
getContext().watch(deviceActor);
deviceActor.tell(new Device.ReadTemperature(0L), getSelf());
}
}
@Override
public void postStop() {
queryTimeoutTimer.cancel();
}
@Override
public Receive createReceive() {
return waitingForReplies(new HashMap<>(), actorToDeviceId.keySet());
}
public Receive waitingForReplies(
Map<String, DeviceGroup.TemperatureReading> repliesSoFar,
Set<ActorRef> stillWaiting) {
return receiveBuilder()
.match(Device.RespondTemperature.class, r -> {
ActorRef deviceActor = getSender();
DeviceGroup.TemperatureReading reading = r.value
.map(v -> (DeviceGroup.TemperatureReading) new DeviceGroup.Temperature(v))
.orElse(new DeviceGroup.TemperatureNotAvailable());
receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar);
})
.match(Terminated.class, t -> {
receivedResponse(t.getActor(), new DeviceGroup.DeviceNotAvailable(), stillWaiting, repliesSoFar);
})
.match(CollectionTimeout.class, t -> {
Map<String, DeviceGroup.TemperatureReading> replies = new HashMap<>(repliesSoFar);
for (ActorRef deviceActor : stillWaiting) {
String deviceId = actorToDeviceId.get(deviceActor);
replies.put(deviceId, new DeviceGroup.DeviceTimedOut());
}
requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, replies), getSelf());
getContext().stop(getSelf());
})
.build();
}
public void receivedResponse(ActorRef deviceActor,
DeviceGroup.TemperatureReading reading,
Set<ActorRef> stillWaiting,
Map<String, DeviceGroup.TemperatureReading> repliesSoFar) {
getContext().unwatch(deviceActor);
String deviceId = actorToDeviceId.get(deviceActor);
Set<ActorRef> newStillWaiting = new HashSet<>(stillWaiting);
newStillWaiting.remove(deviceActor);
Map<String, DeviceGroup.TemperatureReading> newRepliesSoFar = new HashMap<>(repliesSoFar);
newRepliesSoFar.put(deviceId, reading);
if (newStillWaiting.isEmpty()) {
requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar), getSelf());
getContext().stop(getSelf());
} else {
getContext().become(waitingForReplies(newRepliesSoFar, newStillWaiting));
}
}
}
Testing the query actor
Now let’s verify the correctness of the query actor implementation.
There are various scenarios we need to test individually to make sure
everything works as expected. To be able to do this, we need to simulate
the device actors somehow to exercise various normal or failure
scenarios. Thankfully we took the list of collaborators (actually a Map)
as a parameter to the query actor, so we can easily pass in TestProbe
TestKit references. In our first test, we try out the case when there
are two devices and both report a temperature:
Scala
"return temperature value for working devices" in {
val requester = TestProbe()
val device1 = TestProbe()
val device2 = TestProbe()
val queryActor = system.actorOf(DeviceGroupQuery.props(
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
requestId = 1,
requester = requester.ref,
timeout = 3.seconds
))
device1.expectMsg(Device.ReadTemperature(requestId = 0))
device2.expectMsg(Device.ReadTemperature(requestId = 0))
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref)
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref)
requester.expectMsg(DeviceGroup.RespondAllTemperatures(
requestId = 1,
temperatures = Map(
"device1" -> DeviceGroup.Temperature(1.0),
"device2" -> DeviceGroup.Temperature(2.0)
)
))
}
Java
@Test
public void testReturnTemperatureValueForWorkingDevices() {
TestKit requester = new TestKit(system);
TestKit device1 = new TestKit(system);
TestKit device2 = new TestKit(system);
Map<ActorRef, String> actorToDeviceId = new HashMap<>();
actorToDeviceId.put(device1.getRef(), "device1");
actorToDeviceId.put(device2.getRef(), "device2");
ActorRef queryActor = system.actorOf(DeviceGroupQuery.props(
actorToDeviceId,
1L,
requester.getRef(),
new FiniteDuration(3, TimeUnit.SECONDS)));
assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId);
assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId);
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef());
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef());
DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class);
assertEquals(1L, response.requestId);
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0));
expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0));
assertEqualTemperatures(expectedTemperatures, response.temperatures);
}
That was the happy case, but we know that sometimes devices cannot
provide a temperature measurement. This scenario is just slightly
different from the previous:
Scala
"return TemperatureNotAvailable for devices with no readings" in {
val requester = TestProbe()
val device1 = TestProbe()
val device2 = TestProbe()
val queryActor = system.actorOf(DeviceGroupQuery.props(
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
requestId = 1,
requester = requester.ref,
timeout = 3.seconds
))
device1.expectMsg(Device.ReadTemperature(requestId = 0))
device2.expectMsg(Device.ReadTemperature(requestId = 0))
queryActor.tell(Device.RespondTemperature(requestId = 0, None), device1.ref)
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref)
requester.expectMsg(DeviceGroup.RespondAllTemperatures(
requestId = 1,
temperatures = Map(
"device1" -> DeviceGroup.TemperatureNotAvailable,
"device2" -> DeviceGroup.Temperature(2.0)
)
))
}
Java
@Test
public void testReturnTemperatureNotAvailableForDevicesWithNoReadings() {
TestKit requester = new TestKit(system);
TestKit device1 = new TestKit(system);
TestKit device2 = new TestKit(system);
Map<ActorRef, String> actorToDeviceId = new HashMap<>();
actorToDeviceId.put(device1.getRef(), "device1");
actorToDeviceId.put(device2.getRef(), "device2");
ActorRef queryActor = system.actorOf(DeviceGroupQuery.props(
actorToDeviceId,
1L,
requester.getRef(),
new FiniteDuration(3, TimeUnit.SECONDS)));
assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId);
assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId);
queryActor.tell(new Device.RespondTemperature(0L, Optional.empty()), device1.getRef());
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef());
DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class);
assertEquals(1L, response.requestId);
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
expectedTemperatures.put("device1", new DeviceGroup.TemperatureNotAvailable());
expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0));
assertEqualTemperatures(expectedTemperatures, response.temperatures);
}
We also know, that sometimes device actors stop before answering:
Scala
"return DeviceNotAvailable if device stops before answering" in {
val requester = TestProbe()
val device1 = TestProbe()
val device2 = TestProbe()
val queryActor = system.actorOf(DeviceGroupQuery.props(
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
requestId = 1,
requester = requester.ref,
timeout = 3.seconds
))
device1.expectMsg(Device.ReadTemperature(requestId = 0))
device2.expectMsg(Device.ReadTemperature(requestId = 0))
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref)
device2.ref ! PoisonPill
requester.expectMsg(DeviceGroup.RespondAllTemperatures(
requestId = 1,
temperatures = Map(
"device1" -> DeviceGroup.Temperature(1.0),
"device2" -> DeviceGroup.DeviceNotAvailable
)
))
}
Java
@Test
public void testReturnDeviceNotAvailableIfDeviceStopsBeforeAnswering() {
TestKit requester = new TestKit(system);
TestKit device1 = new TestKit(system);
TestKit device2 = new TestKit(system);
Map<ActorRef, String> actorToDeviceId = new HashMap<>();
actorToDeviceId.put(device1.getRef(), "device1");
actorToDeviceId.put(device2.getRef(), "device2");
ActorRef queryActor = system.actorOf(DeviceGroupQuery.props(
actorToDeviceId,
1L,
requester.getRef(),
new FiniteDuration(3, TimeUnit.SECONDS)));
assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId);
assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId);
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef());
device2.getRef().tell(PoisonPill.getInstance(), ActorRef.noSender());
DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class);
assertEquals(1L, response.requestId);
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0));
expectedTemperatures.put("device2", new DeviceGroup.DeviceNotAvailable());
assertEqualTemperatures(expectedTemperatures, response.temperatures);
}
If you remember, there is another case related to device actors
stopping. It is possible that we get a normal reply from a device actor,
but then receive a Terminated for the same actor later. In this case, we
would like to keep the first reply and not mark the device as
DeviceNotAvailable. We should test this, too:
Scala
"return temperature reading even if device stops after answering" in {
val requester = TestProbe()
val device1 = TestProbe()
val device2 = TestProbe()
val queryActor = system.actorOf(DeviceGroupQuery.props(
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
requestId = 1,
requester = requester.ref,
timeout = 3.seconds
))
device1.expectMsg(Device.ReadTemperature(requestId = 0))
device2.expectMsg(Device.ReadTemperature(requestId = 0))
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref)
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref)
device2.ref ! PoisonPill
requester.expectMsg(DeviceGroup.RespondAllTemperatures(
requestId = 1,
temperatures = Map(
"device1" -> DeviceGroup.Temperature(1.0),
"device2" -> DeviceGroup.Temperature(2.0)
)
))
}
Java
@Test
public void testReturnTemperatureReadingEvenIfDeviceStopsAfterAnswering() {
TestKit requester = new TestKit(system);
TestKit device1 = new TestKit(system);
TestKit device2 = new TestKit(system);
Map<ActorRef, String> actorToDeviceId = new HashMap<>();
actorToDeviceId.put(device1.getRef(), "device1");
actorToDeviceId.put(device2.getRef(), "device2");
ActorRef queryActor = system.actorOf(DeviceGroupQuery.props(
actorToDeviceId,
1L,
requester.getRef(),
new FiniteDuration(3, TimeUnit.SECONDS)));
assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId);
assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId);
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef());
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef());
device2.getRef().tell(PoisonPill.getInstance(), ActorRef.noSender());
DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class);
assertEquals(1L, response.requestId);
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0));
expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0));
assertEqualTemperatures(expectedTemperatures, response.temperatures);
}
The final case is when not all devices respond in time. To keep our test
relatively fast, we will construct the DeviceGroupQuery actor with a
smaller timeout:
Scala
"return DeviceTimedOut if device does not answer in time" in {
val requester = TestProbe()
val device1 = TestProbe()
val device2 = TestProbe()
val queryActor = system.actorOf(DeviceGroupQuery.props(
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
requestId = 1,
requester = requester.ref,
timeout = 1.second
))
device1.expectMsg(Device.ReadTemperature(requestId = 0))
device2.expectMsg(Device.ReadTemperature(requestId = 0))
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref)
requester.expectMsg(DeviceGroup.RespondAllTemperatures(
requestId = 1,
temperatures = Map(
"device1" -> DeviceGroup.Temperature(1.0),
"device2" -> DeviceGroup.DeviceTimedOut
)
))
}
Java
@Test
public void testReturnDeviceTimedOutIfDeviceDoesNotAnswerInTime() {
TestKit requester = new TestKit(system);
TestKit device1 = new TestKit(system);
TestKit device2 = new TestKit(system);
Map<ActorRef, String> actorToDeviceId = new HashMap<>();
actorToDeviceId.put(device1.getRef(), "device1");
actorToDeviceId.put(device2.getRef(), "device2");
ActorRef queryActor = system.actorOf(DeviceGroupQuery.props(
actorToDeviceId,
1L,
requester.getRef(),
new FiniteDuration(3, TimeUnit.SECONDS)));
assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId);
assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId);
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef());
DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(
FiniteDuration.create(5, TimeUnit.SECONDS),
DeviceGroup.RespondAllTemperatures.class);
assertEquals(1L, response.requestId);
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0));
expectedTemperatures.put("device2", new DeviceGroup.DeviceTimedOut());
assertEqualTemperatures(expectedTemperatures, response.temperatures);
}
Our query works as expected now, it is time to include this new
functionality in the DeviceGroup actor now.
Adding query capability to the group
Including the query feature in the group actor is fairly simple now. We
did all the heavy lifting in the query actor itself, the group actor
only needs to create it with the right initial parameters and nothing
else.
Scala
class DeviceGroup(groupId: String) extends Actor with ActorLogging {
var deviceIdToActor = Map.empty[String, ActorRef]
var actorToDeviceId = Map.empty[ActorRef, String]
var nextCollectionId = 0L
override def preStart(): Unit = log.info("DeviceGroup {} started", groupId)
override def postStop(): Unit = log.info("DeviceGroup {} stopped", groupId)
override def receive: Receive = {
// ... other cases omitted
case RequestAllTemperatures(requestId) =>
context.actorOf(DeviceGroupQuery.props(
actorToDeviceId = actorToDeviceId,
requestId = requestId,
requester = sender(),
3.seconds
))
}
}
Java
public class DeviceGroup extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
final String groupId;
public DeviceGroup(String groupId) {
this.groupId = groupId;
}
public static Props props(String groupId) {
return Props.create(DeviceGroup.class, groupId);
}
public static final class RequestDeviceList {
final long requestId;
public RequestDeviceList(long requestId) {
this.requestId = requestId;
}
}
public static final class ReplyDeviceList {
final long requestId;
final Set<String> ids;
public ReplyDeviceList(long requestId, Set<String> ids) {
this.requestId = requestId;
this.ids = ids;
}
}
public static final class RequestAllTemperatures {
final long requestId;
public RequestAllTemperatures(long requestId) {
this.requestId = requestId;
}
}
public static final class RespondAllTemperatures {
final long requestId;
final Map<String, TemperatureReading> temperatures;
public RespondAllTemperatures(long requestId, Map<String, TemperatureReading> temperatures) {
this.requestId = requestId;
this.temperatures = temperatures;
}
}
public static interface TemperatureReading {
}
public static final class Temperature implements TemperatureReading {
public final double value;
public Temperature(double value) {
this.value = value;
}
}
public static final class TemperatureNotAvailable implements TemperatureReading {
}
public static final class DeviceNotAvailable implements TemperatureReading {
}
public static final class DeviceTimedOut implements TemperatureReading {
}
final Map<String, ActorRef> deviceIdToActor = new HashMap<>();
final Map<ActorRef, String> actorToDeviceId = new HashMap<>();
final long nextCollectionId = 0L;
@Override
public void preStart() {
log.info("DeviceGroup {} started", groupId);
}
@Override
public void postStop() {
log.info("DeviceGroup {} stopped", groupId);
}
private void onAllTemperatures(RequestAllTemperatures r) {
getContext().actorOf(DeviceGroupQuery.props(
actorToDeviceId, r.requestId, getSender(), new FiniteDuration(3, TimeUnit.SECONDS)));
}
@Override
public Receive createReceive() {
// ... other cases omitted
.match(RequestAllTemperatures.class, this::onAllTemperatures)
.build();
}
}
It is probably worth restating what we said at the beginning of the
chapter. By keeping the temporary state that is only relevant to the
query itself in a separate actor we keep the group actor implementation
very simple. It delegates everything to child actors and therefore does
not have to keep state that is not relevant to its core business. Also,
multiple queries can now run parallel to each other, in fact, as many as
needed. In our case querying an individual device actor is a fast
operation, but if this were not the case, for example, because the
remote sensors need to be contacted over the network, this design would
significantly improve throughput.
We close this chapter by testing that everything works together. This
test is just a variant of the previous ones, now exercising the group
query feature:
Scala
"be able to collect temperatures from all active devices" in {
val probe = TestProbe()
val groupActor = system.actorOf(DeviceGroup.props("group"))
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
val deviceActor1 = probe.lastSender
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
val deviceActor2 = probe.lastSender
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device3"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
val deviceActor3 = probe.lastSender
// Check that the device actors are working
deviceActor1.tell(Device.RecordTemperature(requestId = 0, 1.0), probe.ref)
probe.expectMsg(Device.TemperatureRecorded(requestId = 0))
deviceActor2.tell(Device.RecordTemperature(requestId = 1, 2.0), probe.ref)
probe.expectMsg(Device.TemperatureRecorded(requestId = 1))
// No temperature for device3
groupActor.tell(DeviceGroup.RequestAllTemperatures(requestId = 0), probe.ref)
probe.expectMsg(
DeviceGroup.RespondAllTemperatures(
requestId = 0,
temperatures = Map(
"device1" -> DeviceGroup.Temperature(1.0),
"device2" -> DeviceGroup.Temperature(2.0),
"device3" -> DeviceGroup.TemperatureNotAvailable)))
}
Java
@Test
public void testCollectTemperaturesFromAllActiveDevices() {
TestKit probe = new TestKit(system);
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
ActorRef deviceActor1 = probe.getLastSender();
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
ActorRef deviceActor2 = probe.getLastSender();
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device3"), probe.getRef());
probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
ActorRef deviceActor3 = probe.getLastSender();
// Check that the device actors are working
deviceActor1.tell(new Device.RecordTemperature(0L, 1.0), probe.getRef());
assertEquals(0L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
deviceActor2.tell(new Device.RecordTemperature(1L, 2.0), probe.getRef());
assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
// No temperature for device 3
groupActor.tell(new DeviceGroup.RequestAllTemperatures(0L), probe.getRef());
DeviceGroup.RespondAllTemperatures response = probe.expectMsgClass(DeviceGroup.RespondAllTemperatures.class);
assertEquals(0L, response.requestId);
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0));
expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0));
expectedTemperatures.put("device3", new DeviceGroup.TemperatureNotAvailable());
assertEqualTemperatures(expectedTemperatures, response.temperatures);
}
Summary
In the context of the IoT system, this guide introduced the following
concepts, among others. You can follow the links to review them if
necessary:
- The hierarchy of actors and their lifecycle
- The importance of designing messages for flexibility
- How to watch and stop actors, if necessary
What’s Next?
To continue your journey with Akka, we recommend:
- Start building your own applications with Akka, make sure you get
involved in our amazing community for help if you get stuck.
- If you’d like some additional background, read the rest of the
reference documentation and check out some of the books and video’s
on Akka.
LINKS TO MORE AKKA DOCUMENTATION
* Akka Documentation: http://akka.io/docs/
* Try Akka: http://akka.io/try-akka/
* Akka Commercial Addons: http://developer.lightbend.com/docs/akka-commercial-addons/current/
* Alpakka - Akka Streams Connectors: http://developer.lightbend.com/docs/alpakka/current/
* Akka Streams connector for Apache Kafka: http://doc.akka.io/docs/akka-stream-kafka/current/home.html
* Akka (Cluster) Management: http://developer.lightbend.com/docs/akka-management/current/
* Cassandra Plugins for Akka Persistence: https://github.com/akka/akka-persistence-cassandra
* DynamoDBJournal for Akka Persistence: https://github.com/akka/akka-persistence-dynamodb
* Samples and guides: http://developer.lightbend.com/start/?group=akka
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment