Created
May 31, 2018 23:56
-
-
Save Butjok/144785e6ea75556b276b0c79bbee167e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.IO; | |
using System.IO.Compression; | |
using System.Linq; | |
using System.Runtime.Serialization.Formatters.Binary; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
// https://www.microsoft.com/en-us/research/wp-content/uploads/2004/12/tr-2004-136.pdf | |
namespace GZipTest | |
{ | |
class Program | |
{ | |
const int KB = 1024; | |
const int MB = KB * KB; | |
const int defaultChunkSize = 128 * MB; | |
const int blockSize = MB; | |
[Serializable] | |
public class BlockMetaData | |
{ | |
public int originalSize; | |
public int compressedSize; | |
} | |
[Serializable] | |
public class ArchiveMetaData | |
{ | |
public List<BlockMetaData> blocks; | |
} | |
class BlockTask | |
{ | |
public int originalSize; | |
public int compressedSize; | |
[NonSerialized] | |
public int inputChunkOffset; | |
[NonSerialized] | |
public MemoryStream workerBuffer; | |
[NonSerialized] | |
public int workerBufferOffset; | |
[NonSerialized] | |
public int outputChunkOffset; | |
} | |
static public ArchiveMetaData Compress(Stream input, Stream output) | |
{ | |
byte[] inputChunk = new byte[defaultChunkSize]; | |
var compressionTasks = new ThreadedQueue<BlockTask>(); | |
var writeTasks = new ThreadedQueue<BlockTask>(); | |
var workersNum = Environment.ProcessorCount; | |
var workerOutputs = new MemoryStream[workersNum]; | |
for (var i = 0; i < workersNum; i++) | |
{ | |
workerOutputs[i] = new MemoryStream(); | |
} | |
var metaData = new ArchiveMetaData | |
{ | |
blocks = new List<BlockMetaData>(), | |
}; | |
using (var outputChunk = new MemoryStream()) | |
{ | |
while (input.Position < input.Length) | |
{ | |
// Enqueue compression tasks | |
var chunkSize = input.Read(inputChunk, 0, inputChunk.Length); | |
for (int offset = 0, blockIndex = 0; offset < chunkSize; offset += blockSize, blockIndex++) | |
{ | |
var block = new BlockTask | |
{ | |
inputChunkOffset = offset, | |
originalSize = Math.Min(blockSize, chunkSize - offset) | |
}; | |
compressionTasks.Add(block); | |
} | |
// Compress! | |
foreach (var workerOutput in workerOutputs) | |
{ | |
workerOutput.Position = 0; | |
} | |
Parallel.RunTasks(workersNum, compressionTasks, (index, task) => | |
{ | |
var workerOutput = workerOutputs[index]; | |
var oldPosition = workerOutput.Position; | |
using (var gzip = new GZipStream(workerOutput, CompressionMode.Compress, true)) | |
{ | |
gzip.Write(inputChunk, task.inputChunkOffset, task.originalSize); | |
} | |
task.workerBuffer = workerOutput; | |
task.workerBufferOffset = (int)oldPosition; | |
task.compressedSize = (int)(workerOutput.Position - oldPosition); | |
writeTasks.Add(task); | |
}); | |
// Sort blocks in correct order, write them sequentially into memory buffer | |
var outputChunkOffset = 0; | |
foreach (var result in writeTasks.OrderBy(result => result.inputChunkOffset)) | |
{ | |
result.outputChunkOffset = outputChunkOffset; | |
metaData.blocks.Add(new BlockMetaData | |
{ | |
originalSize = result.originalSize, | |
compressedSize = result.compressedSize | |
}); | |
outputChunkOffset += result.compressedSize; | |
} | |
outputChunk.SetLength(outputChunkOffset); | |
Parallel.RunTasks(workersNum, writeTasks, (index, result) => | |
{ | |
Buffer.BlockCopy(result.workerBuffer.GetBuffer(), result.workerBufferOffset, outputChunk.GetBuffer(), result.outputChunkOffset, result.compressedSize); | |
}); | |
output.Write(outputChunk.GetBuffer(), 0, outputChunkOffset); | |
} | |
} | |
// Write meta data | |
var metaPosition = output.Position; | |
using (var gzip=new GZipStream(output, CompressionMode.Compress, true)) | |
{ | |
new BinaryFormatter().Serialize(gzip, metaData); | |
} | |
new BinaryWriter(output).Write(metaPosition); | |
// Truncate output file | |
output.SetLength(output.Position); | |
for (var i = 0; i < workersNum; i++) | |
{ | |
workerOutputs[i].Dispose(); | |
} | |
return metaData; | |
} | |
static public void Decompress(Stream input, Stream output) | |
{ | |
var taskQueue = new ThreadedQueue<BlockTask>(); | |
var workerResults = new ThreadedQueue<BlockTask>(); | |
// Read archive meta data | |
input.Seek(-sizeof(long), SeekOrigin.End); | |
var metaDataPosition = new BinaryReader(input).ReadInt64(); | |
input.Seek(metaDataPosition, SeekOrigin.Begin); | |
ArchiveMetaData meta; | |
using (var gzip=new GZipStream(input, CompressionMode.Decompress, true)) { | |
meta = (ArchiveMetaData)new BinaryFormatter().Deserialize(gzip); | |
} | |
input.Seek(0, SeekOrigin.Begin); | |
byte[] inputChunk; | |
byte[] outputChunk; | |
{ | |
var inputChunkSize = defaultChunkSize; | |
var outputChunkSize = defaultChunkSize; | |
long originalFileSize = 0; | |
foreach (var block in meta.blocks) | |
{ | |
inputChunkSize = Math.Max(inputChunkSize, block.compressedSize); | |
outputChunkSize = Math.Max(inputChunkSize, block.originalSize); | |
originalFileSize += block.originalSize; | |
} | |
inputChunk = new byte[inputChunkSize]; | |
outputChunk = new byte[outputChunkSize]; | |
output.SetLength(originalFileSize); | |
} | |
var workersNum = Environment.ProcessorCount; | |
var workerOutputs = new MemoryStream[workersNum]; | |
for (var i = 0; i < workersNum; i++) | |
{ | |
workerOutputs[i] = new MemoryStream(outputChunk.Length); | |
} | |
for (var totalBlocksRead = 0; input.Position < metaDataPosition; /* nothing */) | |
{ | |
// Read chunk | |
var inputChunkOffset = 0; | |
var outputChunkOffset = 0; | |
while (totalBlocksRead < meta.blocks.Count | |
&& inputChunkOffset + meta.blocks[totalBlocksRead].compressedSize <= inputChunk.Length | |
&& outputChunkOffset + meta.blocks[totalBlocksRead].originalSize <= outputChunk.Length) | |
{ | |
var blockMetaData = meta.blocks[totalBlocksRead++]; | |
taskQueue.Add(new BlockTask | |
{ | |
originalSize = blockMetaData.originalSize, | |
compressedSize = blockMetaData.compressedSize, | |
outputChunkOffset = outputChunkOffset, | |
inputChunkOffset = inputChunkOffset | |
}); | |
inputChunkOffset += blockMetaData.compressedSize; | |
outputChunkOffset += blockMetaData.originalSize; | |
} | |
input.Read(inputChunk, 0, inputChunkOffset); | |
// Decompress! | |
foreach (var workerOutput in workerOutputs) | |
{ | |
workerOutput.Position = 0; | |
} | |
Parallel.RunTasks(workersNum, taskQueue, (index, task) => | |
{ | |
var workerOutput = workerOutputs[index]; | |
var oldPosition = workerOutput.Position; | |
//workerOutput.Write(inputChunk, task.inputChunkOffset, task.compressedSize); | |
using (var inStream = new MemoryStream(inputChunk, task.inputChunkOffset, task.compressedSize)) | |
using (var gzip = new GZipStream(inStream, CompressionMode.Decompress)) | |
{ | |
CopyTo(gzip, workerOutput); | |
//gzip.CopyTo(workerOutput); | |
} | |
task.workerBuffer = workerOutput; | |
task.workerBufferOffset = (int)oldPosition; | |
task.originalSize = (int)(workerOutput.Position - oldPosition); | |
workerResults.Add(task); | |
}); | |
Parallel.RunTasks(workersNum, workerResults, (index, result) => | |
{ | |
Buffer.BlockCopy(result.workerBuffer.GetBuffer(), result.workerBufferOffset, outputChunk, result.outputChunkOffset, result.originalSize); | |
}); | |
output.Write(outputChunk, 0, outputChunkOffset); | |
} | |
} | |
public static void CopyTo(Stream input, Stream output) | |
{ | |
byte[] buffer = new byte[defaultChunkSize]; // Fairly arbitrary size | |
int bytesRead; | |
while ((bytesRead = input.Read(buffer, 0, buffer.Length)) > 0) | |
{ | |
output.Write(buffer, 0, bytesRead); | |
} | |
} | |
static void Main(string[] args) | |
{ | |
var inputFilename = "input.jpg"; | |
var outputFilename = $"{inputFilename}.compressed"; | |
var decompressedInputFilename = $"decompressed-{inputFilename}"; | |
Console.WriteLine($"----{Environment.ProcessorCount}"); | |
var stopWatch = new Stopwatch(); | |
stopWatch.Start(); | |
using (var input = new FileStream(inputFilename, FileMode.Open, FileAccess.Read, FileShare.None, defaultChunkSize, FileOptions.SequentialScan)) | |
using (var output = new FileStream(outputFilename, FileMode.Create, FileAccess.Write)) | |
{ | |
Compress(input, output); | |
} | |
Console.WriteLine("-----"); | |
using (var input = new FileStream(outputFilename, FileMode.Open, FileAccess.Read)) | |
using (var output = new FileStream(decompressedInputFilename, FileMode.Create, FileAccess.Write)) | |
{ | |
Decompress(input, output); | |
} | |
stopWatch.Stop(); | |
TimeSpan ts = stopWatch.Elapsed; | |
// Format and display the TimeSpan value. | |
string elapsedTime = String.Format("{0:00}:{1:00}:{2:00}.{3:00}", | |
ts.Hours, ts.Minutes, ts.Seconds, | |
ts.Milliseconds / 10); | |
Console.WriteLine("RunTime " + elapsedTime); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment