Skip to content

Instantly share code, notes, and snippets.

@denistex
Last active May 25, 2017 09:53
Show Gist options
  • Save denistex/56fdd22a890be0559b69e84e4afa46a2 to your computer and use it in GitHub Desktop.
Save denistex/56fdd22a890be0559b69e84e4afa46a2 to your computer and use it in GitHub Desktop.
Concise content of "Akka in Action" book

"Akka in Action" Short Terms

Chapter 1. Introducing Akka

  1. Scale up - more resources (CPU).
  2. Scale out - more servers in a cluster.
  3. Traditional scaling (thread, locks, RPC, etc.) is complex and inflexible.
  4. Actors - programming model for scaling up and out.
  5. Akka is centered on actors. Reactive Manifesto - base ideas of Akka.
  6. Actors executed asynchronously.
  7. Actors can receive and send messages. Messages are immutable.
  8. No type safety (yet). Any message can be send to the actor.
  9. Actor operations:
    • send: sending messages;
    • create: creating child actors;
    • become: moving between states like state machines;
    • supervise: keeping track of child actors.
  10. Actors decoupled on three axes:
    • space/location: no expectation about where other components are located;
    • time: no expectation about when the work will be done;
    • interface: no expectation about what messages other components can understand.
  11. ActorSystem - container for all actors.
  12. New actor is created by ActorSystem.actorOf with Props object passed as argument.
  13. Props object describes how the actor should be created. It eventually calls the actor constructor.
  14. ActorSystem returns ActorRef for created actor (not Actor itself).
  15. ActorRef is used to send messages to the actor.
  16. Every actor has a name (unique per level in the hierarchy).
  17. ActorPath - path to the actor in the hierarchy (like URL). Path can be relative or absolute.
  18. Every actor has a mailbox - messages queue.
  19. Dispatcher pushes down the mailbox, make the actor to process next message.

Chapter 2. Up and running

  1. Example projects repo.
  2. You have to be familiar with sbt.
  3. httpie is good tool to test HTTP servers.
  4. Actor messages usually bundled together in the companion object.
  5. Actor.sender() is used to send reply to the message sender.
  6. ActorRef.forward(message) is used to forward the message (original sender of the message is not changed).
  7. Actors use ActorContext.actorOf(props, name) instead of ActorSystem.actorOf(props, name) to create child actors.
  8. ActorContext.children and ActorContext.child(name) returns children of the actor.
  9. Heroku can be used to deploy actors applications.
  10. git push is used to deploy the project to Heroku.

Chapter 3. Test-driven development with actors

  1. Actors unit testing can be:
    • synchronous or asynchronous;
    • single-threaded, multithreaded or multiple JVM.
  2. Messages can be:
    • one-way: fire and forget;
    • two-way: request-response.
  3. Three variations of actor can be tested:
    • silent actor: changes the state on new message, but doesn't send any messages or producing any side effects; changes are not directly observable from the outside;
    • sending actor: send message(s) to other actors; this includes: mutating copy actor, forwarding actor, transforming actor, filtering actor and sequencing actor;
    • side effecting actor: produces side-effects on new message (writes to log, for example).
  4. Test classes should extend TestKit.
  5. TestKit.testActor is internal actor that can be used as receiver in test environment.
  6. TestKit provides method to assert messages received by testActor: expectMsg, expectMsgPF, expectNoMsg, receiveWhile and ignoreMsg.
  7. TestProbe class can be used to work with several test actors. TestProbe can be instantiated, no need to extend it.
  8. Silent actors in single thread are tested using TestActorRef[SilentActor] that provides direct access to the actor and its state with underlyingActor.
  9. Silent actors must support GetState message to be able for testing in multithreaded environment.
  10. Sending actors are tested using TestKit method (expectMsg, ignoreMsg, etc.).
  11. Side effecting actors are hard to test and usually it's better to add some extra functional to the actor and make it sending message to the optional receiver with side effect data. After that side effecting actors can be tested like sending ones.
  12. Test class can extend ImplicitSender to implicitly replace sender() ref with testActor. Can be useful when testing two-way messages.

Chapter 4. Fault tolerance

  1. Let-it-crash principle.
  2. Fault avoidance strategies:
    • isolation: isolate failed component;
    • redundancy: backup components existence;
    • replacement: failed component can be easily replaced with backup;
    • reboot: failed component can be restarted;
    • suspend: calls to failed component should be suspended util backup is ready to process them;
    • separation of concerns: fault-recovery code separated from the normall processing.
  3. Two separate flows of application:
    • normal logic: regular actors;
    • fault-recovery: supervisors.
  4. Two states of an actor:
    • started;
    • terminated.
  5. Three events:
    • start;
    • stop;
    • restart.
  6. An actor can be stopped using ActorSystem.stop(actor), ActorContext.stop(actor) or by sending PoisonPill message to it.
  7. Four hooks: preStart, postStop, preRestart (optionally calls postStop) and postRestart (optionally calls preStart).
  8. Failing message passed to preRestart hook as a parameter.
  9. Restarted actor replaces crashed one, ActorRef automatically switches to the new instance.
  10. Stopped actor doesn't process messages and its ActorRef switches to special deadLettersActorRef.
  11. Actor can monitor any other (not only its child) with ActorContext.watch(actor). In this case monitor receive Terminated message if monitored actor is stopped.
  12. Monitor is not receive any messages if monitored actor is restarted.
  13. User space - supervisor hierarchy under the /user actor path.
  14. Most dangerous actors should be as low down the hierarchy as possible.
  15. Two way to define a hierarchy of supervisors:
    • one supervisor creates all actors in the application and supervises them (only restart of actors can be used);
    • parent actor supervises its children and decide what to do with crashed child.
  16. Two predefined supervisor strategies:
    • default: stops actor when it's failed to initialize or was killed, restarts in other cases;
    • stopping: stops actor on every exception.
  17. Predefined strategies catch Exception instances only (not Throwable).
  18. Unhandled exceptions automatically escalades to parent of the supervisor.
  19. Four decisions are available for a supervisor:
    • restart: child will be recreated from its props; the failing message is removed from the mailbox (can be reprocessed in restart hooks);
    • resume: error is ignored, same actor instance continues to process messages;
    • stop: child will be terminated, message processign stopped;
    • escalade: the problem will be escaladed to the parent of the supervisor.
  20. Two strategies available for supervisor decisions:
    • OneForOneStrategy: applies the decision to the crashed child only;
    • AllForOneStrategy: applies the decision to all children of the supervisor.
  21. Both OneForOneStrategy and AllForOneStrategy have maxNrOfRetries and withinTimeRange parameters.

Chapter 5. Futures

  1. A future makes it possible to process the result of a function without ever waiting in the current thread for the result.
  2. A future is read-only placeholder for a function result that will be available at some point in the future.
  3. Futures can be combined with other futures in many ways.
  4. Futures and actors can be used together.
  5. Scala futures is not a wrapper around Plain Old Java Futures (java.util.concurrent.Future).
  6. Future.apply(block) executes block on another thread.
  7. Future.foreach asynchronously processes the future result when it becomes available.
  8. Future.map and Future.flatMap call a passed function when the future contains a successfull result and returns new Future.
  9. Implicit ExecutionContext must be provided to use futures. scala.concurrent.Implicits.global is a global execution context.
  10. The dispatcher of an actor system can be used as an ExecutionContext. Better than the global one.
  11. Promise is write side of the Future/Promise model.
  12. A promise can only be completed once. (IllegalStateException is thrown).
  13. DefaultPromise[T] extends both Future[T] and Promise[T], thread-safe.
  14. Future.onCompleted receives Try that can be Success or Failure.
  15. Try supports pattern matching.
  16. Fatal exceptions (like OutOfMemoryError) never handled by a future. They are thrown straight through the JVM.
  17. Future.onFailure method can be used instead of Future.onCompleted if only exceptions has to be processed.
  18. Future.recover and Future.recoverWith methods are used to provide default future value in case the future is failed.
  19. Code block passed to Future.recover and Future.recoverWith methods executed synchronously after the error has been returned. Keep this block simple.
  20. Future.firstCompletedOf returns first completed future (successed or failed).
  21. Future.find can be used to find first successed future.
  22. Future.zip combines two futures and returns a future of tuple.
  23. For comprehension can be used instead of Future.map.
  24. Future.sequence converts sequence of futures to future of sequence (see also Future.traverse).
  25. Future.fold can be used to collect data from sequence of futures.
  26. akka.pattern.ask returns a future that wraps actor answer.
  27. akka.pattern.pipe sends message to an actor when it become available in the future.
  28. Calling sender() from the future body is not safe due to mutable nature of actors. Capture it in val or use pipe.
  29. The value contained in the future should be immutable to avoid sharing of mutable state.

Chapter 6. Your first distributed Akka app

  1. Node - a running application that communicates across the network.
  2. A node has a specific role in the distributed system.
  3. A node uses a specific network transport protocol (TCP, UDP) to communicate with other nodes.
  4. Messages between nodes are encoded into network-specific protocol data units.
  5. Messages need to be translated to and from bytes, respectively known as serialization and deserialization.
  6. When nodes are part of the same distributed system, they share a group membership.
  7. Membership can be static or dynamic or a mix of both.
  8. Some kind of discovery mechanizm needed to support nodes in a dynamic membership.
  9. Common network topologies:
    • centralized/local;
    • client-server;
    • star;
    • ring;
    • peer-to-peer/mesh;
    • tree.
  10. Local programming differs from distributed one in four important areas:
    • latency;
    • partial failure;
    • memory access;
    • concurrency.
  11. Akka provides distributed model both for local and distributed programming.
  12. Two ways to get a reference to a remote actor: lookup and deploy.
  13. Akka-remote module should be configured in src/main/resources/application.conf (transports, host, port).
  14. ActorSystem.actorSelection is used to lookup remote actors.
  15. Java serialization should not be used. Akka will log a warning.
  16. ActorSystem.actorFor is deprecated.
  17. RemoteLookupProxy is used to lookup remote actor and keep correct ActorRef to it.
  18. Built-in Identify message and ActorIdentity reply are used to get ActorRef to remote actor by path.
  19. Two ways to deploy an actor remotely:
    • through configuration: no code changes needed;
    • programmatically: Props.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString(uri)))).
  20. RemoteBoxOfficeForwarder is used to deploy remote actor (through configuration), watch it, keep actual ActorRef and forward (proxy) messages to it.
  21. RemoteLookupProxy and RemoteBoxOfficeForwarder lost messages during reconnect to the crashed remote actor.
  22. To use multi-JVM testing:
    • sbt-multi-jvm plugin should be registered in the project/plugins.sbt file;
    • multi-JVM configuration file should be added into the project/ dir;
    • node roles should be defined in object that extends MultiNodeConfig class;
    • test class should extend MultiNodeSpec, MultiNodeSpecCallbacks and ImplicitSender.
  23. MultiNodeSpec.enterBarrier is used to synchronize nodes in test.

Chapter 7. Configuration, logging and deployment

  1. The ConfigFactory is used to get the configuration.
  2. Following configuration formats are supported:
    • application.properties: the Java property file format;
    • application.json: the JSON style;
    • application.conf: the HOCON format;
  3. Config.getConfig(path) is used to get subtree of the config.
  4. Variables substitution is allowed in the config with familiar syntax: connectstr = "jdbc:mysql://${hostname}/mydata".
  5. System properties or environment variables allowed in the config: hostname = ${?HOST_NAME}.
  6. Following values for the same key overrides previous value (if the new value is not empty).
  7. Exception is thrown when trying to get value that isn't set before.
  8. The reference.conf file contains default values for the configuration.
  9. The configuration library will find all the reference.conf files in all applicaiton components and integrate them into the configuration fallback structure.
  10. Following configurations are used by default (upper configurations overrides lower ones):
    • system properties;
    • application.conf;
    • application.json;
    • application.properties;
    • reference.conf.
  11. ConfigFactory.load("myapp") will load myapp.{conf,json,properties} instead of application.{...}.
  12. Config file name can be overriden with Java system properties (name should include the extension):
    • config.resource: resource name;
    • config.file: file path;
    • config.url: file URL.
  13. Config object can be passed to ActorSystem as a second parameter: ActorSystem("front", config).
  14. ActorSystem uses ConfigFactory.load() by default (loads application.{conf,json,properties} or config that specified in Java system properties).
  15. Loaded config can be accessed through ActorSystem: actorSystem.settings.config.
  16. Configuration file can include other configs with include "baseConfig".
  17. Lifting a configuration can be implemented using withFallback method: child.withFallback(parent).
  18. Akka logger is an actor that receives log messages and forwards them to the preferred logging framework.
  19. Two built-in loggers available by default:
    • akka.event.Logging$DefaultLogger: sends messages to STDOUT;
    • akka.event.slf4j.Slf4jLogger: uses SLF4j as logging framework.
  20. Custom logger can be used (just an actor with specialized messages interface).
  21. ActorLogging trait should be mixed to actor to use logging.
  22. Placeholders available in logs: log.debug("two args: {}, {}", "one", "two").
  23. Lot of config options available for Akka system logs, check docs.
  24. LoggingReceive trait should be used to track actor received messages.
  25. Register sbt-native-packaged plugin in project/plugins.sbt to build application bundle with sbt.
  26. Distribution is created with sbt stage.

Chapter 8. Structural patterns for actors

  1. There are three enterprise integration patterns (EIPs) discussed:
    • pipes and filters;
    • scatter-gather;
    • routing slip;
  2. Pipes and filters introduce independent processing units with same interface (filters), so the pipeline can be constructed from them. Units can be reordered or replaced in any way and this won't change logic of rest units. In most cases units are filters, but it's not mandatory.
  3. Scatter-gather can be applied in two different scenarios:
    • competing tasks: the processing tasks are all focused on one thing (but they may be doing it in different ways), the gather selects one result following specified criteria;
    • parallel cooperative processing: the tasks are performing a subtask, the gather combines results into a single message.
  4. Scatter-gather introduces three components:
    • scatter: recevies one message and scatters the job to several processing tasks;
    • processing tasks: doing the same jobs in parallel or doing some parts of the main job concurrently;
    • gather: filters or combines tasks result.
  5. Recipient list pattern can be used as simple implementation of scatter component.
  6. Gather timeout event can be implemented with Akka scheduler: context.system.scheduler.scheduleOnce, that will send message to the actor after specified time.
  7. To keep buffer of already received messages on restart, gather can use preRestart hook to resend messages to itself.
  8. Aggregator trait provided by Akka implements Aggregator pattern and can be used as gather component.
  9. Pipes and filters can be combined with scatter-gather components in two ways:
    • scatter-gather implementation presents one filter in the pipeline;
    • the pipeline is used in the processing tasks of scatter-gather pattern.
  10. Routing slip is a dynamic version of pipes and filters patterns.
  11. In routing slip units have the same interface and can be combined in many ways.
  12. The messages sent between units has routeSlip field that contains the list of following recepients. Each actor takes head from the list and send result to it with tail of routeSlip list.
  13. It's better to implement route slip functions in separate trait that will send message to next task. In this case task actors won't contain copy-paste of code responsible for messages transferring.

Chapter 9. Routing messages

  1. Three reasons for using routing to control message flow:
    • performance;
    • message content;
    • state.
  2. In Akka a separation is made between the routing logic and the actor that represents the router.
  3. The built-in routers come in two varieties:
    • pool: manage the routees;
    • group: don't manage the routees (creation, adding, watching, removing and termination has to be done by the client code).
  4. Available routers within Akka:
    • RoundRobinRoutingLogic / RoundRobinPool / RoundRobinGroup;
    • RandomRoutingLogic / RandomPool / RandomGroup;
    • SmallestMailboxRoutingLogic / SmallestMailboxPool: group is not available because it can't check mailboxes of the routees;
    • BalancingPool: one mailbox for all the routees, distributes the messages to the idle routees;
    • BroadcastRoutingLogic / BroadcastPool / BroadcastGroup: recipient list;
    • ScatterGatherFirstCompletedRoutingLogic / ScatterGatherFirstCompletedPool / ScatterGatherFirstCompletedGroup: scatter-gather pattern implementation;
    • ConsistentHashingRoutingLogic / ConsistentHashingPool / ConsistentHashingGroup.
  5. Two different ways to configure the router:
    • through configuration file;
    • in source code.
  6. Some messages are processed by the router itself (instead of redirect to routees):
    • Kill: terminates the router (pool routees will terminated as well);
    • PoisonPill: terminates the router (pool routees will terminated as well);
    • Broadcast: sends the content of the message to all the routees (DO NOT use with BalancingPool).
  7. RemoteRouteConfig configures router to use the routees on remote servers.
  8. Resizer can be configured to dynamically change size of the pool:
    • should be enabled with enabled = on;
    • lower-bound, upper-bound: min and max number of the routees;
    • pressure-treshold: mailbox size of router that considered to be under pressure (special value 0 means that when the routee is processing a message, it's under pressure);
    • rampup-rate: how fast routees should be added (0.25 means +25% of current routees number, value rounded up);
    • backoff-treshold: when decrease number of routees (0.3 means decrease number when there are less than 30% of non-idle routees);
    • backoff-rate: how fast routees should be removed (0.1 means -10% of current routees number);
    • messages-per-resize: number of messages received before another resize action is allowed.
  9. Default supervisor strategy of the pool always escalade fails to its own supervisor. That can lead to unexpected restart of the pool with all the routees.
  10. Supervisor strategy can be passed as a parameter to the pool constructor (default strategy: SupervisorStrategy.defaultStrategy).
  11. If the resizer of the pool is not configured the pool won't spawn a new route to replace terminated one (will just remove them from the list). Use resizer to keep the specified minimum number of routees.
  12. Groups configures with the routees paths instead of routees count.
  13. Paths to remote routees can be set in the configuration to make group work with remote routees. No additional changes needed.
  14. Group doesn't watch the routees - it will send messages to the terminated routee (it hopes routee becomes available on this path sometimes).
  15. Special messages to the groups:
    • GetRoutees: returns routees sequence (Java collection);
    • AddRoutee: adds specified routee to the group;
    • RemoveRoutee: removes specified routee from the group.
  16. Three implementations of the Routee trait:
    • ActorRefRoutee: DO NOT use it to add the routee to the group, because in this case the group will watch the route and will terminate itself if the routee termiates;
    • ActorSelectionRoutee: uses ActorSelection;
    • SeveralRoutees: a list of Routees.
  17. To remove a route the same Route instance as sent in AddRoute should be sent in RemoveRoute message. So use ActorSelectionRoute.
  18. The consistent hashing routers use virtual nodes before the routees to get a bigger chance to equally spread all the messages over the routees.
  19. The consistent hashing routers support three ways to translate the message into a message key (it's possible to use the three solutions in one router):
    • router-specific: a partial function is passed to the router;
    • message-specific: message should implement ConsistentHashable;
    • sender-specific: message should be wrapped into ConsistentHashableEnvelope by the sender.
  20. Content-based routing can be implemented with regular actors, no Akka routers required.
  21. State-based routers can use become and unbecome methods:
    • become replaces receive function with specified one;
    • unbecome restores original receive function of the router.
  22. If state-based router fails and restarts, original receive function is restored.

Chapter 10. Message channels

  1. Two channel types:
    • point-to-point;
    • publish-subscribe.
  2. The point-to-point channel sends the message to one receiver.
  3. The point-to-point channel can have multiple receivers, but every message is delivered to just one receiver (example: round-robin router).
  4. When multiple messages are sent through point-to-point channel, the order of these messages isn't changed.
  5. The publish-subscribe channel has dynamic nature and decouples the receivers and the sender.
  6. The publisher actor shouldn't know anything about the subscribers of publish-subscribe channel.
  7. Every ActorSystem has eventStream that can manage multiple publish-subscribe channels (classified by message type).
  8. The actor can subscribe to a specific message type in event stream.
  9. EventBus can be implemented to create custom public-subscribe channel.
  10. EventBus is generalized, there are three entities:
    • Event: type of all events in the channel;
    • Subscriber: type of subscriber allowed to register on that event bus;
    • Classifier: the classifier to be used in selecting subscribers for dispatching events.
  11. Three auxiliary traits to keep track of the subscribers:
    • LookupClassification: keeps a set of subscribers for each possible classifier, using classify method, which should be implemented;
    • SubchannelClassification: used when classifiers form a hierarchy and it's possible to subscribe at the parent nodes as well (example: message types in EventStream implementation);
    • ScanningClassification: can be used when classifiers have an overlap (one Event can be part of more classifiers).
  12. ActorEventBus trait defines Subscriber entity as ActorRef, also implements compareSubscribers method needed by LookupClassification.
  13. Dead-letter channel (or dead-letter queue or dead-message queue) is a special channel that contain all the messages that can't be processed or delivered.
  14. EventStream is used to implement dead-letter channel.
  15. Messages in dead-letter channel are wrapped into a DeadLetter object.
  16. The actor can subscribe to dead-letter channel with system.eventStream.subscribe(actor, classOf[DeadLetter]).
  17. A message can be sent to system.deadLetter actor to be published in dead-letter channel. In this case initial receiver becomes DeadLetter actor. To avoid this the original message can be wrapped into a DeadLetter object manually before sending to the DeadLetter actor.
  18. Messages from dead-letter channel can be reinserted to the mailbox to keep them from dropping.
  19. Akka can't guarantee message delivery in all cases (no system can).
  20. For local actors, the delivery is guaranteed as long as there are no critical VM errors.
  21. For remote actors Akka guarantees that messages are delivered at most once (a message is delivered once or it's not delivered and it's lost).
  22. Three reasons why Akka doesn't implement fully guaranteed delivery:
    • fully guaranteed delivery results in a performance penalty even when the system don't need that level of guarantees;
    • systems need guaranteed processing not delivery only, but this is system dependent and Akka can't deduce this;
    • it's always possible to add stricter guarantees on top of basic ones, but inverse is not possible.
  23. ReliableProxy can be used to increase reliability of sending messages using remote actors.
  24. ReliableProxy established a tunnel between sender and receiver, tracks received messages and repeat failed ones until they are delivered as well.
  25. ReliableProxy tunnel is only one-way and for one receiver. When the receiver replies to the sender, the tunnel is not used (another tunnel has to be made in this case).

Chapter 11. Finite-state machines and agents

  1. FSM trait is used to build finite-state machines.
  2. FSM trait takes two type parameters:
    • State: the super type of all states;
    • StateData: the super type of all possible state data types that's tracked by the FSM.
  3. FSM useful methods:
    • startWith defines the initial state and the initial state data;
    • when(State) { PartialFunction } declares transitions for the state;
    • whenUnhandled { PartialFunction } declares default behavior for unhandled events;
    • goto(State) { PartialFunction } declares next state during event processing;
    • goto(State) using StateData declares next state and updates data;
    • stay keeps the current state during event processing;
    • stay using StateData keeps the current state and updates data;
    • onTransition { PartialFunction } declares entry and exit actions for states (stateData and nextStateData variables available);
    • initialize starts the FSM.
  4. An actor can subscribe to FSM transition events by sending SubscribeTransitionCallback(actor) message to the FSM. FSM will reply with CurrentState message and will send Transition message on each transition event.
  5. The state timeout can be send in two ways:
    • as when second parameter: when(State, stateTimeout: FiniteDuration);
    • by calling forMax method: goto(State) using (StateData) forMax (FiniteDuration).
  6. State receives StateTimeout message on timeout. The timer is cancelled upon receipt of any other message while in the current state.
  7. Akka has support for sending messages using timers within FSM. API calls:
    • setTimer(name: String, msg: Any, timeout: FiniteDuration, repeat: Boolean);
    • cancelTimer(name: String): cancels the timer immediately, even if the timer already fired and equeued the message, the message won't be processed after this call;
    • isTimerActive(name: String): Boolean.
  8. Handler onTermination { PartialFunction } is used to handle FSM termination. It receives StopEvent(Reason, State, Data) object. There are three possible reasons:
    • Normal: FSM stop method has been called;
    • Shutdown: the actor has been terminated;
    • Failure(cause: Any): an error occurred.
  9. Akka Agent allows multiple actors work with shared state:
    • Agent.apply or Agent.get returns the current state;
    • Agent.send(State) or Agent.send(State => State) updates the state (use second version to combine new state with current);
    • Agent.alter(State => State) updates the state and returns Future with new state;
    • Agent.future returns Future that finishes when the pending state changes are all processed.
  10. New agents can be created witn map or flatMap methods.

Chapter 12. System integration

  1. Endpoints are the glue between the external service and the rest of the system. The endpoint encapsulates the interface between two services.
  2. Consumer endpoint receives messages from the external service and translates them to internal format of the system.
  3. Producer endpoint converts internal format of the message and produce it to the external service.
  4. Normalizer pattern translates different messages (from different services and endpoints) to common internal format to allow general processing.
  5. In complex cases normalizer can be splitted to three sub-components:
    • a set of transport implementations (for example, EMail, REST and MQueue);
    • a set of format translators (from plain text, JSON and XML);
    • a router that selects translator for each received message (it should know how to distinguish between all the message types).
  6. There is a trade-off between flexibility and complexity: some transport implementations can be connected directly to translators if just one message type is transferred through this transport - this decreases complexety but also decreases flexibility.
  7. Canonical data model pattern can be used when lot of systems has to be connected together.
  8. In canonical data model each system provides endpoint that converts messages between internal system format and common canonical format that is used in communication bus.
  9. Apache Camel and Akka Camel module provide support for a great variety of transport layers and makes it possible to implement the standard EIPs in a few lines of code.
  10. Apache Camel allows to select transport layer implementation at runtime. This can be used in tests.
  11. Apache Camel uses URI to define the transport protocol and its properties.
  12. akka.camel.Consumer should be extended to implement a consumer:
    • endpointUri should be implemented to specify URI of the desired component;
    • receive should be implemented to process messages of CamelMessage type.
  13. To send a response in the consumer just reply to the sender actor as usual.
  14. CamelExtension.activationFutureFor(Consumer) is used to "wait" a consumer to become ready.
  15. CamelContext.addComponent is used to add parametrized component (for example, ActiveMQComponent).
  16. BrokerRegistry is used to stop ActiveMQ message brokers: BrokerRegistry.getInstance().getBrokers.foreach { case (_, b) => b.stop() }.
  17. akka.camel.Producer should be extended to implement a producer:
    • transformOutgoingMessage can be implemented to convert message before sending;
    • transformResponse can be implemented to convert received response (from CamelMessage);
    • routeResponse can be implemented to route received response to other receiver (default implementation routes response to the original sender); note: transformResponse should be called manually from overriden routeResponse.
  18. Akka HTTP module allows to define routes with directives.
  19. Generic directive form is this: name(arguments) { extractions => ... // inner route }.
  20. Directive examples:
    • get { ... } match on GET requests;
    • post { ... } match on POST requests;
    • path(PathMatcher) match request path;
    • pathPrefix(PathMatcher) match prefix of the path;
    • complete(value) completes the request with the value;
  21. The values can be extracted from the request path with path matchers: pathPrefix("orders" / IntNumber) { id => ... }.
  22. Akka HTTP module provides a test kit to test routes. ScalatestRouteTest should be extended in test class to use the kit.
  23. RequestTimeout trait can be extended to automatically read akka.http.server.request-timeout value from the configuration.
  24. Entity marshallers should be provided in implicit scope to enable custom type marshallers in akka-http (for example, to enable XML marshalling akka.http.scaladsl.marshallers.xml.ScalaXmlSupport._ should be imported).

Chapter 13. Streaming

  1. A stream of data is a sequence of elements that could have no end.
  2. Streams provided by akka-stream library.
  3. Using akka-stream involves two steps:
    • define a blueprint: how streams need to be processed;
    • execute the blueprint: the graph is turned into actors that actually stream the data.
  4. Source and Sink are stream endpoints.
  5. The Source and Sink form RunnableGraph when connected together.
  6. All inputs and outputs in graph should be connected to form RunnableGraph.
  7. RunnableGraph.run materialize the graph - it creates needed resources and do actual work. It requires Materializer in implicit scope. ActorMaterializer converts RunnableGraph into actors.
  8. A stream can be cancelled using take, takeWhile and takeWithin.
  9. Streams use special protocol between data publisher and subscriber.
  10. Data subscriber use nonblocking back pressure to signal the publisher how much data it can process.
  11. Back pressure is traversed from the end of stream to the beginning to ensure no publisher sends more messages than slowest consumer can process.
  12. Reactive Streams Initiative is a standard for asynchronous stream processing with nonblocking back pressure.
  13. Akka-stream uses buffers internaly, so it requests batches of elements instead of requesting every single one.
  14. Akka-stream performs operator fusion to remove unnecessary asynchronous boundaries in the graph. As many stages in a graph as possible are run on a single actor. This behavior is configurable.
  15. Sources and sinks can provide an auxiliary value in a Future. It's possible to configure which value should be kept in each transition (left, right or both).
  16. Flow component is used to perform processing on a stream data. A Flow has one input port and one output.
  17. Akka-stream has a couple of predefined Flows for framing that can be used to identify frames of data in a stream.
  18. The Flow has many collection-like operations, such as map and filter.
  19. A stream is not a collection. Big difference: the size of stream is not known.
  20. By default, stream processing is stopeed when an exception occurs. Supervisor strategy can be set for every graph component or for the complete graph to avoid this.
  21. Another option: catch exceptions and pass them through the stream as special messages.
  22. Bidirectional flow has two open inputs and two open outputs.
  23. BidiFlow can be stacked on top of a flow as an adapter.
  24. In other words: BidiFlow provides two flows that can be connected before and after the existing flow to adapt input and output of the flow.
  25. Akka-http internally uses akka-stream.
  26. HTTP request entity has a dataBytes field which is Source of data in the HTTP stream.
  27. Source can be send to the client as HttpEntity(ContentType, Source).
  28. Akka-http makes it possible to create custom marshallers and unmarshallers for entity data of different content types.
  29. Akka-stream provides graph DSL.
  30. Graph DSL provides GraphDSL.Builder to create the nodes and a ~> method to connect nodes.
  31. Flows can be merged with MergeGraphStage or MergePreferredGraphStage.
  32. Source.combine can be used to merge Sources.
  33. Buffers can be used in streams. Flow.buffer method requires two arguments: buffer size and overflow strategy.
  34. Buffer overflow strategies:
    • dropHead;
    • dropTail;
    • dropBuffer;
    • dropNew;
    • backpressure;
    • fail.
  35. Flow.groupedWithin groups stream elements that arrived during specified time. Can be used to decrease load to the consumer.
  36. Flow.expand registers iterator that will be used when there are no elements available from the flow but fast consumer is ready to process. It will pull elements from the iterator in this case.

Chapter 14. Clustering

  1. A cluster is a dynamic group of nodes, it makes it possible to dynamically grow and shrink the number of nodes.
  2. Each node is a JVM with actor system started in it.
  3. All actor systems in the cluster should have the same name (this is the name of the cluster as well).
  4. Each cluster should contain one or more seed nodes. Seed nodes is a founders of the cluster.
  5. New node should know a list of seed nodes to join to the cluster. The list can be set in the configuration file.
  6. New node sends join message to each seed node; the first seed node to respond will get to handle the join command.
  7. One of the nodes in the cluster is the leader. The leader decides if a member node is up or down.
  8. The first node, in sort order, that is Up or Leaving automatically becomes the leader.
  9. If node in the cluster is down, it's flagged as UNREACHABLE.
  10. The leader can't execute any leader actions as long as any of the nodes are unreachable. No node can leave or join the cluster in this case.
  11. The unreachable node have to be taken down (use down method or akka.cluster.auto-down-unreachable-after setting).
  12. An actor can use Cluster.subscribe method to subscribe to cluster events: MemberUp, MemberExited, MemberRemoved, UnreachableMember, ReachableMember and CurrentClusterState.
  13. ActorRef.watch method can be used as usual to watch actors in the cluster.
  14. Routers can be used with ClusterRouterPool and ClusterRouterGroup wrappers.
  15. Cluster can be tested on local machine or in multi-JVM environment with sbt-multi-jvm plugin in usual way.

Chapter 15. Actor persistence

  1. Event sourcing captures a sequence of immutable events in a journal.
  2. A persistent actor (PersistentActor trait) works in two modes: it recovers from events (receiveRecover method) or it processes commands (receiveCommand method).
  3. Every persistent actor requires a persistentId to identify the events in the journal for that actor.
  4. persist or persistAsync methods is used in receiveCommand to store events in the journal: persist(Event)(Event => Unit).
  5. Commands are messages that are sent to the actor to execute some logic.
  6. Events provide evidence that the actor has executed the logic correctly.
  7. Snapshots can reduce the required storage space and speed up recovery of the state.
  8. saveSnapshot method is used to save snapshots.
  9. Recovery process can be customized by overriding the recovery method.
  10. Persistence query is a module for querying a journal.
  11. PersistenceQuery.readJournalFor returns a specific read journal which is used to query the data.
  12. Two types of queries:
    • methods starting with current: returns a Source with all currently stored events;
    • methods that don't start with current: returns a Source with current events and continuously provide "live" events as they arrive.
  13. Custom serializer is the best choice in most cases, but it's possible to use third-party libraries: akka-kryo-serialization and Stamina.
  14. Custom serializer has to extend Serializer trait.
  15. Serializers can be bound to the classes in configuration file using akka.actor.serializers and akka.actor.serialization-binding sections.
  16. Akka-persistence doesn't just serialize the events and snapshots directly into the Journal or SnapshotStore. The serialized objects are wrapped into internal format. EventAdapter can help when it's needed to query the backend database of a journal plugin.
  17. Sharding is the distribution of the actor state across servers.
  18. Cluster singleton extension guarantees that there's only one specified actor at any point in time in the cluster.
  19. ClusterSingletonManager actor ensures singleton guarantee and ClusterSingletonProxy actor always points to the current singleton in the cluster.
  20. Default LevelDB journal is not safe for production use - akka-persistence-cassandra should be used.
  21. ClusterSharding extension divides actors to nodes in shards.
  22. Shard is basically a group of sharded actors.
  23. A ShardRegion manages a number of Shards and forwards messages to the sharded actors.
  24. A ShardingCoordinator (which is a cluster singleton) determines which ShardRegion will own the Shard behind the scenes.
  25. Special functions (extractEntityId and extractShardId) should be implemented by sharded actor's companion object to create IDs of commands and shards. The implementation should ensure that there will be no duplicate shared actors running in the cluster.
  26. ClusterSharding module will automatically start a sharded actor once it tries to forward a command. User code shouldn't start sharded actors.
  27. A sharded actor can ask Shard to be passivated when it's not used. This can help to control memory usage.

Chapter 16. Performance tips

  1. 80% of performance improvements can be made by addressing only 20% of the system (Pareto principle):
    • it's possible to make minor changes to improve performance;
    • only changes to 20% of the system will have any effect on the performance;
  2. These 20% are called bottlenecks.
  3. Solving the first bottleneck gives the biggest improvement. Solving the next bottleneck will result in a lesser improvement (the concept of diminishing returns).
  4. Two types of performance problems:
    • the throughput is too low: the number of requests that can be served is too low, usually solved by scaling;
    • the latency is too long: each request takes too long to be processed, generally require design changes.
  5. Performance terms:
    • arrival rate: number of messages arriving during a period;
    • throughput: number of completions during a period;
    • service time: the time needed to process a single job (or service rate - average number of jobs serviced during a period);
    • the latency: the time between the entry and the exit;
    • the utilization: the percentage of the time the node is busy processing messages.
  6. Messages queue size is an important metric indicating that there's a problem.
  7. Optimal performance: each time a task is completed, there's another one to do, but the wait time is vanishingly small.
  8. The queue size can be retrieved from the mailbox, and the utilization needs the statistics of the processing unit.
  9. The following data is needed from the Akka actor:
    • when a message is received and added to the mailbox;
    • when it was sent to be processed, removed from the mailbox and handed over to the processing unit;
    • when the message was done processing and left the processing unit.
  10. These metrics can be retrieved by using custom mailbox and actor trait with overriden receive method (both should send statistics).
  11. To create a custom mailbox MailboxType and MessageQueue traits should be implemented.
  12. MailboxType that is used by a dispatcher should be set in the configuration file (akka.actor.default-mailbox for the default dispatcher).
  13. To improve the performace three parameters can be changed:
    • number of services: actually a scaling-up, increases the possible throughput of the node;
    • arrival rate: reducing number of messages to be processed;
    • service time: making processing faster, also improves the throughput.
  14. Available buit-in dispatchers:
    • Dispatcher: default dispatcher, binds its actors to a thread pool, has fixed thread pool size;
    • PinnedDispatcher: binds an actor to a single and unique thread, thread isn't shared between actors;
    • BalancingDispatcher: redistributes the messages from busy actors to idle actors;
    • CallingThreadDispatcher: uses the current thread to process the messages of an actor, only used for testing.
  15. Props.withDispatcher is used to select the dispatcher defined in the configuration file.
  16. When the CPU utilization is 80% or higher, increasing the number of threads will probably not increase the performance.
  17. When the CPU utilization is low this means that the processing of messages is mainly waiting, freezings and blocking calls should be removed. If this is impossible the number of threads can be increased.
  18. Three parameters to configure the number of threads:
    • parallelism-factor: used to calculate number of threads from available processors;
    • parallelism-min: min number of threads;
    • parallelism-max: max number of threads.
  19. The executore used by the dispatcher can be changed to use a dynamic thread pool.
  20. Three possible values of the executor configuration item:
    • fork-join-executor: default executor;
    • thread-pool-executor: used when a dynamic thread pool is needed;
    • fully qualified class name (FQCN): custom executor (should extend Java ExecutorService).
  21. Two important parameters of thread-pool-executor:
    • task-queue-size: size of waiting thread requests before the pool is increased, -1 disables pool increase (how quickly the pool size will grow);
    • keep-alive-time: time a thread can be idle before it's cleaned up (how quickly the pool size will decrease).
  22. There is throughput dispatcher parameter that defines the maximum number of messages an actor may process before it has to release the thread back to the pool (default value: 5).
  23. Increasing the throughput will improve performance when there are a lot of messages but message processing is fast.
  24. The throughput can negatively impact performance if message processing is long. The throughput-deadline-time parameter bounds time that actor can keep a thread.
  25. Whether it's needed to increase or decrease the throughput configuration is completely dependent on the arrival rate and the function of the system.

Chapter 17. Looking ahead

  1. Akka-typed module provides typed Actor API.
  2. There is no sender() method in akka-typed.
  3. Actors are defined in terms of typed behaviors.
  4. Every message is passed to an immutable behavior.
  5. The behavior of an actor can be swithced over time or stay the same.
  6. preStart, preRestart and other methods are replaced by special signal messages.
  7. The akka-typed API is highly likely to change and shouldn't be used in production.
  8. Akka Distributed Data provides conflict-free replicated data types (CRDTs) in an Akka cluster.
  9. CRDTs always have a merge function that merges data entries into one consistent view without any coordination between the nodes.
  10. The types of data structures that can be used as CRDT are limited.
  11. It's possible to build custom CRDT data structure (it should implement merge function according to the rules of CRDT).
  12. Akka Distributed Data provides Replicator actor to replicate a data structure throughout the Akka cluster.
  13. It's possible to subscribe to updates of the data structure.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment