Skip to content

Instantly share code, notes, and snippets.

@antirez
Last active July 11, 2020 12:26
Show Gist options
  • Star 16 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save antirez/68e67f3251d10f026861be2d0fe0d2f4 to your computer and use it in GitHub Desktop.
Save antirez/68e67f3251d10f026861be2d0fe0d2f4 to your computer and use it in GitHub Desktop.
Redis Streams: consumer groups v2 specification document

Redis RCP11 specifies Redis Streams, and includes the specification of the cosumer groups semantics and API. However after further analysis I was not satisfied by the specification of this feature, so this document redefines the feature in different terms, so that the consumer always knows the order of messages are preserved, it is possible for a consumer to re-fetch a part of the stream still with the same ordering and picking exactly the same messages that the server returned to it in the first instance, and finally, it is possible for any consumer to inspect dead (non acknowledged) messages in the stream, with a special API, so that those messages can be processed at a latter time by other consumer if needed.

Description of the problem

Redis Streams consumer groups have a similar goal to Kafka consumer groups, but a completely different implementation and API. But the gist is the same: allow multiple consumers to cooperate in processing messages arriving in a stream, so that each consumers in a given group takes a subset of the messages. Compared of just letting the consumers to shard by key, this feature wants to put the complexity on the server side, from the point of view of recovering after consumers failures, handling membership of the group so that consumers can join and leave, provide observability of the stream processing in real time.

Requirements

  • Identify groups with names.
  • Within groups, identify consumers with the name they claim to have.
  • Return messages in order. Every successive call to XREAD-GROUP by any consumer in a group, with the special ID >, will return messages with greater IDs and with the same order as they were added into the stream.
  • Consumers can also call XREAD-GROUP with an old offset, and will be served with just messages that were assigned to it in the past, and only if not already acknowledged.
  • Serve messages to the consumers based on what consumer is available to process messages, without any pre-defined sharding, so that consumers able to process messages at different rates does not create any processing hole.
  • Ability to inspect a stream and retrieve older non processed messages, so that consumers can cooperate in order to explicitly receive older not in order messages that were not acknowledged as processed in time. This allows to obtain at least once delivery (but subject to the consistency limits of Redis itself in case of failures).

Creating a consumers group

Normally Redis creates objects the first time they are seen. However in the case of consumer groups, the associated state and the lifespan are harder to anticipate and there are no obvious defaults. Nor destroying streams after a specific amount of time of inactivity looks a sensible choice. So streams are created and destroyed explicitly using the XGROUP command. To create a new stream, we have to specify its name, and the initial ID. The initial ID is used as a starting point to return messages go the consumers participating in the group. Only messages with IDs greater than the specified one will be returned.

XGROUP CREATE mystream groupname $

Note that we are not forced to specify '$' as last ID. Any valid ID will do. If we want to force re-processing of the stream in the future, it is possible to set a different ID later, using the SETID subcommand.

XGROUP SETID mystream 1507282666292-0

The GROUP option

The feature is requested by the consumer by calling a variant of the XREAD command called XREAD-GROUP. The command is very similar to XREAD however the GROUP and NAME options are mandatory in XREAD-GROUP command, and the command is considered a write command, while XREAD itself is a read-only command.

The GROUP and NAME options are used to specify the name of the group and the name of the consumer. It is very important that each consumer specifies an unique consumer name, because this is the sole mechanism used in order to provide consumer with the same messages they already saw in the past, in case consumers attempt to re-fetch older messages that were already delivered. Example:

XREAD-GROUP GROUP mygroup NAME c-1234 STREAMS mystream >

The special ID > means that we only want messages that are new (never returned to previous clients). Each group has a current ID tracking the higher ID returned so far, so > basically means we want messages greater than the current group ID. As a result of messages delivered to our client, the group ID will increment. For instance, XREAD-GROUP may return something like this:

Note: the format here is only used for the sake of explaining

1500000000000-0 "Message" "1"
1500000000000-1 "Message" "2"

And now the group mygroup current ID is set to 1500000000000-1. Note that as a side effect of returning certain messages to a given client name, Redis will memorize inside the stream (just conceptually, a different radix tree data structure is used) that the above messages belong to the client called c-1234, and also the time it was delivered.

1500000000000-0 => "c-1234" 2 seconds ago
1500000000000-1 => "c-1234" 2 seconds ago

The above dictionary of delivered messages to specific clients is called the Pending Entires List, or PEL.

When a query for messages with an ID smaller or equal to the current group ID is performed by a client, the only messages returned are the ones in the PEL that also match the name of the client, so basically clients always see the same messages again and again.

For example our client c-1234 could be stopped and restarted later. Supposing it persisted the last processed ID in a permanent storage or database, it will read it back and continue. Such ID could be 1500000000000-0 (since it only processed a message before being stopped). When it will call XREAD-GROUP again specifying such an ID, only the message 1500000000000-1 ill be returned to the client, because successive messages may only be owned by other clients at this point.

Once the client will call XREAD-GROUP with successive messages, eventually no more messages will be returned, and the client may resume the normal operations asking for the > special ID to get new messages.

Note that clients have also a way to ask for not acknowledged messages that are pending for them, so clients are also free to rely on Redis after a restart if they do not want to persist message IDs. This mechanism is covered later in this document.

Acknowledging messages

The way messages are removed from the PEL is by acknowledging them with the XACK command. The command is designed to get multiple message IDs in a single call, so the client may call:

XACK mystream mygroup c-1234 1500000000000-0 1500000000000-1 ...

The general form is:

XACK <key> <group-name> <consumer-name> <ID-1> <ID-2> ... <ID-N>

Now that the messages are acknowledged, nobody will be able to retrieve the messages again the group. Similarly the messages will no longer be in the pending list.

Inspecting the PEL

Clients or system administrators may want to see what messages are currently in the pending entries list. There are two commands to deal with this task:

XGROUP PENDING mystream mygroup => (minid, maxid, count)

The XGROUP INFO command with the PENDING operation will return the min and max ID of the pending messages, and the total amount of pending messages. It is also possible to obtain the PEL entries:

XGROUP GET-PENDING mystream mygroup min max count

This command will return pending messages from min to max, no more than count messages, and for each message the client name and the number of seconds elapsed since the delivery (let's call this message idle time) are also reported.

Clients may get the list of pending messages, and if the age is big enough, they may want to process the messages by claim ownership with the XGROUP CLAIM command, then fetching them via the XRANGE command, and later acknowledge them using XACK in order to cleanup the PEL.

Claiming messages

In order for a client to claim a message that was of some other client, and process it safely, the following command is used:

XGROUP CLAIM <key> <group-name> <consumer-name> <min-idle-time> <ID-1> <ID-2> ... <ID-N>

The command works as follows: it inspects all the messages in the PEL having the specified IDs, and for each message that has an idle time equal or greater than the specified, the following happens:

  1. The message ID is returned in the reply array.
  2. The message idle time is reset to 0 (last delivery is set to now).
  3. The message deliveries counter is incremented.
  4. The message is assigned to the consumer claiming it.

Because of this semantics the first client arriving is served and resets the idle time, and can try the deliery. If it fails, another one can eventually claim the message, and so forth.

XREAD-GROUP and the PEL

Now we can zoom into the semantics of XREAD-GROUP to see what it does when delivering messages from the point of view of the pending entries list (PEL):

  1. When messages are delivered as a result of using the > ID, the PEL entry is created for such a message: the message last delivery time is set to now, and the message is assigned to the consumer that received it.
  2. When messages are re-delivered to clients because a specific ID was specified, the only change to the PEL is just that the PEL entry last delivery time is updated and that the deliveries count is incremented.

So if a client will refetch messages after a restart using something like:

XREAD-GROUP GROUP mygroup NAME c-1234 STREAMS mystream 1500000000000-2

The messages last delivery time will be updated, preventing XGROUP CLAIM called by some other client to get the message. Similarly if instead XGROUP CLAIM was able to claim the message, it was assigned to another client, so it is no longer visible by c-1234 and will not get delivered again to the original owner.

Getting the pending messages after a consumer restart

Clients are not forced to keep the last processed ID in a durable way: if the consistency characteristics of Redis are enough for the problem at hand, instead of storing (for example writing it to disk) the new ID every time a message is successfully acknowledged, clients could use the following command in order to get the pending messsages that were not successfully acknowledged:

XREAD-GROUP GROUP mygroup NAME c-1234 STREAMS mystream 0

By specifying the ID 0 we will be able to obtain the pending messages not yet acknowledged. However note that the consumer can avoid certain duplicate processing of messages by handling the offset locally. For instance if Redis is failing the consumer can still upgrade its local offset, while otherwise it needs to keep trying, and if restarted in the middle the state is lost, so after a restart the message will be reprocessed.

The NOACK option

Certain applications do not need any reliability, and messages in the stream can be trivially lost by clients from time to time without any issue. In this case the XREAD-GROUP command can be used with the NOACK option so that no entry is created inside the PEL for returned messages. This is equivalent to acknowledging the messages in the same moment they are returned.

Deleting a consumers group

Deleting a consumer group is as simple as:

XGROUP DEL mystream mygroup

All the clients blocked for this group are unblocked with an error when the group is destroyed: -ERR group deleted.

Client group and read-only slaves

While XREAD and XRANGE are read only commands that can be scaled with slaves, XREAD-GROUP alters the state of the stream, because groups are part of the stream data structure itself, being persisted and replicated.

For this reason XREAD-GROUP can only be used in master instances.

Example consumer implementation

The following pseudo code shows what a consumer should normally do in order to process part of the messages, recover after a failure, and process pending messages about failing consumers from time to time:

$myname = "consumer-9999" # Obtain an unique name in a smarter way...
$timeout = 60000          # After 60 seconds a message is considered lost

XGROUP CREATE ... # This call may fail if the group already exists.

# Process pending messages if any... This is useful to recover after
# a restart immediately processing pending messages for this consumer.
$to_process = XREAD-GROUP GROUP mygroup NAME $myname STREAMS mystream 0
FOREACH $to_process AS $msg
    PROCESS($msg.body)
    XACK mystream mygroup $msg.id
END

# The main loop for normal operations
WHILE TRUE
    # Fetch messages that nobody has yet fetched from the stream:
    $to_process = XREAD-GROUP GROUP mygroup NAME $myname STREAMS mystream >
    FOREACH $to_process AS $msg
        PROCESS($msg.body)
        XACK mystream mygroup $msg.id
    END

    # This is a very large block in the main loop with the sole goal of
    # handling "lost" messages, that were assigned to some consumer that
    # never acknowledged them.
    FROM-TIME-TO-TIME
        $old_messages = XGROUP GET-PENDING mystream mygroup - + 10
        FOREACH $old_messages AS $old
            IF $old.age > $timeout
                $res = XGROUP CLAIM mystream mygroup $myname $timeout $old.id
                IF LEN($res) > 0
                    PROCESS($old.body)
                    XACK mystream mygroup $msg.id
                END
            END
        END
    END
END

Handling of consumers

Unlike groups, consumers inside a group are auto-created the first time a given consumer receives a message: when this happens we need to create a pending list new entry referencing the consumer.

However clients may end using different algorithms to create conusmer names, and no longer existing consumer names may remain still referenced. However consumers can be reclaimed from memory once the list of pending messages associated with them drops to zero.

The implementation should check periodically the list of consumers in a given group (for instance from time to time when the group is accessed, and only if the number of consumers is already large, or always when the list of consumers is accessed via XGROUP CONSUMERS), and if needed reclaim consumers that have both zero associated messages, and a list seen time which is greater than a few minutes.

Moreover the following commands should be provided:

XGROUP DEL-CONSUMER stream group consumer-name

XGROUP CONSUMERS => Provides a list of all the consumers and associated
                    informations such as # of pending messages, idle time
                    and so forth.

When a consumer is deleted, all the pending messages associated with it will be delted as well, so administrators should make sure to reassign the list of messages to a different consumer before deleting a given consumer.

Implementation strategies

Groups are just referenced inside the stream data type, as members of an hash table where the group name is the key. Group names will be case sensitive. The group representation itself is a cgroup structure composed of a the current group last-delivered-ID, and a radix tree of known consumers, again indexed by consumer name, having as associated value a cgroupConsumer.

Non acknowledged messages are described into two ways, as a global list of not acknowledged messages, and as a consumer specific list of non acknowledged messages:

The group structure will have its list of messages IDs represented into the radix tree, having as associated data the notAckedMessage structure that will reference the cgroupConsumer data structure. The notAckedMessage also contains the message last-delivery attempt, and the number of attempts, which is updated by XREAD-GROUP if the option to update the delivery time is given. So we have a radix tree that represents all the pending messages for the group. However we also need to duplicate such information for each consumer, so the cgroupConsumer structure also have an associated radix tree that represents the pending messages for this client, referencing the notAckedMessage structure for that message. This way we can immediately reply to the client with the message not acknowledged by it having the smallest ID.

Main structures involved:

struct cgroup {
    streamID lastid; /* Last delivered ID (not acknowledged, just delivered). */
    radixtree *pel;  /* Pending messages list, by message ID, aux data is
                        a notAckedMessage structure. */
    radixtree *consumers; /* By name, aux data is cgroupConsumer. */
}

struct cgroupConsumer {
    mstime_t seen;  /* Last time the consumer was seen. */
    sds *name;      /* Consumer name. */
    radixtree *pel; /* Pending list for this client, by message ID, aux data is
                       a reference to notAckedMessage in cgroup->pel. */
}

struct notAckedMessage {
    mstime_t delivery_time; /* Last time the message was delivered to some
                               client. */
    uint64_t delivery_count; /* Number of times this message was re-delivered.*/
    cgroupConsumer *consumer; /* The consumer this message was delivered to. */
}

Replication of groups changes to slaves

There are the following set of changes that must be replicated to slaves and propagated to AOF:

  1. Consumer group creation. This is trivial to accomplish just by propagating the command verbatim in case of success.
  2. Change of group ID. Again we can just propagate the XGROUP SETID.
  3. Creation or updating of an entry in the PEL when a message is delivered to some client. This happens in multiple instances: when XREAD-GROUP returns messages to the client, when a message is claimed by some other client via XGROUP CLAIM. This event will be propagated as a special XGROUP PEL <stream> <group> <message-id> <current-owner-name> <idle-time> <deliveries-count>. Such a command will create the message in the PEL if it does not already exist, or will just update the fields. As a side effect this message also creates the cgroupConsumer representations in the slave side, so that the slave can contain full state.
  4. Deletion of an entry in the PEL: this only happens when XACK is received, so the command can be propagated verbatim.
  5. Deletion of a consumer. This is replicated just as XGROUP DEL-CONSUMER in both the possiblities: because of natural GC of unused conusmers, or when the command is actually executed by the user.

RDB persistence

The current RDB persistence format will be modified in order to persist all the stream metadata on disk. This will include all the groups of a given stream, and for each, the full list of pending messages, clients owners, and so forth.

Credits

This specification benefitted from inputs provided by Roey Berman and by an internal discussion about streams in Tel Aviv Redis offices where multiple people contributed ideas. Also users over Twitter suggested ideas.

@krushita
Copy link

krushita commented Oct 2, 2018

@antirez I am trying to evaluate if redis streams fits our requirement for ordered processing of messages and have a question about it.
Let's say we have multiple streams S1, S2, S3..Sn and a consumer group containing 4 consumers C1..C4.
I want to create a common consumer group to process messages from multiple streams. However, the restriction is that only 1 message from a single stream can be processed at a given time.
So if C1 is processing a message from S1, no other messages from S1 can be processed while the 1st request is in progress. Messages from S2, S3 etc can still be processed by other consumers in the group. Does xread_group support these semantics? In other words, will xread_group only deliver messages from streams which do not have any pending messages?

@krushita
Copy link

krushita commented Oct 3, 2018

Reading https://groups.google.com/forum/#!topic/redis-db/td-aPJKycH0 I am still confused on what to conclude.

  • If you use N streams with N consumers, so only a given consumer hits a subset of the N streams, you can scale the above model of 1 stream -> 1 consumer.

If in my use case, the producer is creating and posting to N different streams, and the consumers (say less than N) within the consumer group are guaranteed to receive a message from non-pending stream only, it looks like ordered processing per stream is possible. Can you clarify?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment