Skip to content

Instantly share code, notes, and snippets.

@pkhuong
Last active January 11, 2021 17:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pkhuong/79054e80d672a8c23d8005ee0aa352c9 to your computer and use it in GitHub Desktop.
Save pkhuong/79054e80d672a8c23d8005ee0aa352c9 to your computer and use it in GitHub Desktop.
Word-stuffed record stream

Backtrace's log record framing format

We use this self-synchronising format to store key metadata for our server-side embedded crash database. See this blog post for more details and explanations.

We released the source under the MIT license, but the scheme is simple enough that coding an interface tuned to one's specific needs probably makes sense.

#pragma once
/*
* Copyright 2021 Backtrace I/O, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include <stdbool.h>
#include <stddef.h>
/**
** # Threading an error "monad" in C
**
** Operations that can fail accept a `crdb_error_t *` argument
** (usually the last), and denote failure by returning
** `false`/`NULL`, or, more rarely, a function-specific sentinel
** value. The `crdb_error_t *` argument is *meant* to be nullable,
** but some code paths may erroneously assume the error struct is
** provided.
**
** A `crdb_error_t` value should always be initialised with
** `CRDB_ERROR_INITIALIZER`, or zero-filled.
**/
struct crdb_error {
const char *message;
unsigned long long error;
};
typedef struct crdb_error crdb_error_t;
#define CRDB_ERROR_INITIALIZER (crdb_error_t) { .error = 0 }
static inline void
_crdb_error_set(struct crdb_error *error, const char *message,
unsigned long long n)
{
if (error == NULL)
return;
error->message = message;
error->error = n;
return;
}
#define CRDB_ERROR_SET_(E, M, N, ...) ({ _crdb_error_set(E, M, (N)); false; })
#define crdb_error_set(E, M, ...) CRDB_ERROR_SET_((E), (M), ##__VA_ARGS__, 0)
Copyright 2021 Backtrace I/O, Inc.
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
/*
* Copyright 2021 Backtrace I/O, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#define _GNU_SOURCE /* For SEEK_DATA */
#include "record_stream.h"
#include <assert.h>
#include <errno.h>
#include <smmintrin.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <unistd.h>
#include "word_stuff.h"
/*
* Fill the record_header.crc field with CRC_INITIAL_VALUE when
* computing the checksum: crc32c is vulnerable to 0-prefixing,
* so we make sure the initial bytes are non-zero.
*/
#define CRC_INITIAL_VALUE ((uint32_t)-1)
#define CRDB_ARRAY_SIZE(X) (sizeof(X) / sizeof(*(X)))
struct record_header {
uint32_t crc;
uint32_t generation;
};
struct write_record {
struct record_header header;
uint8_t data[CRDB_RECORD_STREAM_MAX_LEN];
};
struct read_record {
struct record_header header;
uint8_t data[CRDB_RECORD_STREAM_BUF_LEN];
};
/**
* This is our internal reference implementation. You probably want
* something else that has higher performance.
*/
static uint32_t
crdb_crc32c(const void *buf, size_t len)
{
uint32_t acc = 0;
size_t i;
for (i = 0; i + sizeof(uint32_t) <= len; i += sizeof(uint32_t)) {
uint32_t bytes;
memcpy(&bytes, (const uint8_t *)buf + i, sizeof(bytes));
acc = _mm_crc32_u32(acc, bytes);
}
for (; i < len; i++)
acc = _mm_crc32_u8(acc, ((const uint8_t *)buf)[i]);
return acc;
}
/**
* Encodes the write record to `encoded[0 ... *encoded_size - 1]`.
*/
static bool
encode_record(
uint8_t encoded[static CRDB_WORD_STUFFED_BOUND(sizeof(struct write_record))],
size_t *encoded_size, struct write_record *record, size_t data_len,
crdb_error_t *ce)
{
enum { MAX_ENCODED = CRDB_WORD_STUFFED_BOUND(sizeof(struct write_record)) };
size_t record_size;
uint8_t *write_ptr;
static_assert((size_t)MAX_ENCODED <= CRDB_RECORD_STREAM_BUF_LEN,
"The maximum encoded size must fit in the read record size limit.");
if (data_len > CRDB_RECORD_STREAM_MAX_LEN)
return crdb_error_set(ce, "crdb_record_stream data too long");
record->header.crc = CRC_INITIAL_VALUE;
record_size = sizeof(struct record_header) + data_len;
record->header.crc = crdb_crc32c(record, record_size);
assert(crdb_word_stuffed_size(record_size, true) <= MAX_ENCODED);
write_ptr = crdb_word_stuff_encode(encoded, record, record_size);
/*
* The beginning and end of file act as implicit headers, and
* we simply have to separate records with the 2-byte header;
* we are free to write a header before the current record, or
* after, in preparation for the next write.
*
* We prefer to write the header preemptively because we
* observe that corruption mostly happens at the tail of the
* file: having the header in place early makes it more likely
* that it will make it to persistent storage before a crash.
*/
write_ptr = crdb_word_stuff_header(write_ptr);
*encoded_size = write_ptr - encoded;
return true;
}
/**
* Repeatedly attempts to write `buf` to `fd`, which is expected to be
* in O_APPEND mode.
*
* The buffer is word-stuffed and ends with a header for the next record.
*/
static bool
append_to_fd(int fd, const void *buf, size_t count, crdb_error_t *ce)
{
static const size_t num_tries = 3;
uint8_t header[CRDB_WORD_STUFF_HEADER_SIZE];
struct iovec iov[] = {
{
/*
* The first write does not include a header:
* we assume the previous write inserted one
* for us.
*/
.iov_base = header,
.iov_len = 0
},
{
.iov_base = (void *)buf,
.iov_len = count,
}
};
size_t expected = count;
ssize_t written;
int err;
/* Flip to true when at least one write was short. */
bool partial_write = false;
for (size_t i = 0; i < num_tries; i++) {
const uint8_t *end;
written = writev(fd, iov, CRDB_ARRAY_SIZE(iov));
if ((size_t)written == expected)
break;
/* We failed without writing anything; just retry. */
if (written <= 0)
continue;
/*
* If the first write was short, we should definitely
* make sure there's a header before the new record:
* we can't assume the previous write left one for us.
*
* It is however safe to leave a partially written
* record behind: the read-side will correctly detect
* it as corrupt. If another writer succeeded just
* after the short write, we may also lose that
* record... but we expect write failures to be sticky
* (media failure or exhausted storage quotas).
*/
partial_write = true;
end = crdb_word_stuff_header(header);
assert(end == header + sizeof(header));
iov[0].iov_len = sizeof(header);
expected = count + sizeof(header);
}
/*
* If we failed and left some partial records behind, try to
* at least leave a header for the next writer.
*/
err = errno;
if (partial_write == true && (size_t)written != expected) {
ssize_t r;
/*
* This write is best-effort: if it fails, that's
* life, and there's not much we can do against what
* is probably a storage media or quota problem.
*/
r = write(fd, iov[0].iov_base, iov[0].iov_len);
(void)r;
}
if (written < 0)
return crdb_error_set(ce, "record_stream write(2) failed.", err);
if ((size_t)written != expected)
return crdb_error_set(ce, "Short write in record_stream.");
return true;
}
/**
* Dumps an encoded version of `record` to `fd`. The record's header
* will be updated in-place to contain the correct crc.
*
* We guarantee that the encoded record (header + data) fits in
* CRDB_RECORD_STREAM_BUF_LEN bytes.
*/
static bool
record_stream_append_record(int fd, struct write_record *record,
size_t data_len, crdb_error_t *ce)
{
uint8_t encoded[CRDB_WORD_STUFFED_BOUND(sizeof(*record))];
size_t encoded_size;
if (encode_record(encoded, &encoded_size, record, data_len, ce) == false)
return false;
return append_to_fd(fd, encoded, encoded_size, ce);
}
static bool
record_stream_write_record(FILE *stream, struct write_record *record,
size_t data_len, crdb_error_t *ce)
{
uint8_t encoded[CRDB_WORD_STUFFED_BOUND(sizeof(*record))];
size_t encoded_size;
size_t written;
if (encode_record(encoded, &encoded_size, record, data_len, ce) == false)
return false;
written = fwrite(encoded, encoded_size, 1, stream);
if (written != 1)
return crdb_error_set(ce, "crdb_record_stream fwrite(3) failed.",
errno);
return true;
}
static bool
fd_ends_with_header(int fd,
const uint8_t header[static CRDB_WORD_STUFF_HEADER_SIZE])
{
uint8_t buf[CRDB_WORD_STUFF_HEADER_SIZE];
ssize_t ret;
if (lseek(fd, -(off_t)sizeof(buf), SEEK_END) < 0)
return false;
do {
ret = read(fd, buf, sizeof(buf));
} while (ret == -1 && errno == EINTR);
if ((size_t)ret != sizeof(buf))
return false;
return memcmp(buf, header, sizeof(buf)) == 0;
}
bool
crdb_record_stream_append_initial(int fd, crdb_error_t *ce)
{
uint8_t header[CRDB_WORD_STUFF_HEADER_SIZE];
uint8_t *end;
end = crdb_word_stuff_header(header);
assert(end == header + sizeof(header));
/* Nothing to do if we definitely have a header at the end. */
if (fd_ends_with_header(fd, header))
return true;
/* Otherwise, it's always safe to append a header. */
return append_to_fd(fd, header, sizeof(header), ce);
}
bool
crdb_record_stream_write_initial(FILE *stream, crdb_error_t *ce)
{
uint8_t header[CRDB_WORD_STUFF_HEADER_SIZE];
uint8_t *end;
size_t written;
end = crdb_word_stuff_header(header);
assert(end == header + sizeof(header));
written = fwrite(header, sizeof(header), 1, stream);
if (written != 1)
return crdb_error_set(ce,
"crdb_record_stream initial fwrite(3) failed.",
errno);
return true;
}
bool
crdb_record_stream_append_buf(int fd, uint32_t generation,
const uint8_t *buf, size_t len, crdb_error_t *ce)
{
struct write_record record = {
.header.generation = generation,
};
if (len > CRDB_RECORD_STREAM_MAX_LEN)
return crdb_error_set(ce, "crdb_record_stream data too long");
memcpy(&record.data, buf, len);
return record_stream_append_record(fd, &record, len, ce);
}
bool
crdb_record_stream_write_buf(FILE *stream, uint32_t generation,
const uint8_t *buf, size_t len, crdb_error_t *ce)
{
struct write_record record = {
.header.generation = generation,
};
if (len > CRDB_RECORD_STREAM_MAX_LEN)
return crdb_error_set(ce, "crdb_record_stream data too long");
memcpy(&record.data, buf, len);
return record_stream_write_record(stream, &record, len, ce);
}
#ifdef HAS_PROTOBUF_C
bool
crdb_record_stream_append_msg(int fd, uint32_t generation,
const ProtobufCMessage *message, crdb_error_t *ce)
{
struct write_record record = {
.header.generation = generation,
};
size_t packed_size;
size_t serialized_size;
packed_size = protobuf_c_message_get_packed_size(message);
if (packed_size > CRDB_RECORD_STREAM_MAX_LEN)
return crdb_error_set(ce,
"crdb_record_stream message too large.");
serialized_size = protobuf_c_message_pack(message, record.data);
assert(serialized_size <= packed_size);
return record_stream_append_record(fd, &record, serialized_size, ce);
}
bool
crdb_record_stream_write_msg(FILE *stream, uint32_t generation,
const ProtobufCMessage *message, crdb_error_t *ce)
{
struct write_record record = {
.header.generation = generation,
};
size_t packed_size;
size_t serialized_size;
packed_size = protobuf_c_message_get_packed_size(message);
if (packed_size > CRDB_RECORD_STREAM_MAX_LEN)
return crdb_error_set(ce,
"crdb_record_stream message too large.");
serialized_size = protobuf_c_message_pack(message, record.data);
assert(serialized_size <= packed_size);
return record_stream_write_record(stream, &record, serialized_size, ce);
}
#endif /* HAS_PROTOBUF_C */
void
crdb_record_stream_iterator_init_buf(struct crdb_record_stream_iterator *it,
const uint8_t *buf, size_t size)
{
*it = (struct crdb_record_stream_iterator) {
.cursor = buf,
.end = buf + size,
.stop_at = buf + size,
.begin = buf,
.first_nonzero = buf,
.first_record = true,
};
return;
}
static const uint8_t *
find_first_nonzero(const uint8_t *cursor, const uint8_t *end)
{
while (cursor < end && cursor[0] == 0)
cursor++;
return cursor;
}
bool
crdb_record_stream_iterator_init_fd(struct crdb_record_stream_iterator *it,
int fd, crdb_error_t *ce)
{
struct stat st;
void *mapped;
off_t first_data = -1;
if (fstat(fd, &st) == -1)
return crdb_error_set(ce, "failed to fstat record stream",
errno);
if (st.st_size <= 0) {
crdb_record_stream_iterator_init_buf(it, NULL, 0);
return true;
}
/* Skip any sparse hole at the head. */
first_data = lseek(fd, 0, SEEK_DATA);
mapped = mmap(NULL, st.st_size, PROT_READ, MAP_SHARED, fd, 0);
if (mapped == MAP_FAILED)
return crdb_error_set(ce, "failed to mmap record stream",
errno);
*it = (struct crdb_record_stream_iterator) {
.cursor = mapped,
.end = (const uint8_t *)mapped + st.st_size,
.stop_at = (const uint8_t *)mapped + st.st_size,
.begin = mapped,
.mapped = mapped,
.map_size = st.st_size,
.first_record = true,
};
/* If we found a hole, advance the cursor. */
if (first_data > 0) {
if (first_data >= st.st_size)
first_data = st.st_size;
it->cursor += first_data;
}
/*
* And now, skip zeros: we know any valid record starts with a
* (non-zero) two-byte header.
*/
it->cursor = it->first_nonzero = find_first_nonzero(it->cursor, it->end);
return true;
}
void
crdb_record_stream_iterator_deinit(struct crdb_record_stream_iterator *it)
{
if (it->mapped != NULL)
munmap(it->mapped, it->map_size);
return;
}
size_t
crdb_record_stream_iterator_size(const struct crdb_record_stream_iterator *it)
{
return it->end - it->begin;
}
bool
crdb_record_stream_iterator_locate_at(struct crdb_record_stream_iterator *it,
size_t start_offset)
{
/*
* We can't rewrite before the first byte that might have
* useful data, or after the byte at which we want to stop
* decoding.
*/
if (start_offset < (size_t)(it->first_nonzero - it->begin) ||
start_offset > (size_t)(it->stop_at - it->begin))
return false;
if (start_offset == (size_t)(it->first_nonzero - it->begin)) {
it->first_record = true;
it->cursor = it->first_nonzero;
return true;
}
it->first_record = false;
it->cursor = it->begin + start_offset;
return true;
}
void
crdb_record_stream_iterator_stop_at(struct crdb_record_stream_iterator *it,
size_t stop_offset)
{
if (stop_offset > (size_t)(it->end - it->begin))
return;
it->stop_at = it->begin + stop_offset;
return;
}
static bool
crc_matches(struct read_record *record, size_t total_len)
{
uint32_t expected = record->header.crc;
record->header.crc = CRC_INITIAL_VALUE;
return expected == crdb_crc32c(record, total_len);
}
/**
* Consumes and attempts to decode the next record.
*
* @param it a non-empty iterator.
*
* @return the size of the decoded record data on success, -1 on failure.
*/
static ssize_t
record_stream_iterator_next_record(struct crdb_record_stream_iterator *it,
struct read_record *dst)
{
const uint8_t *encoded_data;
size_t encoded_len;
size_t decoded_len;
/*
* Skip to the next header, except for the initial record,
* which may not have any prefixing header: we actually write
* *trailers* to protect against the most common form of
* corruption we observe, with garbage appended to valid data.
*/
if (it->first_record == true) {
it->first_record = false;
it->header = it->cursor;
encoded_data = it->cursor;
} else {
const uint8_t *first_header;
first_header = crdb_word_stuff_header_find(it->cursor,
it->end - it->cursor);
/* No header found -> consume everything and bail. */
if (first_header >= it->stop_at)
goto eof;
it->header = first_header;
encoded_data = first_header + CRDB_WORD_STUFF_HEADER_SIZE;
assert(encoded_data <= it->end);
}
/*
* If we found data, but it starts too far in the directory
* file, return eof.
*/
if (it->header >= it->stop_at)
goto eof;
{
const uint8_t *next_header;
next_header = crdb_word_stuff_header_find(encoded_data,
it->end - encoded_data);
/*
* We found where the next record starts; decode
* everything up to that byte.
*/
it->cursor = next_header;
encoded_len = next_header - encoded_data;
}
/*
* We moved the cursor to the next encoded record. We just
* have to decode and validate the data.
*/
/* This is clearly too much data. Reject early. */
if (encoded_len > CRDB_RECORD_STREAM_BUF_LEN)
return -1;
/* Unstuff the bytes. */
{
uint8_t *decoded_begin = (uint8_t *)dst;
uint8_t *decoded_end;
/*
* Decoding never expands the number of bytes, so we
* know this won't overflow dst.
*/
decoded_end = crdb_word_stuff_decode(decoded_begin,
encoded_data, encoded_len);
if (decoded_end == NULL)
return -1;
decoded_len = decoded_end - decoded_begin;
}
/*
* Make sure we decoded a full header, and that the header's
* checksum is correct.
*/
if (decoded_len < sizeof(dst->header) ||
crc_matches(dst, decoded_len) == false)
return -1;
return decoded_len - sizeof(dst->header);
eof:
it->cursor = it->end;
return -1;
}
/**
* Writes the next valid record to `dst`.
*
* @return the size of the decoded record data on success, -1 on failure.
*/
static ssize_t
record_stream_iterator_next(struct crdb_record_stream_iterator *it,
struct read_record *dst)
{
while (it->cursor < it->stop_at) {
ssize_t r;
r = record_stream_iterator_next_record(it, dst);
if (r >= 0)
return r;
}
it->cursor = NULL;
it->end = NULL;
return -1;
}
bool
crdb_record_stream_iterator_next_buf(struct crdb_record_stream_iterator *it,
uint32_t *generation, uint8_t dst[static CRDB_RECORD_STREAM_BUF_LEN],
size_t *len)
{
struct read_record buf;
ssize_t payload_size;
*generation = 0;
*len = 0;
payload_size = record_stream_iterator_next(it, &buf);
if (payload_size < 0)
return false;
assert(payload_size <= CRDB_RECORD_STREAM_BUF_LEN);
*generation = buf.header.generation;
memcpy(dst, buf.data, payload_size);
*len = (size_t)payload_size;
return true;
}
#ifdef HAS_PROTOBUF_C
void *
crdb_record_stream_iterator_next_msg(struct crdb_record_stream_iterator *it,
uint32_t *generation, const ProtobufCMessageDescriptor *descriptor,
ProtobufCAllocator *allocator)
{
struct read_record buf;
ProtobufCMessage *ret = NULL;
*generation = 0;
/* We may fail to parse a buffer; keep scanning if that happens. */
while (ret == NULL) {
ssize_t payload_size;
payload_size = record_stream_iterator_next(it, &buf);
if (payload_size < 0)
return NULL;
assert(payload_size <= CRDB_RECORD_STREAM_BUF_LEN);
ret = protobuf_c_message_unpack(descriptor, allocator,
payload_size, buf.data);
}
*generation = buf.header.generation;
return ret;
}
#endif /* HAS_PROTOBUF_C */
#pragma once
/**
* A record stream is a corruption-resilient stream of (protobuf)
* serialized messages. A stream supports two operations:
*
* 1. Append a serialized record to a file descriptor.
* 2. Iterate through all the valid records in a file descriptor.
*/
#ifdef HAS_PROTOBUF_C
#include <protobuf-c/protobuf-c.h>
#endif /* HAS_PROTOBUF_C */
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
#include "crdb_error.h"
/**
* We only support up to 512 raw bytes of payload on writes, and allow
* up to 1024 encoded bytes on reads. The read limit
* CRDB_RECORD_STREAM_BUF_LEN must be at least as high as
* CRDB_WORD_STUFFED_BOUND(CRDB_RECORD_STREAM_MAX_LEN + [internal header]).
*
* The current use case for record stream only needs ~20-40 byte
* records. These limits are more than generous enough, while still
* being reasonable for stack allocation. The read limit has twice as
* much headroom as the write limit to help forward compatibility:
* future versions may be able to write backward compatible protos
* that include many more fields than the initial schema.
*/
enum {
CRDB_RECORD_STREAM_MAX_LEN = 512,
CRDB_RECORD_STREAM_BUF_LEN = 2 * CRDB_RECORD_STREAM_MAX_LEN,
};
struct crdb_record_stream_iterator {
const uint8_t *cursor;
const uint8_t *end;
/*
* Stop returning records as soon as their first byte
* (including the header) is at or after `stop_at`.
*
* This is initially equal to `end`, but can be shifted
* earlier in the record stream. Note that a record that
* starts before `stop_at` will end after that pointer.
*/
const uint8_t *stop_at;
const uint8_t *begin;
/* The following is only populated when iterating over fds. */
const uint8_t *header;
void *mapped;
size_t map_size;
bool first_record;
/* Everything in `mapped` before first_nonzero is zero-filled bytes. */
const uint8_t *first_nonzero;
};
/**
* Ensures the contents of `fd` are ready to append more records.
*
* This function should be called before appending data to a `fd` that
* might contain corrupt data. A call is useless when writing to a
* fresh empty file, but never harmful.
*
* @param fd a file descriptor opened with O_APPEND; may be repositioned.
*/
bool crdb_record_stream_append_initial(int fd, crdb_error_t *);
/**
* Ensures the contents of `stream` are ready to append more records.
*
* This function should be called before appending data to a stream
* that might contain corrupt data. A call is useless when writing to
* a fresh empty file, but never harmful.
*/
bool crdb_record_stream_write_initial(FILE *stream, crdb_error_t *);
/**
* Appends a record containing `buf[0 ... len - 1]` to `fd`.
*
* @param fd a file descriptor opened with O_APPEND.
*/
bool crdb_record_stream_append_buf(int fd, uint32_t generation,
const uint8_t *buf, size_t len, crdb_error_t *);
/**
* Writes a record containing `buf[0 ... len - 1]` to `stream`.
*
* This function never attempts to handle errors internally and
* should only be used to write to private temporary files.
*/
bool crdb_record_stream_write_buf(FILE *stream, uint32_t generation,
const uint8_t *buf, size_t len, crdb_error_t *);
#ifdef HAS_PROTOBUF_C
/**
* Serializes `message` and appends that record to `fd`.
*
* @param fd a file descriptor opened with O_APPEND.
*/
bool crdb_record_stream_append_msg(int fd, uint32_t generation,
const ProtobufCMessage *message, crdb_error_t *);
/**
* Serializes `message` and writes that record to `stream`.
*
* This function never attempts to handle errors internally and
* should only be used to write to private temporary files.
*/
bool crdb_record_stream_write_msg(FILE *stream, uint32_t generation,
const ProtobufCMessage *message, crdb_error_t *);
#endif /* HAS_PROTOBUF_C */
/**
* Initializes an iterator to scan for records in `buf[0 ... size - 1]`.
*/
void crdb_record_stream_iterator_init_buf(struct crdb_record_stream_iterator *,
const uint8_t *buf, size_t size);
/**
* Initializes an iterator to scan for records in `fd`.
*
* @param fd a descriptor for a mmap-able file. May be repositioned (lseek'ed).
*/
bool crdb_record_stream_iterator_init_fd(struct crdb_record_stream_iterator *,
int fd, crdb_error_t *);
/**
* Deinitializes an iterator.
*/
void crdb_record_stream_iterator_deinit(struct crdb_record_stream_iterator *);
/**
* Returns the number of bytes in the record stream.
*/
size_t crdb_record_stream_iterator_size(const struct crdb_record_stream_iterator *);
/**
* Sets the record stream to start looking for valid records at `start_offset`.
*
* No-ops on error.
*
* @return false if that points the iterator at a range of clearly invalid data.
*/
bool crdb_record_stream_iterator_locate_at(struct crdb_record_stream_iterator *,
size_t start_offset);
/**
* Sets the stop offset for a record stream iterator.
*
* Once set, the iterator will successfully stop yielding new records
* instead of yielding a record starting at or after `stop_offset`.
*
* Paired with `locate_at`, this lets us partition a record stream in
* non-overlapping half-open ranges, by the records' first bytes.
*/
void crdb_record_stream_iterator_stop_at(struct crdb_record_stream_iterator *,
size_t stop_offset);
/**
* Decodes and consumes the next valid record in the iterator.
*
* @param generation populated with the record's generation on success, 0 on failure.
* @param dst overwritten with the record's contents.
* @param len populated with the payload size on success, 0 on failure.
*
* @return true if a valid record was found, false on EOF.
*/
bool crdb_record_stream_iterator_next_buf(struct crdb_record_stream_iterator *,
uint32_t *generation, uint8_t dst[static CRDB_RECORD_STREAM_BUF_LEN],
size_t *len);
#ifdef HAS_PROTOBUF_C
/**
* Deserializes and returns the next valid protobuf message.
*
* @param generation populated with the record's generation on success, 0 on failure.
* @param descriptor the protobuf-c descriptor for the message type to decode.
* @param allocator the allocator used to build the return value, or NULL for
* the default allocator.
*
* @return a valid ProtobufCMessage for `descriptor`, or NULL on EOF.
*/
void *crdb_record_stream_iterator_next_msg(
struct crdb_record_stream_iterator *, uint32_t *generation,
const ProtobufCMessageDescriptor *descriptor,
ProtobufCAllocator *allocator);
#endif /* HAS_PROTOBUF_C */
/*
* Copyright 2021 Backtrace I/O, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "word_stuff.h"
#include <assert.h>
#include <limits.h>
#include <string.h>
#define RADIX 0xFDUL
/*
* We encode the first run size with a single byte, in radix 0xFD, to
* guarantee that writing the size will never introduce a byte
* sequence that matches the 2-byte header.
*/
#define MAX_INITIAL_RUN (RADIX - 1)
/*
* We encode every run after the first one with two bytes, in radix 0xFD:
* a larger radix reduces the asymptotic overhead, and, again, we want to
* make sure we never introduce a byte sequence that matches the header.
*/
#define MAX_REMAINING_RUN ((RADIX * RADIX) - 1)
/* Hardcode a reasonable cache line size. */
#define CACHE_LINE_SIZE 64
#define CRDB_LIKELY(X) (__builtin_expect(!!(X), 1))
#define CRDB_UNLIKELY(X) (__builtin_expect(!!(X), 0))
#define min(a, b) \
({ \
__auto_type _a = (a); \
__auto_type _b = (b); \
_a < _b ? _a : _b; \
})
static const uint8_t header[] = { RADIX + 1, RADIX };
static_assert(sizeof(header) == CRDB_WORD_STUFF_HEADER_SIZE,
"Header byte sequence does not match the header size.");
inline const uint8_t *
crdb_word_stuff_header_find(const uint8_t *data, size_t num)
{
const union {
uint8_t bytes[CRDB_WORD_STUFF_HEADER_SIZE];
uint16_t word;
} needle = {
.bytes = { header[0], header[1] },
};
/* Number of bytes where we might find the initial header byte. */
size_t num_prefix;
static_assert(sizeof(needle.word) == sizeof(needle.bytes),
"Header must be exactly a uint16_t.");
if (num < CRDB_WORD_STUFF_HEADER_SIZE)
return data + num;
num_prefix = num - 1;
for (size_t i = 0; i < num_prefix; i++) {
uint16_t actual;
memcpy(&actual, &data[i], sizeof(actual));
if (actual == needle.word)
return data + i;
}
return data + num;
}
size_t
crdb_word_stuffed_size(size_t in_size, bool with_header)
{
size_t static_estimate = CRDB_WORD_STUFFED_BOUND(in_size);
size_t ret = in_size;
/*
* We can handle much larger inputs, but the input is already
* unrealistically large at this point.
*/
if (in_size > SSIZE_MAX)
return SIZE_MAX;
/* Count the overhead for the initial header + one-byte chunk header */
ret += (with_header) ? sizeof(header) + 1 : 1;
if (in_size < MAX_INITIAL_RUN) {
assert(ret <= static_estimate);
return ret;
}
in_size -= MAX_INITIAL_RUN;
/*
* Add one 2-byte header for each remaining chunk, including
* the last partial / empty one.
*/
ret += sizeof(uint16_t) * (1 + (in_size / MAX_REMAINING_RUN));
assert(ret <= static_estimate);
return ret;
}
inline uint8_t *
crdb_word_stuff_header(uint8_t dst[static CRDB_WORD_STUFF_HEADER_SIZE])
{
memcpy(dst, header, CRDB_WORD_STUFF_HEADER_SIZE);
return dst + CRDB_WORD_STUFF_HEADER_SIZE;
}
/*
* We expect a lot of short copies. Avoid the overhead of full memcpy
* for those...
*
* We simplify the code by relying on unaligned loads and stores being
* fast on x86. Instead of, e.g., implementing a 15-byte copy by
* copying 8 bytes into &dst[0], and another 7 into &dst[8], we
* instead copy 8 bytes into &dst[0] and 8 again into &dst[7].
* There's a bit of duplicated work, but that's much better than
* splitting the 7-byte copy into multiple smaller writes.
*
* On my puny intel, this brings down the decoding time for records <
* 64 bytes down to ~8 cycles, compared to ~20-30 cycles with a full
* call to memcpy.
*/
static void
short_memcpy(uint8_t *dst, const uint8_t *src, size_t n)
{
#define MEMCPY(DST, SRC, N) do { \
memcpy(DST, SRC, N); \
/* Hafta disable memcpy conversion. */ \
asm volatile("" ::: "memory"); \
} while (0)
/*
* If we could rely on short rep movsb being fast, we would
* just always use that. However, we can't, especially not on
* AMD, so defer to memcpy for large copies, and use the code
* below for short (< cache line) ones.
*/
if (CRDB_LIKELY(n >= 8)) {
size_t tail = n - 8;
if (CRDB_UNLIKELY(n >= CACHE_LINE_SIZE)) {
memcpy(dst, src, n);
return;
}
/* We'll handle the last 8 bytes with an overlapping write. */
for (size_t i = 0; i < tail; i += 8)
MEMCPY(dst + i, src + i, 8);
MEMCPY(dst + tail, src + tail, 8);
return;
}
/* 4 <= n <= 8: use two potentially overlapping 4-byte writes. */
if (CRDB_LIKELY(n >= 4)) {
size_t tail = n - 4;
MEMCPY(dst, src, 4);
MEMCPY(dst + tail, src + tail, 4);
return;
}
/* n <= 3. Decode the length in binary (2 and 1 -byte copies). */
if (n & 2)
MEMCPY(dst, src, 2);
if (n & 1)
MEMCPY(dst + (n - 1), src + (n - 1), 1);
#undef MEMCPY
return;
}
static inline uint8_t *
encode_run_size(uint8_t *dst, size_t distance)
{
assert(distance <= MAX_REMAINING_RUN);
/* Encode the distance in little-endian with RADIX base. */
dst[0] = distance % RADIX;
dst[1] = distance / RADIX;
return dst + 2;
}
static inline size_t
decode_run_size(const uint8_t src[static 2])
{
/* The chunk size is encoded in 2 little-endian base RADIX bytes. */
return src[0] + RADIX * src[1];
}
#define CONSUME(X) do { \
size_t consumed_ = (X); \
\
src += consumed_; \
src_size -= consumed_; \
} while (0)
uint8_t *
crdb_word_stuff_encode(uint8_t *dst, const void *vsrc, size_t src_size)
{
const uint8_t *src = vsrc;
uint8_t *ret = dst;
bool first_header = true;
/*
* Encoding looks for the next forbidden (header) sequence
* within the length that can be encoded in the current chunk:
* the first chunk uses a single byte, so the maximum length
* is MAX_INITIAL_RUN (252), and all other chunks can use two
* bytes, so their maximum length is MAX_REMAINING_RUN (252**2).
*
* In both cases, a value less than the maximum run size denotes
* the length of a literal run, followed by an implicit forbidden
* header, which can thus be consumed from the source. Otherwise,
* the length denotes a literal run, with no trailing header, so
* we only consume the literals copied to the destination.
*
* We also pretend we appended a forbidden sequence at the end
* of the source: that's the only way to ensure we can
* represent a short message. Decoding will strip that final
* forbidden sequence.
*/
for (;;) {
const uint8_t *next_forbidden;
size_t max_run_size;
size_t run_size;
if (first_header) {
max_run_size = MAX_INITIAL_RUN;
next_forbidden = crdb_word_stuff_header_find(src,
min(max_run_size, src_size));
run_size = next_forbidden - src;
assert(run_size <= MAX_INITIAL_RUN);
*ret = run_size;
ret++;
first_header = false;
} else {
max_run_size = MAX_REMAINING_RUN;
next_forbidden = crdb_word_stuff_header_find(src,
min(max_run_size, src_size));
run_size = next_forbidden - src;
ret = encode_run_size(ret, run_size);
}
short_memcpy(ret, src, run_size);
ret += run_size;
CONSUME(run_size);
/*
* Values less than the chunk size limit are
* implicitly suffixed with the stuff byte sequence.
*/
if (run_size < max_run_size) {
/*
* We reached the end (with a virtual
* terminating forbidden byte sequence).
*/
if (src_size == 0)
break;
assert(src_size >= CRDB_WORD_STUFF_HEADER_SIZE &&
src[0] == header[0] && src[1] == header[1] &&
"If we stopped short, we must have found "
"a forbidden header word.");
CONSUME(CRDB_WORD_STUFF_HEADER_SIZE);
}
}
return ret;
}
uint8_t *
crdb_word_stuff_decode(uint8_t *dst, const void *vsrc, size_t src_size)
{
const uint8_t *src = vsrc;
uint8_t *ret = dst;
bool first_header = true;
/*
* This is the inverse of the encode loop, with additional
* logic to avoid out-of-bound reads and detect obviously bad
* data.
*
* When we read the first header, we know it fits in one byte,
* and must be at most MAX_INITIAL_RUN. After that, we read
* 2-byte headers, which must represent a length of at most
* MAX_REMAINING_RUN.
*
* When the length is less than the maximum value, it denotes
* a run of literals (of the encoded length) followed by the
* forbidden (header) 2-byte sequence. When the length is
* equal to the maximum value, it denotes a run of literals,
* without any implicit trailing sequence.
*
* Once we arrive at the last chunk, we should find a chunk
* that encodes a literal followed by a header. That last
* header was virtually appended to the actual message during
* encoding, and we do not want to actually write it to the
* destination: it simply tells us that we correctly reached
* the end of the message.
*/
for (;;) {
size_t max_run_size;
size_t run_size;
if (first_header) {
max_run_size = MAX_INITIAL_RUN;
if (CRDB_UNLIKELY(src_size < 1))
return NULL;
run_size = src[0];
CONSUME(1);
first_header = false;
} else {
max_run_size = MAX_REMAINING_RUN;
if (CRDB_UNLIKELY(
src_size < CRDB_WORD_STUFF_HEADER_SIZE))
return NULL;
run_size = decode_run_size(src);
CONSUME(CRDB_WORD_STUFF_HEADER_SIZE);
}
if (CRDB_UNLIKELY(src_size < run_size ||
run_size > max_run_size))
return NULL;
short_memcpy(ret, src, run_size);
ret += run_size;
CONSUME(run_size);
/* We have to add the implicit header. */
if (run_size < max_run_size) {
/* Unless it's the virtual terminating header. */
if (src_size == 0)
break;
/*
* If it's not the end, there must at least be
* a header remaining. We check here before
* writing to dst to preserve the invariant
* that we'll never write more bytes than the
* initial src_size - 1.
*/
if (CRDB_UNLIKELY(
src_size < CRDB_WORD_STUFF_HEADER_SIZE))
return NULL;
ret = crdb_word_stuff_header(ret);
}
}
return ret;
}
#undef CONSUME
#pragma once
/**
* The word_stuff component implements a variant of consistent
* overhead byte stuffing (https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing,
* https://doi.org/10.1109%2F90.769765) that replaces the forbidden
* byte with a forbidden sequence of 2 bytes. This is similar to
* "word" stuffing, except that the sequence does not have to be
* aligned; that's important to deal with the fact that short writes
* in POSIX can fail at byte granularity.
*
* This functionality is expected to be used to write self-delimiting
* records to a persistent file. Each record should begin with a
* 2-byte header consisting of the forbidden byte sequence (as written
* by crdb_word_stuff_header), followed by the word-stuffed data. One
* can simply write that to a file in O_APPEND, and catch failures
* after the fact: the encoding ensures that overwritten *and* lost or
* inserted bytes only affect records that overlap with the corruption.
*
* On the read-side, one should scan for header byte sequences with
* crdb_word_stuff_header_find (and assume the last record ends where
* the file ends), and decode the stuffed contents between each
* header.
*
* We use byte-sequence stuffing with a funny (0xFE 0xFD) byte
* sequence to try and maximise the length of runs between occurrences
* of the forbidden sequence: that makes for a workload that's more
* efficient on contemporary machines than traditional COBS. We chose
* this sequence because it does not appear in any small integer,
* unsigned or signed (two's complement), nor in any varint,
* regardless of endianness; it also doesn't appear in any float or
* double value with an exponent around small integers. A two-byte run
* length header is excessive for small records, so the first run
* length header only uses a single byte. The worst-case space
* overhead thus occurs for records slightly longer than 254 bytes.
*
* The reader always knows when it's decoding a 1-byte (when we start
* decoding a record) or a 2-byte run size (the rest of the time), so
* there's no ambiguity there. In both cases, the run size encodes the
* number of literal bytes before inserting a 2-byte forbidden
* sequence (the word_stuff header), with an escape hatch for long
* runs without the forbidden sequence: when the run size is the
* maximum encodable length, it encodes the number of literal bytes to
* copy without inserting a forbidden sequence (if a forbidden
* sequence immediately follows, we next encode a run of size 0).
*
* The current implementation is much more geared toward fast decoding
* than encoding. That's nothing inherent to the format, but simply
* reflects the fact that we expect writes to be I/O bound for
* durability, and reads to happen much more frequently than writes.
*/
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
/**
* We use a two-byte header.
*/
enum { CRDB_WORD_STUFF_HEADER_SIZE = 2 };
/**
* Returns a pointer to the first byte of the first occurrence of the
* word stuffing header in `data[0 ... num - 1]`, or `data + num` if
* none.
*/
const uint8_t *crdb_word_stuff_header_find(const uint8_t *data, size_t num);
/**
* Returns the worst-case stuffed size for an input of `in_size` bytes.
*
* @param in_size the number of bytes in the raw input to stuff
* @param with_header whether to also include the 2-byte header in the
* return value.
*
* @return the stuffed size, or SIZE_MAX on overflow.
*/
size_t crdb_word_stuffed_size(size_t in_size, bool with_header);
/**
* CRDB_WORD_STUFFED_SIZE is a safe over-approximation of
* crdb_word_stuffed_size that can be used as an integer constant, as
* long as the computation does not overflow.
*
* We overestimate the number of run headers we might need by adding
* *2* to `IN_SIZE / MAX_RUN_LENGTH`: we must round up, and take into
* account the initial short run.
*/
#define CRDB_WORD_STUFFED_BOUND(IN_SIZE) \
((size_t)CRDB_WORD_STUFF_HEADER_SIZE + (IN_SIZE) + \
CRDB_WORD_STUFF_HEADER_SIZE * (2 + (IN_SIZE) / (253ULL * 253 - 1)))
/**
* Writes the 2-byte stuffing header to `dst`, and returns a pointer
* to `dst + CRDB_WORD_STUFF_HEADER_SIZE`.
*/
uint8_t *crdb_word_stuff_header(uint8_t dst[static CRDB_WORD_STUFF_HEADER_SIZE]);
/**
* Word stuffs the bytes in `src[0 ... src_size - 1]` into `dst`, which
* must have room for `crdb_word_stuffed_size(src_size, false)`.
*
* @return a pointer to one past the last byte written in `dst`.
*/
uint8_t *crdb_word_stuff_encode(uint8_t *dst, const void *src, size_t src_size);
/**
* Decodes the word-stuffed input in `src` into `dst`, which must have room
* for `src_size - 1` bytes.
*
* @return a pointer to one past the last byte written in `dst`, or NULL on
* decidedly invalid input.
*/
uint8_t *crdb_word_stuff_decode(uint8_t *dst, const void *src, size_t src_size);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment