Skip to content

Instantly share code, notes, and snippets.

@oktal
Last active July 29, 2020 19:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save oktal/8d56812dccd434e8a0c5bb49b05340bc to your computer and use it in GitHub Desktop.
Save oktal/8d56812dccd434e8a0c5bb49b05340bc to your computer and use it in GitHub Desktop.
Low Latency Logging
#include <algorithm>
#include <array>
#include <atomic>
#include <condition_variable>
#include <cstring>
#include <deque>
#include <functional>
#include <iostream>
#include <random>
#include <sstream>
#include <thread>
class LogBufferBase;
class LogBufferView
{
public:
LogBufferView(const LogBufferBase& buffer)
: buffer{ buffer }
{}
const char* read(size_t index) const;
template<typename T>
T readAs(size_t index) const
{
return *reinterpret_cast<const T*>(read(index));
}
template<typename T>
const T * overlayAs(size_t index) const
{
return reinterpret_cast<const T*>(read(index));
}
private:
const LogBufferBase& buffer;
};
template<typename T>
struct Offset
{
Offset()
: offset{ 0 }
{}
explicit Offset(uint16_t offset)
: offset{ offset }
{}
T get(LogBufferView buffer) const
{
return buffer.readAs<T>(offset);
}
private:
uint16_t offset;
};
namespace tag
{
struct String {};
struct Function {};
}
template<>
struct Offset<tag::String>
{
Offset()
: offset{ 0 }
{}
explicit Offset(uint16_t offset)
: offset{ offset }
{}
std::string get(LogBufferView buffer) const
{
auto size = buffer.readAs<uint16_t>(offset);
const char* data = buffer.read(offset + sizeof(uint16_t));
return std::string(data, size);
}
const char* data(LogBufferView buffer) const
{
return buffer.read(offset + sizeof(uint16_t));
}
size_t size(LogBufferView buffer) const
{
return buffer.readAs<uint16_t>(offset);
}
private:
uint16_t offset;
};
template<>
struct Offset<tag::Function>
{
Offset()
: offset{ 0 }
{}
explicit Offset(uint16_t offset)
: offset{ offset }
{}
template<typename FuncType, typename... Args>
auto invoke(LogBufferView buffer, Args&& ...args) const
{
auto* func = buffer.overlayAs<FuncType>(offset);
std::invoke(*func, std::forward<Args>(args)...);
}
private:
uint16_t offset;
};
using StringOffset = Offset<tag::String>;
using FunctionOffset = Offset<tag::Function>;
class LogBufferBase
{
public:
friend class LogBufferView;
virtual ~LogBufferBase() = default;
template<typename T>
Offset<T> write(T value)
{
return offsetAt<T>(encode(value));
}
template<size_t N>
StringOffset write(const char(&str)[N])
{
return writeString(str, N - 1);
}
StringOffset write(const std::string& str)
{
return writeString(str.data(), str.size());
}
StringOffset write(const char* str)
{
return writeString(str, std::strlen(str));
}
template<typename Return, typename... Args>
FunctionOffset write(Return (*func)(Args...))
{
return offsetAt<tag::Function>(encodeFunction(func));
}
size_t size() const
{
return m_cursor;
}
private:
size_t m_cursor{ 0 };
StringOffset writeString(const char* str, size_t size)
{
return offsetAt<tag::String>(encodeString(str, size));
}
virtual void reserve(size_t capacity) = 0;
protected:
virtual char* dataAt(size_t index) = 0;
const char* dataAt(size_t index) const
{
return const_cast<LogBufferBase*>(this)->dataAt(index);
}
template<typename T>
T* overlayAt(size_t offset)
{
return reinterpret_cast<T*>(dataAt(offset));
}
template<typename T>
const T* overlayAt(size_t offset) const
{
return reinterpret_cast<const T*>(dataAt(offset));
}
template<typename T>
size_t encode(T value)
{
reserve(size() + sizeof(value));
auto index = m_cursor;
*overlayAt<T>(index) = value;
m_cursor += sizeof(T);
return index;
}
size_t encodeRaw(const char* bytes, size_t size)
{
reserve(this->size() + size);
auto index = m_cursor;
std::memcpy(dataAt(index), bytes, size);
m_cursor += size;
return index;
}
size_t encodeString(const char* str, size_t size)
{
auto index = encode(static_cast<uint16_t>(size));
encodeRaw(str, size);
return index;
}
template<typename Function>
size_t encodeFunction(Function func)
{
reserve(size() + sizeof(Function));
auto index = m_cursor;
//*overlayAt<Function>(index) = func;
m_cursor += sizeof(Function);
return index;
}
void advance(size_t size)
{
m_cursor += size;
}
size_t cursor() const
{
return m_cursor;
}
template<typename T>
Offset<T> offsetAt(size_t index) const
{
return Offset<T> { static_cast<uint16_t>(index) };
}
};
template<size_t N>
class LogBuffer : public LogBufferBase
{
public:
LogBuffer()
: m_data(&m_inlineData[0])
, m_capacity { N }
{}
~LogBuffer()
{
if (!isSmall())
{
std::free(m_data);
m_data = nullptr;
}
}
LogBuffer(const LogBuffer& other)
{
*this = other;
}
LogBuffer(LogBuffer&& other) noexcept
{
*this = std::move(other);
}
LogBuffer& operator=(const LogBuffer& other)
{
reserve(other.m_capacity);
std::memcpy(m_data, other.m_data, other.size());
m_capacity = other.m_capacity;
advance(other.cursor());
return *this;
}
LogBuffer& operator=(LogBuffer&& other) noexcept
{
// If the other is not small, let's steal its buffer
if (!other.isSmall())
{
// Cleanup our buffer if we were not small
if (!isSmall())
std::free(m_data);
m_data = other.m_data;
other.m_data = nullptr;
}
// The other is small, which means that we will also be small, so memcpy the bytes
// over
else
{
// Cleanup our buffer if we were not small
if (!isSmall())
std::free(m_data);
std::memcpy(m_data, other.m_data, cursor());
m_data = m_inlineData.data();
}
advance(other.cursor());
m_capacity = other.m_capacity;
return* this;
}
private:
using InlineStorage = std::array<char, N>;
InlineStorage m_inlineData {};
char* m_data { nullptr };
size_t m_capacity { 0 };
bool isSmall() const
{
return m_data == &m_inlineData[0];
}
virtual void reserve(size_t capacity) override
{
if (capacity <= m_capacity)
return;
auto newCapacity = std::max(m_capacity * 2, capacity);
auto* data = static_cast<char*>(::malloc(newCapacity * sizeof(char)));
if (data == nullptr)
throw std::bad_alloc();
if (isSmall())
{
std::memcpy(data, m_inlineData.data(), m_inlineData.size());
}
else
{
std::memcpy(data, m_data, this->cursor());
::free(m_data);
}
m_data = data;
m_capacity = newCapacity;
}
protected:
char* dataAt(size_t index) override
{
return m_data + index;
}
};
inline const char* LogBufferView::read(size_t index) const
{
return buffer.dataAt(index);
}
class FormatCallback
{
public:
void operator()(const char* data, size_t size) const
{
call(data, size);
}
private:
virtual void call(const char* data, size_t size) const = 0;
};
template<typename Stream>
class StreamCallback : public FormatCallback
{
public:
StreamCallback(Stream& stream)
: stream { stream }
{}
private:
Stream& stream;
void call(const char* data, size_t size) const override
{
stream.write(data, size);
}
};
/*
+---------+--------------------------------+---------+
| LogFunc | OffsetsIndex | Data | Offsets |
+---------+--------------+-----------------+---------+
|
^-------------------
*/
class EventLogBuffer : public LogBuffer<255>
{
public:
using LogFunc = void (*)(const LogBufferBase& buffer, uint16_t offsetsIndex, FormatCallback& callback);
static constexpr size_t HeaderOffset = 0;
struct Header
{
LogFunc logFunc;
uint16_t offsetsIndex;
};
EventLogBuffer()
{
LogBufferBase::advance(HeaderOffset + sizeof(Header));
}
template<typename T>
void writeEvent(const T& value)
{
auto offsetsIndex = encode(value);
encodeHeader<T>(offsetsIndex);
}
void format(FormatCallback& callback) const
{
const auto* header = decodeHeader();
std::invoke(header->logFunc, *this, header->offsetsIndex, callback);
}
private:
template<typename T>
void encodeHeader(size_t offsetsIndex)
{
auto* header = LogBufferBase::template overlayAt<Header>(HeaderOffset);
header->offsetsIndex = static_cast<uint16_t>(offsetsIndex);
header->logFunc = [](const LogBufferBase& buffer, uint16_t offsetsIndex, FormatCallback& callback) {
LogBufferView view{ buffer };
const T* obj = view.overlayAs<T>(offsetsIndex);
obj->format(view, callback);
};
}
const Header* decodeHeader() const
{
return LogBufferBase::template overlayAt<Header>(HeaderOffset);
}
};
struct MyEvent
{
Offset<uint32_t> providerId;
Offset<char> key;
StringOffset reason;
template<typename FormatCallback>
void format(LogBufferView view, FormatCallback& callback) const
{
std::ostringstream os;
os << "MyEvent { ProviderId: " << providerId.get(view) << ", Key: " << key.get(view) << ", Reason: " << reason.get(view) << " }";
const auto& str = os.str();
callback(str.data(), str.size());
}
};
struct AsyncChannel
{
~AsyncChannel()
{
if (isStarted())
stop();
if (thread.joinable())
thread.join();
}
void log(const EventLogBuffer& buffer)
{
LockGuard guard(queueLock);
queue.push_back(Entry::create(buffer));
cv.notify_one();
}
void start()
{
bool expected = false;
if (!started.compare_exchange_strong(expected, true))
return;
thread = std::thread([=] {
this->threadMain();
});
}
void stop()
{
bool expected = true;
if (!started.compare_exchange_strong(expected, false))
return;
LockGuard guard(queueLock);
queue.push_back(Entry::stop());
cv.notify_one();
}
bool isStarted() const {
return started.load(std::memory_order_acquire);
}
private:
struct Entry
{
static Entry create(const EventLogBuffer& buffer)
{
return Entry{false, buffer};
}
static Entry stop()
{
return Entry{true, EventLogBuffer{}};
}
bool stopFlag;
EventLogBuffer buffer;
};
using Lock = std::mutex;
using LockGuard = std::unique_lock<Lock>;
Lock queueLock;
std::condition_variable cv;
std::deque<Entry> queue;
std::thread thread;
std::atomic<bool> started { false };
void threadMain()
{
std::ostringstream oss;
for (;;)
{
bool stopFlag = false;
LockGuard guard(queueLock);
cv.wait(guard, [&] { return !queue.empty(); });
for (const auto& entry: queue)
{
if (entry.stopFlag)
{
stopFlag = true;
break;
}
StreamCallback<std::ostringstream> callback { oss };
entry.buffer.format(callback);
std::cout << oss.str() << std::endl;
oss.str("");
}
queue.clear();
if (stopFlag)
break;
}
}
};
template<typename T, T... Args>
T pickRandom()
{
static constexpr T Values[] = { Args... };
static std::random_device rd;
static std::default_random_engine engine(rd());
static std::uniform_int_distribution<int> dist(0, sizeof...(Args) - 1);
return Values[dist(engine)];
}
namespace details
{
template<typename T> using OffsetT = decltype(static_cast<LogBufferBase *>(0)->write(std::declval<T>()));
template<typename... Args> using OffsetTuple = std::tuple<OffsetT<std::decay_t<Args>>...>;
template<typename... Args>
auto writeAll(LogBufferBase& buffer, Args&& ...args)
{
return std::make_tuple(buffer.write(args)...);
}
template<typename Func, typename... Args>
struct EventWrapper
{
using Offsets = OffsetTuple<Args...>;
EventWrapper(Offsets offsets, FunctionOffset funcOffset)
: offsets { offsets }
, funcOffset { funcOffset }
{}
void format(LogBufferView view, FormatCallback& callback) const
{
formatImpl(view, callback, std::index_sequence_for<Args...>());
}
private:
Offsets offsets;
FunctionOffset funcOffset;
template<size_t... Indexes>
void formatImpl(LogBufferView view, FormatCallback& callback, std::index_sequence<Indexes...>) const
{
funcOffset.invoke<Func>(view, view, callback, std::get<Indexes>(offsets)...);
}
};
}
template<typename Func, typename... Args>
void logInfo(AsyncChannel& channel, Func func, Args&& ...args)
{
using FunctionPtr = void (*)(LogBufferView, FormatCallback&, details::OffsetT<std::decay_t<Args>>...);
EventLogBuffer buffer;
auto offsets = details::writeAll(buffer, std::forward<Args>(args)...);
auto funcOffset = buffer.write(static_cast<FunctionPtr>(func));
details::EventWrapper<Func, Args...> event(offsets, funcOffset);
buffer.writeEvent(event);
channel.log(buffer);
}
int main()
{
AsyncChannel channel;
channel.start();
static constexpr size_t Count = 100;
for (size_t i = 0; i < Count; ++i)
{
/*
EventLogBuffer buffer;
MyEvent myEvent{
buffer.write(pickRandom<uint32_t, 10, 32, 45>()),
buffer.write(pickRandom<char, 'A', 'F', '!'>()),
buffer.write(reason),
};
buffer.writeEvent(myEvent);
channel.log(buffer);
*/
std::string reason("My specific reason");
auto providerId = pickRandom<uint32_t, 10, 32, 45>();
auto key = pickRandom<char, 'A', 'F', '!'>();
logInfo(channel, [](LogBufferView buffer, FormatCallback& callback, auto providerId, auto key, auto reason) {
std::ostringstream oss;
oss << "Something happened. ProviderId: " << providerId.get(buffer) << " Key: " << key.get(buffer) << " Reason: " << reason.get(buffer);
const auto& str = oss.str();
callback(str.data(), str.size());
}, providerId, key, reason);
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment