Created
June 17, 2024 23:27
-
-
Save ismkdc/1eef375d46995fa7a793a5584f2f3a89 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public abstract class BaseConsumer(IRedisClient redisClient) : BackgroundService | |
{ | |
private readonly string _consumerName = Guid.NewGuid().ToString(); | |
private readonly IDatabase _database = redisClient.GetDefaultDatabase().Database; | |
protected abstract int ReadMaxCount { get; } | |
protected abstract string StreamName { get; } | |
protected abstract string GroupName { get; } | |
protected async Task Consume | |
( | |
CancellationToken stoppingToken | |
) | |
{ | |
while (!stoppingToken.IsCancellationRequested) | |
try | |
{ | |
if (!await _database.KeyExistsAsync(StreamName) || | |
(await _database.StreamGroupInfoAsync(StreamName)).All(x => x.Name != GroupName) | |
) | |
await _database.StreamCreateConsumerGroupAsync( | |
StreamName, | |
GroupName, | |
StreamPosition.Beginning | |
); | |
await Task.Delay(500, stoppingToken); | |
while (!stoppingToken.IsCancellationRequested) | |
try | |
{ | |
var claimed = await ClaimPendingMessages(); | |
if (claimed.ClaimedEntries.Length > 0) | |
await ReadClaimedMessages(); | |
await ReadNewMessages(); | |
await Task.Delay(100, stoppingToken); | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine(e); | |
} | |
await Task.Delay(5000, stoppingToken); | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine(e); | |
} | |
} | |
private async Task ReadClaimedMessages() | |
{ | |
var streamResult = | |
await _database.StreamReadGroupAsync | |
( | |
StreamName, | |
GroupName, | |
_consumerName, | |
StreamPosition.Beginning, | |
ReadMaxCount | |
); | |
if (streamResult.Length > 0) await HandleMessages(streamResult); | |
} | |
private async Task ReadNewMessages() | |
{ | |
var streamResult = | |
await _database.StreamReadGroupAsync | |
( | |
StreamName, | |
GroupName, | |
_consumerName, | |
">", | |
ReadMaxCount | |
); | |
if (streamResult.Length > 0) await HandleMessages(streamResult); | |
} | |
private async Task HandleMessages(IEnumerable<StreamEntry> streamResult) | |
{ | |
foreach (var data in streamResult) | |
foreach (var messageItem in data.Values) | |
try | |
{ | |
await OnMessage(messageItem, data.Id); | |
await _database.StreamAcknowledgeAsync(StreamName, GroupName, data.Id); | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine(e); | |
} | |
} | |
private Task<StreamAutoClaimResult> ClaimPendingMessages() | |
{ | |
return _database.StreamAutoClaimAsync | |
( | |
StreamName, | |
GroupName, | |
_consumerName, | |
TimeSpan | |
.FromMinutes(10) | |
.Milliseconds, | |
StreamPosition.Beginning, | |
ReadMaxCount | |
); | |
} | |
protected abstract Task OnMessage(NameValueEntry message, RedisValue messageId); | |
} | |
public class AccountCreateConsumer( | |
IRedisClient redisClient | |
) | |
: BaseConsumer(redisClient) | |
{ | |
protected override int ReadMaxCount { get; } = 100; | |
protected override string StreamName { get; } = StreamNames.AccountCreate; | |
protected override string GroupName { get; } = $"{StreamNames.AccountCreate}_CONSUMER_GROUP"; | |
protected override async Task OnMessage(NameValueEntry message, RedisValue messageId) | |
{ | |
var user = JsonSerializer.Deserialize<User>(message.Value); | |
//CREATE ACCOUNT HERE | |
} | |
protected override async Task ExecuteAsync(CancellationToken stoppingToken) | |
{ | |
await Consume(stoppingToken); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment