Skip to content

Instantly share code, notes, and snippets.

@antirez antirez/cgroupsv2.md Secret
Last active Oct 5, 2018

Embed
What would you like to do?
Redis Streams: consumer groups v2 specification document

Redis RCP11 specifies Redis Streams, and includes the specification of the cosumer groups semantics and API. However after further analysis I was not satisfied by the specification of this feature, so this document redefines the feature in different terms, so that the consumer always knows the order of messages are preserved, it is possible for a consumer to re-fetch a part of the stream still with the same ordering and picking exactly the same messages that the server returned to it in the first instance, and finally, it is possible for any consumer to inspect dead (non acknowledged) messages in the stream, with a special API, so that those messages can be processed at a latter time by other consumer if needed.

Description of the problem

Redis Streams consumer groups have a similar goal to Kafka consumer groups, but a completely different implementation and API. But the gist is the same: allow multiple consumers to cooperate in processing messages arriving in a stream, so that each consumers in a given group takes a subset of the messages. Compared of just letting the consumers to shard by key, this feature wants to put the complexity on the server side, from the point of view of recovering after consumers failures, handling membership of the group so that consumers can join and leave, provide observability of the stream processing in real time.

Requirements

  • Identify groups with names.
  • Within groups, identify consumers with the name they claim to have.
  • Return messages in order. Every successive call to XREAD-GROUP by any consumer in a group, with the special ID >, will return messages with greater IDs and with the same order as they were added into the stream.
  • Consumers can also call XREAD-GROUP with an old offset, and will be served with just messages that were assigned to it in the past, and only if not already acknowledged.
  • Serve messages to the consumers based on what consumer is available to process messages, without any pre-defined sharding, so that consumers able to process messages at different rates does not create any processing hole.
  • Ability to inspect a stream and retrieve older non processed messages, so that consumers can cooperate in order to explicitly receive older not in order messages that were not acknowledged as processed in time. This allows to obtain at least once delivery (but subject to the consistency limits of Redis itself in case of failures).

Creating a consumers group

Normally Redis creates objects the first time they are seen. However in the case of consumer groups, the associated state and the lifespan are harder to anticipate and there are no obvious defaults. Nor destroying streams after a specific amount of time of inactivity looks a sensible choice. So streams are created and destroyed explicitly using the XGROUP command. To create a new stream, we have to specify its name, and the initial ID. The initial ID is used as a starting point to return messages go the consumers participating in the group. Only messages with IDs greater than the specified one will be returned.

XGROUP CREATE mystream groupname $

Note that we are not forced to specify '$' as last ID. Any valid ID will do. If we want to force re-processing of the stream in the future, it is possible to set a different ID later, using the SETID subcommand.

XGROUP SETID mystream 1507282666292-0

The GROUP option

The feature is requested by the consumer by calling a variant of the XREAD command called XREAD-GROUP. The command is very similar to XREAD however the GROUP and NAME options are mandatory in XREAD-GROUP command, and the command is considered a write command, while XREAD itself is a read-only command.

The GROUP and NAME options are used to specify the name of the group and the name of the consumer. It is very important that each consumer specifies an unique consumer name, because this is the sole mechanism used in order to provide consumer with the same messages they already saw in the past, in case consumers attempt to re-fetch older messages that were already delivered. Example:

XREAD-GROUP GROUP mygroup NAME c-1234 STREAMS mystream >

The special ID > means that we only want messages that are new (never returned to previous clients). Each group has a current ID tracking the higher ID returned so far, so > basically means we want messages greater than the current group ID. As a result of messages delivered to our client, the group ID will increment. For instance, XREAD-GROUP may return something like this:

Note: the format here is only used for the sake of explaining

1500000000000-0 "Message" "1"
1500000000000-1 "Message" "2"

And now the group mygroup current ID is set to 1500000000000-1. Note that as a side effect of returning certain messages to a given client name, Redis will memorize inside the stream (just conceptually, a different radix tree data structure is used) that the above messages belong to the client called c-1234, and also the time it was delivered.

1500000000000-0 => "c-1234" 2 seconds ago
1500000000000-1 => "c-1234" 2 seconds ago

The above dictionary of delivered messages to specific clients is called the Pending Entires List, or PEL.

When a query for messages with an ID smaller or equal to the current group ID is performed by a client, the only messages returned are the ones in the PEL that also match the name of the client, so basically clients always see the same messages again and again.

For example our client c-1234 could be stopped and restarted later. Supposing it persisted the last processed ID in a permanent storage or database, it will read it back and continue. Such ID could be 1500000000000-0 (since it only processed a message before being stopped). When it will call XREAD-GROUP again specifying such an ID, only the message 1500000000000-1 ill be returned to the client, because successive messages may only be owned by other clients at this point.

Once the client will call XREAD-GROUP with successive messages, eventually no more messages will be returned, and the client may resume the normal operations asking for the > special ID to get new messages.

Note that clients have also a way to ask for not acknowledged messages that are pending for them, so clients are also free to rely on Redis after a restart if they do not want to persist message IDs. This mechanism is covered later in this document.

Acknowledging messages

The way messages are removed from the PEL is by acknowledging them with the XACK command. The command is designed to get multiple message IDs in a single call, so the client may call:

XACK mystream mygroup c-1234 1500000000000-0 1500000000000-1 ...

The general form is:

XACK <key> <group-name> <consumer-name> <ID-1> <ID-2> ... <ID-N>

Now that the messages are acknowledged, nobody will be able to retrieve the messages again the group. Similarly the messages will no longer be in the pending list.

Inspecting the PEL

Clients or system administrators may want to see what messages are currently in the pending entries list. There are two commands to deal with this task:

XGROUP PENDING mystream mygroup => (minid, maxid, count)

The XGROUP INFO command with the PENDING operation will return the min and max ID of the pending messages, and the total amount of pending messages. It is also possible to obtain the PEL entries:

XGROUP GET-PENDING mystream mygroup min max count

This command will return pending messages from min to max, no more than count messages, and for each message the client name and the number of seconds elapsed since the delivery (let's call this message idle time) are also reported.

Clients may get the list of pending messages, and if the age is big enough, they may want to process the messages by claim ownership with the XGROUP CLAIM command, then fetching them via the XRANGE command, and later acknowledge them using XACK in order to cleanup the PEL.

Claiming messages

In order for a client to claim a message that was of some other client, and process it safely, the following command is used:

XGROUP CLAIM <key> <group-name> <consumer-name> <min-idle-time> <ID-1> <ID-2> ... <ID-N>

The command works as follows: it inspects all the messages in the PEL having the specified IDs, and for each message that has an idle time equal or greater than the specified, the following happens:

  1. The message ID is returned in the reply array.
  2. The message idle time is reset to 0 (last delivery is set to now).
  3. The message deliveries counter is incremented.
  4. The message is assigned to the consumer claiming it.

Because of this semantics the first client arriving is served and resets the idle time, and can try the deliery. If it fails, another one can eventually claim the message, and so forth.

XREAD-GROUP and the PEL

Now we can zoom into the semantics of XREAD-GROUP to see what it does when delivering messages from the point of view of the pending entries list (PEL):

  1. When messages are delivered as a result of using the > ID, the PEL entry is created for such a message: the message last delivery time is set to now, and the message is assigned to the consumer that received it.
  2. When messages are re-delivered to clients because a specific ID was specified, the only change to the PEL is just that the PEL entry last delivery time is updated and that the deliveries count is incremented.

So if a client will refetch messages after a restart using something like:

XREAD-GROUP GROUP mygroup NAME c-1234 STREAMS mystream 1500000000000-2

The messages last delivery time will be updated, preventing XGROUP CLAIM called by some other client to get the message. Similarly if instead XGROUP CLAIM was able to claim the message, it was assigned to another client, so it is no longer visible by c-1234 and will not get delivered again to the original owner.

Getting the pending messages after a consumer restart

Clients are not forced to keep the last processed ID in a durable way: if the consistency characteristics of Redis are enough for the problem at hand, instead of storing (for example writing it to disk) the new ID every time a message is successfully acknowledged, clients could use the following command in order to get the pending messsages that were not successfully acknowledged:

XREAD-GROUP GROUP mygroup NAME c-1234 STREAMS mystream 0

By specifying the ID 0 we will be able to obtain the pending messages not yet acknowledged. However note that the consumer can avoid certain duplicate processing of messages by handling the offset locally. For instance if Redis is failing the consumer can still upgrade its local offset, while otherwise it needs to keep trying, and if restarted in the middle the state is lost, so after a restart the message will be reprocessed.

The NOACK option

Certain applications do not need any reliability, and messages in the stream can be trivially lost by clients from time to time without any issue. In this case the XREAD-GROUP command can be used with the NOACK option so that no entry is created inside the PEL for returned messages. This is equivalent to acknowledging the messages in the same moment they are returned.

Deleting a consumers group

Deleting a consumer group is as simple as:

XGROUP DEL mystream mygroup

All the clients blocked for this group are unblocked with an error when the group is destroyed: -ERR group deleted.

Client group and read-only slaves

While XREAD and XRANGE are read only commands that can be scaled with slaves, XREAD-GROUP alters the state of the stream, because groups are part of the stream data structure itself, being persisted and replicated.

For this reason XREAD-GROUP can only be used in master instances.

Example consumer implementation

The following pseudo code shows what a consumer should normally do in order to process part of the messages, recover after a failure, and process pending messages about failing consumers from time to time:

$myname = "consumer-9999" # Obtain an unique name in a smarter way...
$timeout = 60000          # After 60 seconds a message is considered lost

XGROUP CREATE ... # This call may fail if the group already exists.

# Process pending messages if any... This is useful to recover after
# a restart immediately processing pending messages for this consumer.
$to_process = XREAD-GROUP GROUP mygroup NAME $myname STREAMS mystream 0
FOREACH $to_process AS $msg
    PROCESS($msg.body)
    XACK mystream mygroup $msg.id
END

# The main loop for normal operations
WHILE TRUE
    # Fetch messages that nobody has yet fetched from the stream:
    $to_process = XREAD-GROUP GROUP mygroup NAME $myname STREAMS mystream >
    FOREACH $to_process AS $msg
        PROCESS($msg.body)
        XACK mystream mygroup $msg.id
    END

    # This is a very large block in the main loop with the sole goal of
    # handling "lost" messages, that were assigned to some consumer that
    # never acknowledged them.
    FROM-TIME-TO-TIME
        $old_messages = XGROUP GET-PENDING mystream mygroup - + 10
        FOREACH $old_messages AS $old
            IF $old.age > $timeout
                $res = XGROUP CLAIM mystream mygroup $myname $timeout $old.id
                IF LEN($res) > 0
                    PROCESS($old.body)
                    XACK mystream mygroup $msg.id
                END
            END
        END
    END
END

Handling of consumers

Unlike groups, consumers inside a group are auto-created the first time a given consumer receives a message: when this happens we need to create a pending list new entry referencing the consumer.

However clients may end using different algorithms to create conusmer names, and no longer existing consumer names may remain still referenced. However consumers can be reclaimed from memory once the list of pending messages associated with them drops to zero.

The implementation should check periodically the list of consumers in a given group (for instance from time to time when the group is accessed, and only if the number of consumers is already large, or always when the list of consumers is accessed via XGROUP CONSUMERS), and if needed reclaim consumers that have both zero associated messages, and a list seen time which is greater than a few minutes.

Moreover the following commands should be provided:

XGROUP DEL-CONSUMER stream group consumer-name

XGROUP CONSUMERS => Provides a list of all the consumers and associated
                    informations such as # of pending messages, idle time
                    and so forth.

When a consumer is deleted, all the pending messages associated with it will be delted as well, so administrators should make sure to reassign the list of messages to a different consumer before deleting a given consumer.

Implementation strategies

Groups are just referenced inside the stream data type, as members of an hash table where the group name is the key. Group names will be case sensitive. The group representation itself is a cgroup structure composed of a the current group last-delivered-ID, and a radix tree of known consumers, again indexed by consumer name, having as associated value a cgroupConsumer.

Non acknowledged messages are described into two ways, as a global list of not acknowledged messages, and as a consumer specific list of non acknowledged messages:

The group structure will have its list of messages IDs represented into the radix tree, having as associated data the notAckedMessage structure that will reference the cgroupConsumer data structure. The notAckedMessage also contains the message last-delivery attempt, and the number of attempts, which is updated by XREAD-GROUP if the option to update the delivery time is given. So we have a radix tree that represents all the pending messages for the group. However we also need to duplicate such information for each consumer, so the cgroupConsumer structure also have an associated radix tree that represents the pending messages for this client, referencing the notAckedMessage structure for that message. This way we can immediately reply to the client with the message not acknowledged by it having the smallest ID.

Main structures involved:

struct cgroup {
    streamID lastid; /* Last delivered ID (not acknowledged, just delivered). */
    radixtree *pel;  /* Pending messages list, by message ID, aux data is
                        a notAckedMessage structure. */
    radixtree *consumers; /* By name, aux data is cgroupConsumer. */
}

struct cgroupConsumer {
    mstime_t seen;  /* Last time the consumer was seen. */
    sds *name;      /* Consumer name. */
    radixtree *pel; /* Pending list for this client, by message ID, aux data is
                       a reference to notAckedMessage in cgroup->pel. */
}

struct notAckedMessage {
    mstime_t delivery_time; /* Last time the message was delivered to some
                               client. */
    uint64_t delivery_count; /* Number of times this message was re-delivered.*/
    cgroupConsumer *consumer; /* The consumer this message was delivered to. */
}

Replication of groups changes to slaves

There are the following set of changes that must be replicated to slaves and propagated to AOF:

  1. Consumer group creation. This is trivial to accomplish just by propagating the command verbatim in case of success.
  2. Change of group ID. Again we can just propagate the XGROUP SETID.
  3. Creation or updating of an entry in the PEL when a message is delivered to some client. This happens in multiple instances: when XREAD-GROUP returns messages to the client, when a message is claimed by some other client via XGROUP CLAIM. This event will be propagated as a special XGROUP PEL <stream> <group> <message-id> <current-owner-name> <idle-time> <deliveries-count>. Such a command will create the message in the PEL if it does not already exist, or will just update the fields. As a side effect this message also creates the cgroupConsumer representations in the slave side, so that the slave can contain full state.
  4. Deletion of an entry in the PEL: this only happens when XACK is received, so the command can be propagated verbatim.
  5. Deletion of a consumer. This is replicated just as XGROUP DEL-CONSUMER in both the possiblities: because of natural GC of unused conusmers, or when the command is actually executed by the user.

RDB persistence

The current RDB persistence format will be modified in order to persist all the stream metadata on disk. This will include all the groups of a given stream, and for each, the full list of pending messages, clients owners, and so forth.

Credits

This specification benefitted from inputs provided by Roey Berman and by an internal discussion about streams in Tel Aviv Redis offices where multiple people contributed ideas. Also users over Twitter suggested ideas.

@gleicon

This comment has been minimized.

Copy link

commented Nov 3, 2017

My stream workflow with kafka involves a couple of steps: after creating the topic (which could be auto created but I don't because I want to set sensible partitioning defaults for my cluster and capacity planning), I just attach a producer and consumers passing the consumer groups from the client up.

From this design, it seems to be the same except that consumer groups are not auto-created (correct me if I understand it wrong). What should be the standard behavior ? I understand the logic but it adds another schema setup step - if you want to rewind and replay the stream with another consumer group to recreate a database or something similar, you would have to redis-cli XGROUP CREATE. In this case what would happen if you start more than one consumer ? Or the recommendation would be not running any consumer before setting all this up ?

I know it has little to do w/ kafka/kinesis but this is how I've being using streams and got curious on what to implement driver and consumer side. Thanks !

@itamarhaber

This comment has been minimized.

Copy link

commented Dec 19, 2017

I ❤️ the (pending problem of) server-side last known client ack record keeping, as it somewhat comparable to the ease of using expiration in terms of developer experience. The price in resources for maintaining the relevant internal data structures is indeed a consideration, but IMO can be mitigated by adding a knob in form of a configuration directive that controls whether it is used or not.

@antirez

This comment has been minimized.

Copy link
Owner Author

commented Dec 20, 2017

@gleicon: It is possible to create as many consumer groups you want to, but yes, if you want to create a set of clients to re-consume the stream in an independent way, you need to create a new consumer group. At this point you can use the consumer group with as many consumers as you want, without any additional setup of the consumer group, the commands are needed only to create and destroy the consumer group. Of course you do not require a consumer group in order to casually consume the stream of messages, you can just use XREAD or XRANGE if you want. So basically consumers are auto-created inside a group as entities the first time they are seen, while groups must be created and destroyed. We may add in the future a new option to XREAD-GROUP to auto-create the consumer if it does not exist, however it's somewhat an odd use case because IMHO the time to live of a group is not clear, if groups cannot be reclaimed automatically then I've the feeling it's semantically more sounding if groups are also created in an explicit way. Note that you can always try to re-create the group, and what happens is just that you get an error, so a client may always: 1) try to create the group. 2) enter in the consumer main loop.

@itamarhaber yes that's definitely a good opportunity indeed. To start we could add the feature with an O(N) time complexity, because the list of not acknowledged messages will be usually short, and the amount of times the client will try to refetch its last processed ID should be very rare (only after a restart or so). However we can later understand how to make the command faster, for instance by keeping more state (and using more memory) only if some option is given in the Redis configuration, so that the feature would be always present in the Redis API but you can just speed-up it if you want from O(N) with N=number of pending messages in the group to O(1). Or perhaps we could invert the logic and only take the client-specific list of messages that are not acknowledged, and when navigating the list of messages that are pending we could return them by client. I'm trying to finalize that part of the design right now actually...

@Hades32

This comment has been minimized.

Copy link

commented Dec 28, 2017

I'm not a (very) active Kafka user, so I might be wrong, but AFAICR Kafka consumer groups guarantee that only one member of the group gets message for one partition, which in turn us created based on some sharding of message keys.

This allows that e.g. in a "customer" stream one customer is only processed by one consumer and you don't get nasty locking problems (most of the time).

AFAICT your current design randomly(?) distributes messages and thus I'll either get messages for the same customer in two consumers OR I'll have to create streams per customer and then handle sharding again myself...

Maybe I also got it completely wrong though... But how would I realize such a usecase with the current design?

@mp911de

This comment has been minimized.

Copy link

commented Jan 12, 2018

It's a good approach to keep consumer groups as a separate data structure that allows modification/changes. Two things came to my mind:

  • XREAD-GROUP is not consistent regarding its name as it uses a dash (-) to separate parts of the command. Handling XREAD GROUP in a subcommand way on the server side would fit more smoothly. Or maybe XREADGROUP?.
  • I see the point in explicit consumer and consumer creation. Creating consumer groups and consumers on the first read if not already present could help in jumpstarting stream adoption. Maybe controllable with a flag in redis.conf, however on the first read for a consumer group that isn't created Redis could create one using the head of stream to read (would correspond with XGROUP CREATE mystream groupname $). For production use, you'd want total control over consumer groups. For development, explicit creation is rather a repetitive task especially after FLUSHALL.
@antirez

This comment has been minimized.

Copy link
Owner Author

commented Jan 12, 2018

@Hades32 this design attempts to reach the same feature (every consumer sees only a subset of messages, even trying to re-fetch the history later in case of a restart) without having to resort to partitioning. The way this is reached without using partitions is by storing, before receiving an acknowledge of the message already processed, what pending messages were delivered to what client name, so there is an explicit partitioning. For messages which are reported as processed by clients via XACK this information is discarded and the message could not be delivered again (if not by exploring the stream explicitly).

@mp911de Subcommand approach is not usable because XREAD-GROUP si writable, for sure must be a separated command name. It's true that it departs from the command names so far, so probably XREADGROUP is the way to go. Configurabile API semantics are always wrong IMHO because basically what you do is to create a configuration option to create incompatible versions of Redis: the API should be the API, written in the rock :-) However later it is possible to add an option to XREAD-GROUP for automatic creation.

Thanks both for the comments.

@Hades32

This comment has been minimized.

Copy link

commented Jan 13, 2018

But what if I WANT to have (automatic) partitioning?
It's actually pretty nice that one Kafka consumer gets all events concerning one key (e.g. one customer). That way you're sure there is no race condition within one entity...

@antirez

This comment has been minimized.

Copy link
Owner Author

commented Jan 17, 2018

@Hades32 currently there is no way to have automatic partitioning. One could use Redis Cluster with streams of course in order to scale, but in that case, the application code will have to use multiple keys with some form of hashing.

@antirez

This comment has been minimized.

Copy link
Owner Author

commented Jan 17, 2018

To understand why this design was chosen, consider the fact that Redis is primarily a data structures server, the most important design criteria is that a Stream looks like a general tool, a data structure stored at a key that does not do anything too magic. Consumer groups are just an abstraction on top of the ADT (abstract data structure) model, so you can see as Kafka as having logs as a side effect of what it wants to provide to clients, that is, Kakfa is basically a vertical tool. Redis instead attempts to provide just a set of ADTs that can be used as tools for different use cases, but it is more important to make sense as a simple ADT than to specialize as a given tool.

@bsergean

This comment has been minimized.

Copy link

commented Aug 16, 2018

There's a typo in the first sentence:cosumer groups.

@krushita

This comment has been minimized.

Copy link

commented Oct 2, 2018

@antirez I am trying to evaluate if redis streams fits our requirement for ordered processing of messages and have a question about it.
Let's say we have multiple streams S1, S2, S3..Sn and a consumer group containing 4 consumers C1..C4.
I want to create a common consumer group to process messages from multiple streams. However, the restriction is that only 1 message from a single stream can be processed at a given time.
So if C1 is processing a message from S1, no other messages from S1 can be processed while the 1st request is in progress. Messages from S2, S3 etc can still be processed by other consumers in the group. Does xread_group support these semantics? In other words, will xread_group only deliver messages from streams which do not have any pending messages?

@krushita

This comment has been minimized.

Copy link

commented Oct 3, 2018

Reading https://groups.google.com/forum/#!topic/redis-db/td-aPJKycH0 I am still confused on what to conclude.

  • If you use N streams with N consumers, so only a given consumer hits a subset of the N streams, you can scale the above model of 1 stream -> 1 consumer.

If in my use case, the producer is creating and posting to N different streams, and the consumers (say less than N) within the consumer group are guaranteed to receive a message from non-pending stream only, it looks like ordered processing per stream is possible. Can you clarify?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.