Last active
May 9, 2019 17:22
-
-
Save azyobuzin/512950cdbae3f7eed0c537bcb7eb76e3 to your computer and use it in GitHub Desktop.
TPL Dataflow を使ってキューサービスに積まれたメッセージを処理するワーカーを書く例
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Threading; | |
using System.Threading.Tasks; | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
var queueService = new QueueService(); | |
var cancellation = new CancellationTokenSource(); | |
var worker = new Worker(queueService, 2, cancellation.Token); | |
worker.StartReceiving(); | |
// 15 秒で終了 | |
cancellation.CancelAfter(15 * 1000); | |
try | |
{ | |
worker.Completion.GetAwaiter().GetResult(); | |
} | |
catch (TaskCanceledException) { /* 正常終了 */ } | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class QueueMessage | |
{ | |
public int MessageId { get; } | |
public QueueMessage(int messageId) => this.MessageId = messageId; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Threading; | |
using System.Threading.Tasks; | |
public class QueueService | |
{ | |
private static readonly Random s_rng = new Random(); | |
private int _messageId; | |
public async Task<IReadOnlyList<QueueMessage>> ReceiveMessagesAsync(CancellationToken cancellationToken) | |
{ | |
// 取得に 100~600ms かかるものとする | |
await Task.Delay(100 + s_rng.Next(500), cancellationToken).ConfigureAwait(false); | |
// 1~20 件取得できる | |
var results = new QueueMessage[s_rng.Next(20) + 1]; | |
for (var i = 0; i < results.Length; i++) | |
results[i] = new QueueMessage(++_messageId); | |
return results; | |
} | |
public Task DeleteMessagesAsync(IEnumerable<QueueMessage> messages) | |
{ | |
// 削除に 100ms かかるものとする | |
return Task.Delay(100); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.Linq; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using System.Threading.Tasks.Dataflow; | |
public class Worker | |
{ | |
private static readonly Random s_rng = new Random(); | |
private readonly QueueService _queueService; | |
private readonly CancellationToken _cancellationToken; | |
// メッセージを処理するブロック | |
private readonly ITargetBlock<QueueMessage> _workBlock; | |
// 完了したメッセージの削除を入力するブロック | |
private readonly ITargetBlock<QueueMessage> _deleteMessageTargetBlock; | |
// メッセージの削除を実際に処理するブロック | |
private readonly IDataflowBlock _deleteMessageWorkBlock; | |
// 5秒ごとに溜まった削除依頼を処理する | |
private static readonly TimeSpan s_deleteMessageTimerInterval = new TimeSpan(5 * TimeSpan.TicksPerSecond); | |
private readonly Timer _deleteMessageTimer; | |
// ログ用の時間計測 | |
private readonly Stopwatch _stopwatch = Stopwatch.StartNew(); | |
// ログ用の完了数カウンター | |
private int _completedCount = 0; | |
// 呼び出し元に公開する Task | |
public Task Completion { get; } | |
public Worker( | |
QueueService queueService, int parallelDegree, | |
CancellationToken cancellationToken) | |
{ | |
_queueService = queueService; | |
_cancellationToken = cancellationToken; | |
// 並列処理を行うブロックを作成 | |
_workBlock = new ActionBlock<QueueMessage>( | |
this.ProcessMessageAsync, | |
new ExecutionDataflowBlockOptions() | |
{ | |
CancellationToken = cancellationToken, | |
// バッファーが枯渇しないように、同時実行数分だけ余分な入力を許可する | |
BoundedCapacity = parallelDegree * 2 - 1, | |
// 実行順は気にしない(という設定をしているが、実際には Transform(Many)Block でのみ挙動が変わる) | |
EnsureOrdered = false, | |
MaxDegreeOfParallelism = parallelDegree, | |
// 入力は QueueService から取得するループだけなので | |
// プロデューサーが 1 つであることが保証されている | |
SingleProducerConstrained = true, | |
}); | |
// 処理が完了したメッセージの削除依頼を受け付けるブロックを作成 | |
var deleteMessageTargetBlock = new BufferBlock<QueueMessage>(); | |
_deleteMessageTargetBlock = deleteMessageTargetBlock; | |
// 最初の削除依頼を受け取ったとき、タイマーを開始する | |
var startDeleteMessageTimerBlock = new ActionBlock<QueueMessage>( | |
message => | |
{ | |
// タイマーを開始 | |
_deleteMessageTimer.Change(s_deleteMessageTimerInterval, s_deleteMessageTimerInterval); | |
// もう一度キューに積む | |
_deleteMessageTargetBlock.Post(message); | |
}, | |
new ExecutionDataflowBlockOptions() | |
{ | |
EnsureOrdered = false, | |
SingleProducerConstrained = true, | |
}); | |
deleteMessageTargetBlock.LinkTo( | |
startDeleteMessageTimerBlock, | |
new DataflowLinkOptions() { MaxMessages = 1 }); | |
// Amazon SQS の仕様に合わせて 20 件ごとに区切る | |
var deleteMessageBatchBlock = new BatchBlock<QueueMessage>(20); | |
deleteMessageTargetBlock.LinkTo( | |
deleteMessageBatchBlock, | |
new DataflowLinkOptions() { PropagateCompletion = true }); | |
// 実際に削除処理を行うブロックを作成 | |
var deleteMessageWorkBlock = new ActionBlock<QueueMessage[]>( | |
this.DeleteMessagesAsync, | |
new ExecutionDataflowBlockOptions() | |
{ | |
SingleProducerConstrained = true, | |
}); | |
_deleteMessageWorkBlock = deleteMessageWorkBlock; | |
deleteMessageBatchBlock.LinkTo( | |
deleteMessageWorkBlock, | |
new DataflowLinkOptions() { PropagateCompletion = true }); | |
// 一定時間が経ったら 20 件溜まっていなくても削除を実行するようにするタイマー | |
_deleteMessageTimer = new Timer( | |
state => | |
{ | |
var block = (BatchBlock<QueueMessage>)state; | |
block.TriggerBatch(); | |
}, | |
deleteMessageBatchBlock, | |
Timeout.Infinite, Timeout.Infinite); | |
// 後処理まで含めて Task 化 | |
this.Completion = this.CreateCompletionTask(); | |
} | |
private async Task ProcessMessageAsync(QueueMessage message) | |
{ | |
this.Log($"処理開始 ({message.MessageId})"); | |
// 処理に 1~2秒 かかるものとする | |
await Task.Delay(1000 + s_rng.Next(1001)).ConfigureAwait(false); | |
this.Log($"処理完了 ({message.MessageId}, {Interlocked.Increment(ref _completedCount)}件完了)"); | |
// 完了したのでキューから削除する | |
_deleteMessageTargetBlock.Post(message); | |
} | |
private async Task DeleteMessagesAsync(QueueMessage[] messages) | |
{ | |
var messagesInfo = $"{messages.Length}件 ({string.Join(",", messages.Select(x => x.MessageId))})"; | |
this.Log("削除開始 " + messagesInfo); | |
// サービスから削除を実行する | |
await _queueService.DeleteMessagesAsync(messages).ConfigureAwait(false); | |
this.Log("削除完了 " + messagesInfo); | |
// タイマーをリセット | |
_deleteMessageTimer.Change(s_deleteMessageTimerInterval, s_deleteMessageTimerInterval); | |
} | |
public void StartReceiving() | |
{ | |
// 呼び出し元にすぐに戻るため、 Task.Run | |
Task.Run(async () => | |
{ | |
try | |
{ | |
while (!_cancellationToken.IsCancellationRequested) | |
{ | |
this.Log("取得開始"); | |
// サービスからメッセージを取得する | |
var messages = await _queueService.ReceiveMessagesAsync(_cancellationToken).ConfigureAwait(false); | |
this.Log($"取得完了 {messages.Count}件 ({string.Join(",", messages.Select(x => x.MessageId))})"); | |
// 取得したメッセージを処理を行うブロックに投入していく | |
foreach (var message in messages) | |
await _workBlock.SendAsync(message, _cancellationToken).ConfigureAwait(false); | |
} | |
} | |
catch (Exception ex) when (!_cancellationToken.IsCancellationRequested) | |
{ | |
// 続行不可能なエラーが発生した場合は、ブロックのエラーとする | |
_workBlock.Fault(ex); | |
} | |
}, _cancellationToken); | |
} | |
private Task CreateCompletionTask() | |
{ | |
return _workBlock.Completion | |
.ContinueWith(async workBlockCompletion => | |
{ | |
this.Log("処理ブロック終了"); | |
var exceptions = new List<Exception>(); | |
if (workBlockCompletion.IsFaulted) | |
exceptions.AddRange(workBlockCompletion.Exception.InnerExceptions); | |
try | |
{ | |
// すべての削除が完了するのを待つ | |
_deleteMessageTargetBlock.Complete(); | |
await _deleteMessageWorkBlock.Completion.ConfigureAwait(false); | |
// 削除タイマーはもう不要 | |
_deleteMessageTimer.Dispose(); | |
} | |
catch (Exception ex) | |
{ | |
exceptions.Add(ex); | |
} | |
this.Log("削除ブロック終了"); | |
// 例外が発生した場合は、その例外を結果とする | |
if (exceptions.Count > 0) | |
{ | |
var tcs = new TaskCompletionSource<object>(); | |
tcs.SetException(exceptions); | |
return tcs.Task; | |
} | |
return workBlockCompletion; | |
}, TaskScheduler.Default) | |
.Unwrap().Unwrap(); | |
} | |
private void Log(string message) | |
{ | |
Console.WriteLine("[{0:F1}s] {1}", _stopwatch.Elapsed.TotalSeconds, message); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment