Last active January 29, 2023 19:23
Azure Blob Storage parallel downloads of 1000s files
// Parallel download blobs from an Azure Blob Storage container, and report speed and bandwidth metrics
// As is, this code searches for blobs by index tag. To download by virtual folder instead,
// replace blobItems = containerClient.FindBlobsByTags(indexTagFilter) with
// blobItemsUntagged = containerClient.GetBlobs();
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Dynamic;
using System.Globalization;
using System.Text.Json;
using System.Threading.Tasks.Sources;
using Azure.Storage.Blobs;
var timeLog = new RecordingStopwatch().SetVerbose(Console.Out).ClearAndStart().AddThenIndent("Start");
//--Parse Args---------------------------------------------------------------------------------------
if (args.Length < 2 || !Uri.TryCreate(args[0],UriKind.Absolute,out var speedTestUrl))
Console.WriteLine("\nParallel download blobs from an Azure Blob Storage container, " +
"\nand report speed and bandwidth metrics." +
"\n\nUsage from project directory: " +
"\ndotnet run -- <absoluteStorageUrlWithSaS> <containerName>" +
" [<timeboxSeconds=5>]" +
" [<indexTagFilter=\\\"state\\\"='new']" +
var containerName = args[1];
var timeoutSecounds = args.Length >= 3 && int.TryParse(args[2], out var arg2) && arg2 > 0 ? arg2 : 5;
var indexTagFilter = args.Length >= 4 ? args[3] : "\"state\"='new'";
Console.WriteLine($"Container={containerName}, Timeboxed to {timeoutSecounds} seconds, Filtered on Index Tags {indexTagFilter}");
//--Maybe set Connection and Thread limits ----------------------------------------------------------------
var parallelism = Environment.ProcessorCount * 8;
#pragma warning disable CS0162 //unreachable code
if (true)
Console.WriteLine($"{Environment.ProcessorCount} processors found, use parallelism={parallelism}");
System.Net.ServicePointManager.DefaultConnectionLimit += parallelism;
ThreadPool.SetMinThreads(parallelism, Environment.ProcessorCount);
ThreadPool.GetMinThreads(out var workerThreads, out _);
Console.WriteLine("Using DotNetCore defaults unchanged: " +
$"System.Net.ServicePointManager.DefaultConnectionLimit={System.Net.ServicePointManager.DefaultConnectionLimit}\n" +
//--Init workspace-----------------------------------------------------------------------------------
var files = new ConcurrentQueue<string>();
var downloadTaskSemaphore = new SemaphoreSlim(parallelism); //I think we get more Azure errors if we don't use the throttle
var downloadTasks = new List<Task>();
var blobCount = 0;
var timedout = 0;
var blobServiceClient = new BlobServiceClient(speedTestUrl);
var containerClient = blobServiceClient.GetBlobContainerClient(containerName);
var blobItems = containerClient.FindBlobsByTags(indexTagFilter);
//var blobItems = containerClient.GetBlobs();
//--Download Files in a loop-------------------------------------------------------------------------
var timeoutCancellation = new CancellationTokenSource(TimeSpan.FromSeconds(timeoutSecounds)).Token;
foreach (var blob in blobItems)
await downloadTaskSemaphore.WaitAsync();
Task.Run(async () =>
var blobClient = containerClient.GetBlobClient(blob.BlobName /*blob.Name*/);
// Can do the stream reading manually, but it ain't any better
// using var stream = new MemoryStream();
// await blobClient.DownloadToAsync(stream, timeoutCancellation);
// stream.Position = 0;
// using var streamReader = new StreamReader(stream);
// var result = await streamReader.ReadToEndAsync();
var result = (await blobClient.DownloadContentAsync(timeoutCancellation))
if (result is not null)
catch (TaskCanceledException) { timedout++; }
Console.WriteLine("Total Blobs Found="+blobCount);
//--Deserialize or at least Count downloaded files---------------------------------------------------
int filesSeen = 0;
int totalSize = 0;
double time99;
var _99thPercentile = blobCount * 99 / 100;
while (filesSeen < blobCount && !timeoutCancellation.IsCancellationRequested)
if (files.TryDequeue(out var content))
//var obj = JsonSerializer.Deserialize<ExpandoObject>(file)??new ExpandoObject();
//var isJsonDeserializable = obj.Count()>0;
if (content is null)
Console.WriteLine("Assertion Failed: Dequeued null but shouldn't have been able to");
$"{++filesSeen}.Length={1.0 / 1024 * content.Length:F1}KB," +
// $"{(isJsonDeserializable?(obj.Count() + " json properties"):"")}," +
$"{timeLog.Elapsed.TotalSeconds:F1}secs, ");
totalSize += content.Length;
if (filesSeen == _99thPercentile)
time99 = timeLog.Elapsed.TotalSeconds;
Console.WriteLine($"{filesSeen} in {time99:F2}secs. {filesSeen / time99:F0} Events per second," +
$" {totalSize / 1024 / 1024 / time99:F2} MBytes per second, " +
$"average filesize {(totalSize / filesSeen / 1024):F0}KB");
catch (Exception e)
Console.WriteLine($"{e.Message} at file {filesSeen} {e.StackTrace?.Replace("\n", " | ")}");
var totalSeconds = timeLog.Elapsed.TotalSeconds;
if(timeoutCancellation.IsCancellationRequested){Console.WriteLine($"Timed out at {totalSeconds:F2} seconds");}
if (filesSeen == 0)
Console.WriteLine("Got nothing.");
Console.WriteLine($"{filesSeen} in {totalSeconds :F2}secs. {filesSeen/totalSeconds :F0} files per second." +
$" {totalSize/1024/1024/totalSeconds :F2} MBytes per second, " +
$"average filesize {(totalSize/filesSeen/1024) :F0}KB");
if (0 < filesSeen && filesSeen < blobCount)
$"Got {100 * filesSeen/blobCount}% of {blobCount} before timeboxing at {timeoutSecounds} seconds." +
(timedout>0 ? $" {timedout} downloads abandoned." :""));
Environment.ExitCode = 0;
public class RecordingStopwatch : Stopwatch
public RecordingStopwatch SetVerbose(TextWriter output) { _out = output; return this; }
public RecordingStopwatch UnsetVerbose() { _out = null; return this; }
List<(int Indent, string Event, TimeSpan SinceStart)> Lines = new();
int _currentIndent = 0;
TextWriter? _out;
public RecordingStopwatch ClearAndStart()
Lines = new();
return this;
public RecordingStopwatch Add(string @event)
Lines.Add( (_currentIndent,@event,Elapsed));
return this;
public RecordingStopwatch Add(string event1, string event2)
var now = Elapsed;
Lines.Add( (_currentIndent,event1,now));
Lines.Add( (_currentIndent,event2,now));
return this;
public RecordingStopwatch IndentAdd(string @event)
Lines.Add( ( ++_currentIndent, @event, Elapsed));
return this;
public RecordingStopwatch AddThenIndent(string @event)
Lines.Add( ( _currentIndent, @event, Elapsed));
return this;
public RecordingStopwatch OutdentThenAdd(string @event)
if (_currentIndent < 0) _currentIndent = 0;
Lines.Add( ( _currentIndent, @event, Elapsed));
return this;
public RecordingStopwatch AddIndentAdd(string @event, string indentedEvent)
var now = Elapsed;
Lines.Add( ( _currentIndent, @event, now));
Lines.Add( ( ++_currentIndent, @indentedEvent, now));
return this;
public RecordingStopwatch AddOutdentAdd(string @event, string outdentedEvent)
var now = Elapsed;
Lines.Add( ( _currentIndent, @event, now));
if (_currentIndent < 0) _currentIndent = 0;
Lines.Add( ( _currentIndent, outdentedEvent, now));
return this;
public override string ToString()
base.ToString() +
Environment.NewLine +
string.Join('\n', Lines.Select(r => OneRowToString(r)));
public string ToString(string timespanFormat)
base.ToString() +
Environment.NewLine +
string.Join('\n', Lines.Select(r=>OneRowToString(r,timespanFormat)));
public string ToStringMillis(string? doubleFormat=null)
base.ToString() +
Environment.NewLine +
string.Join('\n', Lines.Select(r=>OneRowToStringMillis(r,doubleFormat)));
static string OneRowToString((int Indent, string Event, TimeSpan SinceStart) r, string? timespanFormat=null)
return (r.Indent > 0 ? new string(' ', r.Indent) : "") +
r.Event + " " + r.SinceStart.ToString(timespanFormat);
static string OneRowToStringMillis((int Indent, string Event, TimeSpan SinceStart) r, string? doubleFormat=null)
return (r.Indent > 0 ? new string(' ', r.Indent) : "") +
r.Event + " " + r.SinceStart.TotalMilliseconds.ToString(doubleFormat);
