Skip to content

Instantly share code, notes, and snippets.

@ismkdc
Created June 17, 2024 23:27
Show Gist options
  • Save ismkdc/1eef375d46995fa7a793a5584f2f3a89 to your computer and use it in GitHub Desktop.
Save ismkdc/1eef375d46995fa7a793a5584f2f3a89 to your computer and use it in GitHub Desktop.
public abstract class BaseConsumer(IRedisClient redisClient) : BackgroundService
{
private readonly string _consumerName = Guid.NewGuid().ToString();
private readonly IDatabase _database = redisClient.GetDefaultDatabase().Database;
protected abstract int ReadMaxCount { get; }
protected abstract string StreamName { get; }
protected abstract string GroupName { get; }
protected async Task Consume
(
CancellationToken stoppingToken
)
{
while (!stoppingToken.IsCancellationRequested)
try
{
if (!await _database.KeyExistsAsync(StreamName) ||
(await _database.StreamGroupInfoAsync(StreamName)).All(x => x.Name != GroupName)
)
await _database.StreamCreateConsumerGroupAsync(
StreamName,
GroupName,
StreamPosition.Beginning
);
await Task.Delay(500, stoppingToken);
while (!stoppingToken.IsCancellationRequested)
try
{
var claimed = await ClaimPendingMessages();
if (claimed.ClaimedEntries.Length > 0)
await ReadClaimedMessages();
await ReadNewMessages();
await Task.Delay(100, stoppingToken);
}
catch (Exception e)
{
Console.WriteLine(e);
}
await Task.Delay(5000, stoppingToken);
}
catch (Exception e)
{
Console.WriteLine(e);
}
}
private async Task ReadClaimedMessages()
{
var streamResult =
await _database.StreamReadGroupAsync
(
StreamName,
GroupName,
_consumerName,
StreamPosition.Beginning,
ReadMaxCount
);
if (streamResult.Length > 0) await HandleMessages(streamResult);
}
private async Task ReadNewMessages()
{
var streamResult =
await _database.StreamReadGroupAsync
(
StreamName,
GroupName,
_consumerName,
">",
ReadMaxCount
);
if (streamResult.Length > 0) await HandleMessages(streamResult);
}
private async Task HandleMessages(IEnumerable<StreamEntry> streamResult)
{
foreach (var data in streamResult)
foreach (var messageItem in data.Values)
try
{
await OnMessage(messageItem, data.Id);
await _database.StreamAcknowledgeAsync(StreamName, GroupName, data.Id);
}
catch (Exception e)
{
Console.WriteLine(e);
}
}
private Task<StreamAutoClaimResult> ClaimPendingMessages()
{
return _database.StreamAutoClaimAsync
(
StreamName,
GroupName,
_consumerName,
TimeSpan
.FromMinutes(10)
.Milliseconds,
StreamPosition.Beginning,
ReadMaxCount
);
}
protected abstract Task OnMessage(NameValueEntry message, RedisValue messageId);
}
public class AccountCreateConsumer(
IRedisClient redisClient
)
: BaseConsumer(redisClient)
{
protected override int ReadMaxCount { get; } = 100;
protected override string StreamName { get; } = StreamNames.AccountCreate;
protected override string GroupName { get; } = $"{StreamNames.AccountCreate}_CONSUMER_GROUP";
protected override async Task OnMessage(NameValueEntry message, RedisValue messageId)
{
var user = JsonSerializer.Deserialize<User>(message.Value);
//CREATE ACCOUNT HERE
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await Consume(stoppingToken);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment