Skip to content

Instantly share code, notes, and snippets.

@richardhundt
Created February 24, 2013 11:01
Show Gist options
  • Save richardhundt/5023415 to your computer and use it in GitHub Desktop.
Save richardhundt/5023415 to your computer and use it in GitHub Desktop.
libray
#include "ray.h"
int ray_last_error(ray_ctx_t* self) {
uv_err_t err = uv_last_error(self->loop);
return err.code;
}
const char* ray_strerror(int code) {
uv_err_t err = { .code = code };
return uv_strerror(err);
}
const char* ray_err_name(int code) {
uv_err_t err = { .code = code };
return uv_err_name(err);
}
void ray_buf_need(ray_buf_t* buf, size_t len) {
size_t size = buf->size;
size_t need = buf->offs + len;
if (size == 0) {
ray_buf_init(buf);
size = buf->size;
}
if (need > buf->size) {
while (size < need) size *= 2;
buf->base = (char*)realloc(buf->base, size);
buf->size = size;
}
printf("buf->base: %p\n", buf->base);
}
void ray_buf_init(ray_buf_t* buf) {
buf->base = calloc(1, RAY_BUF_SIZE);
buf->size = RAY_BUF_SIZE;
buf->offs = 0;
}
void ray_buf_write(ray_buf_t* buf, const char* data, size_t len) {
ray_buf_need(buf, len);
memcpy(buf->base + buf->offs, data, len);
buf->offs += len;
}
const char* ray_buf_read(ray_buf_t* buf) {
buf->offs = 0;
return buf->base;
}
void ray_buf_clear(ray_buf_t* buf) {
memset(buf->base, 0, buf->size);
buf->offs = 0;
}
size_t ray_buf_get_offset(ray_buf_t* buf) {
return buf->offs;
}
ray_evt_t ray_evt_init(ray_agent_t* a, ray_type_t t, int i, void* d) {
ray_evt_t evt;
evt.self = a;
evt.type = t;
evt.info = i;
evt.data = d;
printf("ray_evt_init - agent: %p, data: %p\n", a, d);
return evt;
}
void ray_ctx_async_cb(uv_async_t* async, int status) {
(void)async;
(void)status;
}
void ray_ctx_timer_cb(uv_timer_t* timer, int status) {
ray_ctx_t* ctx = container_of(timer, ray_ctx_t, timer);
ray_interrupt(ctx);
}
ray_ctx_t* ray_ctx_new(size_t size) {
ray_ctx_t* self = (ray_ctx_t*)malloc(sizeof(ray_ctx_t));
ray_ctx_init(self, size + (size % 2));
return self;
}
int ray_ctx_init(ray_ctx_t* self, size_t size) {
uv_loop_t* loop = uv_loop_new();
self->nput = 0;
self->nget = 0;
self->size = size;
self->loop = loop;
loop->data = (void*)self;
self->evts = calloc(size, sizeof(ray_evt_t));
uv_async_init(loop, &self->async, ray_ctx_async_cb);
uv_unref((uv_handle_t*)&self->async);
uv_timer_init(loop, &self->timer);
uv_unref((uv_handle_t*)&self->timer);
self->sys = ray_agent_new(self);
return 0;
}
void ray_ctx_free(ray_ctx_t* self) {
ray_agent_free(self->sys);
free(self->evts);
free(self);
}
ray_agent_t* ray_agent_new(ray_ctx_t* ctx) {
ray_agent_t* self = (ray_agent_t*)calloc(1, sizeof(ray_agent_t));
self->ctx = ctx;
return self;
}
void ray_agent_free(ray_agent_t* self) {
printf("free agent: %p\n", self);
free(self);
}
int ray_evt_count(ray_ctx_t* self) {
if (self->nput > self->nget) {
return self->nput - self->nget;
}
else { // integer wrap
return self->nget - self->nput;
}
}
void ray_post(ray_ctx_t* self, ray_evt_t* evt) {
assert(ray_evt_count(self) != self->size);
self->evts[self->nput++ % self->size] = *evt;
}
ray_evt_t* ray_take(ray_ctx_t* self) {
if (ray_evt_count(self) == 0) return NULL;
return &self->evts[self->nget++ % self->size];
}
ray_evt_t* ray_peek(ray_ctx_t* self) {
if (ray_evt_count(self) == 0) return NULL;
return &self->evts[self->nget % self->size];
}
ray_evt_t* ray_next(ray_ctx_t* self) {
int uv_again = 0;
do {
printf("try UV_RUN_NOWAIT\n");
uv_again = uv_run(self->loop, UV_RUN_NOWAIT);
if (ray_evt_count(self) != 0) break;
printf("try UV_RUN_ONCE\n");
uv_again = uv_run(self->loop, UV_RUN_ONCE);
printf("trigger\n");
if (ray_evt_count(self) != 0) break;
} while (uv_again);
if (ray_evt_count(self) != 0) return ray_take(self);
return NULL;
}
void ray_done(ray_evt_t* evt) {
printf("ray_done: evt: %p, data: %p\n", evt, evt->data);
if (evt->data) free(evt->data);
evt->data = NULL;
printf("OK\n");
}
int ray_interrupt(ray_ctx_t* ctx) {
return uv_async_send(&ctx->async);
}
/* ========================================================================== */
/* streams */
/* ========================================================================== */
void ray_close_cb(uv_handle_t* handle) {
ray_agent_t* self = container_of(handle, ray_agent_t, h);
ray_evt_t evt = ray_evt_init(self, RAY_CLOSE, 0, NULL);
ray_post(self->ctx, &evt);
}
void ray_close(ray_agent_t* self) {
if (!uv_is_closing(&self->h.handle)) {
uv_close(&self->h.handle, ray_close_cb);
}
else {
ray_evt_t evt = ray_evt_init(self, RAY_CLOSE, 0, NULL);
ray_post(self->ctx, &evt);
}
}
uv_buf_t ray_alloc_cb(uv_handle_t* handle, size_t size) {
ray_agent_t* self = container_of(handle, ray_agent_t, h);
ray_buf_need(&self->buf, size);
return uv_buf_init(self->buf.base, size);
}
void ray_read_cb(uv_stream_t* stream, ssize_t nread, uv_buf_t buf) {
printf("read_cb: nread %i\n", (int)nread);
ray_agent_t* self = container_of(stream, ray_agent_t, h);
if (nread == 0) return;
ray_evt_t evt;
if (nread > 0) {
evt = ray_evt_init(self, RAY_READ, nread, strndup(self->buf.base, nread));
}
else {
uv_err_t err = uv_last_error(stream->loop);
evt = ray_evt_init(self, RAY_ERROR, err.code, NULL);
ray_close(self);
}
ray_post(self->ctx, &evt);
}
void ray_write_cb(uv_write_t* req, int status) {
printf("ray_write_cb\n");
ray_agent_t* self = container_of(req, ray_agent_t, r);
ray_evt_t evt = ray_evt_init(self, RAY_WRITE, status, NULL);
ray_post(self->ctx, &evt);
//ray_interrupt(self->ctx);
}
int ray_read_start(ray_agent_t* self, size_t len) {
printf("ray_read_start: %p\n", self);
if (!len) len = RAY_BUF_SIZE;
printf("allocating buffer\n");
ray_buf_need(&self->buf, len);
printf("got buffer\n");
int rc = uv_read_start(&self->h.stream, ray_alloc_cb, ray_read_cb);
printf("uv_read_start returned: %i\n", rc);
return rc;
}
int ray_read_stop(ray_agent_t* self) {
return uv_read_stop(&self->h.stream);
}
int ray_write(ray_agent_t* self, const char* str, size_t len) {
uv_buf_t buf = uv_buf_init((char*)str, (unsigned int)len);
int rc = uv_write(&self->r.write, &self->h.stream, &buf, 1, ray_write_cb);
printf("uv_write returned: %i\n", rc);
return rc;
}
void ray_connection_cb(uv_stream_t* stream, int status) {
ray_agent_t* self = container_of(stream, ray_agent_t, h);
ray_evt_t evt = ray_evt_init(self, RAY_CONNECTION, status, NULL);
printf("connection_cb on self %p\n", self);
ray_post(self->ctx, &evt);
}
int ray_listen(ray_agent_t* self, int backlog) {
return uv_listen(&self->h.stream, backlog, ray_connection_cb);
}
int ray_accept(ray_agent_t* server, ray_agent_t* client) {
return uv_accept(&server->h.stream, &client->h.stream);
}
void* ray_get_data(ray_agent_t* self) {
return self->data;
}
void ray_set_data(ray_agent_t* self, void* data) {
self->data = data;
}
int ray_get_id(ray_agent_t* self) {
return self->id;
}
void ray_set_id(ray_agent_t* self, int id) {
self->id = id;
}
/* ========================================================================== */
/* timers */
/* ========================================================================== */
void ray_timer_cb(uv_timer_t* timer, int status) {
ray_agent_t* self = container_of(timer, ray_agent_t, h);
ray_evt_t evt = ray_evt_init(self, RAY_TIMER, status, NULL);
ray_post(self->ctx, &evt);
}
ray_agent_t* ray_timer_new(ray_ctx_t* ctx) {
ray_agent_t* self = ray_agent_new(ctx);
if (uv_timer_init(ctx->loop, &self->h.timer)) return NULL;
return self;
}
int ray_timer_start(ray_agent_t* self, int64_t timeo, int64_t repeat) {
return uv_timer_start(&self->h.timer, ray_timer_cb, timeo, repeat);
}
int ray_timer_stop(ray_agent_t* self) {
return uv_timer_stop(&self->h.timer);
}
/* ========================================================================== */
/* TCP */
/* ========================================================================== */
int ray_tcp_init(ray_agent_t* self) {
ray_buf_init(&self->buf);
return uv_tcp_init(self->ctx->loop, &self->h.tcp);
}
ray_agent_t* ray_tcp_new(ray_ctx_t* ctx) {
ray_agent_t* self = ray_agent_new(ctx);
if (ray_tcp_init(self)) return NULL;
return self;
}
int ray_tcp_bind(ray_agent_t* self, const char* host, int port) {
struct sockaddr_in addr;
addr = uv_ip4_addr(host, port);
return uv_tcp_bind(&self->h.tcp, addr);
}
/* ========================================================================== */
/* idle */
/* ========================================================================== */
void ray_idle_cb(uv_idle_t* idle, int status) {
ray_agent_t* self = container_of(idle, ray_agent_t, h);
ray_evt_t evt = ray_evt_init(self, RAY_IDLE, status, NULL);
ray_post(self->ctx, &evt);
}
ray_agent_t* ray_idle_new(ray_ctx_t* ctx) {
ray_agent_t* self = ray_agent_new(ctx);
uv_idle_init(self->ctx->loop, &self->h.idle);
return self;
}
int ray_idle_start(ray_agent_t* self) {
return uv_idle_start(&self->h.idle, ray_idle_cb);
}
int ray_idle_stop(ray_agent_t* self) {
return uv_idle_stop(&self->h.idle);
}
/* ========================================================================== */
/* file system */
/* ========================================================================== */
void ray_stat_init(ray_stat_t* self, uv_statbuf_t* s) {
if (s) {
self->dev = s->st_dev;
self->ino = s->st_ino;
self->mode = s->st_mode;
self->nlink = s->st_nlink;
self->uid = s->st_uid;
self->gid = s->st_gid;
self->rdev = s->st_rdev;
self->size = s->st_size;
self->atime = s->st_atime;
self->mtime = s->st_mtime;
self->ctime = s->st_ctime;
}
}
void ray_fs_cb(uv_fs_t* req) {
ray_agent_t* sys = container_of(req, ray_agent_t, r);
ray_evt_t evt;
if (req->result == -1) {
evt = ray_evt_init(sys, RAY_ERROR, req->errorno, NULL);
}
else {
int type = -1;
void* data = NULL;
int info = 0;
switch (req->fs_type) {
case UV_FS_RENAME:
type = RAY_FS_RENAME;
info = req->result;
break;
case UV_FS_UNLINK:
type = RAY_FS_UNLINK;
info = req->result;
break;
case UV_FS_RMDIR:
type = RAY_FS_RMDIR;
info = req->result;
break;
case UV_FS_MKDIR:
type = RAY_FS_MKDIR;
info = req->result;
break;
case UV_FS_FSYNC:
type = RAY_FS_FSYNC;
info = req->result;
break;
case UV_FS_FTRUNCATE:
type = RAY_FS_FTRUNCATE;
info = req->result;
break;
case UV_FS_FDATASYNC:
type = RAY_FS_FDATASYNC;
info = req->result;
break;
case UV_FS_LINK:
type = RAY_FS_LINK;
info = req->result;
break;
case UV_FS_SYMLINK:
type = RAY_FS_SYMLINK;
info = req->result;
break;
case UV_FS_CHMOD:
type = RAY_FS_CHMOD;
info = req->result;
break;
case UV_FS_FCHMOD:
type = RAY_FS_FCHMOD;
info = req->result;
break;
case UV_FS_CHOWN:
type = RAY_FS_CHOWN;
info = req->result;
break;
case UV_FS_FCHOWN:
type = RAY_FS_FCHOWN;
info = req->result;
break;
case UV_FS_UTIME:
type = RAY_FS_UTIME;
info = req->result;
break;
case UV_FS_FUTIME:
type = RAY_FS_FUTIME;
info = req->result;
break;
case UV_FS_CLOSE:
type = RAY_FS_CLOSE;
info = req->result;
break;
case UV_FS_OPEN:
type = RAY_FS_OPEN;
info = req->result;
break;
case UV_FS_READ:
type = RAY_FS_READ;
data = req->data;
info = req->result;
break;
case UV_FS_WRITE:
type = RAY_FS_WRITE;
info = req->result;
break;
case UV_FS_READLINK:
type = RAY_FS_READLINK;
info = strlen(req->ptr);
data = strdup(req->ptr);
break;
case UV_FS_READDIR: {
int i;
ray_dir_t* dirs;
type = RAY_FS_READDIR;
info = req->result;
data = calloc(info, sizeof(ray_dir_t));
dirs = (ray_dir_t*)data;
char* ptr = (char*)req->ptr;
for (i = 0; i < info; i++) {
char* name = strdup(ptr);
size_t nlen = strlen(ptr);
dirs[i].name = name;
dirs[i].nlen = nlen;
ptr += nlen + 1;
}
break;
}
case UV_FS_STAT: {
type = RAY_FS_STAT;
data = malloc(sizeof(ray_stat_t));
ray_stat_init((ray_stat_t*)data, (uv_statbuf_t*)req->ptr);
break;
}
case UV_FS_LSTAT: {
type = RAY_FS_LSTAT;
data = malloc(sizeof(ray_stat_t));
ray_stat_init((ray_stat_t*)data, (uv_statbuf_t*)req->ptr);
break;
}
case UV_FS_FSTAT: {
type = RAY_FS_READDIR;
data = malloc(sizeof(ray_stat_t));
ray_stat_init((ray_stat_t*)data, (uv_statbuf_t*)req->ptr);
break;
}
default: {
printf("Unhandled fs_type");
abort();
}
}
evt = ray_evt_init(sys, type, info, data);
}
uv_fs_req_cleanup(req);
ray_post(sys->ctx, &evt);
}
int ray_str_flags(const char* str) {
if (strcmp(str, "r") == 0)
return O_RDONLY;
if (strcmp(str, "r+") == 0)
return O_RDWR;
if (strcmp(str, "w") == 0)
return O_CREAT | O_TRUNC | O_WRONLY;
if (strcmp(str, "w+") == 0)
return O_CREAT | O_TRUNC | O_RDWR;
if (strcmp(str, "a") == 0)
return O_APPEND | O_CREAT | O_WRONLY;
if (strcmp(str, "a+") == 0)
return O_APPEND | O_CREAT | O_RDWR;
assert(0 && "Unknown file open flag");
}
int ray_fs_open(ray_ctx_t* ctx, const char *path, const char* how, int mode) {
int flags = ray_str_flags(how);
if (!mode) mode = 8;
return uv_fs_open(ctx->loop, &ctx->sys->r.fs, path, flags, mode, ray_fs_cb);
}
int ray_fs_read(ray_ctx_t* ctx, ray_file_t fh, char* buf, size_t len, int64_t ofs) {
return uv_fs_read(ctx->loop, &ctx->sys->r.fs, fh, buf, len, ofs, ray_fs_cb);
}
#include <stdlib.h>
#include <stddef.h>
#include <string.h>
#include <assert.h>
#include <stdio.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#ifndef _WIN32
#include <unistd.h>
#endif
#ifdef WIN32
# define UNUSED /* empty */
# define INLINE __inline
#else
# define UNUSED __attribute__((unused))
# define INLINE inline
#endif
#define RAY_DEBUG
#include "libuv/include/uv.h"
#ifdef RAY_DEBUG
# define TRACE(fmt, ...) do { \
fprintf(stderr, "%s: %d: %s: " fmt, \
__FILE__, __LINE__, __func__, ##__VA_ARGS__); \
} while (0)
#else
# define TRACE(fmt, ...) ((void)0)
#endif /* RAY_DEBUG */
/* default buffer size for read operations */
#define RAY_BUF_SIZE 4096
/* max path length */
#define RAY_MAX_PATH 1024
#define container_of(ptr, type, member) \
((type*) ((char*)(ptr) - offsetof(type, member)))
typedef enum {
RAY_UNKNOWN = -1,
RAY_CUSTOM,
RAY_ERROR,
RAY_READ,
RAY_WRITE,
RAY_CLOSE,
RAY_CONNECTION,
RAY_TIMER,
RAY_IDLE,
RAY_CONNECT,
RAY_SHUTDOWN,
RAY_WORK,
RAY_FS_CUSTOM,
RAY_FS_ERROR,
RAY_FS_OPEN,
RAY_FS_CLOSE,
RAY_FS_READ,
RAY_FS_WRITE,
RAY_FS_SENDFILE,
RAY_FS_STAT,
RAY_FS_LSTAT,
RAY_FS_FSTAT,
RAY_FS_FTRUNCATE,
RAY_FS_UTIME,
RAY_FS_FUTIME,
RAY_FS_CHMOD,
RAY_FS_FCHMOD,
RAY_FS_FSYNC,
RAY_FS_FDATASYNC,
RAY_FS_UNLINK,
RAY_FS_RMDIR,
RAY_FS_MKDIR,
RAY_FS_RENAME,
RAY_FS_READDIR,
RAY_FS_LINK,
RAY_FS_SYMLINK,
RAY_FS_READLINK,
RAY_FS_CHOWN,
RAY_FS_FCHOWN
} ray_type_t;
typedef union ray_handle_u {
uv_handle_t handle;
uv_stream_t stream;
uv_tcp_t tcp;
uv_pipe_t pipe;
uv_prepare_t prepare;
uv_check_t check;
uv_idle_t idle;
uv_async_t async;
uv_timer_t timer;
uv_fs_event_t fs_event;
uv_fs_poll_t fs_poll;
uv_poll_t poll;
uv_process_t process;
uv_tty_t tty;
uv_udp_t udp;
} ray_handle_t;
typedef union ray_req_u {
uv_req_t req;
uv_write_t write;
uv_connect_t connect;
uv_shutdown_t shutdown;
uv_fs_t fs;
uv_work_t work;
uv_udp_send_t udp_send;
uv_getaddrinfo_t getaddrinfo;
} ray_req_t;
typedef enum ray_err_e {
RAY_OK = 0,
RAY_EOF = 1,
RAY_EADDRINFO = 2,
RAY_EACCES = 3,
RAY_EAGAIN = 4,
RAY_EADDRINUSE = 5,
RAY_EADDRNOTAVAIL = 6,
RAY_EAFNOSUPPORT = 7,
RAY_EALREADY = 8,
RAY_EBADF = 9,
RAY_EBUSY = 10,
RAY_ECONNABORTED = 11,
RAY_ECONNREFUSED = 12,
RAY_ECONNRESET = 13,
RAY_EDESTADDRREQ = 14,
RAY_EFAULT = 15,
RAY_EHOSTUNREACH = 16,
RAY_EINTR = 17,
RAY_EINVAL = 18,
RAY_EISCONN = 19,
RAY_EMFILE = 20,
RAY_EMSGSIZE = 21,
RAY_ENETDOWN = 22,
RAY_ENETUNREACH = 23,
RAY_ENFILE = 24,
RAY_ENOBUFS = 25,
RAY_ENOMEM = 26,
RAY_ENOTDIR = 27,
RAY_EISDIR = 28,
RAY_ENONET = 29,
RAY_ENOTCONN = 31,
RAY_ENOTSOCK = 32,
RAY_ENOTSUP = 33,
RAY_ENOENT = 34,
RAY_ENOSYS = 35,
RAY_EPIPE = 36,
RAY_EPROTO = 37,
RAY_EPROTONOSUPPORT = 38,
RAY_EPROTOTYPE = 39,
RAY_ETIMEDOUT = 40,
RAY_ECHARSET = 41,
RAY_EAIFAMNOSUPPORT = 42,
RAY_EAISERVICE = 44,
RAY_EAISOCKTYPE = 45,
RAY_ESHUTDOWN = 46,
RAY_EEXIST = 47,
RAY_ESRCH = 48,
RAY_ENAMETOOLONG = 49,
RAY_EPERM = 50,
RAY_ELOOP = 51,
RAY_EXDEV = 52,
RAY_ENOTEMPTY = 53,
RAY_ENOSPC = 54,
RAY_EIO = 55,
RAY_EROFS = 56,
RAY_ENODEV = 57,
RAY_ESPIPE = 58,
RAY_ECANCELED = 59,
} ray_err_t;
typedef uv_file ray_file_t;
typedef struct ray_buf_s ray_buf_t;
typedef struct ray_evt_s ray_evt_t;
typedef struct ray_ctx_s ray_ctx_t;
typedef struct ray_agent_s ray_agent_t;
typedef struct ray_dir_s ray_dir_t;
typedef struct ray_stat_s ray_stat_t;
struct ray_buf_s {
size_t size;
size_t offs;
char* base;
};
struct ray_evt_s {
ray_type_t type;
ray_agent_t* self;
int info;
void* data;
};
struct ray_ctx_s {
size_t nput;
size_t nget;
size_t size;
ray_agent_t* sys;
uv_loop_t* loop;
ray_evt_t* evts;
uv_async_t async;
uv_timer_t timer;
};
struct ray_agent_s {
ray_handle_t h;
ray_req_t r;
ray_buf_t buf;
ray_ctx_t* ctx;
int id;
void* data;
};
struct ray_dir_s {
char* name;
size_t nlen;
};
struct ray_stat_s {
uint32_t mode;
uint32_t uid;
uint32_t gid;
uint64_t size;
uint64_t atime;
uint64_t mtime;
uint64_t ctime;
uint64_t dev;
uint64_t rdev;
uint64_t ino;
uint64_t nlink;
};
ray_buf_t* ray_buf_new(size_t size);
void ray_buf_init(ray_buf_t* buf);
void ray_buf_need(ray_buf_t* buf, size_t len);
void ray_buf_write(ray_buf_t* buf, const char* str, size_t len);
void ray_buf_clear(ray_buf_t* buf);
const char* ray_buf_read(ray_buf_t* buf);
void ray_buf_free(ray_buf_t* buf);
ray_ctx_t* ray_ctx_new(size_t size);
int ray_ctx_init(ray_ctx_t* self, size_t size);
void ray_ctx_free(ray_ctx_t* self);
int ray_evt_count(ray_ctx_t* self);
ray_evt_t ray_evt_init(ray_agent_t* o, ray_type_t t, int i, void* d);
ray_agent_t* ray_agent_new(ray_ctx_t* ctx);
void ray_agent_free(ray_agent_t* self);
void ray_post(ray_ctx_t* self, ray_evt_t* evt);
ray_evt_t* ray_take(ray_ctx_t* self);
ray_evt_t* ray_peek(ray_ctx_t* self);
ray_evt_t* ray_next(ray_ctx_t* self);
int ray_last_error(ray_ctx_t* self);
const char* ray_strerror(int code);
const char* ray_err_name(int code);
int ray_interrupt(ray_ctx_t* ctx);
ray_agent_t* ray_tcp_new(ray_ctx_t* ctx);
int ray_tcp_init(ray_agent_t* self);
int ray_tcp_bind(ray_agent_t* self, const char* host, int port);
int ray_read_start(ray_agent_t* self, size_t len);
int ray_read_stop(ray_agent_t* self);
int ray_write(ray_agent_t* self, const char* str, size_t len);
int ray_listen(ray_agent_t* self, int backlog);
int ray_accept(ray_agent_t* server, ray_agent_t* client);
void ray_close(ray_agent_t* self);
ray_agent_t* ray_timer_new(ray_ctx_t* ctx);
int ray_timer_start(ray_agent_t* self, int64_t timeo, int64_t repeat);
int ray_timer_stop(ray_agent_t* self);
ray_evt_t* ray_next(ray_ctx_t* self);
void ray_done(ray_evt_t* evt);
int ray_get_id(ray_agent_t* self);
void ray_set_id(ray_agent_t* self, int id);
local ffi = require('ffi')
ffi.cdef[[
typedef enum {
RAY_UNKNOWN = -1,
RAY_CUSTOM,
RAY_ERROR,
RAY_READ,
RAY_WRITE,
RAY_CLOSE,
RAY_CONNECTION,
RAY_TIMER,
RAY_IDLE,
RAY_CONNECT,
RAY_SHUTDOWN,
RAY_WORK,
RAY_FS_CUSTOM,
RAY_FS_ERROR,
RAY_FS_OPEN,
RAY_FS_CLOSE,
RAY_FS_READ,
RAY_FS_WRITE,
RAY_FS_SENDFILE,
RAY_FS_STAT,
RAY_FS_LSTAT,
RAY_FS_FSTAT,
RAY_FS_FTRUNCATE,
RAY_FS_UTIME,
RAY_FS_FUTIME,
RAY_FS_CHMOD,
RAY_FS_FCHMOD,
RAY_FS_FSYNC,
RAY_FS_FDATASYNC,
RAY_FS_UNLINK,
RAY_FS_RMDIR,
RAY_FS_MKDIR,
RAY_FS_RENAME,
RAY_FS_READDIR,
RAY_FS_LINK,
RAY_FS_SYMLINK,
RAY_FS_READLINK,
RAY_FS_CHOWN,
RAY_FS_FCHOWN
} ray_type_t;
typedef enum ray_err_e {
RAY_OK = 0,
RAY_EOF = 1,
RAY_EADDRINFO = 2,
RAY_EACCES = 3,
RAY_EAGAIN = 4,
RAY_EADDRINUSE = 5,
RAY_EADDRNOTAVAIL = 6,
RAY_EAFNOSUPPORT = 7,
RAY_EALREADY = 8,
RAY_EBADF = 9,
RAY_EBUSY = 10,
RAY_ECONNABORTED = 11,
RAY_ECONNREFUSED = 12,
RAY_ECONNRESET = 13,
RAY_EDESTADDRREQ = 14,
RAY_EFAULT = 15,
RAY_EHOSTUNREACH = 16,
RAY_EINTR = 17,
RAY_EINVAL = 18,
RAY_EISCONN = 19,
RAY_EMFILE = 20,
RAY_EMSGSIZE = 21,
RAY_ENETDOWN = 22,
RAY_ENETUNREACH = 23,
RAY_ENFILE = 24,
RAY_ENOBUFS = 25,
RAY_ENOMEM = 26,
RAY_ENOTDIR = 27,
RAY_EISDIR = 28,
RAY_ENONET = 29,
RAY_ENOTCONN = 31,
RAY_ENOTSOCK = 32,
RAY_ENOTSUP = 33,
RAY_ENOENT = 34,
RAY_ENOSYS = 35,
RAY_EPIPE = 36,
RAY_EPROTO = 37,
RAY_EPROTONOSUPPORT = 38,
RAY_EPROTOTYPE = 39,
RAY_ETIMEDOUT = 40,
RAY_ECHARSET = 41,
RAY_EAIFAMNOSUPPORT = 42,
RAY_EAISERVICE = 44,
RAY_EAISOCKTYPE = 45,
RAY_ESHUTDOWN = 46,
RAY_EEXIST = 47,
RAY_ESRCH = 48,
RAY_ENAMETOOLONG = 49,
RAY_EPERM = 50,
RAY_ELOOP = 51,
RAY_EXDEV = 52,
RAY_ENOTEMPTY = 53,
RAY_ENOSPC = 54,
RAY_EIO = 55,
RAY_EROFS = 56,
RAY_ENODEV = 57,
RAY_ESPIPE = 58,
RAY_ECANCELED = 59,
} ray_err_t;
typedef int ray_file_t;
typedef struct ray_buf_s ray_buf_t;
typedef struct ray_evt_s ray_evt_t;
typedef struct ray_ctx_s ray_ctx_t;
typedef struct ray_agent_s ray_agent_t;
typedef struct ray_dir_s ray_dir_t;
typedef struct ray_stat_s ray_stat_t;
struct ray_buf_s {
size_t size;
uint8_t* head;
uint8_t* base;
};
struct ray_evt_s {
ray_type_t type;
ray_agent_t* self;
int info;
void* data;
};
struct ray_dir_s {
char* name;
size_t nlen;
};
struct ray_stat_s {
uint32_t mode;
uint32_t uid;
uint32_t gid;
uint64_t size;
uint64_t atime;
uint64_t mtime;
uint64_t ctime;
uint64_t dev;
uint64_t rdev;
uint64_t ino;
uint64_t nlink;
};
ray_buf_t* ray_buf_new(size_t size);
void ray_buf_init(ray_buf_t* buf);
void ray_buf_need(ray_buf_t* buf, size_t len);
void ray_buf_write(ray_buf_t* buf, const char* str, size_t len);
void ray_buf_clear(ray_buf_t* buf);
const char* ray_buf_read(ray_buf_t* buf, size_t len);
void ray_buf_free(ray_buf_t* buf);
ray_ctx_t* ray_ctx_new(size_t size);
int ray_ctx_init(ray_ctx_t* self, size_t size);
void ray_ctx_free(ray_ctx_t* self);
int ray_evt_count(ray_ctx_t* self);
ray_evt_t ray_evt_init(ray_agent_t* o, ray_type_t t, int i, void* d);
ray_agent_t* ray_agent_new(ray_ctx_t* ctx);
void ray_agent_free(ray_agent_t* self);
void ray_post(ray_ctx_t* self, ray_evt_t* evt);
ray_evt_t* ray_take(ray_ctx_t* self);
ray_evt_t* ray_peek(ray_ctx_t* self);
ray_evt_t* ray_next(ray_ctx_t* self);
int ray_last_error(ray_ctx_t* self);
const char* ray_strerror(int code);
const char* ray_err_name(int code);
int ray_interrupt(ray_ctx_t* ctx);
ray_agent_t* ray_tcp_new(ray_ctx_t* ctx);
int ray_tcp_init(ray_agent_t* self);
int ray_tcp_bind(ray_agent_t* self, const char* host, int port);
int ray_read_start(ray_agent_t* self, size_t len);
int ray_read_stop(ray_agent_t* self);
int ray_write(ray_agent_t* self, const char* str, size_t len);
int ray_listen(ray_agent_t* self, int backlog);
int ray_accept(ray_agent_t* server, ray_agent_t* client);
void ray_close(ray_agent_t* self);
ray_agent_t* ray_timer_new(ray_ctx_t* ctx);
int ray_timer_start(ray_agent_t* self, int64_t timeo, int64_t repeat);
int ray_timer_stop(ray_agent_t* self);
ray_agent_t* ray_idle_new(ray_ctx_t* ctx);
int ray_idle_start(ray_agent_t* self);
int ray_idle_stop(ray_agent_t* self);
ray_evt_t* ray_next(ray_ctx_t* self);
void ray_done(ray_evt_t* evt);
int ray_get_id(ray_agent_t* self);
void ray_set_id(ray_agent_t* self, int id);
]]
return ffi.load('./libray.so')
local ffi = require('ffi')
local lib = require('ray')
--[[
local ctx = lib.ray_ctx_new(1024)
local timer = lib.ray_timer_new(ctx)
lib.ray_timer_start(timer, 1000, 1000)
--]]
--[[
local idle = lib.ray_idle_new(ctx)
lib.ray_idle_start(idle)
--]]
Sched = { }
Sched.IDGEN = 0
Sched.ALIVE = { }
Sched.QUEUE = lib.ray_ctx_new(1024)
Sched.COROS = { }
function Sched:run()
while true do
while #self.COROS > 0 do
local coro = table.remove(self.COROS)
coroutine.resume(coro)
end
local evt = lib.ray_next(self.QUEUE)
if evt == nil then
-- no more pending events
break
end
local oid = lib.ray_get_id(evt.self)
if oid > 0 then
local obj = self.ALIVE[oid]
obj:react(evt)
else
error("not found")
end
lib.ray_done(evt)
end
end
function Sched:genid()
self.IDGEN = self.IDGEN + 1
return self.IDGEN
end
function Sched:add(obj)
local oid = self:genid()
if type(obj) == 'thread' then
self.COROS[#self.COROS + 1] = obj
elseif obj.cdata then
lib.ray_set_id(obj.cdata, oid)
end
self.ALIVE[oid] = obj
return oid
end
TCPSocket = { }
TCPSocket.__index = TCPSocket
function TCPSocket.new(class)
local self = { }
self.cdata = lib.ray_tcp_new(Sched.QUEUE)
self.read_queue = { }
self.write_queue = { }
self.close_queue = { }
Sched:add(self)
return setmetatable(self, class)
end
function TCPSocket.new_from_cdata(class, cdata)
local self = setmetatable({
cdata = cdata;
read_queue = { };
write_queue = { };
close_queue = { };
}, class)
Sched:add(self)
return self
end
function TCPSocket:react(evt)
print("TCPSocket:react - evt:", evt)
if evt.type == 'RAY_ERROR' then
lib.ray_close(self.cdata)
elseif evt.type == 'RAY_READ' then
local data = evt.data
local coro = table.remove(self.read_queue)
lib.ray_read_stop(self.cdata)
coroutine.resume(coro, ffi.string(data))
elseif evt.type == 'RAY_WRITE' then
local coro = table.remove(self.write_queue)
coroutine.resume(coro)
elseif evt.type == 'RAY_CLOSE' then
if #self.close_queue > 0 then
local coro = table.remove(self.close_queue)
coroutine.resume(coro)
end
end
end
function TCPSocket:read()
print("TCPSocket:read - ", self.cdata)
local curr = coroutine.running()
self.read_queue[#self.read_queue + 1] = curr
lib.ray_read_start(self.cdata, 1024)
return coroutine.yield()
end
function TCPSocket:write(data)
local curr = coroutine.running()
self.write_queue[#self.write_queue + 1] = curr
lib.ray_write(self.cdata, data, #data)
return coroutine.yield()
end
function TCPSocket:close()
local curr = coroutine.running()
self.close_queue[#self.close_queue + 1] = curr
lib.ray_close(self.cdata)
return coroutine.yield()
end
TCPServer = { }
TCPServer.__index = TCPServer
function TCPServer.new(class)
local self = setmetatable({
cdata = lib.ray_tcp_new(Sched.QUEUE);
accept_queue = { },
close_queue = { },
}, class)
Sched:add(self)
return self
end
function TCPServer:react(evt)
if evt.type == 'RAY_ERROR' then
local mesg = evt.info
elseif evt.type == 'RAY_CONNECTION' then
local cdata = lib.ray_tcp_new(Sched.QUEUE)
lib.ray_accept(self.cdata, cdata)
local sock = TCPSocket:new_from_cdata(cdata)
local coro = table.remove(self.accept_queue, 1)
coroutine.resume(coro, sock)
elseif evt.type == 'RAY_CLOSE' then
local coro = table.remove(self.close_queue, 1)
coroutine.resume(coro)
end
end
function TCPServer:bind(host, port)
return lib.ray_tcp_bind(self.cdata, host, port)
end
function TCPServer:listen(backlog)
return lib.ray_listen(self.cdata, backlog)
end
function TCPServer:accept()
local curr = coroutine.running()
self.accept_queue[#self.accept_queue + 1] = curr
return coroutine.yield()
end
local main = coroutine.create(function()
local server = TCPServer:new()
server:bind('127.0.0.1', 8080)
server:listen(128)
while true do
local sock = server:accept()
local coro = coroutine.create(function()
while true do
local data = sock:read()
if data then
sock:write(data)
else
sock:close()
break
end
end
end)
Sched:add(coro)
end
end)
Sched:add(main)
Sched:run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment