Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save markheath/84adc6723274a01fba6ce24e4b4d0afd to your computer and use it in GitHub Desktop.
Save markheath/84adc6723274a01fba6ce24e4b4d0afd to your computer and use it in GitHub Desktop.
Demonstration of fast Azure blob uploads using a producer consumer pattern with TPL dataflow and using parallel chunked uploads
string[] SizeSuffixes = { "bytes", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB" };
string SizeSuffix(long value, int decimalPlaces = 0)
{
if (value < 0)
{
throw new ArgumentException("Bytes should not be negative", "value");
}
var mag = (int)Math.Max(0, Math.Log(value, 1024));
var adjustedSize = Math.Round(value / Math.Pow(1024, mag), decimalPlaces);
return $"{adjustedSize} {SizeSuffixes[mag]}";
}
string HashStream(HashAlgorithm hasher, Stream stream)
{
hasher.Initialize();
var buffer = new byte[4*1024*1024];
while (true)
{
int read = stream.Read(buffer, 0, buffer.Length);
if (read == 0) break;
hasher.TransformBlock(buffer, 0, read, null, 0);
}
hasher.TransformFinalBlock(new byte[0], 0, 0);
var hash = hasher.Hash;
return BitConverter.ToString(hash).Replace("-", "");
};
async Task<IReadOnlyCollection<string>> Produce(ITargetBlock<Block> target, Stream file)
{
var blockIds = new List<string>();
var blockSize = 8 * 1024 * 1024;
while (true)
{
// need to create a new buffer each time as they are processed in parallel
var buffer = new byte[blockSize];
var read = await file.ReadAsync(buffer, 0, buffer.Length);
if (read == 0) break;
string blockId = Convert.ToBase64String(Guid.NewGuid().ToByteArray());
blockIds.Add(blockId);
await target.SendAsync(new Block(blockId, buffer, read));
}
target.Complete();
return blockIds;
}
// The consumer
async Task StageBlock(Block block, BlockBlobClient blobClient)
{
using var ms = new MemoryStream(block.Data, 0, block.Length);
await blobClient.StageBlockAsync(block.Id, ms);
}
var connectionString = Util.GetPassword("mheath-storage");
var blobServiceClient = new BlobServiceClient(connectionString);
var containerName = "uploads";
var containerClient = blobServiceClient.GetBlobContainerClient(containerName);
var uploadBlobClient = containerClient.GetBlockBlobClient("chunked-tpl.mp4");
await uploadBlobClient.DeleteIfExistsAsync();
// pick the largest file I have in my video folder
var fileName = Directory.EnumerateFiles(@"C:\Users\mark\Videos")
.OrderByDescending(f => new FileInfo(f).Length)
.First();
var maxParallelConsume = 8; // experiment with this value
using var file = File.OpenRead(fileName);
var expectedHash = HashStream(SHA512.Create(), file);
expectedHash.Dump();
file.Position = 0; // reset before upload
var sw = Stopwatch.StartNew();
var buffer = new BufferBlock<Block>(new DataflowBlockOptions() { BoundedCapacity = maxParallelConsume });
var consumerBlock = new ActionBlock<Block>(
block => StageBlock(block,uploadBlobClient),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxParallelConsume });
buffer.LinkTo(consumerBlock, new DataflowLinkOptions() { PropagateCompletion = true });
var producerTask = Produce(buffer,file);
await consumerBlock.Completion;
var blockIds = producerTask.Result;
var opts = new CommitBlockListOptions()
{
// could set tags, metadata, mime type etc here
};
var info = await uploadBlobClient.CommitBlockListAsync(blockIds, opts);
$"{SizeSuffix(file.Length, 1)} uploaded in {sw.Elapsed} {SizeSuffix((long)(file.Length / sw.Elapsed.TotalSeconds), 1)} per second with {maxParallelConsume} threads".Dump();
//info.Dump();
Console.WriteLine("Calculating hash");
using var readStream = uploadBlobClient.OpenRead();
var uploadedHash = HashStream(SHA512.Create(),readStream);
if (uploadedHash != expectedHash)
{
Console.WriteLine($"Hashes don't match {uploadedHash}");
}
else
{
Console.WriteLine("Hash is valid!");
}
record Block(string Id, byte[] Data, int Length);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment