Skip to content

Instantly share code, notes, and snippets.

@ayende
Created December 23, 2013 06:14
Show Gist options
  • Save ayende/8092415 to your computer and use it in GitHub Desktop.
Save ayende/8092415 to your computer and use it in GitHub Desktop.
A single file that implements an external sorter
using System;
using System.CodeDom;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Reflection;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
namespace Rhino.ExternalSorting
{
public class ExternalSorter : IDisposable
{
private readonly string _basePath;
private readonly Comparison<ArraySegment<byte>> _comparison;
private readonly bool _compress;
private readonly List<Stream> _files = new List<Stream>();
private int _bufferPos;
private readonly byte[] _buffer1;
private readonly byte[] _buffer2;
private readonly List<EntryOffset> _offsets1 = new List<EntryOffset>();
private readonly List<EntryOffset> _offsets2 = new List<EntryOffset>();
private byte[] _currentBuffer;
private List<EntryOffset> _currentOffsets;
private Task _flushing = Task.FromResult(1);
private struct EntryOffset
{
public int Offset;
public int BufferSize;
public byte KeyCountSize;
}
public ExternalSorter(string basePath, int bufferSize, Comparison<ArraySegment<byte>> comparison, bool compress)
{
if (bufferSize < 1024 * 1024)
throw new ArgumentException("Buffer size cannot be less than 1MB", "bufferSize");
_basePath = basePath;
if (Directory.Exists(_basePath) == false)
Directory.CreateDirectory(_basePath);
_comparison = comparison;
_compress = compress;
_buffer1 = new byte[bufferSize / 2];
_buffer2 = new byte[bufferSize / 2];
_currentBuffer = _buffer1;
_currentOffsets = _offsets1;
}
public void Add(ArraySegment<byte> key)
{
Add(key.Array, key.Offset, key.Count);
}
public void Add(byte[] key, int keyOffset, int keyCount)
{
if (keyCount + (5 /* max size for 7 bits count can use */) > _currentBuffer.Length - _bufferPos)
{
StartFlushing();
}
var offset = _bufferPos;
var keyCountSize = Set7BitsBuffer(keyCount, _currentBuffer, ref _bufferPos);
Buffer.BlockCopy(key, keyOffset, _currentBuffer, _bufferPos, keyCount);
_bufferPos += keyCount;
_currentOffsets.Add(new EntryOffset
{
Offset = offset,
BufferSize = _bufferPos - offset,
KeyCountSize = keyCountSize
});
}
public IEnumerable<ArraySegment<byte>> ReadAllSorted()
{
SortOffsets(_currentBuffer, _currentOffsets);
var inMemBuffer = _currentOffsets
.Select(entryOffset => new ArraySegment<byte>(_currentBuffer, entryOffset.Offset + entryOffset.KeyCountSize, entryOffset.BufferSize - entryOffset.KeyCountSize));
if (_files.Count == 0)
{
return inMemBuffer;
}
var heap = new Heap<ArraySegment<byte>>(_files.Count + 1, (x, y) => _comparison(x, y));
heap.Enqueue(inMemBuffer);
foreach (var stream in _files)
{
heap.Enqueue(ReadFrom(stream));
}
return ReadAllFrom(heap);
}
private IEnumerable<ArraySegment<byte>> ReadAllFrom(Heap<ArraySegment<byte>> heap)
{
while (heap.Count > 0)
{
var enumerator = heap.Dequeue();
yield return enumerator;
}
}
private IEnumerable<ArraySegment<byte>> ReadFrom(Stream file)
{
file.Position = 0;
var buffer = new byte[0];
var gzip = _compress ? new GZipStream(file, CompressionMode.Decompress, leaveOpen: true) : file;
try
{
using (var br = new MyBinaryReader(gzip))
{
var len = br.Read7BitEncodedInt();
if (len > buffer.Length)
buffer = new byte[NearestPowerOfTwo(len)];
int pos = 0;
while (pos < len)
{
var read = br.Read(buffer, pos, len - pos);
if (read == 0)
throw new EndOfStreamException();
pos += read;
}
yield return new ArraySegment<byte>(buffer, 0, len);
}
}
finally
{
if (_compress)
gzip.Dispose();
}
}
private class MyBinaryReader : BinaryReader
{
public MyBinaryReader(Stream input) : base(input)
{
}
public new int Read7BitEncodedInt()
{
return base.Read7BitEncodedInt();
}
}
private void StartFlushing()
{
_flushing.Wait();
var current = _currentBuffer;
var offsets = _currentOffsets;
if (_currentBuffer == _buffer1)
{
_currentBuffer = _buffer2;
_currentOffsets = _offsets2;
}
else
{
_currentBuffer = _buffer1;
_currentOffsets = _offsets1;
}
_bufferPos = 0;
_currentOffsets.Clear();
_flushing = Task.Run(() => FlushToFile(current, offsets));
}
private long count;
private void FlushToFile(byte[] buffer, List<EntryOffset> offsets)
{
count += offsets.Count;
Console.Write("\rStarting to sort {0:#,#} with {1:#,#} total ", offsets.Count, count);
var sp = Stopwatch.StartNew();
SortOffsets(buffer, offsets);
Console.Write("\rSorted {0:#,#} with {1:#,#} total in {2} ", offsets.Count, count, sp.Elapsed);
var file = new FileStream(Path.Combine(_basePath, _files.Count + ".tmp"),
FileMode.Create,
FileAccess.ReadWrite,
FileShare.ReadWrite,
4096,
FileOptions.DeleteOnClose | FileOptions.SequentialScan
);
_files.Add(file);
var gzip = _compress ? (Stream)new GZipStream(file, CompressionLevel.Optimal, leaveOpen: true) : file;
try
{
foreach (var entryOffset in offsets)
{
gzip.Write(buffer, entryOffset.Offset, entryOffset.BufferSize);
}
}
finally
{
if (_compress)
gzip.Dispose();
}
file.Position = 0;
}
private unsafe void SortOffsets(byte[] buffer, List<EntryOffset> offsets)
{
var parallelSort = new ParallelSort(buffer, _comparison);
parallelSort.QuicksortParallel(offsets);
}
private static byte Set7BitsBuffer(int value, byte[] buffer, ref int offset)
{
byte s = 1;
var num = (uint)value;
while (num >= 128U)
{
buffer[offset++] = ((byte)(num | 128U));
num >>= 7;
s++;
}
buffer[offset++] = (byte)num;
return s;
}
public static int Get7BitEncodedInt( byte[] buffer, ref int pos)
{
int ret = 0;
int shift = 0;
int len;
for (len = 0; len < 5; ++len)
{
byte b = buffer[pos++];
ret = ret | ((b & 0x7f) << shift);
shift += 7;
if ((b & 0x80) == 0)
break;
}
if (len < 5)
return ret;
throw new FormatException("Too many bytes in what should have been a 7 bit encoded Int32.");
}
public void Dispose()
{
foreach (var stream in _files)
{
stream.Dispose();
}
_files.Clear();
}
/// <summary>
/// Fixed size heap that supports duplicates and allow to get the smallest Value
/// from the heap
/// </summary>
private class Heap<T>
{
private readonly Comparison<T> _comparer;
private readonly IEnumerator<T>[] _values;
public int Count { get; private set; }
public Heap(int size, Comparison<T> comparer)
{
_comparer = comparer;
_values = new IEnumerator<T>[size];
}
public void Enqueue(IEnumerable<T> value)
{
var enumerator = value.GetEnumerator();
if (enumerator.MoveNext())
Enqueue(enumerator);
else
enumerator.Dispose();
}
private void Enqueue(IEnumerator<T> value)
{
if (Count >= _values.Length)
throw new InvalidOperationException("Heap full");
var index = Count;
Count++;
_values[index] = value;
BubbleUp(index);
}
private void BubbleUp(int index)
{
while (index > 0)
{
var parentIndex = (index - 1) / 2;
if (_comparer(_values[index].Current, _values[parentIndex].Current) > 0)
break;
var parent = _values[parentIndex];
_values[parentIndex] = _values[index];
_values[index] = parent;
index = parentIndex;
}
}
private void TrickleDown()
{
int index = 0;
var childIndex = (index * 2) + 1;
while (childIndex < Count)
{
if (childIndex + 1 < Count &&
_comparer(_values[childIndex].Current, _values[childIndex + 1].Current) > 0)
{
childIndex++;
}
var tmp = _values[index];
_values[index] = _values[childIndex];
_values[childIndex] = tmp;
index = childIndex;
childIndex = (index * 2) + 1;
}
BubbleUp(index);
}
public T Dequeue()
{
if (Count == 0)
throw new InvalidOperationException("Heap empty");
var enumerator = _values[0];
var val = enumerator.Current;
Count--;
_values[0] = _values[Count];
TrickleDown();
_values[Count] = null;
Enqueue(enumerator);
return val;
}
}
public static long NearestPowerOfTwo(long v)
{
v--;
v |= v >> 1;
v |= v >> 2;
v |= v >> 4;
v |= v >> 8;
v |= v >> 16;
v++;
return v;
}
/// <summary>
/// Parallel quicksort algorithm.
/// </summary>
private unsafe class ParallelSort
{
private readonly byte[] _buffer;
private readonly Comparison<ArraySegment<byte>> _comparison;
private FieldInfo _fieldInfo;
public ParallelSort(byte[] buffer, Comparison<ArraySegment<byte>> comparison)
{
_buffer = buffer;
_comparison = comparison;
_fieldInfo = typeof(List<EntryOffset> ).GetField("_items", BindingFlags.Instance | BindingFlags.NonPublic);
}
public void QuicksortParallel(List<EntryOffset> list)
{
var value = (EntryOffset[]) _fieldInfo.GetValue(list);
QuicksortParallel(value, 0, list.Count - 1);
}
private void QuicksortSequential(EntryOffset[] arr, int left, int right)
{
if (right > left)
{
int pivot = Partition(arr, left, right);
QuicksortSequential(arr, left, pivot - 1);
QuicksortSequential(arr, pivot + 1, right);
}
}
private void QuicksortParallel(EntryOffset[] arr, int left, int right)
{
const int sequentialThreshold = 512;
if (right > left)
{
if (right - left < sequentialThreshold)
{
QuicksortSequential(arr, left, right);
}
else
{
int pivot = Partition(arr, left, right);
var t1 = Task.Run(() => QuicksortParallel(arr, left, pivot - 1));
var t2 = Task.Run(() => QuicksortParallel(arr, pivot + 1, right));
Task.WaitAll(t1, t2);
}
}
}
private void Swap(EntryOffset[] arr, int i, int j)
{
var tmp = arr[i];
arr[i] = arr[j];
arr[j] = tmp;
}
[DllImport("msvcrt.dll", CallingConvention = CallingConvention.Cdecl, SetLastError = false)]
public static extern int memcmp(byte* b1, byte* b2, int count);
private int Partition(EntryOffset[] arr, int low, int high)
{
// Simple partitioning implementation
int pivotPos = (high + low) / 2;
var pivot = arr[pivotPos];
Swap(arr, low, pivotPos);
var pOffset = pivot.Offset;
var pLen = Get7BitEncodedInt(_buffer, ref pOffset);
var pivotArray = new ArraySegment<byte>(_buffer, pOffset, pLen);
int left = low;
for (int i = low + 1; i <= high; i++)
{
var item = arr[i];
var iOffset = item.Offset;
var iLen = Get7BitEncodedInt(_buffer, ref iOffset);
//var result = memcmp(_bufferp + iOffset, _bufferp + pOffset, Math.Min(iLen, pLen));
//if (result == 0)
// result = pLen - iLen;
var x = new ArraySegment<byte>(_buffer, iOffset, iLen);
if (_comparison(x, pivotArray) < 0)
{
left++;
Swap(arr, i, left);
}
}
Swap(arr, low, left);
return left;
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment