Skip to content

Instantly share code, notes, and snippets.

@jpe42
Last active June 7, 2016 04:05
Show Gist options
  • Save jpe42/5d2d77da970854fb40af707cc44dc3cd to your computer and use it in GitHub Desktop.
Save jpe42/5d2d77da970854fb40af707cc44dc3cd to your computer and use it in GitHub Desktop.
Jedipus Redis Pub/Sub
final Node node = Node.create("127.0.0.1", 6379);
final String channel = "jedipus";
final RedisClientFactory.Builder clientFactory = RedisClientFactory.startBuilding();
final ElementRetryDelay<Node> nodeRetryDelay =
ElementRetryDelay.startBuilding().withMaxDelay(Duration.ofSeconds(30)).create();
final RedisClientExecutor clientExecutor = RedisClientExecutor.startBuilding()
.withClientFactory(clientFactory)
.withRetryDelay(nodeRetryDelay)
.create(() -> node);
final RedisSubscriber subscriber = RedisSubscriber.startBuilding()
// Sends a ping every second to test socket connection.
.withSoTimeoutMillis(500).withOnSocketTimeout(RedisSubscriber::ping)
.withPongConsumer(pong -> System.out.println("I'm Alive!"))
.create(redisExecutor, (ch, payload) -> System.out.format("Received '%s' from channel '%s'.%n", payload, ch));
subscriber.subscribe(channel);
// RedisSubscriber is a Runnable.
// run() processes messages until the subscription count drops back to 0.
final ExecutorService executor = Executors.newSingleThreadExecutor();
final Future<?> subscriberFuture = executor.submit(subscriber);
try (final RedisClient publisher = clientFactory.create(node)) {
publisher.publish(channel, "Hello fellow thread.");
}
// Sleep here to demonstrate socket ping/pong health test.
Thread.sleep(2000);
// Unsubscribe from all channels.
subscriber.unsubscribe();
// Should return immediately as we are no longer subscribed to any channels.
subscriberFuture.get();
subscriber.close();
executor.shutdown();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment