Created
December 3, 2022 18:09
-
-
Save joeyguerra/e09acc09bc9edf1f8960e3158ef6ae61 to your computer and use it in GitHub Desktop.
Listen to Redis streams
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using StackExchange.Redis; | |
namespace Shipment; | |
public class StreamListener | |
{ | |
private readonly ConnectionMultiplexer _multiplexer; | |
private string _consumerGroupName; | |
public StreamListener(ConnectionMultiplexer multiplexer, string consumerGroupName) | |
{ | |
_multiplexer = multiplexer; | |
_consumerGroupName = consumerGroupName; | |
} | |
public void Listen<T>(string streamKey, Func<StreamEntry, T> Deserialize, Func<T, Task> Handle) | |
{ | |
var cts = new CancellationTokenSource(); | |
Console.CancelKeyPress += (sender, e) => | |
{ | |
e.Cancel = true; | |
cts.Cancel(); | |
}; | |
Action<string> log = (message) => Console.WriteLine($"{DateTime.UtcNow} - {message}"); | |
Action<string> error = (message) => Console.Error.WriteLine($"{DateTime.UtcNow} - {message}"); | |
var consumerName = $"only-one-{_consumerGroupName}-consumer"; | |
/* | |
README: | |
By limiting the number of messages that are read, makes it more manageable to manually acknowledge messages out of the PEL so the program can move on. | |
We don't limit reading from the PEL because HOW_MANY_MESSAGES_TO_READ_AT_A_TIME limits the number of messages ever getting into the PEL. | |
*/ | |
const int HOW_MANY_MESSAGES_TO_READ_AT_A_TIME = 10; | |
try | |
{ | |
_multiplexer.GetDatabase().StreamCreateConsumerGroup(streamKey, _consumerGroupName, StreamPosition.NewMessages); | |
} | |
catch (RedisException e) | |
{ | |
error($"From Redis: {e?.Message}"); | |
} | |
Task.Run(async () => | |
{ | |
var loops = 0; | |
var totalMessagesHandled = 0; | |
while (!cts.IsCancellationRequested) | |
{ | |
var entries = new StreamEntry[] { }; | |
try | |
{ | |
var db = _multiplexer.GetDatabase(); | |
if (loops % 20 == 0) log($"Reached {loops / 20} iterations."); | |
entries = await db.StreamReadGroupAsync(streamKey, _consumerGroupName, consumerName, ConsumerPosition.PendingEntriesList); | |
if (entries.Any()) | |
{ | |
log($"Reading from PEL - {entries.Length} messages"); | |
} | |
else | |
{ | |
entries = await db.StreamReadGroupAsync(streamKey, _consumerGroupName, consumerName, | |
ConsumerPosition.UndeliveredMessages, HOW_MANY_MESSAGES_TO_READ_AT_A_TIME); | |
} | |
foreach (var entry in entries) | |
{ | |
try | |
{ | |
var @event = Deserialize(entry); | |
log($"{@event}"); | |
await Handle(@event); | |
} | |
catch (Exception e) | |
{ | |
error($"Error handling event: {entry}, {e}"); | |
} | |
await db.StreamAcknowledgeAsync(streamKey, _consumerGroupName, entry.Id); | |
totalMessagesHandled++; | |
} | |
loops++; | |
} | |
catch (Exception e) | |
{ | |
error($"{e.Message}"); | |
} | |
finally | |
{ | |
if (!entries.Any()) | |
{ | |
await Task.Delay(1000, cts.Token); | |
} | |
} | |
} | |
log($"Shutting down. Sent {totalMessagesHandled} messages"); | |
}); | |
} | |
} | |
public struct ConsumerPosition | |
{ | |
public static RedisValue PendingEntriesList => StreamPosition.Beginning; | |
public static RedisValue UndeliveredMessages => StreamPosition.NewMessages; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment