Skip to content

Instantly share code, notes, and snippets.

@glampert
Created May 29, 2017 02:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save glampert/2c462bcc77d326526787708c0f2cceff to your computer and use it in GitHub Desktop.
Save glampert/2c462bcc77d326526787708c0f2cceff to your computer and use it in GitHub Desktop.
Fixed-size concurrent/lockless hash table for integer types using C++11 atomics.
// License:
// Public Domain. Feel free to copy, distribute, and modify this file as you see fit.
#include <cassert>
#include <cstddef>
#include <cstdint>
#include <cstdio>
#include <array>
#include <atomic>
#include <thread> // for the tests
// ----------------------------------------------------------------------------
// https://github.com/aappleby/smhasher/blob/master/src/MurmurHash3.cpp
struct MurmurHash3FMix32
{
std::size_t operator()(std::uint32_t k) const
{
k ^= k >> 16;
k *= 0x85ebca6b;
k ^= k >> 13;
k *= 0xc2b2ae35;
k ^= k >> 16;
return std::size_t(k);
}
};
struct MurmurHash3FMix64
{
std::size_t operator()(std::uint64_t k) const
{
k ^= k >> 33;
k *= UINT64_C(0xff51afd7ed558ccd);
k ^= k >> 33;
k *= UINT64_C(0xc4ceb9fe1a85ec53);
k ^= k >> 33;
return std::size_t(k);
}
};
// ----------------------------------------------------------------------------
// http://preshing.com/20130605/the-worlds-simplest-lock-free-hash-table/
// Hash table optimized for integer types. Uses Standard C++ atomics
// to ensure concurrent insertion, removal and lookup are possible
// without the need to acquire locks. Uses Open Addressing with
// Linear Probing to resolve key collisions. A special sentinel key
// must be reserved to denote unused keys. If using an integer
// type for the key it defaults to zero but any constant can
// be specified as the last template argument. Table size is
// fixed at compile time and does not grow.
//
// Based on "The World's Simplest Lock-Free Hash Table", by Jeff Preshing.
template
<
typename Key,
typename Value,
typename HashFunc,
const std::size_t TableSize,
const Key InvalidKeyConstant = Key{}
>
class LocklessHashTable final
{
public:
static_assert((TableSize & (TableSize - 1)) == 0, "Table size must be a power of two!");
struct Pair
{
std::atomic<Key> key;
std::atomic<Value> value;
Pair() : key{ InvalidKeyConstant }, value{}
{ }
bool isValid() const
{
return key.load(std::memory_order_relaxed) != InvalidKeyConstant;
}
};
using Buckets = std::array<Pair, TableSize>;
public:
LocklessHashTable() : m_usedCount{ 0 }
{ }
bool insert(const Key key, Value value)
{
assert(key != InvalidKeyConstant);
int bucketsProbed = 0;
for (std::size_t bucket = HashFunc{}(key);; ++bucket)
{
bucket &= (TableSize - 1);
if (bucketsProbed++ == TableSize)
{
return false; // Table is full!
}
// Load the key that was there.
const Key probedKey = m_buckets[bucket].key.load(std::memory_order_relaxed);
if (probedKey != key) // The entry was either free, or contains another key.
{
if (probedKey != InvalidKeyConstant)
{
continue; // Usually, it contains another key. Keep probing.
}
// The bucket was free. Now let's try to take it using a CAS.
Key expectedKey = InvalidKeyConstant;
const bool success = m_buckets[bucket].key.compare_exchange_weak(expectedKey, key,
std::memory_order_release, std::memory_order_relaxed);
if (!success && (expectedKey != InvalidKeyConstant) && (expectedKey != key))
{
continue; // Another thread just stole it.
}
// Either we just added the key, or another thread did.
// In which case we can overwrite the value but assume the
// other thread has bumped the count if we observed the same key.
if (expectedKey != key)
{
++m_usedCount;
assert(m_usedCount <= TableSize);
}
}
m_buckets[bucket].value.store(value, std::memory_order_relaxed);
return true; // Value inserted or overwritten.
}
}
bool remove(const Key key)
{
assert(key != InvalidKeyConstant);
int bucketsProbed = 0;
for (std::size_t bucket = HashFunc{}(key);; ++bucket)
{
bucket &= (TableSize - 1);
if (bucketsProbed++ == TableSize)
{
return false; // Probed the whole table without finding the key.
}
Key probedKey = m_buckets[bucket].key.load(std::memory_order_relaxed);
if (probedKey == key)
{
if (!m_buckets[bucket].key.compare_exchange_strong(probedKey, InvalidKeyConstant,
std::memory_order_release, std::memory_order_relaxed))
{
return false; // Another thread has already removed the key and reused the bucket.
}
// Try to clear the value of the removed key:
Value oldValue = m_buckets[bucket].value.load(std::memory_order_relaxed);
if (oldValue != Value{})
{
m_buckets[bucket].value.compare_exchange_weak(oldValue, Value{},
std::memory_order_release, std::memory_order_relaxed);
}
--m_usedCount;
assert(m_usedCount >= 0);
return true;
}
}
}
bool access(const Key key, Value * outValue = nullptr) const
{
assert(key != InvalidKeyConstant);
int bucketsProbed = 0;
for (std::size_t bucket = HashFunc{}(key);; ++bucket)
{
bucket &= (TableSize - 1);
if (bucketsProbed++ == TableSize)
{
return false; // Probed the whole table without finding the key.
}
const Key probedKey = m_buckets[bucket].key.load(std::memory_order_relaxed);
if (probedKey == key)
{
if (outValue != nullptr)
{
*outValue = m_buckets[bucket].value.load(std::memory_order_relaxed);
}
return true;
}
if (probedKey == InvalidKeyConstant)
{
return false;
}
}
}
void clear() // This operation is not thread safe!
{
m_usedCount.store(0, std::memory_order_relaxed);
for (Pair & p : m_buckets)
{
p.key.store(InvalidKeyConstant, std::memory_order_relaxed);
p.value.store(Value{}, std::memory_order_relaxed);
}
}
int size() const
{
return m_usedCount.load(std::memory_order_relaxed);
}
bool isEmpty() const
{
return size() == 0;
}
const Buckets & buckets() const
{
return m_buckets;
}
private:
std::atomic<int> m_usedCount;
Buckets m_buckets;
};
// ----------------------------------------------------------------------------
static void tests()
{
// Single threaded usage:
{
LocklessHashTable<std::uint32_t, std::uint32_t, MurmurHash3FMix32, 16> ht;
ht.insert(1, 42);
ht.insert(2, 1337);
std::uint32_t one, two;
ht.access(1, &one);
ht.access(2, &two);
assert(one == 42);
assert(two == 1337);
assert(ht.size() == 2);
assert(ht.remove(1) == true);
assert(ht.remove(2) == true);
assert(ht.remove(76) == false);
assert(ht.size() == 0);
}
// Concurrent insertions:
{
LocklessHashTable<std::uint64_t, std::uint64_t, MurmurHash3FMix64, 1024> ht;
unsigned N1 = 0;
std::thread inserterThread1{
[&ht, &N1]() {
for(unsigned i = 1; i < 512; ++i)
{
ht.insert(i, i);
N1 += i;
if (i & 1) std::this_thread::yield();
}
}
};
unsigned N2 = 0;
std::thread inserterThread2{
[&ht, &N2]() {
for(unsigned i = 512; i <= 1024; ++i)
{
ht.insert(i, i);
N2 += i;
if (i & 1) std::this_thread::yield();
}
}
};
inserterThread1.join();
inserterThread2.join();
assert(ht.size() == 1024);
std::uint64_t accK = 0, accV = 0;
for (const auto & b : ht.buckets())
{
auto k = b.key.load(std::memory_order_relaxed);
auto v = b.value.load(std::memory_order_relaxed);
assert(k >= 1 && k <= 1024);
assert(v >= 1 && v <= 1024);
accK += k;
accV += v;
}
assert(accK == (N1 + N2));
assert(accV == (N1 + N2));
}
// One thread inserting the other accessing:
{
LocklessHashTable<std::uint32_t, std::uint64_t, MurmurHash3FMix32, 1024> ht;
std::thread inserterThread{
[&ht]() {
for (unsigned i = 1; i <= 512; ++i)
{
ht.insert(i, i);
if (i & 1) std::this_thread::yield();
}
}
};
std::thread accessingThread{
[&ht]() {
constexpr unsigned expectedCount = 131328; // sum(1,512)
std::uint64_t count = 0;
std::array<bool, 512> keysCounted = {};
while (count < expectedCount)
{
for (unsigned i = 1; i <= 512; ++i)
{
if (!keysCounted[i - 1])
{
std::uint64_t v = 0;
const bool gotKey = ht.access(i, &v);
assert(v == 0 || (v >= 1 && v <= 512));
if (gotKey && v != 0)
{
keysCounted[i - 1] = true;
count += v;
}
}
}
}
assert(count == expectedCount);
}
};
inserterThread.join();
accessingThread.join();
assert(ht.size() == 512);
}
// Concurrent inserts + removes:
{
const std::array<int, 10> items = {{ 42, 33, 66, 123, 678, 255, 1024, 1, 9, 1337 }}; // to be inserted/removed
LocklessHashTable<int, int, MurmurHash3FMix32, 32> ht;
std::thread inserterThread{
[&ht, &items]() {
for (int i : items)
{
ht.insert(i, i);
if (i & 1) std::this_thread::yield();
}
}
};
std::thread removerThread{
[&ht, &items]() {
unsigned numRemoved = 0;
while (numRemoved < items.size())
{
for (int i : items)
{
if (ht.remove(i)) numRemoved++;
}
}
assert(numRemoved == items.size());
assert(ht.size() == 0);
}
};
inserterThread.join();
removerThread.join();
assert(ht.isEmpty());
}
}
// ----------------------------------------------------------------------------
int main()
{
// Run several test iterations to exercise thread scheduling differences.
for (int iterations = 0; iterations < 512; ++iterations)
{
tests();
}
std::printf("LocklessHashTable tests completed successfully.\n");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment