Created
December 23, 2013 06:14
-
-
Save ayende/8092415 to your computer and use it in GitHub Desktop.
A single file that implements an external sorter
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.CodeDom; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.IO; | |
using System.IO.Compression; | |
using System.Linq; | |
using System.Reflection; | |
using System.Runtime.InteropServices; | |
using System.Threading.Tasks; | |
namespace Rhino.ExternalSorting | |
{ | |
public class ExternalSorter : IDisposable | |
{ | |
private readonly string _basePath; | |
private readonly Comparison<ArraySegment<byte>> _comparison; | |
private readonly bool _compress; | |
private readonly List<Stream> _files = new List<Stream>(); | |
private int _bufferPos; | |
private readonly byte[] _buffer1; | |
private readonly byte[] _buffer2; | |
private readonly List<EntryOffset> _offsets1 = new List<EntryOffset>(); | |
private readonly List<EntryOffset> _offsets2 = new List<EntryOffset>(); | |
private byte[] _currentBuffer; | |
private List<EntryOffset> _currentOffsets; | |
private Task _flushing = Task.FromResult(1); | |
private struct EntryOffset | |
{ | |
public int Offset; | |
public int BufferSize; | |
public byte KeyCountSize; | |
} | |
public ExternalSorter(string basePath, int bufferSize, Comparison<ArraySegment<byte>> comparison, bool compress) | |
{ | |
if (bufferSize < 1024 * 1024) | |
throw new ArgumentException("Buffer size cannot be less than 1MB", "bufferSize"); | |
_basePath = basePath; | |
if (Directory.Exists(_basePath) == false) | |
Directory.CreateDirectory(_basePath); | |
_comparison = comparison; | |
_compress = compress; | |
_buffer1 = new byte[bufferSize / 2]; | |
_buffer2 = new byte[bufferSize / 2]; | |
_currentBuffer = _buffer1; | |
_currentOffsets = _offsets1; | |
} | |
public void Add(ArraySegment<byte> key) | |
{ | |
Add(key.Array, key.Offset, key.Count); | |
} | |
public void Add(byte[] key, int keyOffset, int keyCount) | |
{ | |
if (keyCount + (5 /* max size for 7 bits count can use */) > _currentBuffer.Length - _bufferPos) | |
{ | |
StartFlushing(); | |
} | |
var offset = _bufferPos; | |
var keyCountSize = Set7BitsBuffer(keyCount, _currentBuffer, ref _bufferPos); | |
Buffer.BlockCopy(key, keyOffset, _currentBuffer, _bufferPos, keyCount); | |
_bufferPos += keyCount; | |
_currentOffsets.Add(new EntryOffset | |
{ | |
Offset = offset, | |
BufferSize = _bufferPos - offset, | |
KeyCountSize = keyCountSize | |
}); | |
} | |
public IEnumerable<ArraySegment<byte>> ReadAllSorted() | |
{ | |
SortOffsets(_currentBuffer, _currentOffsets); | |
var inMemBuffer = _currentOffsets | |
.Select(entryOffset => new ArraySegment<byte>(_currentBuffer, entryOffset.Offset + entryOffset.KeyCountSize, entryOffset.BufferSize - entryOffset.KeyCountSize)); | |
if (_files.Count == 0) | |
{ | |
return inMemBuffer; | |
} | |
var heap = new Heap<ArraySegment<byte>>(_files.Count + 1, (x, y) => _comparison(x, y)); | |
heap.Enqueue(inMemBuffer); | |
foreach (var stream in _files) | |
{ | |
heap.Enqueue(ReadFrom(stream)); | |
} | |
return ReadAllFrom(heap); | |
} | |
private IEnumerable<ArraySegment<byte>> ReadAllFrom(Heap<ArraySegment<byte>> heap) | |
{ | |
while (heap.Count > 0) | |
{ | |
var enumerator = heap.Dequeue(); | |
yield return enumerator; | |
} | |
} | |
private IEnumerable<ArraySegment<byte>> ReadFrom(Stream file) | |
{ | |
file.Position = 0; | |
var buffer = new byte[0]; | |
var gzip = _compress ? new GZipStream(file, CompressionMode.Decompress, leaveOpen: true) : file; | |
try | |
{ | |
using (var br = new MyBinaryReader(gzip)) | |
{ | |
var len = br.Read7BitEncodedInt(); | |
if (len > buffer.Length) | |
buffer = new byte[NearestPowerOfTwo(len)]; | |
int pos = 0; | |
while (pos < len) | |
{ | |
var read = br.Read(buffer, pos, len - pos); | |
if (read == 0) | |
throw new EndOfStreamException(); | |
pos += read; | |
} | |
yield return new ArraySegment<byte>(buffer, 0, len); | |
} | |
} | |
finally | |
{ | |
if (_compress) | |
gzip.Dispose(); | |
} | |
} | |
private class MyBinaryReader : BinaryReader | |
{ | |
public MyBinaryReader(Stream input) : base(input) | |
{ | |
} | |
public new int Read7BitEncodedInt() | |
{ | |
return base.Read7BitEncodedInt(); | |
} | |
} | |
private void StartFlushing() | |
{ | |
_flushing.Wait(); | |
var current = _currentBuffer; | |
var offsets = _currentOffsets; | |
if (_currentBuffer == _buffer1) | |
{ | |
_currentBuffer = _buffer2; | |
_currentOffsets = _offsets2; | |
} | |
else | |
{ | |
_currentBuffer = _buffer1; | |
_currentOffsets = _offsets1; | |
} | |
_bufferPos = 0; | |
_currentOffsets.Clear(); | |
_flushing = Task.Run(() => FlushToFile(current, offsets)); | |
} | |
private long count; | |
private void FlushToFile(byte[] buffer, List<EntryOffset> offsets) | |
{ | |
count += offsets.Count; | |
Console.Write("\rStarting to sort {0:#,#} with {1:#,#} total ", offsets.Count, count); | |
var sp = Stopwatch.StartNew(); | |
SortOffsets(buffer, offsets); | |
Console.Write("\rSorted {0:#,#} with {1:#,#} total in {2} ", offsets.Count, count, sp.Elapsed); | |
var file = new FileStream(Path.Combine(_basePath, _files.Count + ".tmp"), | |
FileMode.Create, | |
FileAccess.ReadWrite, | |
FileShare.ReadWrite, | |
4096, | |
FileOptions.DeleteOnClose | FileOptions.SequentialScan | |
); | |
_files.Add(file); | |
var gzip = _compress ? (Stream)new GZipStream(file, CompressionLevel.Optimal, leaveOpen: true) : file; | |
try | |
{ | |
foreach (var entryOffset in offsets) | |
{ | |
gzip.Write(buffer, entryOffset.Offset, entryOffset.BufferSize); | |
} | |
} | |
finally | |
{ | |
if (_compress) | |
gzip.Dispose(); | |
} | |
file.Position = 0; | |
} | |
private unsafe void SortOffsets(byte[] buffer, List<EntryOffset> offsets) | |
{ | |
var parallelSort = new ParallelSort(buffer, _comparison); | |
parallelSort.QuicksortParallel(offsets); | |
} | |
private static byte Set7BitsBuffer(int value, byte[] buffer, ref int offset) | |
{ | |
byte s = 1; | |
var num = (uint)value; | |
while (num >= 128U) | |
{ | |
buffer[offset++] = ((byte)(num | 128U)); | |
num >>= 7; | |
s++; | |
} | |
buffer[offset++] = (byte)num; | |
return s; | |
} | |
public static int Get7BitEncodedInt( byte[] buffer, ref int pos) | |
{ | |
int ret = 0; | |
int shift = 0; | |
int len; | |
for (len = 0; len < 5; ++len) | |
{ | |
byte b = buffer[pos++]; | |
ret = ret | ((b & 0x7f) << shift); | |
shift += 7; | |
if ((b & 0x80) == 0) | |
break; | |
} | |
if (len < 5) | |
return ret; | |
throw new FormatException("Too many bytes in what should have been a 7 bit encoded Int32."); | |
} | |
public void Dispose() | |
{ | |
foreach (var stream in _files) | |
{ | |
stream.Dispose(); | |
} | |
_files.Clear(); | |
} | |
/// <summary> | |
/// Fixed size heap that supports duplicates and allow to get the smallest Value | |
/// from the heap | |
/// </summary> | |
private class Heap<T> | |
{ | |
private readonly Comparison<T> _comparer; | |
private readonly IEnumerator<T>[] _values; | |
public int Count { get; private set; } | |
public Heap(int size, Comparison<T> comparer) | |
{ | |
_comparer = comparer; | |
_values = new IEnumerator<T>[size]; | |
} | |
public void Enqueue(IEnumerable<T> value) | |
{ | |
var enumerator = value.GetEnumerator(); | |
if (enumerator.MoveNext()) | |
Enqueue(enumerator); | |
else | |
enumerator.Dispose(); | |
} | |
private void Enqueue(IEnumerator<T> value) | |
{ | |
if (Count >= _values.Length) | |
throw new InvalidOperationException("Heap full"); | |
var index = Count; | |
Count++; | |
_values[index] = value; | |
BubbleUp(index); | |
} | |
private void BubbleUp(int index) | |
{ | |
while (index > 0) | |
{ | |
var parentIndex = (index - 1) / 2; | |
if (_comparer(_values[index].Current, _values[parentIndex].Current) > 0) | |
break; | |
var parent = _values[parentIndex]; | |
_values[parentIndex] = _values[index]; | |
_values[index] = parent; | |
index = parentIndex; | |
} | |
} | |
private void TrickleDown() | |
{ | |
int index = 0; | |
var childIndex = (index * 2) + 1; | |
while (childIndex < Count) | |
{ | |
if (childIndex + 1 < Count && | |
_comparer(_values[childIndex].Current, _values[childIndex + 1].Current) > 0) | |
{ | |
childIndex++; | |
} | |
var tmp = _values[index]; | |
_values[index] = _values[childIndex]; | |
_values[childIndex] = tmp; | |
index = childIndex; | |
childIndex = (index * 2) + 1; | |
} | |
BubbleUp(index); | |
} | |
public T Dequeue() | |
{ | |
if (Count == 0) | |
throw new InvalidOperationException("Heap empty"); | |
var enumerator = _values[0]; | |
var val = enumerator.Current; | |
Count--; | |
_values[0] = _values[Count]; | |
TrickleDown(); | |
_values[Count] = null; | |
Enqueue(enumerator); | |
return val; | |
} | |
} | |
public static long NearestPowerOfTwo(long v) | |
{ | |
v--; | |
v |= v >> 1; | |
v |= v >> 2; | |
v |= v >> 4; | |
v |= v >> 8; | |
v |= v >> 16; | |
v++; | |
return v; | |
} | |
/// <summary> | |
/// Parallel quicksort algorithm. | |
/// </summary> | |
private unsafe class ParallelSort | |
{ | |
private readonly byte[] _buffer; | |
private readonly Comparison<ArraySegment<byte>> _comparison; | |
private FieldInfo _fieldInfo; | |
public ParallelSort(byte[] buffer, Comparison<ArraySegment<byte>> comparison) | |
{ | |
_buffer = buffer; | |
_comparison = comparison; | |
_fieldInfo = typeof(List<EntryOffset> ).GetField("_items", BindingFlags.Instance | BindingFlags.NonPublic); | |
} | |
public void QuicksortParallel(List<EntryOffset> list) | |
{ | |
var value = (EntryOffset[]) _fieldInfo.GetValue(list); | |
QuicksortParallel(value, 0, list.Count - 1); | |
} | |
private void QuicksortSequential(EntryOffset[] arr, int left, int right) | |
{ | |
if (right > left) | |
{ | |
int pivot = Partition(arr, left, right); | |
QuicksortSequential(arr, left, pivot - 1); | |
QuicksortSequential(arr, pivot + 1, right); | |
} | |
} | |
private void QuicksortParallel(EntryOffset[] arr, int left, int right) | |
{ | |
const int sequentialThreshold = 512; | |
if (right > left) | |
{ | |
if (right - left < sequentialThreshold) | |
{ | |
QuicksortSequential(arr, left, right); | |
} | |
else | |
{ | |
int pivot = Partition(arr, left, right); | |
var t1 = Task.Run(() => QuicksortParallel(arr, left, pivot - 1)); | |
var t2 = Task.Run(() => QuicksortParallel(arr, pivot + 1, right)); | |
Task.WaitAll(t1, t2); | |
} | |
} | |
} | |
private void Swap(EntryOffset[] arr, int i, int j) | |
{ | |
var tmp = arr[i]; | |
arr[i] = arr[j]; | |
arr[j] = tmp; | |
} | |
[DllImport("msvcrt.dll", CallingConvention = CallingConvention.Cdecl, SetLastError = false)] | |
public static extern int memcmp(byte* b1, byte* b2, int count); | |
private int Partition(EntryOffset[] arr, int low, int high) | |
{ | |
// Simple partitioning implementation | |
int pivotPos = (high + low) / 2; | |
var pivot = arr[pivotPos]; | |
Swap(arr, low, pivotPos); | |
var pOffset = pivot.Offset; | |
var pLen = Get7BitEncodedInt(_buffer, ref pOffset); | |
var pivotArray = new ArraySegment<byte>(_buffer, pOffset, pLen); | |
int left = low; | |
for (int i = low + 1; i <= high; i++) | |
{ | |
var item = arr[i]; | |
var iOffset = item.Offset; | |
var iLen = Get7BitEncodedInt(_buffer, ref iOffset); | |
//var result = memcmp(_bufferp + iOffset, _bufferp + pOffset, Math.Min(iLen, pLen)); | |
//if (result == 0) | |
// result = pLen - iLen; | |
var x = new ArraySegment<byte>(_buffer, iOffset, iLen); | |
if (_comparison(x, pivotArray) < 0) | |
{ | |
left++; | |
Swap(arr, i, left); | |
} | |
} | |
Swap(arr, low, left); | |
return left; | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment