Skip to content

Instantly share code, notes, and snippets.

@Kiechlus
Created January 20, 2019 04:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Kiechlus/1acfa39aab662221510d6633f08f8c62 to your computer and use it in GitHub Desktop.
Save Kiechlus/1acfa39aab662221510d6633f08f8c62 to your computer and use it in GitHub Desktop.
Stream parallel requests into zip
/**
* Streaming: https://blog.stephencleary.com/2016/11/streaming-zip-on-aspnet-core.html
* Queue: https://robertwray.co.uk/blog/wrapping-concurrentqueue-t-to-make-it-eventful
*
*/
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.IO.Compression;
using System.Json;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Net.Http.Headers;
using RW.EventfulConcurrentQueue;
using stream;
namespace WebApplication.Controllers
{
[Route("api/[controller]")]
public class FileController : Controller
{
private static HttpClient Client { get; } = new HttpClient();
private EventfulConcurrentQueue<Dictionary<string, byte[]>> imageQueue = new EventfulConcurrentQueue<Dictionary<string, byte[]>>();
private const int CONCURRENT_REQUESTS = 20;
/// <summary>
/// Get http://localhost:<port>/api/file to start the download.
/// </summary>
/// <returns></returns>
[HttpGet]
public IActionResult Get()
{
string orthancBase = "xxxxxxxxxxxx";
string study = "xxxxxxxxxxxxxxxxx"; // This is a big study with 800MB zipped.
//string study = "xxxxxxxxxxxxxxxx"; // Small study with only one file.
var fetchMetadataStartTime = DateTime.Now;
// Read instance metadata from file. Fetching it from Orthanc takes too long.
JsonValue instancesMetadata = JsonValue.Parse(System.IO.File.ReadAllText($"{AppDomain.CurrentDomain.BaseDirectory}\\instances.json"));
var tasks = new List<Task>();
return new FileCallbackResult(new MediaTypeHeaderValue("application/octet-stream"), async (outputStream, _) => // start callback function
{
//Debug.WriteLine($"Fetching instance metadata started at {fetchMetadataStartTime.ToString()}.");
//HttpResponseMessage response = await Client.GetAsync($"{orthancBase}/studies/{study}/instances");
//Debug.WriteLine($"Fetched instance metadata. Duration: {(System.DateTime.Now - fetchMetadataStartTime).Milliseconds} ms..");
//response.EnsureSuccessStatusCode();
//JsonValue instancesMetadata = JsonValue.Parse(await response.Content.ReadAsStringAsync());
var startTime = DateTime.Now;
using (var zipArchive = new ZipArchive(outputStream, ZipArchiveMode.Create))
{
// Event handler, fires when a new object is added to the queue.
void queue_Enqueued(object sender, EventArgs args)
{
Debug.WriteLine("New image in the queue. Trying to lock the zip stream and to stream the image into it.");
var s2 = DateTime.Now;
// Locking is needed, it is not possible to stream two files at the same time into the zip stream.
lock (zipArchive)
{
// Fetch the image from the queue.
var success = imageQueue.TryDequeue(out Dictionary<string, byte[]> nameAndInstance);
if (!success || nameAndInstance.Count != 1)
{
throw new Exception("Error, cannot dequeue or wrong content.");
}
byte[] image = null;
string fileName = null;
foreach (var key in nameAndInstance.Keys)
{
fileName = key;
image = nameAndInstance[key];
}
// Stream the image as a new file into the zip stream.
var zipEntry = zipArchive.CreateEntry($"{fileName}.dcm");
using (var zipStream = zipEntry.Open())
using (var stream = new MemoryStream(image))
stream.CopyTo(zipStream);
Debug.WriteLine($"Finished flushing image to the zip stream in {(DateTime.Now - s2).Milliseconds} ms.");
}
}
// Register the event handler.
imageQueue.ItemEnqueued += queue_Enqueued;
// Semaphore, allow to make n requests in parallel.
using (var semaphore = new SemaphoreSlim(CONCURRENT_REQUESTS))
{
foreach (JsonValue i in instancesMetadata)
{
string id = i["ID"];
string instanceUrl = $"{orthancBase}/instances/{id}/file";
// await here until there is a room for this task
await semaphore.WaitAsync();
tasks.Add(MakeRequest(semaphore, instanceUrl, $"{id}.dcm"));
Debug.WriteLine($"Adding new task, length: {tasks.Count}");
}
// await for the rest of tasks to complete
await Task.WhenAll(tasks);
}
Debug.WriteLine($"Fetched all data. Total duration: {(System.DateTime.Now - startTime).Milliseconds} ms..");
}
})
// end callback function
{
FileDownloadName = $"{study}.zip"
};
}
/// <summary>
/// Fetch an image from orthanc and write it to the queue.
/// </summary>
/// <param name="semaphore">The semaphore.</param>
/// <param name="instanceUrl">The instance URL.</param>
/// <param name="fileName">Name of the file.</param>
/// <returns></returns>
private async Task MakeRequest(SemaphoreSlim semaphore, string instanceUrl, string fileName)
{
try
{
using (var responseMessage = await Client.GetAsync(instanceUrl).ConfigureAwait(false))
{
var s3 = DateTime.Now;
var filenameAndContent = new Dictionary<string, byte[]>
{
{ fileName, await responseMessage.Content.ReadAsByteArrayAsync() },
};
imageQueue.Enqueue(filenameAndContent);
Debug.WriteLine($"Enqueuing result. New queue length: {imageQueue.Count()}");
Debug.WriteLine($"Finished fetching image after {(DateTime.Now - s3).Milliseconds} ms...");
}
}
finally
{
semaphore.Release();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment