Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@azyobuzin
Last active May 9, 2019 17:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save azyobuzin/512950cdbae3f7eed0c537bcb7eb76e3 to your computer and use it in GitHub Desktop.
Save azyobuzin/512950cdbae3f7eed0c537bcb7eb76e3 to your computer and use it in GitHub Desktop.
TPL Dataflow を使ってキューサービスに積まれたメッセージを処理するワーカーを書く例
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
var queueService = new QueueService();
var cancellation = new CancellationTokenSource();
var worker = new Worker(queueService, 2, cancellation.Token);
worker.StartReceiving();
// 15 秒で終了
cancellation.CancelAfter(15 * 1000);
try
{
worker.Completion.GetAwaiter().GetResult();
}
catch (TaskCanceledException) { /* 正常終了 */ }
}
}
public class QueueMessage
{
public int MessageId { get; }
public QueueMessage(int messageId) => this.MessageId = messageId;
}
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
public class QueueService
{
private static readonly Random s_rng = new Random();
private int _messageId;
public async Task<IReadOnlyList<QueueMessage>> ReceiveMessagesAsync(CancellationToken cancellationToken)
{
// 取得に 100~600ms かかるものとする
await Task.Delay(100 + s_rng.Next(500), cancellationToken).ConfigureAwait(false);
// 1~20 件取得できる
var results = new QueueMessage[s_rng.Next(20) + 1];
for (var i = 0; i < results.Length; i++)
results[i] = new QueueMessage(++_messageId);
return results;
}
public Task DeleteMessagesAsync(IEnumerable<QueueMessage> messages)
{
// 削除に 100ms かかるものとする
return Task.Delay(100);
}
}
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
public class Worker
{
private static readonly Random s_rng = new Random();
private readonly QueueService _queueService;
private readonly CancellationToken _cancellationToken;
// メッセージを処理するブロック
private readonly ITargetBlock<QueueMessage> _workBlock;
// 完了したメッセージの削除を入力するブロック
private readonly ITargetBlock<QueueMessage> _deleteMessageTargetBlock;
// メッセージの削除を実際に処理するブロック
private readonly IDataflowBlock _deleteMessageWorkBlock;
// 5秒ごとに溜まった削除依頼を処理する
private static readonly TimeSpan s_deleteMessageTimerInterval = new TimeSpan(5 * TimeSpan.TicksPerSecond);
private readonly Timer _deleteMessageTimer;
// ログ用の時間計測
private readonly Stopwatch _stopwatch = Stopwatch.StartNew();
// ログ用の完了数カウンター
private int _completedCount = 0;
// 呼び出し元に公開する Task
public Task Completion { get; }
public Worker(
QueueService queueService, int parallelDegree,
CancellationToken cancellationToken)
{
_queueService = queueService;
_cancellationToken = cancellationToken;
// 並列処理を行うブロックを作成
_workBlock = new ActionBlock<QueueMessage>(
this.ProcessMessageAsync,
new ExecutionDataflowBlockOptions()
{
CancellationToken = cancellationToken,
// バッファーが枯渇しないように、同時実行数分だけ余分な入力を許可する
BoundedCapacity = parallelDegree * 2 - 1,
// 実行順は気にしない(という設定をしているが、実際には Transform(Many)Block でのみ挙動が変わる)
EnsureOrdered = false,
MaxDegreeOfParallelism = parallelDegree,
// 入力は QueueService から取得するループだけなので
// プロデューサーが 1 つであることが保証されている
SingleProducerConstrained = true,
});
// 処理が完了したメッセージの削除依頼を受け付けるブロックを作成
var deleteMessageTargetBlock = new BufferBlock<QueueMessage>();
_deleteMessageTargetBlock = deleteMessageTargetBlock;
// 最初の削除依頼を受け取ったとき、タイマーを開始する
var startDeleteMessageTimerBlock = new ActionBlock<QueueMessage>(
message =>
{
// タイマーを開始
_deleteMessageTimer.Change(s_deleteMessageTimerInterval, s_deleteMessageTimerInterval);
// もう一度キューに積む
_deleteMessageTargetBlock.Post(message);
},
new ExecutionDataflowBlockOptions()
{
EnsureOrdered = false,
SingleProducerConstrained = true,
});
deleteMessageTargetBlock.LinkTo(
startDeleteMessageTimerBlock,
new DataflowLinkOptions() { MaxMessages = 1 });
// Amazon SQS の仕様に合わせて 20 件ごとに区切る
var deleteMessageBatchBlock = new BatchBlock<QueueMessage>(20);
deleteMessageTargetBlock.LinkTo(
deleteMessageBatchBlock,
new DataflowLinkOptions() { PropagateCompletion = true });
// 実際に削除処理を行うブロックを作成
var deleteMessageWorkBlock = new ActionBlock<QueueMessage[]>(
this.DeleteMessagesAsync,
new ExecutionDataflowBlockOptions()
{
SingleProducerConstrained = true,
});
_deleteMessageWorkBlock = deleteMessageWorkBlock;
deleteMessageBatchBlock.LinkTo(
deleteMessageWorkBlock,
new DataflowLinkOptions() { PropagateCompletion = true });
// 一定時間が経ったら 20 件溜まっていなくても削除を実行するようにするタイマー
_deleteMessageTimer = new Timer(
state =>
{
var block = (BatchBlock<QueueMessage>)state;
block.TriggerBatch();
},
deleteMessageBatchBlock,
Timeout.Infinite, Timeout.Infinite);
// 後処理まで含めて Task 化
this.Completion = this.CreateCompletionTask();
}
private async Task ProcessMessageAsync(QueueMessage message)
{
this.Log($"処理開始 ({message.MessageId})");
// 処理に 1~2秒 かかるものとする
await Task.Delay(1000 + s_rng.Next(1001)).ConfigureAwait(false);
this.Log($"処理完了 ({message.MessageId}, {Interlocked.Increment(ref _completedCount)}件完了)");
// 完了したのでキューから削除する
_deleteMessageTargetBlock.Post(message);
}
private async Task DeleteMessagesAsync(QueueMessage[] messages)
{
var messagesInfo = $"{messages.Length}件 ({string.Join(",", messages.Select(x => x.MessageId))})";
this.Log("削除開始 " + messagesInfo);
// サービスから削除を実行する
await _queueService.DeleteMessagesAsync(messages).ConfigureAwait(false);
this.Log("削除完了 " + messagesInfo);
// タイマーをリセット
_deleteMessageTimer.Change(s_deleteMessageTimerInterval, s_deleteMessageTimerInterval);
}
public void StartReceiving()
{
// 呼び出し元にすぐに戻るため、 Task.Run
Task.Run(async () =>
{
try
{
while (!_cancellationToken.IsCancellationRequested)
{
this.Log("取得開始");
// サービスからメッセージを取得する
var messages = await _queueService.ReceiveMessagesAsync(_cancellationToken).ConfigureAwait(false);
this.Log($"取得完了 {messages.Count}件 ({string.Join(",", messages.Select(x => x.MessageId))})");
// 取得したメッセージを処理を行うブロックに投入していく
foreach (var message in messages)
await _workBlock.SendAsync(message, _cancellationToken).ConfigureAwait(false);
}
}
catch (Exception ex) when (!_cancellationToken.IsCancellationRequested)
{
// 続行不可能なエラーが発生した場合は、ブロックのエラーとする
_workBlock.Fault(ex);
}
}, _cancellationToken);
}
private Task CreateCompletionTask()
{
return _workBlock.Completion
.ContinueWith(async workBlockCompletion =>
{
this.Log("処理ブロック終了");
var exceptions = new List<Exception>();
if (workBlockCompletion.IsFaulted)
exceptions.AddRange(workBlockCompletion.Exception.InnerExceptions);
try
{
// すべての削除が完了するのを待つ
_deleteMessageTargetBlock.Complete();
await _deleteMessageWorkBlock.Completion.ConfigureAwait(false);
// 削除タイマーはもう不要
_deleteMessageTimer.Dispose();
}
catch (Exception ex)
{
exceptions.Add(ex);
}
this.Log("削除ブロック終了");
// 例外が発生した場合は、その例外を結果とする
if (exceptions.Count > 0)
{
var tcs = new TaskCompletionSource<object>();
tcs.SetException(exceptions);
return tcs.Task;
}
return workBlockCompletion;
}, TaskScheduler.Default)
.Unwrap().Unwrap();
}
private void Log(string message)
{
Console.WriteLine("[{0:F1}s] {1}", _stopwatch.Elapsed.TotalSeconds, message);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment