Skip to content

Instantly share code, notes, and snippets.

@kasobol-msft
Created July 9, 2021 19:46
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kasobol-msft/dd88c6a86f06dc981e0de96ef1169c56 to your computer and use it in GitHub Desktop.
Save kasobol-msft/dd88c6a86f06dc981e0de96ef1169c56 to your computer and use it in GitHub Desktop.
NonFlushableStream
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp1
{
public class DelegatingStream : Stream
{
private readonly Stream _inner;
public DelegatingStream(Stream inner)
{
_inner = inner;
}
public override bool CanRead
{
get { return _inner.CanRead; }
}
public override bool CanSeek
{
get { return _inner.CanSeek; }
}
public override bool CanTimeout
{
get { return _inner.CanTimeout; }
}
public override bool CanWrite
{
get { return _inner.CanWrite; }
}
public override long Length
{
get { return _inner.Length; }
}
public override long Position
{
get { return _inner.Position; }
set { _inner.Position = value; }
}
public override int ReadTimeout
{
get { return _inner.ReadTimeout; }
set { _inner.ReadTimeout = value; }
}
public override int WriteTimeout
{
get { return _inner.WriteTimeout; }
set { _inner.WriteTimeout = value; }
}
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback,
object state)
{
return _inner.BeginRead(buffer, offset, count, callback, state);
}
public override int EndRead(IAsyncResult asyncResult)
{
return _inner.EndRead(asyncResult);
}
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback,
object state)
{
return _inner.BeginWrite(buffer, offset, count, callback, state);
}
public override void EndWrite(IAsyncResult asyncResult)
{
_inner.EndWrite(asyncResult);
}
public override void Close()
{
_inner.Close();
}
public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
return _inner.CopyToAsync(destination, bufferSize, cancellationToken);
}
public override void Flush()
{
_inner.Flush();
}
public override Task FlushAsync(CancellationToken cancellationToken)
{
return _inner.FlushAsync(cancellationToken);
}
public override int Read(byte[] buffer, int offset, int count)
{
return _inner.Read(buffer, offset, count);
}
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return _inner.ReadAsync(buffer, offset, count, cancellationToken);
}
public override int ReadByte()
{
return _inner.ReadByte();
}
public override long Seek(long offset, SeekOrigin origin)
{
return _inner.Seek(offset, origin);
}
public override void SetLength(long value)
{
_inner.SetLength(value);
}
public override void Write(byte[] buffer, int offset, int count)
{
_inner.Write(buffer, offset, count);
}
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return _inner.WriteAsync(buffer, offset, count, cancellationToken);
}
public override void WriteByte(byte value)
{
_inner.WriteByte(value);
}
}
}
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Specialized;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.IO.Compression;
using System.Linq;
using System.Threading.Tasks;
namespace ConsoleApp1
{
class Program
{
const string Connection = "REDACTED";
static async Task Main(string[] args)
{
await RunV12();
Console.WriteLine();
await RunV9();
}
private static async Task RunV9()
{
Console.WriteLine("Running V9");
var storageAccount = CloudStorageAccount.Parse(Connection);
var blobService = storageAccount.CreateCloudBlobClient();
var zipContainer = blobService.GetContainerReference("zipcontainer");
var unzippedContainer = blobService.GetContainerReference("unzipped");
var blobList = (await unzippedContainer.ListBlobsSegmentedAsync(null)).Results.ToList();
// Create zip
var zip = zipContainer.GetBlockBlobReference("media.zip");
await zip.DeleteIfExistsAsync();
var totalSw = new Stopwatch();
totalSw.Start();
using (var zipArchive = new ZipArchive(
stream: await zip.OpenWriteAsync(),
mode: ZipArchiveMode.Create,
leaveOpen: false))
{
var sw = new Stopwatch();
var swOpen = new Stopwatch();
var swCopy = new Stopwatch();
for (int i = 1; i <= 10; i++)
{
sw.Start();
var blob = (CloudBlockBlob)blobList[i];
var fileName = string.Format(CultureInfo.InvariantCulture, "{0:D8}_{1}", i, "image.jpg");
var zipEntry = zipArchive.CreateEntry(fileName, CompressionLevel.NoCompression);
using var zipStream = zipEntry.Open();
swOpen.Start();
using var blobStream = await blob.OpenReadAsync();
swOpen.Stop();
swCopy.Start();
await blobStream.CopyToAsync(zipStream);
swCopy.Stop();
sw.Stop();
Console.WriteLine($"\tBlob {i} transferred in {sw.ElapsedMilliseconds} ms");
Console.WriteLine($"\t\tOpened in {swOpen.ElapsedMilliseconds} ms");
Console.WriteLine($"\t\tCopied in {swCopy.ElapsedMilliseconds} ms");
sw.Reset();
swOpen.Reset();
swCopy.Reset();
}
}
totalSw.Stop();
Console.WriteLine($"\tTotal {totalSw.ElapsedMilliseconds} ms");
}
private static async Task RunV12()
{
Console.WriteLine("Running V12");
BlobServiceClient blobServiceClient = new BlobServiceClient(Connection);
BlobContainerClient zipContainer = blobServiceClient.GetBlobContainerClient("zipcontainer");
//await zipContainer.CreateIfNotExistsAsync();
BlobContainerClient unzippedContainer = blobServiceClient.GetBlobContainerClient("unzipped");
List<BlobClient> blobList = new List<BlobClient>();
await foreach(var blobItem in unzippedContainer.GetBlobsAsync())
{
blobList.Add(unzippedContainer.GetBlobClient(blobItem.Name));
}
// Create zip
var zip = zipContainer.GetBlockBlobClient("media.zip");
await zip.DeleteIfExistsAsync();
var totalSw = new Stopwatch();
totalSw.Start();
using (var zipArchive = new ZipArchive(
stream: new NonFlushingStream(await zip.OpenWriteAsync(overwrite: true).ConfigureAwait(false)),
mode: ZipArchiveMode.Create,
leaveOpen: false))
{
var sw = new Stopwatch();
var swOpen = new Stopwatch();
var swCopy = new Stopwatch();
for (int i = 1; i <= 10; i++)
{
sw.Start();
var blob = blobList[i];
var fileName = string.Format(CultureInfo.InvariantCulture, "{0:D8}_{1}", i, "image.jpg");
var zipEntry = zipArchive.CreateEntry(fileName, CompressionLevel.NoCompression);
using var zipStream = zipEntry.Open();
swOpen.Start();
using var blobStream = await blob.OpenReadAsync();
swOpen.Stop();
swCopy.Start();
await blobStream.CopyToAsync(zipStream);
swCopy.Stop();
sw.Stop();
Console.WriteLine($"\tBlob {i} transfered in {sw.ElapsedMilliseconds} ms");
Console.WriteLine($"\t\tOpened in {swOpen.ElapsedMilliseconds} ms");
Console.WriteLine($"\t\tCopied in {swCopy.ElapsedMilliseconds} ms");
sw.Reset();
swOpen.Reset();
swCopy.Reset();
}
}
totalSw.Stop();
Console.WriteLine($"\tTotal {totalSw.ElapsedMilliseconds} ms");
}
}
}
namespace ConsoleApp1
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
public class NonFlushingStream : DelegatingStream
{
public NonFlushingStream(Stream target)
: base(target)
{
}
public override void Flush()
{
// no-op
}
public override Task FlushAsync(CancellationToken cancellationToken)
{
// no-op
return Task.CompletedTask;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment