Skip to content

Instantly share code, notes, and snippets.

@ReubenBond
Created April 13, 2024 23:07
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ReubenBond/24c008ba5596ff306d688b5992fc7ec3 to your computer and use it in GitHub Desktop.
Save ReubenBond/24c008ba5596ff306d688b5992fc7ec3 to your computer and use it in GitHub Desktop.
Filtered Space-Saving
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
namespace Orleans.Runtime.Placement.Rebalancing;
// Implementation of "Filtered Space-Saving" from "Finding top-k elements in data streams"
// by Nuno Homem & Joao Paulo Carvalho (https://www.hlt.inesc-id.pt/~fmmb/references/misnis.ref0a.pdf).
internal abstract class FrequencyFilter<TKey>(int capacity) where TKey : notnull
{
private int _capacity = capacity;
// A lookup from keys to positions in the counter map.
private readonly Dictionary<ulong, int> _counterSlotMap = [];
// The list of counters, ordered by count.
private readonly List<Counter> _counterList = [];
// An error estimate for a given hash value.
private readonly List<uint> _alphaMap = new(GetAlphaCapacity(capacity));
public int Size => _counterList.Count;
public int Capacity { get => _capacity; set => Resize(value); }
// Gets the top K keys and their count estimates in descending order.
public IEnumerable<(TKey Key, uint CountEstimate)> Counters
{
get
{
foreach (var counter in _counterList)
{
yield return (counter.Key, counter.Count);
}
}
}
public abstract ulong GetHashCode64(in TKey key);
public void Clear()
{
_counterSlotMap.Clear();
_counterList.Clear();
_alphaMap.Clear();
}
private void Resize(int newCapacity)
{
_capacity = newCapacity;
_counterList.Capacity = newCapacity;
_alphaMap.Capacity = GetAlphaCapacity(newCapacity);
_alphaMap.Clear();
}
public void AddOrUpdate(in TKey key)
{
const int Increment = 1;
var longHash = GetHashCode64(key);
// Increase count of a key that is already being tracked.
// There is a minute chance of a hash collision, which is deemed acceptable.
if (_counterSlotMap.TryGetValue(longHash, out var slot))
{
ref var existing = ref CollectionsMarshal.AsSpan(_counterList)[slot];
existing.Count += Increment;
Percolate(ref existing, slot);
return;
}
// Key is not being tracked, but can fit in the top K, so add it.
if (Size < Capacity)
{
AddCounter(new Counter(key, Increment, error: 0), longHash);
return;
}
var min = _counterList[^1];
// Filter out values which are estimated to have appeared less frequently than the minimum.
var alphaMask = _alphaMap.Count - 1;
var hash = longHash.GetHashCode();
var alpha = _alphaMap[hash & alphaMask];
if (alpha + Increment < min.Count)
{
// Increase the count estimate.
_alphaMap[hash & alphaMask] += Increment;
return;
}
// Remove the least frequent element.
var longMinHash = GetHashCode64(min.Key);
_counterSlotMap.Remove(longMinHash);
CollectionsMarshal.SetCount(_counterList, _counterList.Count - 1);
// While evicting minimum element, update its location in the filter sketch with its count to improve
// the chance of it passing the filter in the future.
var minHash = longMinHash.GetHashCode();
_alphaMap[minHash & alphaMask] = min.Count;
// Push the new element in place of the last.
AddCounter(new Counter(key, alpha + Increment, error: alpha), longHash);
}
private void AddCounter(Counter counter, ulong longHash)
{
var slot = _counterList.Count;
_counterList.Add(counter);
_counterSlotMap[longHash] = slot;
Percolate(ref counter, slot);
}
// Bubble the updated counter up until it is at its correct position.
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void Percolate(ref Counter counter, int slot)
{
var counterList = CollectionsMarshal.AsSpan(_counterList);
while (slot > 0)
{
var nextSlot = slot - 1;
var next = counterList[nextSlot];
if (!counter.IsEstimateGreater(next))
{
break;
}
// Swap the elements in the list.
counterList[slot] = next;
counterList[nextSlot] = counter;
// Update the map.
_counterSlotMap[GetHashCode64(next.Key)] = slot;
_counterSlotMap[GetHashCode64(counter.Key)] = nextSlot;
slot = nextSlot;
}
}
private static int GetAlphaCapacity(int capacity)
{
// Suggested constants in the paper "Finding top-k elements in data streams", chap 6. equation (24)
// Round to nearest power of 2 for cheaper binning without modulo
const int AlphaMapElementsPerCounter = 6;
return 1 << (64 - int.LeadingZeroCount(capacity * AlphaMapElementsPerCounter));
}
private struct Counter(TKey key, uint count, uint error)
{
public readonly TKey Key = key;
public uint Count = count;
public uint Error = error;
// Returns true if this counter is estimated to be greater than the other.
public readonly bool IsEstimateGreater(Counter other) => Count > other.Count || Count == other.Count && Error < other.Error;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment