Skip to content

Instantly share code, notes, and snippets.

@joeyguerra
Created December 3, 2022 18:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joeyguerra/e09acc09bc9edf1f8960e3158ef6ae61 to your computer and use it in GitHub Desktop.
Save joeyguerra/e09acc09bc9edf1f8960e3158ef6ae61 to your computer and use it in GitHub Desktop.
Listen to Redis streams
using StackExchange.Redis;
namespace Shipment;
public class StreamListener
{
private readonly ConnectionMultiplexer _multiplexer;
private string _consumerGroupName;
public StreamListener(ConnectionMultiplexer multiplexer, string consumerGroupName)
{
_multiplexer = multiplexer;
_consumerGroupName = consumerGroupName;
}
public void Listen<T>(string streamKey, Func<StreamEntry, T> Deserialize, Func<T, Task> Handle)
{
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (sender, e) =>
{
e.Cancel = true;
cts.Cancel();
};
Action<string> log = (message) => Console.WriteLine($"{DateTime.UtcNow} - {message}");
Action<string> error = (message) => Console.Error.WriteLine($"{DateTime.UtcNow} - {message}");
var consumerName = $"only-one-{_consumerGroupName}-consumer";
/*
README:
By limiting the number of messages that are read, makes it more manageable to manually acknowledge messages out of the PEL so the program can move on.
We don't limit reading from the PEL because HOW_MANY_MESSAGES_TO_READ_AT_A_TIME limits the number of messages ever getting into the PEL.
*/
const int HOW_MANY_MESSAGES_TO_READ_AT_A_TIME = 10;
try
{
_multiplexer.GetDatabase().StreamCreateConsumerGroup(streamKey, _consumerGroupName, StreamPosition.NewMessages);
}
catch (RedisException e)
{
error($"From Redis: {e?.Message}");
}
Task.Run(async () =>
{
var loops = 0;
var totalMessagesHandled = 0;
while (!cts.IsCancellationRequested)
{
var entries = new StreamEntry[] { };
try
{
var db = _multiplexer.GetDatabase();
if (loops % 20 == 0) log($"Reached {loops / 20} iterations.");
entries = await db.StreamReadGroupAsync(streamKey, _consumerGroupName, consumerName, ConsumerPosition.PendingEntriesList);
if (entries.Any())
{
log($"Reading from PEL - {entries.Length} messages");
}
else
{
entries = await db.StreamReadGroupAsync(streamKey, _consumerGroupName, consumerName,
ConsumerPosition.UndeliveredMessages, HOW_MANY_MESSAGES_TO_READ_AT_A_TIME);
}
foreach (var entry in entries)
{
try
{
var @event = Deserialize(entry);
log($"{@event}");
await Handle(@event);
}
catch (Exception e)
{
error($"Error handling event: {entry}, {e}");
}
await db.StreamAcknowledgeAsync(streamKey, _consumerGroupName, entry.Id);
totalMessagesHandled++;
}
loops++;
}
catch (Exception e)
{
error($"{e.Message}");
}
finally
{
if (!entries.Any())
{
await Task.Delay(1000, cts.Token);
}
}
}
log($"Shutting down. Sent {totalMessagesHandled} messages");
});
}
}
public struct ConsumerPosition
{
public static RedisValue PendingEntriesList => StreamPosition.Beginning;
public static RedisValue UndeliveredMessages => StreamPosition.NewMessages;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment