Redis RCP11 specifies Redis Streams, and includes the specification of the cosumer groups semantics and API. However after further analysis I was not satisfied by the specification of this feature, so this document redefines the feature in different terms, so that the consumer always knows the order of messages are preserved, it is possible for a consumer to re-fetch a part of the stream still with the same ordering and picking exactly the same messages that the server returned to it in the first instance, and finally, it is possible for any consumer to inspect dead (non acknowledged) messages in the stream, with a special API, so that those messages can be processed at a latter time by other consumer if needed.
Redis Streams consumer groups have a similar goal to Kafka consumer groups, but a completely different implementation and API. But the gist is the same: allow multiple consumers to cooperate in processing messages arriving in a stream, so that each consumers in a given group takes a subset of the messages. Compared of just letting the consumers to shard by key, this feature wants to put the complexity on the server side, from the point of view of recovering after consumers failures, handling membership of the group so that consumers can join and leave, provide observability of the stream processing in real time.
- Identify groups with names.
- Within groups, identify consumers with the name they claim to have.
- Return messages in order. Every successive call to
XREAD-GROUP
by any consumer in a group, with the special ID>
, will return messages with greater IDs and with the same order as they were added into the stream. - Consumers can also call
XREAD-GROUP
with an old offset, and will be served with just messages that were assigned to it in the past, and only if not already acknowledged. - Serve messages to the consumers based on what consumer is available to process messages, without any pre-defined sharding, so that consumers able to process messages at different rates does not create any processing hole.
- Ability to inspect a stream and retrieve older non processed messages, so that consumers can cooperate in order to explicitly receive older not in order messages that were not acknowledged as processed in time. This allows to obtain at least once delivery (but subject to the consistency limits of Redis itself in case of failures).
Normally Redis creates objects the first time they are seen. However in the
case of consumer groups, the associated state and the lifespan are harder to
anticipate and there are no obvious defaults. Nor destroying streams after
a specific amount of time of inactivity looks a sensible choice. So streams
are created and destroyed explicitly using the XGROUP
command. To create
a new stream, we have to specify its name, and the initial ID. The initial
ID is used as a starting point to return messages go the consumers participating
in the group. Only messages with IDs greater than the specified one will be
returned.
XGROUP CREATE mystream groupname $
Note that we are not forced to specify '$' as last ID. Any valid ID will do.
If we want to force re-processing of the stream in the future, it is possible
to set a different ID later, using the SETID
subcommand.
XGROUP SETID mystream 1507282666292-0
The feature is requested by the consumer by calling a variant of the XREAD
command called XREAD-GROUP
. The command is very similar to XREAD
however
the GROUP
and NAME
options are mandatory in XREAD-GROUP
command, and the
command is considered a write command, while XREAD
itself is a read-only
command.
The GROUP
and NAME
options are used to specify the name of the group and
the name of the consumer. It is very important that each consumer specifies an
unique consumer name, because this is the sole mechanism used in order to provide
consumer with the same messages they already saw in the past, in case consumers
attempt to re-fetch older messages that were already delivered. Example:
XREAD-GROUP GROUP mygroup NAME c-1234 STREAMS mystream >
The special ID >
means that we only want messages that are new (never returned
to previous clients). Each group has a current ID tracking the higher ID
returned so far, so >
basically means we want messages greater than the
current group ID. As a result of messages delivered to our client, the group
ID will increment. For instance, XREAD-GROUP
may return something like this:
Note: the format here is only used for the sake of explaining
1500000000000-0 "Message" "1"
1500000000000-1 "Message" "2"
And now the group mygroup
current ID is set to 1500000000000-1.
Note that as a side effect of returning certain messages to a given client
name, Redis will memorize inside the stream (just conceptually, a different
radix tree data structure is used) that the above messages belong to the
client called c-1234
, and also the time it was delivered.
1500000000000-0 => "c-1234" 2 seconds ago
1500000000000-1 => "c-1234" 2 seconds ago
The above dictionary of delivered messages to specific clients is called the Pending Entires List, or PEL.
When a query for messages with an ID smaller or equal to the current group ID is performed by a client, the only messages returned are the ones in the PEL that also match the name of the client, so basically clients always see the same messages again and again.
For example our client c-1234
could be stopped and restarted later. Supposing
it persisted the last processed ID in a permanent storage or database, it
will read it back and continue. Such ID could be 1500000000000-0
(since it
only processed a message before being stopped). When it will call
XREAD-GROUP
again specifying such an ID, only the message 1500000000000-1
ill be returned to the client, because successive messages may only be owned
by other clients at this point.
Once the client will call XREAD-GROUP
with successive messages, eventually
no more messages will be returned, and the client may resume the normal
operations asking for the >
special ID to get new messages.
Note that clients have also a way to ask for not acknowledged messages that are pending for them, so clients are also free to rely on Redis after a restart if they do not want to persist message IDs. This mechanism is covered later in this document.
The way messages are removed from the PEL is by acknowledging them with the
XACK
command. The command is designed to get multiple message IDs in a single
call, so the client may call:
XACK mystream mygroup c-1234 1500000000000-0 1500000000000-1 ...
The general form is:
XACK <key> <group-name> <consumer-name> <ID-1> <ID-2> ... <ID-N>
Now that the messages are acknowledged, nobody will be able to retrieve the messages again the group. Similarly the messages will no longer be in the pending list.
Clients or system administrators may want to see what messages are currently in the pending entries list. There are two commands to deal with this task:
XGROUP PENDING mystream mygroup => (minid, maxid, count)
The XGROUP INFO
command with the PENDING
operation will return the min
and max ID of the pending messages, and the total amount of pending messages.
It is also possible to obtain the PEL entries:
XGROUP GET-PENDING mystream mygroup min max count
This command will return pending messages from min
to max
, no more than
count
messages, and for each message the client name and the number of
seconds elapsed since the delivery (let's call this message idle time) are also
reported.
Clients may get the list of pending messages, and if the
age is big enough, they may want to process the messages by claim ownership
with the XGROUP CLAIM
command, then fetching them via the XRANGE
command,
and later acknowledge them using XACK
in order to cleanup the PEL.
In order for a client to claim a message that was of some other client, and process it safely, the following command is used:
XGROUP CLAIM <key> <group-name> <consumer-name> <min-idle-time> <ID-1> <ID-2> ... <ID-N>
The command works as follows: it inspects all the messages in the PEL having the specified IDs, and for each message that has an idle time equal or greater than the specified, the following happens:
- The message ID is returned in the reply array.
- The message idle time is reset to 0 (last delivery is set to now).
- The message deliveries counter is incremented.
- The message is assigned to the consumer claiming it.
Because of this semantics the first client arriving is served and resets the idle time, and can try the deliery. If it fails, another one can eventually claim the message, and so forth.
Now we can zoom into the semantics of XREAD-GROUP to see what it does when delivering messages from the point of view of the pending entries list (PEL):
- When messages are delivered as a result of using the
>
ID, the PEL entry is created for such a message: the message last delivery time is set to now, and the message is assigned to the consumer that received it. - When messages are re-delivered to clients because a specific ID was specified, the only change to the PEL is just that the PEL entry last delivery time is updated and that the deliveries count is incremented.
So if a client will refetch messages after a restart using something like:
XREAD-GROUP GROUP mygroup NAME c-1234 STREAMS mystream 1500000000000-2
The messages last delivery time will be updated, preventing XGROUP CLAIM
called by some other client to get the message. Similarly if instead
XGROUP CLAIM
was able to claim the message, it was assigned to another client,
so it is no longer visible by c-1234 and will not get delivered again to the
original owner.
Clients are not forced to keep the last processed ID in a durable way: if the consistency characteristics of Redis are enough for the problem at hand, instead of storing (for example writing it to disk) the new ID every time a message is successfully acknowledged, clients could use the following command in order to get the pending messsages that were not successfully acknowledged:
XREAD-GROUP GROUP mygroup NAME c-1234 STREAMS mystream 0
By specifying the ID 0 we will be able to obtain the pending messages not yet acknowledged. However note that the consumer can avoid certain duplicate processing of messages by handling the offset locally. For instance if Redis is failing the consumer can still upgrade its local offset, while otherwise it needs to keep trying, and if restarted in the middle the state is lost, so after a restart the message will be reprocessed.
Certain applications do not need any reliability, and messages in the stream
can be trivially lost by clients from time to time without any issue. In
this case the XREAD-GROUP
command can be used with the NOACK
option so that
no entry is created inside the PEL for returned messages. This is equivalent
to acknowledging the messages in the same moment they are returned.
Deleting a consumer group is as simple as:
XGROUP DEL mystream mygroup
All the clients blocked for this group are unblocked with an error when the
group is destroyed: -ERR group deleted
.
While XREAD
and XRANGE
are read only commands that can be scaled with
slaves, XREAD-GROUP
alters the state of the stream, because groups are
part of the stream data structure itself, being persisted and replicated.
For this reason XREAD-GROUP
can only be used in master instances.
The following pseudo code shows what a consumer should normally do in order to process part of the messages, recover after a failure, and process pending messages about failing consumers from time to time:
$myname = "consumer-9999" # Obtain an unique name in a smarter way...
$timeout = 60000 # After 60 seconds a message is considered lost
XGROUP CREATE ... # This call may fail if the group already exists.
# Process pending messages if any... This is useful to recover after
# a restart immediately processing pending messages for this consumer.
$to_process = XREAD-GROUP GROUP mygroup NAME $myname STREAMS mystream 0
FOREACH $to_process AS $msg
PROCESS($msg.body)
XACK mystream mygroup $msg.id
END
# The main loop for normal operations
WHILE TRUE
# Fetch messages that nobody has yet fetched from the stream:
$to_process = XREAD-GROUP GROUP mygroup NAME $myname STREAMS mystream >
FOREACH $to_process AS $msg
PROCESS($msg.body)
XACK mystream mygroup $msg.id
END
# This is a very large block in the main loop with the sole goal of
# handling "lost" messages, that were assigned to some consumer that
# never acknowledged them.
FROM-TIME-TO-TIME
$old_messages = XGROUP GET-PENDING mystream mygroup - + 10
FOREACH $old_messages AS $old
IF $old.age > $timeout
$res = XGROUP CLAIM mystream mygroup $myname $timeout $old.id
IF LEN($res) > 0
PROCESS($old.body)
XACK mystream mygroup $msg.id
END
END
END
END
END
Unlike groups, consumers inside a group are auto-created the first time a given consumer receives a message: when this happens we need to create a pending list new entry referencing the consumer.
However clients may end using different algorithms to create conusmer names, and no longer existing consumer names may remain still referenced. However consumers can be reclaimed from memory once the list of pending messages associated with them drops to zero.
The implementation should check periodically the list of consumers in a
given group (for instance from time to time when the group is accessed, and
only if the number of consumers is already large, or always when the list
of consumers is accessed via XGROUP CONSUMERS
), and if needed reclaim
consumers that have both zero associated messages, and a list seen time
which is greater than a few minutes.
Moreover the following commands should be provided:
XGROUP DEL-CONSUMER stream group consumer-name
XGROUP CONSUMERS => Provides a list of all the consumers and associated
informations such as # of pending messages, idle time
and so forth.
When a consumer is deleted, all the pending messages associated with it will be delted as well, so administrators should make sure to reassign the list of messages to a different consumer before deleting a given consumer.
Groups are just referenced inside the stream data type, as members of an hash table where the group name is the key.
Group names will be case sensitive. The group representation itself is a cgroup
structure composed of a the current group last-delivered-ID, and a radix tree of known consumers, again indexed by consumer name, having as associated value a cgroupConsumer
.
Non acknowledged messages are described into two ways, as a global list of not acknowledged messages, and as a consumer specific list of non acknowledged messages:
The group structure will have its list of messages IDs represented into the radix tree, having as associated data the notAckedMessage
structure that will reference the cgroupConsumer
data structure. The notAckedMessage
also contains the message last-delivery attempt, and the number of attempts, which is updated by XREAD-GROUP if the option to update the delivery time is given. So we have a radix tree that represents all the pending messages for the group. However we also need to duplicate such information for each consumer, so the cgroupConsumer
structure also have an associated radix tree that represents the pending messages for this client, referencing the notAckedMessage
structure for that message. This way we can immediately reply to the client with the message not acknowledged by it having the smallest ID.
Main structures involved:
struct cgroup {
streamID lastid; /* Last delivered ID (not acknowledged, just delivered). */
radixtree *pel; /* Pending messages list, by message ID, aux data is
a notAckedMessage structure. */
radixtree *consumers; /* By name, aux data is cgroupConsumer. */
}
struct cgroupConsumer {
mstime_t seen; /* Last time the consumer was seen. */
sds *name; /* Consumer name. */
radixtree *pel; /* Pending list for this client, by message ID, aux data is
a reference to notAckedMessage in cgroup->pel. */
}
struct notAckedMessage {
mstime_t delivery_time; /* Last time the message was delivered to some
client. */
uint64_t delivery_count; /* Number of times this message was re-delivered.*/
cgroupConsumer *consumer; /* The consumer this message was delivered to. */
}
There are the following set of changes that must be replicated to slaves and propagated to AOF:
- Consumer group creation. This is trivial to accomplish just by propagating the command verbatim in case of success.
- Change of group ID. Again we can just propagate the
XGROUP SETID
. - Creation or updating of an entry in the PEL when a message is delivered to some client. This happens in multiple instances: when XREAD-GROUP returns messages to the client, when a message is claimed by some other client via
XGROUP CLAIM
. This event will be propagated as a specialXGROUP PEL <stream> <group> <message-id> <current-owner-name> <idle-time> <deliveries-count>
. Such a command will create the message in the PEL if it does not already exist, or will just update the fields. As a side effect this message also creates thecgroupConsumer
representations in the slave side, so that the slave can contain full state. - Deletion of an entry in the PEL: this only happens when
XACK
is received, so the command can be propagated verbatim. - Deletion of a consumer. This is replicated just as
XGROUP DEL-CONSUMER
in both the possiblities: because of natural GC of unused conusmers, or when the command is actually executed by the user.
The current RDB persistence format will be modified in order to persist all the stream metadata on disk. This will include all the groups of a given stream, and for each, the full list of pending messages, clients owners, and so forth.
This specification benefitted from inputs provided by Roey Berman and by an internal discussion about streams in Tel Aviv Redis offices where multiple people contributed ideas. Also users over Twitter suggested ideas.
My stream workflow with kafka involves a couple of steps: after creating the topic (which could be auto created but I don't because I want to set sensible partitioning defaults for my cluster and capacity planning), I just attach a producer and consumers passing the consumer groups from the client up.
From this design, it seems to be the same except that consumer groups are not auto-created (correct me if I understand it wrong). What should be the standard behavior ? I understand the logic but it adds another schema setup step - if you want to rewind and replay the stream with another consumer group to recreate a database or something similar, you would have to redis-cli XGROUP CREATE. In this case what would happen if you start more than one consumer ? Or the recommendation would be not running any consumer before setting all this up ?
I know it has little to do w/ kafka/kinesis but this is how I've being using streams and got curious on what to implement driver and consumer side. Thanks !