Skip to content

Instantly share code, notes, and snippets.

@hltbra
Created May 24, 2019 21:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hltbra/1f2266b96c50700cc17a10115634e55f to your computer and use it in GitHub Desktop.
Save hltbra/1f2266b96c50700cc17a10115634e55f to your computer and use it in GitHub Desktop.
Experiments using ae.c/anet.c with Cython + dredis
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include "ae.h"
#include "anet.h"
#include "cdredis.h"
static struct aeEventLoop *eventLoop;
static char neterr[100000];
void sendReply(aeEventLoop *el, int fd, void *privdata, int mask) {
Client *client = privdata;
// printf("[%d]sendReply called, buff=%s\n", sendReply++, buff);
if (!client->out_buffer[0]) {
return;
}
int nwritten;
// printf("writing '%s'\n", client->out_buffer);
nwritten = write(fd, client->out_buffer, strlen(client->out_buffer));
memset(client->out_buffer, 0, sizeof(client->out_buffer));
aeDeleteFileEvent(el, fd, AE_WRITABLE);
return;
}
void on_message(Client *client) {
printf("called C on_message\n");
sprintf(client->out_buffer, "+1\r\n");
}
void readQuery(aeEventLoop *el, int fd, void *privdata, int mask) {
Client *client = privdata;
// char *buff = client->buffer;
int nread;
// char *pong = "+PONG\r\n\r\n";
// printf("readQuery(%d)\n", fd);
nread = read(fd, client->buffer, 1024*16);
// printf("client->buffer = %s\n", client->buffer);
// sprintf(client->out_buffer, "+1\r\n");
// memset(client->buffer, 0, sizeof(client->buffer));
// printf("nread=%d, buff='%s', errno=%d\n", nread, buff, errno);
if (nread == -1) {
if (errno == EAGAIN) {
printf("try again\n");
return;
} else {
printf("Closing %d (errno=%d)\n", fd, errno);
closeClient(client);
return;
}
} else if (nread == 0) {
printf("Closing %d (nread=0)\n", fd);
closeClient(client);
return;
}
if (fd == -1) {
printf("fd == -1\n");
return;
}
// printf("going to call on_message()\n");
client->on_message(client);
// printf("out_buffer = %s\n", client->out_buffer);
int should_send = client->out_buffer[0]; // will probably be "read input buffer"
// IMPORTANT NOTE:
// add the write callback after the buffer has been fullfilled to avoid wasting event loop cycles with an empty buffer.
// this optimization also requires deleting the callback after the buffer has been written.
if (should_send) {
if (aeCreateFileEvent(el, fd, AE_WRITABLE, sendReply, client) == AE_ERR) {
// printf("AE_ERR: WRITABLE\n");
// fflush(stdout);
closeClient(client);
return;
}
}
// printf("read '%s'\n", buff);
}
Client *createClient(int fd) {
Client *client;
client = malloc(sizeof(Client));
client->fd = fd;
memset(client->buffer, 0, sizeof(client->buffer));
memset(client->out_buffer, 0, sizeof(client->out_buffer));
return client;
}
void closeClient(Client *client) {
aeDeleteFileEvent(eventLoop, client->fd, AE_READABLE);
aeDeleteFileEvent(eventLoop, client->fd, AE_WRITABLE);
if (client->fd != -1) {
close(client->fd);
}
free(client);
}
void acceptHandler(int fd, int flags, void (*on_message_callback)(Client *)) {
Client *client;
if (fd == -1) {
return;
}
anetNonBlock(NULL,fd);
anetEnableTcpNoDelay(NULL,fd);
if ((client = createClient(fd)) == NULL) {
printf("HHHError registering fd event for the new client: %s (fd=%d)\n",
strerror(errno),fd);
close(fd); /* May be already closed, just ignore errors */
return;
}
client->on_message = on_message_callback;
if (aeCreateFileEvent(eventLoop, fd, AE_READABLE, readQuery, client) == AE_ERR)
{
printf("AE_ERR: READABLE\n");
closeClient(client);
return;
}
}
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
printf("accept\n");
void (*on_message_callback)(Client *) = privdata;
int cport, cfd, max = 1000;
char cip[10000];
while(max--) {
printf("max=%d\n", max);
cfd = anetTcpAccept(neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
printf("ANET_ERR acceptTCP (%d), errno=%d (%s)\n", cfd, errno, (errno != EWOULDBLOCK) ? "": "wouldblock");
if (errno != EWOULDBLOCK) {
printf("Accepting client connection: %s\n", neterr);
}
return;
}
printf("Accepted %s:%d\n", cip, cport);
acceptHandler(cfd, 0, on_message_callback);
}
}
int startServer(void (*on_message_callback)(Client *)) {
eventLoop = aeCreateEventLoop(100000);
int fd = create_tcp_server(on_message_callback);
if (fd == -1) {
return 1;
}
aeMain(eventLoop);
aeDeleteEventLoop(eventLoop);
return 0;
}
int create_tcp_server(void (*on_message_callback)(Client *)) {
int fd;
int backlog = 5000;
char *address = "0.0.0.0";
int port = 6377;
fd = anetTcpServer(neterr, port, address, backlog);
if (fd == ANET_ERR) {
printf("anet_err tcp server\n");
return -1;
} else {
printf("got server\n");
}
if (anetNonBlock(NULL,fd) == ANET_ERR) {
printf("ANET_ERR nonblock");
return -1;
}
if (aeCreateFileEvent(eventLoop, fd, AE_READABLE, acceptTcpHandler, on_message_callback) == AE_ERR) {
printf("Unrecoverable error creating server.ipfd file event.\n");
return -1;
} else {
printf("got file event\n");
}
return fd;
}
int main() {
return startServer(on_message);
}
typedef struct _client {
char buffer[1024 * 1024];
char out_buffer[1024 * 1024];
void (*on_message)(struct _client *);
int fd;
} Client;
void closeClient(Client *);
Client *createClient(int);
int create_tcp_server(void (*on_message_callback)(Client *));
extern int startServer(void (*on_message_callback)(Client *));
cdef extern from "cdredis.h":
ctypedef struct Client:
char *buffer;
char *out_buffer;
void (*on_message)(Client *);
int fd;
int startServer(void (*on_message)(Client *))
Client *createClient(int)
from libc.string cimport strcpy, memset
from dredis import db
from dredis.keyspace import Keyspace
from dredis.server import execute_cmd
from dredis.parser import Parser
cimport cythondredis
class Parser2(Parser):
def __init__(self, data):
self._buffer = data
def _read_into_buffer(self):
pass
keyspace = Keyspace()
cdef void on_message(cythondredis.Client *c):
def send(data):
strcpy(c.out_buffer, data)
parser = Parser2(c.buffer)
for cmd in parser.get_instructions():
execute_cmd(keyspace, send, *cmd)
memset(c.buffer, 0, sizeof(c.buffer))
db.DB_MANAGER.setup_dbs('/tmp/cython-db', 'memory', {})
cythondredis.startServer(on_message)
cd ~/projects/redis/src
export CFLAGS="-I$PWD"
export LDFLAGS="-L$PWD"
gcc -O2 zmalloc.c ae.c anet.c cdredis.c -shared -o libcdredis.so && python setup.py build_ext -i && python -c 'import cythondredis'
from distutils.core import setup
from distutils.extension import Extension
from Cython.Build import cythonize
setup(
ext_modules=cythonize([
Extension("cythondredis", ["cythondredis.pyx"],
libraries=["cdredis"])])
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment