Skip to content

Instantly share code, notes, and snippets.

@wangrenjun
Last active October 17, 2019 04:51
Show Gist options
  • Save wangrenjun/10708396 to your computer and use it in GitHub Desktop.
Save wangrenjun/10708396 to your computer and use it in GitHub Desktop.
another libev example
/*
* Licensed under the Apache License, Version 2.0.
*
* simple libev client example
* based on multi-process and ring buffer
* wangrj1981@gmail.com
*/
/*
gcc ./libev-client.c -c -o ./libev-client.o -I/usr/local/include/
gcc ./ringbuffer.c -c -o ./ringbuffer.o -I/usr/local/include/
gcc -o ./libev-client.out ./libev-client.o ./ringbuffer.o -L/usr/local/lib/ -lev
*/
#define CONNECTADDR "127.0.0.1"
#define CONNECTPORT "65535"
#define MSG "knock\r\n"
#define CONNS 1000
#define PERCONN_TASKS 1000
#define RINGBUFFER_SIZE 4096
#define BUFLEN 64
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <net/if.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <signal.h>
#include <ev.h>
#include "ringbuffer.h"
typedef struct {
int count;
int connfd;
struct ev_io read_watcher;
struct ev_io write_watcher;
ringbuffer_t rbuf;
unsigned rdwatching:1;
unsigned wrwatching:1;
unsigned bufinited:1;
} connctx;
static void on_send_cb(struct ev_loop *loop, struct ev_io *watcher, int revents);
static void on_recv_cb(struct ev_loop *loop, struct ev_io *watcher, int revents);
static void on_sigint_cb(EV_P_ ev_signal *w, int revents);
static int create_connector(const char *host, const char *port);
static void freectx(connctx *ctx);
static void msgsend(struct ev_loop *loop, connctx *ctx);
static int setnonblock(int fd);
static volatile int senttotal = 0;
static volatile int rcvdtotal = 0;
int main()
{
int i = 0;
connctx cliconn[CONNS];
struct ev_signal sigint_watcher;
signal(SIGPIPE, SIG_IGN);
ev_signal_init(&sigint_watcher, on_sigint_cb, SIGINT);
ev_signal_start(EV_DEFAULT_ &sigint_watcher);
for(; i < CONNS; i++) {
int connfd = create_connector(CONNECTADDR, CONNECTPORT);
if (connfd < 0) {
fprintf(stderr, "Failed to connect");
return 0;
}
cliconn[i].count = 0;
cliconn[i].connfd = connfd;
ev_io_init(&cliconn[i].write_watcher, on_send_cb, connfd, EV_WRITE);
cliconn[i].write_watcher.data = (void *)&cliconn[i];
ev_io_start(EV_DEFAULT_ &cliconn[i].write_watcher);
ev_io_init(&cliconn[i].read_watcher, on_recv_cb, connfd, EV_READ);
cliconn[i].read_watcher.data = (void *)&cliconn[i];
ev_io_start(EV_DEFAULT_ &cliconn[i].read_watcher);
cliconn[i].wrwatching = 1;
cliconn[i].rdwatching = 1;
cliconn[i].bufinited = 0;
}
ev_loop(EV_DEFAULT_ 0);
printf("total of messages sent: %d\n", senttotal);
printf("total of messages received: %d\n", rcvdtotal);
exit(0);
return 0;
}
static void on_send_cb(struct ev_loop *loop, struct ev_io *watcher, int revents)
{
connctx *ctx = (connctx *) watcher->data;
ev_io_stop(EV_DEFAULT_ &ctx->write_watcher);
ctx->wrwatching = 0;
if (ctx->bufinited == 0) {
ringbuffer_initialize(&ctx->rbuf, RINGBUFFER_SIZE);
ctx->bufinited = 1;
}
if (ctx->count++ < PERCONN_TASKS) {
ringbuffer_ensure_capacity(&ctx->rbuf, sizeof(MSG));
ringbuffer_write(&ctx->rbuf, MSG, sizeof(MSG));
msgsend(loop, ctx);
senttotal++;
} else
freectx(ctx);
}
static void on_recv_cb(struct ev_loop *loop, struct ev_io *watcher, int revents)
{
int rv;
char buf[BUFLEN];
connctx *ctx = (connctx *) watcher->data;
if (ctx->bufinited == 0) {
ringbuffer_initialize(&ctx->rbuf, RINGBUFFER_SIZE);
ctx->bufinited = 1;
}
do {
rv = recv(watcher->fd, buf, BUFLEN, 0);
if (rv == 0) {
freectx(ctx);
return;
} else if (rv < 0) {
break;
} else {
printf(buf);
}
rcvdtotal++;
} while(rv > 0);
if (ctx->count++ < PERCONN_TASKS) {
ringbuffer_ensure_capacity(&ctx->rbuf, sizeof(MSG));
ringbuffer_write(&ctx->rbuf, MSG, sizeof(MSG));
msgsend(loop, ctx);
senttotal++;
} else
freectx(ctx);
}
static void on_sigint_cb(EV_P_ ev_signal *w, int revents)
{
fprintf(stderr, "SIGINT\n");
ev_unloop(EV_A_ EVUNLOOP_ALL);
}
static int create_connector(const char *host, const char *port)
{
struct addrinfo hints;
struct addrinfo *result, *rp;
int sfd, r;
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_canonname = NULL;
hints.ai_addr = NULL;
hints.ai_next = NULL;
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
r = getaddrinfo(host, port, &hints, &result);
if (r != 0) {
fprintf(stderr, "Could not getaddrinfo by %s:%s", host, port);
return -1;
}
for (rp = result; rp != NULL; rp = rp->ai_next) {
sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (sfd == -1)
continue;
setnonblock(sfd);
int r = connect(sfd, rp->ai_addr, rp->ai_addrlen);
if (r != -1 || (r < 0 && errno == EINPROGRESS))
break;
close(sfd);
}
freeaddrinfo(result);
return (rp == NULL) ? -1 : sfd;
}
static void freectx(connctx *ctx)
{
ringbuffer_destruct(&ctx->rbuf);
if (ctx->rdwatching)
ev_io_stop(EV_DEFAULT_ &ctx->read_watcher);
if (ctx->wrwatching)
ev_io_stop(EV_DEFAULT_ &ctx->write_watcher);
close(ctx->connfd);
}
static void msgsend(struct ev_loop *loop, connctx *ctx)
{
int rv;
char buf[BUFLEN];
do {
rv = ringbuffer_peek(&ctx->rbuf, buf, BUFLEN);
if (rv <= 0)
break;
rv = send(ctx->connfd, buf, rv, 0);
if (rv < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
if (ctx->wrwatching)
break;
ev_io_init(&ctx->write_watcher, on_send_cb, ctx->connfd, EV_WRITE);
ctx->write_watcher.data = (void *)ctx;
ev_io_start(loop, &ctx->write_watcher);
ctx->wrwatching = 1;
return;
} else if (rv < 0) {
freectx(ctx);
return;
} else {
ringbuffer_consumed(&ctx->rbuf, rv);
}
} while (1);
}
static int setnonblock(int fd)
{
int flags;
flags = fcntl(fd, F_GETFL);
if (flags < 0)
return -1;
flags |= O_NONBLOCK;
if (fcntl(fd, F_SETFL, flags) < 0)
return -1;
return 0;
}
/*
* Licensed under the Apache License, Version 2.0.
*
* simple libev server example
* based on multi-process and ring buffer
* wangrj1981@gmail.com
*/
/*
gcc ./libev-server.c -c -o ./libev-server.o -I/usr/local/include/
gcc ./ringbuffer.c -c -o ./ringbuffer.o -I/usr/local/include/
gcc -o ./libev-server.out ./libev-server.o ./ringbuffer.o -L/usr/local/lib/ -lev
*/
#define LISTENADDR "127.0.0.1"
#define LISTENPORT "65535"
#define WORKERNUM 8
#define BACKLOG 99999
#define TIMEOUT 600
#define BUFLEN 64
#define RINGBUFFER_SIZE 4096
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <net/if.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <signal.h>
#include <ev.h>
#include "ringbuffer.h"
static struct ev_signal sigint_watcher;
static struct ev_signal sigterm_watcher;
static struct ev_timer time_watcher;
static struct ev_io accept_watcher;
static volatile int exit_mark = 0;
static volatile int listenfd;
static struct {
pid_t pid;
struct ev_child child;
} workers[WORKERNUM];
typedef struct {
int clifd;
struct ev_io read_watcher;
struct ev_io write_watcher;
ringbuffer_t rbuf;
unsigned rdwatching:1;
unsigned wrwatching:1;
unsigned bufinited:1;
} connection;
static void on_accept_cb(struct ev_loop *loop, struct ev_io *watcher, int revents);
static void on_read_cb(struct ev_loop *loop, struct ev_io *watcher, int revents);
static void writeback(struct ev_loop *loop, connection *conn);
static void on_write_cb(struct ev_loop *loop, struct ev_io *watcher, int revents);
static void on_timeup_cb(struct ev_loop *loop, struct ev_timer *w, int revents);
static void on_parent_terminate_cb(EV_P_ ev_signal *w, int revents);
static void on_child_terminate_cb(EV_P_ ev_signal *w, int revents);
static void on_child_exited_cb(EV_P_ ev_child *w, int revents);
static int spawn_worker(int idx);
static int make_listener(const char *host, const char *port);
static int setnonblock(int fd);
int main(int args, char *argv[])
{
int rv = 0, i = 0;
listenfd = make_listener(LISTENADDR, LISTENPORT);
if (listenfd < 0) {
fprintf(stderr, "Failed to listen\n");
return -1;
}
daemon(0, 0);
signal(SIGPIPE, SIG_IGN);
for (i = 0; i < WORKERNUM; i++) {
spawn_worker(i);
}
ev_signal_init(&sigint_watcher, on_parent_terminate_cb, SIGINT);
ev_signal_init(&sigterm_watcher, on_parent_terminate_cb, SIGTERM);
ev_signal_start(EV_DEFAULT_ &sigint_watcher);
ev_signal_start(EV_DEFAULT_ &sigterm_watcher);
ev_loop(EV_DEFAULT_ 0);
close(listenfd);
return 0;
}
static void on_accept_cb(struct ev_loop *loop, struct ev_io *watcher, int revents)
{
connection *conn;
struct sockaddr_in cliaddr;
socklen_t clilen = sizeof(cliaddr);
/* libev is level-triggering so don't have to loop accept */
int clifd = accept(watcher->fd, (struct sockaddr *)&cliaddr, &clilen);
if (clifd < 0)
return;
setnonblock(clifd);
conn = (connection *) malloc(sizeof(*conn));
if (conn == NULL)
return;
conn->clifd = clifd;
ev_io_init(&conn->read_watcher, on_read_cb, clifd, EV_READ);
conn->read_watcher.data = (void *)conn;
ev_io_start(loop, &conn->read_watcher);
conn->rdwatching = 1;
conn->wrwatching = 0;
conn->bufinited = 0;
}
static void closeconn(connection *conn)
{
ringbuffer_destruct(&conn->rbuf);
if (conn->rdwatching)
ev_io_stop(EV_DEFAULT_ &conn->read_watcher);
if (conn->wrwatching)
ev_io_stop(EV_DEFAULT_ &conn->write_watcher);
close(conn->clifd);
free(conn);
}
static void on_read_cb(struct ev_loop *loop, struct ev_io *watcher, int revents)
{
int rv;
char buf[BUFLEN];
connection *conn = (connection *) watcher->data;
if (conn->bufinited == 0) {
ringbuffer_initialize(&conn->rbuf, RINGBUFFER_SIZE);
conn->bufinited = 1;
}
do {
rv = recv(watcher->fd, buf, BUFLEN, 0);
if (rv == 0) {
closeconn(conn);
return;
} else if (rv < 0) {
break;
} else {
ringbuffer_ensure_capacity(&conn->rbuf, rv);
ringbuffer_write(&conn->rbuf, buf, rv);
}
} while(rv > 0);
// just simply echo
writeback(loop, conn);
}
static void writeback(struct ev_loop *loop, connection *conn)
{
int rv;
char buf[BUFLEN];
do {
rv = ringbuffer_peek(&conn->rbuf, buf, BUFLEN);
if (rv <= 0)
break;
rv = send(conn->clifd, buf, rv, 0);
if (rv < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
if (conn->wrwatching)
break;
ev_io_init(&conn->write_watcher, on_write_cb, conn->clifd, EV_WRITE);
conn->write_watcher.data = (void *)conn;
ev_io_start(loop, &conn->write_watcher);
conn->wrwatching = 1;
return;
} else if (rv < 0) {
closeconn(conn);
return;
} else {
ringbuffer_consumed(&conn->rbuf, rv);
}
} while (1);
}
static void on_write_cb(struct ev_loop *loop, struct ev_io *watcher, int revents)
{
connection *conn = (connection *) watcher->data;
ev_io_stop(EV_DEFAULT_ &conn->write_watcher);
conn->wrwatching = 0;
writeback(loop, conn);
}
static void on_timeup_cb(struct ev_loop *loop, struct ev_timer *w, int revents)
{
}
static void on_parent_terminate_cb(EV_P_ ev_signal *w, int revents)
{
int i = 0;
exit_mark = 1;
for (; i != WORKERNUM; ++i) {
ev_child_stop(EV_A_ &workers[i].child);
kill(workers[i].pid, SIGTERM);
}
ev_unloop(EV_A_ EVUNLOOP_ALL);
}
static void on_child_terminate_cb(EV_P_ ev_signal *w, int revents)
{
exit_mark = 1;
ev_unloop(EV_A_ EVUNLOOP_ALL);
}
static void on_child_exited_cb(EV_P_ ev_child *w, int revents)
{
int i = 0;
ev_child_stop(EV_A_ w);
if (exit_mark)
return;
for (; i != WORKERNUM; ++i) {
if (w->rpid == workers[i].pid)
spawn_worker(i);
}
}
static int spawn_worker(int idx)
{
int rv = fork();
if (rv < 0)
return -1;
else if (rv == 0) {
ev_signal_init(&sigint_watcher, on_child_terminate_cb, SIGINT);
ev_signal_init(&sigterm_watcher, on_child_terminate_cb, SIGTERM);
ev_signal_start(EV_DEFAULT_ &sigint_watcher);
ev_signal_start(EV_DEFAULT_ &sigterm_watcher);
ev_timer_init(&time_watcher, on_timeup_cb, 0., TIMEOUT);
ev_timer_start(EV_DEFAULT_ &time_watcher);
ev_io_init(&accept_watcher, on_accept_cb, listenfd, EV_READ);
ev_io_start(EV_DEFAULT_ &accept_watcher);
ev_loop(EV_DEFAULT_ 0);
exit(0);
} else {
ev_loop_destroy(EV_DEFAULT);
ev_default_loop(0);
ev_child_init(&workers[idx].child, on_child_exited_cb, rv, 0);
ev_child_start(EV_DEFAULT_ &workers[idx].child);
workers[idx].pid = rv;
}
return 0;
}
static int make_listener(const char *host, const char *port)
{
struct addrinfo hints;
struct addrinfo *result, *rp;
int sfd, r;
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_canonname = NULL;
hints.ai_addr = NULL;
hints.ai_next = NULL;
hints.ai_socktype = SOCK_STREAM;
hints.ai_family = AF_UNSPEC;
hints.ai_flags = AI_PASSIVE;
r = getaddrinfo(host, port, &hints, &result);
if (r != 0) {
return -1;
}
for (rp = result; rp != NULL; rp = rp->ai_next) {
sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (sfd == -1)
continue;
setnonblock(sfd);
if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0)
break;
close(sfd);
}
if (rp != NULL) {
if (listen(sfd, BACKLOG) == -1) {
freeaddrinfo(result);
return -1;
}
}
freeaddrinfo(result);
return (rp == NULL) ? -1 : sfd;
}
static int setnonblock(int fd)
{
int flags;
flags = fcntl(fd, F_GETFL);
if (flags < 0)
return -1;
flags |= O_NONBLOCK;
if (fcntl(fd, F_SETFL, flags) < 0)
return -1;
return 0;
}
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2011-2012 Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ringbuffer.h"
static lcb_size_t minimum(lcb_size_t a, lcb_size_t b)
{
return (a < b) ? a : b;
}
int ringbuffer_initialize(ringbuffer_t *buffer, lcb_size_t size)
{
char *root = (char *)malloc(size);
if (root == NULL) {
return 0;
}
ringbuffer_take_buffer(buffer, root, size);
return 1;
}
void ringbuffer_take_buffer(ringbuffer_t *buffer, char *buf, lcb_size_t size)
{
memset(buffer, 0, sizeof(ringbuffer_t));
buffer->root = buf;
buffer->size = size;
buffer->write_head = buffer->root;
buffer->read_head = buffer->root;
}
void ringbuffer_reset(ringbuffer_t *buffer)
{
ringbuffer_consumed(buffer,
ringbuffer_get_nbytes(buffer));
}
void ringbuffer_destruct(ringbuffer_t *buffer)
{
free(buffer->root);
buffer->root = buffer->read_head = buffer->write_head = NULL;
buffer->size = buffer->nbytes = 0;
}
int ringbuffer_ensure_capacity(ringbuffer_t *buffer, lcb_size_t size)
{
char *new_root;
lcb_size_t new_size = buffer->size << 1;
if (new_size == 0) {
new_size = 128;
}
if (size < (buffer->size - buffer->nbytes)) {
/* we've got capacity! */
return 1;
}
/* determine the new buffer size... */
while ((new_size - buffer->nbytes) < size) {
new_size <<= 1;
}
/* go ahead and allocate a bigger block */
if ((new_root = (char *)malloc(new_size)) == NULL) {
/* Allocation failed! */
return 0;
} else {
/* copy the data over :) */
char *old;
lcb_size_t nbytes = buffer->nbytes;
lcb_size_t nr = ringbuffer_read(buffer, new_root, nbytes);
lcb_assert(nr == nbytes);
old = buffer->root;
buffer->size = new_size;
buffer->root = new_root;
buffer->nbytes = nbytes;
buffer->read_head = buffer->root;
buffer->write_head = buffer->root + nbytes;
free(old);
return 1;
}
}
lcb_size_t ringbuffer_get_size(ringbuffer_t *buffer)
{
return buffer->size;
}
void *ringbuffer_get_start(ringbuffer_t *buffer)
{
return buffer->root;
}
void *ringbuffer_get_read_head(ringbuffer_t *buffer)
{
return buffer->read_head;
}
void *ringbuffer_get_write_head(ringbuffer_t *buffer)
{
return buffer->write_head;
}
lcb_size_t ringbuffer_write(ringbuffer_t *buffer,
const void *src,
lcb_size_t nb)
{
const char *s = (const char *)src;
lcb_size_t nw = 0;
lcb_size_t space;
lcb_size_t toWrite;
if (buffer->write_head >= buffer->read_head) {
/* write up to the end with data.. */
space = buffer->size - (lcb_size_t)(buffer->write_head - buffer->root);
toWrite = minimum(space, nb);
if (src != NULL) {
memcpy(buffer->write_head, s, toWrite);
}
buffer->nbytes += toWrite;
buffer->write_head += toWrite;
nw = toWrite;
if (buffer->write_head == (buffer->root + buffer->size)) {
buffer->write_head = buffer->root;
}
if (nw == nb) {
/* everything is written to the buffer.. */
return nw;
}
nb -= toWrite;
s += toWrite;
}
/* Copy data up until we catch up with the read head */
space = (lcb_size_t)(buffer->read_head - buffer->write_head);
toWrite = minimum(space, nb);
if (src != NULL) {
memcpy(buffer->write_head, s, toWrite);
}
buffer->nbytes += toWrite;
buffer->write_head += toWrite;
nw += toWrite;
if (buffer->write_head == (buffer->root + buffer->size)) {
buffer->write_head = buffer->root;
}
return nw;
}
lcb_size_t ringbuffer_strcat(ringbuffer_t *buffer, const char *str)
{
lcb_size_t len = strlen(str);
if (!ringbuffer_ensure_capacity(buffer, len)) {
return 0;
}
return ringbuffer_write(buffer, str, len);
}
static void maybe_reset(ringbuffer_t *buffer)
{
if (buffer->nbytes == 0) {
buffer->write_head = buffer->root;
buffer->read_head = buffer->root;
}
}
lcb_size_t ringbuffer_read(ringbuffer_t *buffer, void *dest, lcb_size_t nb)
{
char *d = (char *)dest;
lcb_size_t nr = 0;
lcb_size_t space;
lcb_size_t toRead;
if (buffer->nbytes == 0) {
return 0;
}
if (buffer->read_head >= buffer->write_head) {
/* read up to the wrap point */
space = buffer->size - (lcb_size_t)(buffer->read_head - buffer->root);
toRead = minimum(space, nb);
if (dest != NULL) {
memcpy(d, buffer->read_head, toRead);
}
buffer->nbytes -= toRead;
buffer->read_head += toRead;
nr = toRead;
if (buffer->read_head == (buffer->root + buffer->size)) {
buffer->read_head = buffer->root;
}
if (nr == nb) {
maybe_reset(buffer);
return nr;
}
nb -= toRead;
d += toRead;
}
space = (lcb_size_t)(buffer->write_head - buffer->read_head);
toRead = minimum(space, nb);
if (dest != NULL) {
memcpy(d, buffer->read_head, toRead);
}
buffer->nbytes -= toRead;
buffer->read_head += toRead;
nr += toRead;
if (buffer->read_head == (buffer->root + buffer->size)) {
buffer->read_head = buffer->root;
}
maybe_reset(buffer);
return nr;
}
lcb_size_t ringbuffer_peek(ringbuffer_t *buffer, void *dest, lcb_size_t nb)
{
ringbuffer_t copy = *buffer;
return ringbuffer_read(&copy, dest, nb);
}
lcb_size_t ringbuffer_peek_at(ringbuffer_t *buffer, lcb_size_t offset,
void *dest, lcb_size_t nb)
{
ringbuffer_t copy = *buffer;
lcb_size_t n = ringbuffer_read(&copy, NULL, offset);
if (n != offset) {
return -1;
}
return ringbuffer_read(&copy, dest, nb);
}
void ringbuffer_produced(ringbuffer_t *buffer, lcb_size_t nb)
{
lcb_size_t n = ringbuffer_write(buffer, NULL, nb);
lcb_assert(n == nb);
}
void ringbuffer_consumed(ringbuffer_t *buffer, lcb_size_t nb)
{
lcb_size_t n = ringbuffer_read(buffer, NULL, nb);
lcb_assert(n == nb);
}
lcb_size_t ringbuffer_get_nbytes(ringbuffer_t *buffer)
{
return buffer->nbytes;
}
lcb_size_t ringbuffer_update(ringbuffer_t *buffer,
ringbuffer_direction_t direction,
const void *src, lcb_size_t nb)
{
const char *s = (const char *)src;
lcb_size_t nw, ret = 0;
if (direction == RINGBUFFER_READ) {
if (buffer->read_head <= buffer->write_head) {
nw = minimum(nb, buffer->nbytes);
memcpy(buffer->read_head, s, nw);
ret += nw;
} else {
nw = minimum(nb, buffer->size - (lcb_size_t)(buffer->read_head - buffer->root));
memcpy(buffer->read_head, s, nw);
nb -= nw;
s += nw;
ret += nw;
if (nb) {
nw = minimum(nb, (lcb_size_t)(buffer->write_head - buffer->root));
memcpy(buffer->root, s, nw);
ret += nw;
}
}
} else {
if (buffer->write_head >= buffer->read_head) {
nw = minimum(nb, buffer->nbytes);
memcpy(buffer->write_head - nw, s, nw);
ret += nw;
} else {
nb = minimum(nb, buffer->nbytes);
nw = minimum(nb, (lcb_size_t)(buffer->write_head - buffer->root));
memcpy(buffer->write_head - nw, s + nb - nw, nw);
nb -= nw;
ret += nw;
if (nb) {
nw = minimum(nb, buffer->size - (lcb_size_t)(buffer->read_head - buffer->root));
memcpy(buffer->root + buffer->size - nw, s, nw);
ret += nw;
}
}
}
return ret;
}
void ringbuffer_get_iov(ringbuffer_t *buffer,
ringbuffer_direction_t direction,
struct lcb_iovec_st *iov)
{
iov[1].iov_base = buffer->root;
iov[1].iov_len = 0;
if (direction == RINGBUFFER_READ) {
iov[0].iov_base = buffer->read_head;
iov[0].iov_len = buffer->nbytes;
if (buffer->read_head >= buffer->write_head) {
ptrdiff_t chunk = buffer->root + buffer->size - buffer->read_head;
if (buffer->nbytes > (lcb_size_t)chunk) {
iov[0].iov_len = (lcb_size_t)chunk;
iov[1].iov_len = buffer->nbytes - (lcb_size_t)chunk;
}
}
} else {
lcb_assert(direction == RINGBUFFER_WRITE);
iov[0].iov_base = buffer->write_head;
iov[0].iov_len = buffer->size - buffer->nbytes;
if (buffer->write_head >= buffer->read_head) {
/* I may write all the way to the end! */
iov[0].iov_len = (lcb_size_t)((buffer->root + buffer->size) - buffer->write_head);
/* And all the way up to the read head */
iov[1].iov_len = (lcb_size_t)(buffer->read_head - buffer->root);
}
}
}
int ringbuffer_is_continous(ringbuffer_t *buffer,
ringbuffer_direction_t direction,
lcb_size_t nb)
{
int ret;
if (direction == RINGBUFFER_READ) {
ret = (nb <= buffer->nbytes);
if (buffer->read_head >= buffer->write_head) {
ptrdiff_t chunk = buffer->root + buffer->size - buffer->read_head;
if (nb > (lcb_size_t)chunk) {
ret = 0;
}
}
} else {
ret = (nb <= buffer->size - buffer->nbytes);
if (buffer->write_head >= buffer->read_head) {
ptrdiff_t chunk = buffer->root + buffer->size - buffer->write_head;
if (nb > (lcb_size_t)chunk) {
ret = 0;
}
}
}
return ret;
}
int ringbuffer_append(ringbuffer_t *src, ringbuffer_t *dest)
{
char buffer[1024];
lcb_size_t nr, nw;
while ((nr = ringbuffer_read(src, buffer,
sizeof(buffer))) != 0) {
lcb_assert(ringbuffer_ensure_capacity(dest, nr));
nw = ringbuffer_write(dest, buffer, nr);
lcb_assert(nw == nr);
}
return 1;
}
int ringbuffer_memcpy(ringbuffer_t *dst, ringbuffer_t *src,
lcb_size_t nbytes)
{
ringbuffer_t copy = *src;
struct lcb_iovec_st iov[2];
int ii = 0;
lcb_size_t towrite = nbytes;
lcb_size_t toread, nb;
if (nbytes > ringbuffer_get_nbytes(src)) {
/* EINVAL */
return -1;
}
if (!ringbuffer_ensure_capacity(dst, nbytes)) {
/* Failed to allocate space */
return -1;
}
ringbuffer_get_iov(dst, RINGBUFFER_WRITE, iov);
toread = minimum(iov[ii].iov_len, nbytes);
do {
lcb_assert(ii < 2);
nb = ringbuffer_read(&copy, iov[ii].iov_base, toread);
toread -= nb;
towrite -= nb;
++ii;
} while (towrite > 0);
ringbuffer_produced(dst, nbytes);
return 0;
}
int ringbuffer_ensure_alignment(ringbuffer_t *c)
{
#if defined(__hpux__) || defined(__hpux) || defined(__sparc__) || defined(__sparc)
intptr_t addr = (intptr_t)c->read_head;
if (addr % 8 != 0) {
ringbuffer_t copy;
if (ringbuffer_initialize(&copy, c->size) == 0 ||
ringbuffer_memcpy(&copy, c, ringbuffer_get_nbytes(c)) == -1) {
return -1;
}
ringbuffer_destruct(c);
*c = copy;
}
#else
(void)c;
#endif
return 0;
}
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2011-2012 Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef RINGBUFFER_H
#define RINGBUFFER_H 1
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stddef.h>
#include <assert.h>
typedef size_t lcb_size_t;
#define lcb_assert assert
struct lcb_iovec_st {
char *iov_base;
lcb_size_t iov_len;
};
typedef struct ringbuffer_st {
char *root;
char *read_head;
char *write_head;
lcb_size_t size;
lcb_size_t nbytes;
} ringbuffer_t;
typedef enum {
RINGBUFFER_READ = 0x01,
RINGBUFFER_WRITE = 0x02
} ringbuffer_direction_t;
int ringbuffer_initialize(ringbuffer_t *buffer,
lcb_size_t size);
/**
* Initialize a ringbuffer, taking ownership of an allocated char buffer.
* This function always succeeds.
* @param buffer a ringbuffer_t to be initialized
* @param buf the buffer to steal
* @param size the allocated size of the buffer
*/
void ringbuffer_take_buffer(ringbuffer_t *buffer,
char *buf,
lcb_size_t size);
void ringbuffer_reset(ringbuffer_t *buffer);
void ringbuffer_destruct(ringbuffer_t *buffer);
int ringbuffer_ensure_capacity(ringbuffer_t *buffer,
lcb_size_t size);
lcb_size_t ringbuffer_get_size(ringbuffer_t *buffer);
void *ringbuffer_get_start(ringbuffer_t *buffer);
void *ringbuffer_get_read_head(ringbuffer_t *buffer);
void *ringbuffer_get_write_head(ringbuffer_t *buffer);
lcb_size_t ringbuffer_write(ringbuffer_t *buffer,
const void *src,
lcb_size_t nb);
lcb_size_t ringbuffer_strcat(ringbuffer_t *buffer,
const char *str);
lcb_size_t ringbuffer_read(ringbuffer_t *buffer,
void *dest,
lcb_size_t nb);
lcb_size_t ringbuffer_peek(ringbuffer_t *buffer,
void *dest,
lcb_size_t nb);
lcb_size_t ringbuffer_peek_at(ringbuffer_t *buffer,
lcb_size_t offset,
void *dest,
lcb_size_t nb);
/* replace +nb+ bytes on +direction+ end of the buffer with src */
lcb_size_t ringbuffer_update(ringbuffer_t *buffer,
ringbuffer_direction_t direction,
const void *src, lcb_size_t nb);
void ringbuffer_get_iov(ringbuffer_t *buffer,
ringbuffer_direction_t direction,
struct lcb_iovec_st *iov);
void ringbuffer_produced(ringbuffer_t *buffer, lcb_size_t nb);
void ringbuffer_consumed(ringbuffer_t *buffer, lcb_size_t nb);
lcb_size_t ringbuffer_get_nbytes(ringbuffer_t *buffer);
int ringbuffer_is_continous(ringbuffer_t *buffer,
ringbuffer_direction_t direction,
lcb_size_t nb);
int ringbuffer_append(ringbuffer_t *src, ringbuffer_t *dest);
int ringbuffer_memcpy(ringbuffer_t *dst, ringbuffer_t *src,
lcb_size_t nbytes);
/* Align the read head of the ringbuffer for platforms where it's needed */
int ringbuffer_ensure_alignment(ringbuffer_t *src);
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment