Skip to content

Instantly share code, notes, and snippets.

@smaldini
Last active August 29, 2015 14:21
Show Gist options
  • Save smaldini/62df978ab0a2ead73e11 to your computer and use it in GitHub Desktop.
Save smaldini/62df978ab0a2ead73e11 to your computer and use it in GitHub Desktop.
final Broadcaster<String> closeCircuit = Broadcaster.create();
final Stream<String> openCircuit = Streams.just("Alternative Message");
final Action<Publisher<? extends String>, String> circuitSwitcher = Streams.switchOnNext();
final AtomicInteger successes = new AtomicInteger();
final AtomicInteger failures = new AtomicInteger();
final int maxErrors = 3;
Promise<List<String>> promise =
circuitSwitcher
.observe(d -> successes.incrementAndGet())
.when(Throwable.class, error -> failures.incrementAndGet())
.observeStart(s -> {
System.out.println("failures: " + failures + " successes:" + successes);
if (failures.compareAndSet(maxErrors, 0)) {
circuitSwitcher.onNext(openCircuit);
successes.set(0);
Streams.timer(1)
.consume(ignore -> circuitSwitcher.onNext(closeCircuit));
}
})
.retry()
.toList();
circuitSwitcher.onNext(closeCircuit);
closeCircuit.onNext("test1");
closeCircuit.onNext("test2");
closeCircuit.onNext("test3");
closeCircuit.onError(new Exception("test4"));
closeCircuit.onError(new Exception("test5"));
closeCircuit.onError(new Exception("test6"));
Thread.sleep(1500);
closeCircuit.onNext("test7");
closeCircuit.onNext("test8");
closeCircuit.onComplete();
circuitSwitcher.onComplete();
System.out.println(promise.await());
Assert.assertEquals(promise.get().get(0), "test1");
Assert.assertEquals(promise.get().get(1), "test2");
Assert.assertEquals(promise.get().get(2), "test3");
Assert.assertEquals(promise.get().get(3), "Alternative Message");
Assert.assertEquals(promise.get().get(4), "test7");
Assert.assertEquals(promise.get().get(5), "test8");
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment