Skip to content

Instantly share code, notes, and snippets.

@IgorBerman
Created January 26, 2017 10:22
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save IgorBerman/191e514d0245dc9ab2c2ccddb3e85cb0 to your computer and use it in GitHub Desktop.
Save IgorBerman/191e514d0245dc9ab2c2ccddb3e85cb0 to your computer and use it in GitHub Desktop.
akka streams in java with kill switch, shutdown hook and error handling
package com.example;
import java.util.concurrent.CompletionStage;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.japi.function.Function;
import akka.stream.ActorMaterializer;
import akka.stream.ActorMaterializerSettings;
import akka.stream.FlowShape;
import akka.stream.Graph;
import akka.stream.KillSwitches;
import akka.stream.Materializer;
import akka.stream.Supervision;
import akka.stream.UniqueKillSwitch;
import akka.stream.Supervision.Directive;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.RunnableGraph;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
public class ExceptionHandlingExample {
public static void main(String[] args) throws Exception {
Config config = ConfigFactory.parseString("akka { loglevel = \"DEBUG\" }").withFallback(ConfigFactory.load("QuickStart"));
final ActorSystem system = ActorSystem.create("QuickStart", config);
final Function<Throwable, Directive> decider = exc -> {
System.out.println(exc);
if (exc instanceof ArithmeticException) {
return Supervision.resume();
} else if (exc instanceof Exception) {
return Supervision.resume();
}
return Supervision.stop();
};
final Materializer mat = ActorMaterializer.create(ActorMaterializerSettings.create(system).withDebugLogging(true).withSupervisionStrategy(decider), system);
Graph<FlowShape<Integer, Integer>, UniqueKillSwitch> killSwitch = KillSwitches.single();
//take some source
Source<Integer,NotUsed> source = Source.range(1, 100);
//attach ability to stop it with kill switch
Source<Integer, UniqueKillSwitch> sourceWithKillSwitch = source.viaMat(killSwitch, Keep.right());
//this is custom logic that might throw different kinds of errors
Flow<Integer,String,NotUsed> map = Flow.<Integer>create().map(x->{
Thread.sleep(1000);//only for testing
if (x == 99) {
throw new java.lang.Error("Got:"+x);//error
}
if (x % 7 == 0) {
throw new java.lang.ArithmeticException("Got:"+x);//runtime
}
if (x % 11 == 0) {
throw new java.io.IOException("Got:"+x);//checked
}
return x;
})
//map only those that "ok" to string
.map(x -> (x * 2)+"\n");
Source<String, UniqueKillSwitch> sourceAfterMapping = sourceWithKillSwitch.via(map);
Sink<String, CompletionStage<Done>> consoleSink = Sink.foreach(System.out::print);
//create graph
RunnableGraph<Pair<UniqueKillSwitch,CompletionStage<Done>>> graph = sourceAfterMapping.toMat(consoleSink, Keep.both());
//and run it when we got materialization hooks for killing and for listening for completion
Pair<UniqueKillSwitch, CompletionStage<Done>> switchWithCompletion = graph.run(mat);
//"enrich" completion handler with propogated failures(see decider above !ArithmeticException and !Exception)
switchWithCompletion.second().exceptionally(new java.util.function.Function<Throwable, Done>() {
@Override
public Done apply(Throwable t) {
System.out.println("Encountered Exception that stops stream, we are terminating everything " + t + " " + t.getCause());
system.terminate();
return null;
}
});
//shutdown hook for CtrlC or service restarts(i.e. graceful shutdown)
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
System.out.println("Got Cntrl-C, shutting down by kill switch");
switchWithCompletion.first().shutdown();
}
}));
//after all setup is done, let's wait for the system to be terminated
Await.ready(system.whenTerminated(), Duration.Inf());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment