Created
March 10, 2023 14:31
-
-
Save markheath/84adc6723274a01fba6ce24e4b4d0afd to your computer and use it in GitHub Desktop.
Demonstration of fast Azure blob uploads using a producer consumer pattern with TPL dataflow and using parallel chunked uploads
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
string[] SizeSuffixes = { "bytes", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB" }; | |
string SizeSuffix(long value, int decimalPlaces = 0) | |
{ | |
if (value < 0) | |
{ | |
throw new ArgumentException("Bytes should not be negative", "value"); | |
} | |
var mag = (int)Math.Max(0, Math.Log(value, 1024)); | |
var adjustedSize = Math.Round(value / Math.Pow(1024, mag), decimalPlaces); | |
return $"{adjustedSize} {SizeSuffixes[mag]}"; | |
} | |
string HashStream(HashAlgorithm hasher, Stream stream) | |
{ | |
hasher.Initialize(); | |
var buffer = new byte[4*1024*1024]; | |
while (true) | |
{ | |
int read = stream.Read(buffer, 0, buffer.Length); | |
if (read == 0) break; | |
hasher.TransformBlock(buffer, 0, read, null, 0); | |
} | |
hasher.TransformFinalBlock(new byte[0], 0, 0); | |
var hash = hasher.Hash; | |
return BitConverter.ToString(hash).Replace("-", ""); | |
}; | |
async Task<IReadOnlyCollection<string>> Produce(ITargetBlock<Block> target, Stream file) | |
{ | |
var blockIds = new List<string>(); | |
var blockSize = 8 * 1024 * 1024; | |
while (true) | |
{ | |
// need to create a new buffer each time as they are processed in parallel | |
var buffer = new byte[blockSize]; | |
var read = await file.ReadAsync(buffer, 0, buffer.Length); | |
if (read == 0) break; | |
string blockId = Convert.ToBase64String(Guid.NewGuid().ToByteArray()); | |
blockIds.Add(blockId); | |
await target.SendAsync(new Block(blockId, buffer, read)); | |
} | |
target.Complete(); | |
return blockIds; | |
} | |
// The consumer | |
async Task StageBlock(Block block, BlockBlobClient blobClient) | |
{ | |
using var ms = new MemoryStream(block.Data, 0, block.Length); | |
await blobClient.StageBlockAsync(block.Id, ms); | |
} | |
var connectionString = Util.GetPassword("mheath-storage"); | |
var blobServiceClient = new BlobServiceClient(connectionString); | |
var containerName = "uploads"; | |
var containerClient = blobServiceClient.GetBlobContainerClient(containerName); | |
var uploadBlobClient = containerClient.GetBlockBlobClient("chunked-tpl.mp4"); | |
await uploadBlobClient.DeleteIfExistsAsync(); | |
// pick the largest file I have in my video folder | |
var fileName = Directory.EnumerateFiles(@"C:\Users\mark\Videos") | |
.OrderByDescending(f => new FileInfo(f).Length) | |
.First(); | |
var maxParallelConsume = 8; // experiment with this value | |
using var file = File.OpenRead(fileName); | |
var expectedHash = HashStream(SHA512.Create(), file); | |
expectedHash.Dump(); | |
file.Position = 0; // reset before upload | |
var sw = Stopwatch.StartNew(); | |
var buffer = new BufferBlock<Block>(new DataflowBlockOptions() { BoundedCapacity = maxParallelConsume }); | |
var consumerBlock = new ActionBlock<Block>( | |
block => StageBlock(block,uploadBlobClient), | |
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxParallelConsume }); | |
buffer.LinkTo(consumerBlock, new DataflowLinkOptions() { PropagateCompletion = true }); | |
var producerTask = Produce(buffer,file); | |
await consumerBlock.Completion; | |
var blockIds = producerTask.Result; | |
var opts = new CommitBlockListOptions() | |
{ | |
// could set tags, metadata, mime type etc here | |
}; | |
var info = await uploadBlobClient.CommitBlockListAsync(blockIds, opts); | |
$"{SizeSuffix(file.Length, 1)} uploaded in {sw.Elapsed} {SizeSuffix((long)(file.Length / sw.Elapsed.TotalSeconds), 1)} per second with {maxParallelConsume} threads".Dump(); | |
//info.Dump(); | |
Console.WriteLine("Calculating hash"); | |
using var readStream = uploadBlobClient.OpenRead(); | |
var uploadedHash = HashStream(SHA512.Create(),readStream); | |
if (uploadedHash != expectedHash) | |
{ | |
Console.WriteLine($"Hashes don't match {uploadedHash}"); | |
} | |
else | |
{ | |
Console.WriteLine("Hash is valid!"); | |
} | |
record Block(string Id, byte[] Data, int Length); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment