Skip to content

Instantly share code, notes, and snippets.

@errzey
Created May 2, 2012 18:44
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save errzey/2579114 to your computer and use it in GitHub Desktop.
Save errzey/2579114 to your computer and use it in GitHub Desktop.
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <errno.h>
#include <evhtp.h>
#include <hiredis/hiredis.h>
#include <hiredis/async.h>
#include <hiredis/adapters/libevent.h>
typedef struct app_parent {
evhtp_t * evhtp;
evbase_t * evbase;
char * redis_host;
uint16_t redis_port;
} app_parent_t;
typedef struct app {
app_parent_t * parent;
evbase_t * evbase;
redisAsyncContext * redis;
} app_t;
typedef struct app_req {
app_t * app;
evhtp_request_t * http_req;
uint8_t reading;
uint8_t error;
} app_req_t;
evthr_t *
get_request_thr(evhtp_request_t * request) {
evhtp_connection_t * htpconn;
evthr_t * thread;
htpconn = evhtp_request_get_connection(request);
thread = htpconn->thread;
return thread;
}
void
redis_global_incr_cb(redisAsyncContext * redis, void * redis_reply, void * arg) {
redisReply * reply = redis_reply;
app_req_t * app_req = arg;
evhtp_request_t * request = app_req->http_req;
if (app_req->error) {
return;
}
if (reply == NULL || reply->type != REDIS_REPLY_INTEGER) {
evbuffer_add_printf(request->buffer_out,
"redis_global_incr_cb() failed\n");
return;
}
evbuffer_add_printf(request->buffer_out,
"Total requests = %lld\n", reply->integer);
}
void
redis_srcaddr_incr_cb(redisAsyncContext * redis, void * redis_reply, void * arg) {
redisReply * reply = redis_reply;
app_req_t * app_req = arg;
evhtp_request_t * request = app_req->http_req;
if (app_req->error) {
return;
}
if (reply == NULL || reply->type != REDIS_REPLY_INTEGER) {
evbuffer_add_printf(request->buffer_out,
"redis_srcaddr_incr_cb() failed\n");
return;
}
evbuffer_add_printf(request->buffer_out,
"Requests from this source IP = %lld\n", reply->integer);
}
void
redis_set_srcport_cb(redisAsyncContext * redis, void * redis_reply, void * arg) {
redisReply * reply = redis_reply;
app_req_t * app_req = arg;
evhtp_request_t * request = app_req->http_req;
if (app_req->error) {
return;
}
if (reply == NULL || reply->type != REDIS_REPLY_INTEGER) {
evbuffer_add_printf(request->buffer_out,
"redis_set_srcport_cb() failed\n");
return;
}
if (!reply->integer) {
evbuffer_add_printf(request->buffer_out,
"This source port has been seen already.\n");
} else {
evbuffer_add_printf(request->buffer_out,
"This source port has never been seen.\n");
}
}
void
redis_get_srcport_cb(redisAsyncContext * redis, void * redis_reply, void * arg) {
redisReply * reply = redis_reply;
app_req_t * app_req = arg;
evhtp_request_t * request = app_req->http_req;
int i;
if (app_req->error) {
app_req->reading = 0;
return;
}
if (reply == NULL || reply->type != REDIS_REPLY_ARRAY) {
evbuffer_add_printf(request->buffer_out,
"redis_get_srcport_cb() failed.\n");
return;
}
evbuffer_add_printf(request->buffer_out,
"source ports which have been seen for your ip:\n");
for (i = 0; i < reply->elements; i++) {
redisReply * elem = reply->element[i];
evbuffer_add_printf(request->buffer_out, "%s ", elem->str);
}
evbuffer_add(request->buffer_out, "\n", 1);
app_req->reading = 0;
/* final callback for redis, so send the response */
evhtp_send_reply(request, EVHTP_RES_OK);
}
void
app_process_request(evhtp_request_t * request, void * arg) {
app_req_t * app_req = arg;
evhtp_connection_t * conn = evhtp_request_get_connection(request);
struct sockaddr_in * sin;
char tmp[1024];
sin = (struct sockaddr_in *)conn->saddr;
evutil_inet_ntop(AF_INET, &sin->sin_addr, tmp, sizeof(tmp));
/* increment a global counter of hits on redis */
redisAsyncCommand(app_req->app->redis, redis_global_incr_cb,
(void *)app_req, "INCR requests:total");
/* increment a counter for hits from this source address on redis */
redisAsyncCommand(app_req->app->redis, redis_srcaddr_incr_cb,
(void *)app_req, "INCR requests:ip:%s", tmp);
/* add the source port of this request to a source-specific set */
redisAsyncCommand(app_req->app->redis, redis_set_srcport_cb, (void *)app_req,
"SADD requests:ip:%s:ports %d", tmp, ntohs(sin->sin_port));
/* get all of the ports this source address has used */
redisAsyncCommand(app_req->app->redis, redis_get_srcport_cb, (void *)app_req,
"SMEMBERS requests:ip:%s:ports", tmp);
app_req->reading = 1;
evhtp_request_pause(request);
}
void
app_on_error_cb(evhtp_request_t * request, short events, void * arg) {
app_req_t * app_req = arg;
app_req->error = 1;
evhtp_unset_all_hooks(&request->hooks);
if (app_req->reading == 0) {
free(app_req);
}
}
evhtp_res
app_on_fini(evhtp_request_t * request, void * arg) {
app_req_t * app_req = arg;
if (app_req->reading) {
return EVHTP_RES_OK;
}
free(app_req);
evhtp_request_resume(request);
return EVHTP_RES_OK;
}
evhtp_res
app_onpath_cb(evhtp_request_t * request, evhtp_path_t * path, void * arg) {
app_t * app;
app_req_t * app_req;
evthr_t * thread;
thread = get_request_thr(request);
app = evthr_get_aux(thread);
app_req = calloc(sizeof(app_req_t), 1);
app_req->app = app;
app_req->http_req = request;
app_req->reading = 0;
app_req->error = 0;
request->cb = app_process_request;
request->cbarg = app_req;
evhtp_set_hook(&request->hooks, evhtp_hook_on_error, app_on_error_cb, app_req);
evhtp_set_hook(&request->hooks, evhtp_hook_on_request_fini, app_on_fini, app_req);
return EVHTP_RES_OK;
}
void
app_init_thread(evhtp_t * htp, evthr_t * thread, void * arg) {
app_parent_t * app_p;
app_t * app;
app_p = arg;
app = calloc(sizeof(app_t), 1);
app->parent = app_p;
app->evbase = evthr_get_base(thread);
app->redis = redisAsyncConnect(app_p->redis_host, app_p->redis_port);
redisLibeventAttach(app->redis, app->evbase);
evthr_set_aux(thread, app);
}
int
main(int argc, char ** argv) {
evbase_t * evbase;
evhtp_t * evhtp;
evhtp_callback_t * cb;
app_parent_t * app_p;
evbase = event_base_new();
evhtp = evhtp_new(evbase, NULL);
app_p = calloc(sizeof(app_parent_t), 1);
app_p->evhtp = evhtp;
app_p->evbase = evbase;
app_p->redis_host = "127.0.0.1";
app_p->redis_port = 6379;
cb = evhtp_set_cb(evhtp, "/", NULL, NULL);
evhtp_set_hook(&cb->hooks, evhtp_hook_on_path, app_onpath_cb, NULL);
evhtp_use_threads(evhtp, app_init_thread, 4, app_p);
evhtp_bind_socket(evhtp, "127.0.0.1", 9090, 1024);
event_base_loop(evbase, 0);
return 0;
}
@FDj
Copy link

FDj commented Jul 5, 2012

This is different from the thread_design.c version that's included in examples/evhtp. Is this one newer or older? Is there documentation about what motivated the change? I'm asking because I'm having a problem on Mac OS X, where it seems like evhtp_request_pause messes with the event base enough so that it doesn't actually finish sending its response out to the network. Only sometimes will it proceed to send the response, but that could be something like 20 seconds later.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment