Skip to content

Instantly share code, notes, and snippets.

@tgrall
Last active January 29, 2020 05:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tgrall/bcb45a6820bb4525dd6fc00dcc3a97fe to your computer and use it in GitHub Desktop.
Save tgrall/bcb45a6820bb4525dd6fc00dcc3a97fe to your computer and use it in GitHub Desktop.
blog-redis-steam-001
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>5.1.8.RELEASE</version>
</dependency>
import io.lettuce.core.*;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
RedisClient redisClient = RedisClient.create("redis://password@host:port"); // change to reflect your environment
StatefulRedisConnection<String, String> connection = redisClient.connect();
RedisCommands<String, String> syncCommands = connection.sync();
connection.close();
redisClient.shutdown();
public static void main(String[] args) {
RedisClient redisClient = RedisClient.create("redis://localhost:6379"); // change to reflect your environment
StatefulRedisConnection<String, String> connection = redisClient.connect();
RedisCommands<String, String> syncCommands = connection.sync();
Map<String, String> messageBody = new HashMap<>();
messageBody.put( "speed", "15" );
messageBody.put( "direction", "270" );
messageBody.put( "sensor_ts", String.valueOf(System.currentTimeMillis()) );
String messageId = syncCommands.xadd(
"weather_sensor:wind",
messageBody);
System.out.println( String.format("Message %s : %s posted", messageId, messageBody) );
connection.close();
redisClient.shutdown();
}
...
try {
// WARNING: Streams must exist before creating the group
// This will not be necessary in Lettuce 5.2, see https://github.com/lettuce-io/lettuce-core/issues/898
syncCommands.xgroupCreate( XReadArgs.StreamOffset.from("weather_sensor:wind", "0-0"), "application_1" );
}
catch (RedisBusyException redisBusyException) {
System.out.println( String.format("\t Group '%s' already exists","application_1"));
}
System.out.println("Waiting for new messages");
while(true) {
List<StreamMessage<String, String>> messages = syncCommands.xreadgroup(
Consumer.from("application_1", "consumer_1"),
XReadArgs.StreamOffset.lastConsumed("weather_sensor:wind")
);
if (!messages.isEmpty()) {
for (StreamMessage<String, String> message : messages) {
System.out.println(message);
// Confirm that the message has been processed using XACK
syncCommands.xack(STREAMS_KEY, "application_1", message.getId());
}
}
}
...
> git clone https://github.com/tgrall/redis-streams-101-java.git
> cd redis-streams-101-java
> mvn clean verify
> mvn exec:java -Dexec.mainClass="com.kanibl.redis.streams.simple.RedisStreams101Producer"
> mvn exec:java -Dexec.mainClass="com.kanibl.redis.streams.simple.RedisStreams101Consumer"
> mvn exec:java -Dexec.mainClass="com.kanibl.redis.streams.simple.RedisStreams101Producer" -Dexec.args="100"
> mvn exec:java -Dexec.mainClass="com.kanibl.redis.streams.simple.RedisStreams101Producer" -Dexec.args="5"
> mvn exec:java -Dexec.mainClass="com.kanibl.redis.streams.simple.RedisStreams101Consumer"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment