Skip to content

Instantly share code, notes, and snippets.

@Butjok
Created May 31, 2018 23:56
Show Gist options
  • Save Butjok/144785e6ea75556b276b0c79bbee167e to your computer and use it in GitHub Desktop.
Save Butjok/144785e6ea75556b276b0c79bbee167e to your computer and use it in GitHub Desktop.
using System;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Runtime.Serialization.Formatters.Binary;
using System.Collections.Generic;
using System.Diagnostics;
// https://www.microsoft.com/en-us/research/wp-content/uploads/2004/12/tr-2004-136.pdf
namespace GZipTest
{
class Program
{
const int KB = 1024;
const int MB = KB * KB;
const int defaultChunkSize = 128 * MB;
const int blockSize = MB;
[Serializable]
public class BlockMetaData
{
public int originalSize;
public int compressedSize;
}
[Serializable]
public class ArchiveMetaData
{
public List<BlockMetaData> blocks;
}
class BlockTask
{
public int originalSize;
public int compressedSize;
[NonSerialized]
public int inputChunkOffset;
[NonSerialized]
public MemoryStream workerBuffer;
[NonSerialized]
public int workerBufferOffset;
[NonSerialized]
public int outputChunkOffset;
}
static public ArchiveMetaData Compress(Stream input, Stream output)
{
byte[] inputChunk = new byte[defaultChunkSize];
var compressionTasks = new ThreadedQueue<BlockTask>();
var writeTasks = new ThreadedQueue<BlockTask>();
var workersNum = Environment.ProcessorCount;
var workerOutputs = new MemoryStream[workersNum];
for (var i = 0; i < workersNum; i++)
{
workerOutputs[i] = new MemoryStream();
}
var metaData = new ArchiveMetaData
{
blocks = new List<BlockMetaData>(),
};
using (var outputChunk = new MemoryStream())
{
while (input.Position < input.Length)
{
// Enqueue compression tasks
var chunkSize = input.Read(inputChunk, 0, inputChunk.Length);
for (int offset = 0, blockIndex = 0; offset < chunkSize; offset += blockSize, blockIndex++)
{
var block = new BlockTask
{
inputChunkOffset = offset,
originalSize = Math.Min(blockSize, chunkSize - offset)
};
compressionTasks.Add(block);
}
// Compress!
foreach (var workerOutput in workerOutputs)
{
workerOutput.Position = 0;
}
Parallel.RunTasks(workersNum, compressionTasks, (index, task) =>
{
var workerOutput = workerOutputs[index];
var oldPosition = workerOutput.Position;
using (var gzip = new GZipStream(workerOutput, CompressionMode.Compress, true))
{
gzip.Write(inputChunk, task.inputChunkOffset, task.originalSize);
}
task.workerBuffer = workerOutput;
task.workerBufferOffset = (int)oldPosition;
task.compressedSize = (int)(workerOutput.Position - oldPosition);
writeTasks.Add(task);
});
// Sort blocks in correct order, write them sequentially into memory buffer
var outputChunkOffset = 0;
foreach (var result in writeTasks.OrderBy(result => result.inputChunkOffset))
{
result.outputChunkOffset = outputChunkOffset;
metaData.blocks.Add(new BlockMetaData
{
originalSize = result.originalSize,
compressedSize = result.compressedSize
});
outputChunkOffset += result.compressedSize;
}
outputChunk.SetLength(outputChunkOffset);
Parallel.RunTasks(workersNum, writeTasks, (index, result) =>
{
Buffer.BlockCopy(result.workerBuffer.GetBuffer(), result.workerBufferOffset, outputChunk.GetBuffer(), result.outputChunkOffset, result.compressedSize);
});
output.Write(outputChunk.GetBuffer(), 0, outputChunkOffset);
}
}
// Write meta data
var metaPosition = output.Position;
using (var gzip=new GZipStream(output, CompressionMode.Compress, true))
{
new BinaryFormatter().Serialize(gzip, metaData);
}
new BinaryWriter(output).Write(metaPosition);
// Truncate output file
output.SetLength(output.Position);
for (var i = 0; i < workersNum; i++)
{
workerOutputs[i].Dispose();
}
return metaData;
}
static public void Decompress(Stream input, Stream output)
{
var taskQueue = new ThreadedQueue<BlockTask>();
var workerResults = new ThreadedQueue<BlockTask>();
// Read archive meta data
input.Seek(-sizeof(long), SeekOrigin.End);
var metaDataPosition = new BinaryReader(input).ReadInt64();
input.Seek(metaDataPosition, SeekOrigin.Begin);
ArchiveMetaData meta;
using (var gzip=new GZipStream(input, CompressionMode.Decompress, true)) {
meta = (ArchiveMetaData)new BinaryFormatter().Deserialize(gzip);
}
input.Seek(0, SeekOrigin.Begin);
byte[] inputChunk;
byte[] outputChunk;
{
var inputChunkSize = defaultChunkSize;
var outputChunkSize = defaultChunkSize;
long originalFileSize = 0;
foreach (var block in meta.blocks)
{
inputChunkSize = Math.Max(inputChunkSize, block.compressedSize);
outputChunkSize = Math.Max(inputChunkSize, block.originalSize);
originalFileSize += block.originalSize;
}
inputChunk = new byte[inputChunkSize];
outputChunk = new byte[outputChunkSize];
output.SetLength(originalFileSize);
}
var workersNum = Environment.ProcessorCount;
var workerOutputs = new MemoryStream[workersNum];
for (var i = 0; i < workersNum; i++)
{
workerOutputs[i] = new MemoryStream(outputChunk.Length);
}
for (var totalBlocksRead = 0; input.Position < metaDataPosition; /* nothing */)
{
// Read chunk
var inputChunkOffset = 0;
var outputChunkOffset = 0;
while (totalBlocksRead < meta.blocks.Count
&& inputChunkOffset + meta.blocks[totalBlocksRead].compressedSize <= inputChunk.Length
&& outputChunkOffset + meta.blocks[totalBlocksRead].originalSize <= outputChunk.Length)
{
var blockMetaData = meta.blocks[totalBlocksRead++];
taskQueue.Add(new BlockTask
{
originalSize = blockMetaData.originalSize,
compressedSize = blockMetaData.compressedSize,
outputChunkOffset = outputChunkOffset,
inputChunkOffset = inputChunkOffset
});
inputChunkOffset += blockMetaData.compressedSize;
outputChunkOffset += blockMetaData.originalSize;
}
input.Read(inputChunk, 0, inputChunkOffset);
// Decompress!
foreach (var workerOutput in workerOutputs)
{
workerOutput.Position = 0;
}
Parallel.RunTasks(workersNum, taskQueue, (index, task) =>
{
var workerOutput = workerOutputs[index];
var oldPosition = workerOutput.Position;
//workerOutput.Write(inputChunk, task.inputChunkOffset, task.compressedSize);
using (var inStream = new MemoryStream(inputChunk, task.inputChunkOffset, task.compressedSize))
using (var gzip = new GZipStream(inStream, CompressionMode.Decompress))
{
CopyTo(gzip, workerOutput);
//gzip.CopyTo(workerOutput);
}
task.workerBuffer = workerOutput;
task.workerBufferOffset = (int)oldPosition;
task.originalSize = (int)(workerOutput.Position - oldPosition);
workerResults.Add(task);
});
Parallel.RunTasks(workersNum, workerResults, (index, result) =>
{
Buffer.BlockCopy(result.workerBuffer.GetBuffer(), result.workerBufferOffset, outputChunk, result.outputChunkOffset, result.originalSize);
});
output.Write(outputChunk, 0, outputChunkOffset);
}
}
public static void CopyTo(Stream input, Stream output)
{
byte[] buffer = new byte[defaultChunkSize]; // Fairly arbitrary size
int bytesRead;
while ((bytesRead = input.Read(buffer, 0, buffer.Length)) > 0)
{
output.Write(buffer, 0, bytesRead);
}
}
static void Main(string[] args)
{
var inputFilename = "input.jpg";
var outputFilename = $"{inputFilename}.compressed";
var decompressedInputFilename = $"decompressed-{inputFilename}";
Console.WriteLine($"----{Environment.ProcessorCount}");
var stopWatch = new Stopwatch();
stopWatch.Start();
using (var input = new FileStream(inputFilename, FileMode.Open, FileAccess.Read, FileShare.None, defaultChunkSize, FileOptions.SequentialScan))
using (var output = new FileStream(outputFilename, FileMode.Create, FileAccess.Write))
{
Compress(input, output);
}
Console.WriteLine("-----");
using (var input = new FileStream(outputFilename, FileMode.Open, FileAccess.Read))
using (var output = new FileStream(decompressedInputFilename, FileMode.Create, FileAccess.Write))
{
Decompress(input, output);
}
stopWatch.Stop();
TimeSpan ts = stopWatch.Elapsed;
// Format and display the TimeSpan value.
string elapsedTime = String.Format("{0:00}:{1:00}:{2:00}.{3:00}",
ts.Hours, ts.Minutes, ts.Seconds,
ts.Milliseconds / 10);
Console.WriteLine("RunTime " + elapsedTime);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment