Skip to content

Instantly share code, notes, and snippets.

@mnunberg
Last active January 1, 2016 20:59
Show Gist options
  • Save mnunberg/8200291 to your computer and use it in GitHub Desktop.
Save mnunberg/8200291 to your computer and use it in GitHub Desktop.
#include <stdio.h>
#include "netbufs.h"
#include "slist-inl.h"
#define MINIMUM(a, b) a < b ? a : b
#define MAXIMUM(a, b) a > b ? a : b
#define BASEALLOC 32768
#define BLOCK_IS_FLUSHED(block) ((block)->flush_start == (block)->cursor)
#define BLOCK_IS_EMPTY(block) ((block)->start == (block)->cursor)
#define BLOCK_HAS_DEALLOCS(block) ((block)->min_dealloc_offset != (nb_SIZE)-1)
#define FIRST_BLOCK(mgr) (SLIST_ITEM((mgr)->active_blocks.first, nb_BLOCK, slnode))
#define LAST_BLOCK(mgr) (SLIST_ITEM((mgr)->active_blocks.last), nb_BLOCK, slnode)
#define NEXT_BLOCK(block) (SLIST_ITEM((block)->slnode.next, nb_BLOCK, slnode))
static nb_SIZE get_block_size(nb_BLOCK *block)
{
nb_SIZE ret;
ret = block->wrap - block->start;
if (block->cursor < block->start) {
ret += block->cursor;
}
return ret;
}
static nb_BLOCK* alloc_new_block(nb_MGR *mgr, nb_SIZE capacity)
{
int ii;
nb_BLOCK *ret;
for (ii = 0; ii < MIN_BLOCK_COUNT; ii++) {
if (!mgr->_blocks[ii].nalloc) {
ret = mgr->_blocks + ii;
break;
}
}
if (!ret) {
ret = calloc(1, sizeof(*ret));
mgr->total_allocs++;
ret->type = NETBUF_BLOCK_STANDALONE;
}
if (!ret) {
return NULL;
}
ret->nalloc = mgr->basealloc;
while (ret->nalloc < capacity) {
ret->nalloc *= 2;
}
ret->wrap = 0;
ret->cursor = 0;
ret->root = malloc(ret->nalloc);
mgr->total_allocs++;
if (!ret->root) {
if (ret->type == NETBUF_BLOCK_STANDALONE) {
free(ret);
}
return NULL;
}
return ret;
}
static nb_BLOCK* find_free_block(nb_MGR *mgr, nb_SIZE capacity)
{
slist_iterator iter;
SLIST_ITERFOR(&mgr->avail_blocks, &iter) {
nb_BLOCK *cur = SLIST_ITEM(iter.cur, nb_BLOCK, slnode);
if (cur->nalloc >= capacity) {
slist_iter_remove(&mgr->avail_blocks, &iter);
if (cur->type != NETBUF_BLOCK_MANAGED) {
mgr->blockcount--;
}
return cur;
}
}
return NULL;
}
static int reserve_empty_block(nb_MGR *mgr, nb_SPAN *span)
{
nb_BLOCK *block;
if ( (block = find_free_block(mgr, span->size)) == NULL) {
block = alloc_new_block(mgr, span->size);
}
if (!block) {
return -1;
}
span->parent = block;
span->offset = 0;
block->start = 0;
block->wrap = span->size;
block->cursor = span->size;
block->use_count++;
block->min_dealloc_offset = -1;
slist_append(&mgr->active_blocks, &block->slnode);
return 0;
}
static int reserve_active_block(nb_BLOCK *block, nb_SPAN *span)
{
if (BLOCK_HAS_DEALLOCS(block)) {
return -1;
}
if (block->cursor > block->start) {
if (block->nalloc - block->cursor >= span->size) {
span->offset = block->cursor;
block->cursor += span->size;
block->wrap = block->cursor;
return 0;
} else if (block->start >= span->size) {
/** Wrap around the wrap */
span->offset = 0;
block->cursor = span->size;
return 0;
} else {
return -1;
}
} else {
/* Already wrapped */
if (block->start - block->cursor >= span->size) {
span->offset = block->cursor;
block->cursor += span->size;
return 0;
} else {
return -1;
}
}
}
int netbuf_reserve_span(nb_MGR *mgr, nb_SPAN *span)
{
nb_BLOCK *block;
int rv;
if (SLIST_IS_EMPTY(&mgr->active_blocks)) {
return reserve_empty_block(mgr, span);
} else {
block = SLIST_ITEM(mgr->active_blocks.last, nb_BLOCK, slnode);
rv = reserve_active_block(block,span);
if (rv != 0) {
return reserve_empty_block(mgr, span);
}
span->parent = block;
block->use_count++;
return rv;
}
}
nb_SIZE netbuf_get_size(const nb_MGR *mgr)
{
nb_SIZE ret = 0;
slist_node *ll;
SLIST_FOREACH(&mgr->active_blocks, ll) {
ret += get_block_size(SLIST_ITEM(ll, nb_BLOCK, slnode));
}
return ret;
}
unsigned int netbuf_get_niov(nb_MGR *mgr)
{
slist_node *ll;
unsigned int ret;
SLIST_FOREACH(&mgr->active_blocks, ll) {
nb_BLOCK *cur = SLIST_ITEM(ll, nb_BLOCK, slnode);
if (BLOCK_IS_EMPTY(cur)) {
continue;
}
ret++;
if (cur->cursor < cur->start) {
ret++;
}
}
return ret;
}
/**
* Flush semantics.
*
* The idea behind the netbuf system is that a buffer can be flushed while it
* is being appended to. However data which is currently being flushed
* should not be removed - i.e. via release_span.
*
* A flush operation consists of three steps:
*
* 1. Fill the IOV structures with the data offsets to flush
* 2. Flush the data to the network passing it the IOVs
* 3. Report how much data was actually flushed.
*
* In order to handle partial block flushes (i.e. only part of a block
* has been flushed), the manager shall retain the following data:
*
* I. The first block to flush
* II. The SIZE offset at which to start flushing.
*
* In the case of a partial flush where the flush ends in middle of a block,
* these two variables are set to the partially flushed block, and the
* number of bytes of this block that were partially flushed.
*
* This works because we make the assumption that a BLOCK SHALL NEVER DECREASE
* IN SIZE WHILE IT IS BEING FLUSHED. As such, we can assume that the size
* offset will _always_ refer to at least a subset of the same buffers which
* were present during the inital flush request.
*
*
* Once a block has been flushed,
*/
nb_SIZE netbuf_start_flush(nb_MGR *mgr, nb_IOV *iovs, int niov)
{
nb_SIZE ret = 0;
nb_IOV *iov_end = iovs + niov + 1;
nb_IOV *iov = iovs;
nb_BLOCK *block;
slist_node *ll;
#define SET_IOV_LEN(len) iov->iov_len = len; ret += len;
/** If there's nothing to flush, return immediately */
if (SLIST_IS_EMPTY(&mgr->active_blocks)) {
iov[0].iov_base = NULL;
iov[0].iov_len = 0;
return 0;
}
SLIST_FOREACH(&mgr->active_blocks, ll) {
block = SLIST_ITEM(ll, nb_BLOCK, slnode);
if (block->flushcur == block->cursor || BLOCK_IS_EMPTY(block)) {
continue;
}
/** Flush cursor is either in the first region or the second region */
if (block->cursor == block->wrap) {
/** Only one region */
iov->iov_base = block->root + block->flushcur;
SET_IOV_LEN(block->wrap - block->flushcur);
continue;
} else {
/** Two regions, but we may have flushed the first one */
if (block->flushcur > block->cursor) {
/** First one isn't flushed completely */
iov->iov_base = block->root + block->flushcur;
SET_IOV_LEN(block->wrap - block->flushcur);
if (!block->cursor) {
continue;
}
if (++iov == iov_end) {
break;
}
iov->iov_base = block->root;
SET_IOV_LEN(block->cursor);
} else {
iov->iov_base = block->root + block->flushcur;
SET_IOV_LEN(block->cursor - block->flushcur);
}
}
}
#undef SET_IOV_LEN
return ret;
}
/**
* Here we modify the flush offsets, which should always be a subset of the
* usage offsets.
*/
void netbuf_end_flush(nb_MGR *mgr, unsigned int nflushed)
{
slist_node *ll;
SLIST_FOREACH(&mgr->active_blocks, ll) {
nb_SIZE to_chop;
nb_BLOCK *block = SLIST_ITEM(ll, nb_BLOCK, slnode);
if (block->flushcur >= block->start) {
/** [xxxxxSxxxxxFxxxxxCW] */
to_chop = MINIMUM(nflushed, block->wrap - block->flushcur);
block->flushcur += to_chop;
nflushed -= to_chop;
if (block->flushcur == block->wrap && block->cursor != block->wrap) {
/** [xxxxCoooooSxxxxxFW] */
if (!nflushed) {
block->flushcur = 0;
return;
}
to_chop = MINIMUM(nflushed, block->cursor);
nflushed -= to_chop;
block->flushcur += to_chop;
}
} else {
/** [xxxxxFxxxCoooooSxxxxxW] */
/** Flush cursor is less than start. Second segment */
to_chop = MINIMUM(nflushed, block->cursor - block->flushcur);
block->flushcur += to_chop;
nflushed -= to_chop;
}
if (!nflushed) {
break;
}
}
}
int netbuf_get_flush_status(const nb_MGR *mgr, const nb_SPAN *span)
{
return -1;
}
static void ooo_queue_dealoc(nb_BLOCK *block, nb_SPAN *span)
{
nb_DEALLOC *dea = calloc(1, sizeof(*dea));
dea->offset = span->offset;
dea->size = span->size;
if (block->min_dealloc_offset > dea->offset) {
block->min_dealloc_offset = dea->offset;
}
slist_append(&block->deallocs, &dea->slnode);
}
static void ooo_apply_dealloc(nb_BLOCK *block, nb_SIZE offset)
{
nb_SIZE min_next = -1;
slist_iterator iter;
SLIST_ITERFOR(&block->deallocs, &iter) {
nb_DEALLOC *cur = SLIST_ITEM(iter.cur, nb_DEALLOC, slnode);
if (cur->offset == offset) {
slist_iter_remove(&block->deallocs, &iter);
block->start += cur->size;
free(cur);
} else if (cur->offset < min_next) {
min_next = cur->offset;
}
}
block->min_dealloc_offset = min_next;
}
void netbuf_release_span(nb_MGR *mgr, nb_SPAN *span)
{
nb_BLOCK *block = span->parent;
if (span->offset == block->start) {
/** Removing from the beginning */
block->start += span->size;
if (block->min_dealloc_offset == block->start) {
ooo_apply_dealloc(block, block->start);
}
if (!BLOCK_IS_EMPTY(block) && block->start == block->wrap) {
block->wrap = block->cursor;
block->start = 0;
}
} else if (span->offset + span->size == block->cursor) {
/** Removing from the end */
if (block->cursor == block->wrap) {
/** Single region, no wrap */
block->cursor -= span->size;
block->wrap -= span->size;
} else {
block->cursor -= span->size;
if (!block->cursor) {
/** End has reached around */
block->cursor = block->wrap;
}
}
} else {
ooo_queue_dealoc(block, span);
return;
}
if (--block->use_count) {
return;
}
lcb_assert(BLOCK_IS_EMPTY(block));
{
slist_iterator iter;
SLIST_ITERFOR(&mgr->active_blocks, &iter) {
if (&block->slnode == iter.cur) {
slist_iter_remove(&mgr->active_blocks, &iter);
break;
}
}
}
if (mgr->blockcount < mgr->maxblocks) {
slist_append(&mgr->avail_blocks, &block->slnode);
mgr->blockcount++;
} else {
free(block->root);
block->root = NULL;
if (block->type == NETBUF_BLOCK_STANDALONE) {
free(block);
}
}
}
void netbuf_init(nb_MGR *mgr)
{
memset(mgr, 0, sizeof(*mgr));
mgr->basealloc = BASEALLOC;
mgr->maxblocks = MIN_BLOCK_COUNT * 2;
mgr->blockcount = MIN_BLOCK_COUNT;
}
static void free_blocklist(slist_root *list)
{
slist_iterator iter;
SLIST_ITERFOR(list, &iter) {
nb_BLOCK *block = SLIST_ITEM(iter.cur, nb_BLOCK, slnode);
slist_iter_remove(list, &iter);
if (block->root) {
free(block->root);
}
if (block->type == NETBUF_BLOCK_STANDALONE) {
free(block);
}
}
}
void netbuf_cleanup(nb_MGR *mgr)
{
free_blocklist(&mgr->active_blocks);
free_blocklist(&mgr->avail_blocks);
}
void netbuf_dump_status(nb_MGR *mgr)
{
slist_node *ll;
const char *indent = "";
printf("Status for MGR=%p [nallocs=%u]\n", (void *)mgr, mgr->total_allocs);
printf("ACTIVE:\n");
SLIST_FOREACH(&mgr->active_blocks, ll) {
nb_BLOCK *block = SLIST_ITEM(ll, nb_BLOCK, slnode);
indent = " ";
printf("%sBLOCK=%p; BUF=%p, %uB\n", indent,
(void *)block, block->root, block->nalloc);
indent = " ";
printf("%sUSAGE:\n", indent);
if (block->cursor > block->start) {
printf("%s |-- [ OFFSET=%u ] -- [ POS=%u ] -- [ LIMIT=%u ]\n",
indent, block->start, block->cursor, block->wrap);
} else {
printf("%s |-- [ POS=%u ] -- [ OFFSET=%u ] -- [ LIMIT=%u ]\n",
indent, block->cursor, block->start, block->wrap);
}
}
}
#ifndef LCB_PACKET_H
#define LCB_PACKET_H
#include "config.h"
#include "slist.h"
#include <libcouchbase/couchbase.h>
#include <memcached/protocol_binary.h>
#ifdef __cplusplus
extern "C" {
#endif
typedef struct netbufs_st nb_MGR;
typedef struct netbuf_block_st nb_BLOCK;
typedef struct netbuf_span_st nb_SPAN;
typedef unsigned int nb_SIZE;
typedef struct lcb_iovec_st nb_IOV;
struct netbuf_span_st {
/** PRIVATE: Parent block */
nb_BLOCK *parent;
/** PRIVATE: Offset from root at which this buffer begins */
nb_SIZE offset;
/** PUBLIC, write-once: Allocation size */
nb_SIZE size;
};
#define NETBUF_SPAN_INIT(span, size) (span)->size = size
/**
*
* NETBUF - Efficient write buffers
* ================================
* GOALS
* =====
*
* (1) provide a simple buffer allocation API
* From a logic perspective it's simplest to deal with a straight
* contiguous buffer per packet.
*
* (2) provide an efficient way of sending multiple contiguous packets. This
* will reduce IOV fragmentation and reduce the number of trips to the
* I/O plugin for multiple writes. Currently this is done very efficiently
* with the ringbuffer - however this comes at the cost of copying all
* request data to the ringbuffer itself. Our aim is to reduce the
* number of copies while still maintaining a packed buffer.
*
* (3) Allow a pluggable method by which user-provided data can be plugged
* into the span/cursor/flush architecture.
*
*
* Basic terminology and API
* =========================
*
* ~~~ SPAN ~~~
*
* a SPAN is a region of contiguous memory; a span is user allocated.
* A span is initialized via NETBUF_SPAN_INIT which sets the size the span
* should cover.
*
* Once the span has been set, it must be _reserved_. Once a span has been
* reserved, it will guarantee access to a buffer which may be obtained
* via SPAN_BUFFER. This buffer is guaranteed to contain exactly size bytes
* and may be written to or read from using memcpy. Note that the span's buffer
* is not aligned.
*
* Additionally, spans are effectively ordered in sequential memory. This means
* that it can be effectively relied upon that if span_A is reserved and then
* span_B is reserved, that span_A will be ordered before span_B. This will
* make more sense later on when reading about FLUSH.
*
* ~~~ BLOCK ~~~
*
* A block contains a chunk of memory and offset variables. The chunk of
* memory belonging to a block is fixed (by default to 32k). A block maintains
* a sequence of one or more _effectively contiguous_ spans. The spans are
* ordered in such a manner that, at most, two buffer pointers
* (e.g. char * pointers) will be required to obtain a sequential representation
* of all spans contained therein. This allows for optimization of grouping
* many spans into larger blocks of packed spans.
*
* When a block does not have space for additional spans, a new block is obtained
* (either allocated, or retrieved from a cache). Blocks are ordered as a
* super-sequence of spans; thus:
*
* [ BLOCK 1 ] [ BLOCK 2 ]
* { S1, S2, S3 } { S4, S5, S6, S7 }
*
*
* Note that blocks are not aware of the spans they contain. Blocks only contain
* bound offsets which effectively represent the first and last span contained
* in them. This makes the block structures smaller and easier to maintain.
*
* ~~~ MANAGER ~~~~
*
* The manager controls the assignment of spans to blocks. Thus it is aware
* of the block order.
*
*
* ~~~ FLUSH ~~~~
* Flush is the act of consuming data from the manager. Flush represents an
* internal cursor located within the blocks. This cursor is non-repeatable
* (it cannot be rewound) and represents a position within a specific block.
* All data before this position is considered to be "flushed" or "consumed"
* (typically via a send() call), and all data after the cursor is considered
* to be "unflushed" - i.e. it has not been sent over the network yet.
*
* API-wise, flush is performed by populating a set of IOV structures which
* may be sent (this does not modify internals) via fill_iov(). Once the
* IOV has been sent, the set_flushed() function is called indicating how
* many bytes have been flushed. The internal cursor is incremented by this
* amount of bytes.
*
* Flush begins at the first block and ends at the last active block.
* In this use pattern, it is assumed that under normal circumstances a span
* will not be released until it has been flushed - and releasing a span
* before it has been flushed will corrupt the internal offsets as well as
* resulting in having garbled data placed within the TCP stream.
*
* It is safe to release spans which have been flushed; once a block has been
* flushed and all its spans have been released, the block is considered
* available (or freed to libc, depending on allocation constraints).
*
* Memcached Packet Construction
* =============================
*
* From libcouchbase, the intended architecture is to maintain a manager
* object per server structure. Packets sent to the server will be allocated
* in packed order and will be shipped off to the socket via an IOV structure.
*
* It is assumed that there will be a metadata packet structure containing
* the header, user cookie, start time, etc which will also contain an
* embedded SPAN structure containing the offsets for the specific packet.
*
* As the SPAN is contiguous, the key will also be embedded within the span
* as well.
*
* User Allocated Packets
* ======================
*
* With this scheme it will also be possible to employ use allocated data
* packets.
*
* This will require a specialized packet structure (for the metadata
* book-keeping).
*
* Specifically, in order to support user-allocated data, each separate region
* in memory must be encapsulated into a customized block structure which has
* non-trivial overhead (see below for the memory layout of the block
* structure).
*
* An example command request may look like the following:
*/
#if 0
struct packet_request_sample {
/* Incoming header. 24+extras. Aligned */
const char *header;
/** Key/Value payload */
struct lcb_iovec_st *iov;
unsigned char niov;
};
#endif
/**
* The corresponding internal structure would look like this
*/
#if 0
struct internal_userpacket_sample {
struct packet_request_sample *user;
nb_SPAN *spans;
unsigned char nspans;
/* ... */
};
#endif
/**
* Internally, each IOV region would receive its own block structure which
* must be allocated (or retrieved from a cache). This block structure
* currently tallies at 40 bytes, and will grow if out-of-order dealloction
* is to be supported.
*/
/**
* A block contains a single allocated buffer. The buffer itself may be
* divided among multiple spans. We divide our buffers like so:
*
* U=Used, F=Free, X=Limited
*
* Initially:
* [ UUUUUUUFFFFFFFFFFFFFFFFFFFF ]
*
* After flushing some data:
*
* [ FFFFFUUUFFFFFFFFFFFFFFFFFFF ]
*
* Wrap-Around:
*
* [ UUUUFFFFFFFFUUUUUUUUU ]
*
*
* Wrap-Around (With space waste at the end)
* [ UUUUFFFFFFFFFFUUUUUXXX ]
*
* The used size of the buffer is done by:
*
* (1) Taking the base 'pos' variable.
* (2) Checking if it is greater than 'offset'. If it is, then the total
* size is pos - offset. As such, the buffer looks like this:
* [ FFFFFUUUUUUUUFFFFFF ]
* or just:
* [ UUUUUUUUFFFFFFFFFFF ]
*
*
* (3) If it isn't, the total size is offset - pos; this means the buffer
* looks like this:
* [ UUUUUUUUFFFFFFFUUUU ]
*
*
* To get the two IOV structures from the buffer, we calculate the two
* regions. Again:
*
* (1) If pos > offset then we have a single IOV structure.
* (2) Otherwise, we have two IOV structures. The first is:
* OFFSET..LIMIT
*
* And the second is:
* ROOT..POS
*/
enum {
/** Block is part of the manager structure */
NETBUF_BLOCK_MANAGED = 0,
/** Block has been allocated by the manager, but is not part of its structure */
NETBUF_BLOCK_STANDALONE,
/** Block is user provided */
NETBUF_BLOCK_USER
};
typedef struct {
slist_node slnode;
nb_SIZE offset;
nb_SIZE size;
} nb_DEALLOC;
struct netbuf_block_st {
/** Link for next block in list; @8b */
slist_node slnode;
/** The position at which data starts; @12b */
nb_SIZE start;
/** The position at which the first segment ends. */
nb_SIZE wrap;
/**
* The position at which the current segment ends. If this block only
* has a single segment, then this will be equal to first_end, otherwise
* it will be smaller than 'start'
*/
nb_SIZE cursor;
/** Flush cursor */
nb_SIZE flushcur;
/** How much actual data is allocated; @24b */
nb_SIZE nalloc;
/** Pointer to allocated buffer @32b*/
char *root;
/** One of NETBUF_BLOCK_* values @33b */
char type;
/** Whether this block is set for being flushed */
char flush_status;
/** Reference count on the buffer @32b;*/
short use_count;
slist_root deallocs;
nb_SIZE min_dealloc_offset;
};
#define MIN_BLOCK_COUNT 32
#define ALLOC_HIST_BUCKETS 24
typedef struct {
nb_BLOCK *first;
nb_SIZE pos;
} nb_FLUSHINFO;
struct netbufs_st {
/** Blocks which are enqueued in the network */
slist_root active_blocks;
/** Fully free blocks */
slist_root avail_blocks;
/** Fixed allocation size */
unsigned int basealloc;
unsigned int maxblocks;
unsigned int blockcount;
unsigned int total_allocs;
/** Contiguous block heads for cache locality */
nb_BLOCK _blocks[MIN_BLOCK_COUNT];
};
/**
* Retrieves a pointer to the buffer related to this span.
*/
#define SPAN_BUFFER(span) ((span)->parent->root + (span)->offset)
/**
* Reserve a contiguous region of memory, in-order for a given span. The
* span will be reserved from the last block to be flushed to the network.
*
* The contents of the span are guaranteed to be contiguous (though not aligned)
* and are available via the SPAN_BUFFER macro.
*
* The 'size' property of the span parameter should be set prior to calling
* this function
*
* @return 0 if successful, -1 on error
*/
int netbuf_reserve_span(nb_MGR *mgr, nb_SPAN *span);
#define NETBUF_FLUSHED_PARTIAL -1
#define NETBUF_FLUSHED_FULL 1
#define NETBUF_FLUSHED_NONE 0
/**
* Indicate whether the specified span has been flushed to the network.
* @return one of
* NETBUF_FLUSHED_PARTIAL: Part of the span has been written
* NETBUF_FLUSHED_FULL: The entire span has been written
* NETBUF_FLUSHED_NONE: None of the span has been written.
*/
int netbuf_get_flush_status(const nb_MGR *mgr, const nb_SPAN *span);
/**
* Release a span previously allocated via reserve_span. It is assumed that the
* contents of the span have either:
*
* (1) been successfully sent to the network
* (2) have just been scheduled (and are being removed due to error handling)
* (3) have been partially sent to a connection which is being closed.
*
* Additionally, the span must currently be located either at the very beginning
* or the very end of the buffer. This should never be a problem in normal
* situations, where packets are enqueued in order.
*
* TODO: This is a bit weird. Any ideas about this?
*/
void netbuf_release_span(nb_MGR *mgr, nb_SPAN *span);
/**
* Gets the number of IOV structures required to flush the entire contents of
* all buffers.
*/
unsigned int netbuf_get_niov(nb_MGR *mgr);
/**
* Populates an iovec structure for flushing a set of bytes from the various
* blocks.
*
* @param mgr the manager object
* @param iov an array of iovec structures
* @param niov the number of iovec structures allocated.
*
* @return the number of bytes which can be flushed in this IOV. If the
* return value is 0 then there are no more bytes to flush.
*
* Note that the return value is limited by the number of IOV structures
* provided and should not be taken as an indicator of how many bytes are
* used overall.
*/
nb_SIZE netbuf_start_flush(nb_MGR *mgr, nb_IOV *iov, int niov);
/**
* Indicate that a number of bytes have been flushed. This should be called after
* the data retrieved by get_flushing_iov has been flushed to the TCP buffers.
*
* @param mgr the manager object
* @param nflushed how much data in bytes was flushed to the network.
*/
void netbuf_end_flush(nb_MGR *mgr, nb_SIZE nflushed);
/**
* Resets any flushing state.
*/
#define netbuf_reset_flush(mgr) \
do { \
mgr->flushing_block = NULL; \
mgr->flushing_pos = 0; \
} while (0);
/**
* Informational function to get the total size of all data in the
* buffers. This traverses all blocks, so call this for debugging only.
*/
nb_SIZE netbuf_get_size(const nb_MGR *mgr);
void netbuf_init(nb_MGR *mgr);
void netbuf_cleanup(nb_MGR *mgr);
void netbuf_dump_status(nb_MGR *mgr);
#ifdef __cplusplus
}
#endif
#endif /* LCB_PACKET_H */
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment