Created
May 2, 2012 18:44
-
-
Save errzey/2579114 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <stdint.h> | |
#include <errno.h> | |
#include <evhtp.h> | |
#include <hiredis/hiredis.h> | |
#include <hiredis/async.h> | |
#include <hiredis/adapters/libevent.h> | |
typedef struct app_parent { | |
evhtp_t * evhtp; | |
evbase_t * evbase; | |
char * redis_host; | |
uint16_t redis_port; | |
} app_parent_t; | |
typedef struct app { | |
app_parent_t * parent; | |
evbase_t * evbase; | |
redisAsyncContext * redis; | |
} app_t; | |
typedef struct app_req { | |
app_t * app; | |
evhtp_request_t * http_req; | |
uint8_t reading; | |
uint8_t error; | |
} app_req_t; | |
evthr_t * | |
get_request_thr(evhtp_request_t * request) { | |
evhtp_connection_t * htpconn; | |
evthr_t * thread; | |
htpconn = evhtp_request_get_connection(request); | |
thread = htpconn->thread; | |
return thread; | |
} | |
void | |
redis_global_incr_cb(redisAsyncContext * redis, void * redis_reply, void * arg) { | |
redisReply * reply = redis_reply; | |
app_req_t * app_req = arg; | |
evhtp_request_t * request = app_req->http_req; | |
if (app_req->error) { | |
return; | |
} | |
if (reply == NULL || reply->type != REDIS_REPLY_INTEGER) { | |
evbuffer_add_printf(request->buffer_out, | |
"redis_global_incr_cb() failed\n"); | |
return; | |
} | |
evbuffer_add_printf(request->buffer_out, | |
"Total requests = %lld\n", reply->integer); | |
} | |
void | |
redis_srcaddr_incr_cb(redisAsyncContext * redis, void * redis_reply, void * arg) { | |
redisReply * reply = redis_reply; | |
app_req_t * app_req = arg; | |
evhtp_request_t * request = app_req->http_req; | |
if (app_req->error) { | |
return; | |
} | |
if (reply == NULL || reply->type != REDIS_REPLY_INTEGER) { | |
evbuffer_add_printf(request->buffer_out, | |
"redis_srcaddr_incr_cb() failed\n"); | |
return; | |
} | |
evbuffer_add_printf(request->buffer_out, | |
"Requests from this source IP = %lld\n", reply->integer); | |
} | |
void | |
redis_set_srcport_cb(redisAsyncContext * redis, void * redis_reply, void * arg) { | |
redisReply * reply = redis_reply; | |
app_req_t * app_req = arg; | |
evhtp_request_t * request = app_req->http_req; | |
if (app_req->error) { | |
return; | |
} | |
if (reply == NULL || reply->type != REDIS_REPLY_INTEGER) { | |
evbuffer_add_printf(request->buffer_out, | |
"redis_set_srcport_cb() failed\n"); | |
return; | |
} | |
if (!reply->integer) { | |
evbuffer_add_printf(request->buffer_out, | |
"This source port has been seen already.\n"); | |
} else { | |
evbuffer_add_printf(request->buffer_out, | |
"This source port has never been seen.\n"); | |
} | |
} | |
void | |
redis_get_srcport_cb(redisAsyncContext * redis, void * redis_reply, void * arg) { | |
redisReply * reply = redis_reply; | |
app_req_t * app_req = arg; | |
evhtp_request_t * request = app_req->http_req; | |
int i; | |
if (app_req->error) { | |
app_req->reading = 0; | |
return; | |
} | |
if (reply == NULL || reply->type != REDIS_REPLY_ARRAY) { | |
evbuffer_add_printf(request->buffer_out, | |
"redis_get_srcport_cb() failed.\n"); | |
return; | |
} | |
evbuffer_add_printf(request->buffer_out, | |
"source ports which have been seen for your ip:\n"); | |
for (i = 0; i < reply->elements; i++) { | |
redisReply * elem = reply->element[i]; | |
evbuffer_add_printf(request->buffer_out, "%s ", elem->str); | |
} | |
evbuffer_add(request->buffer_out, "\n", 1); | |
app_req->reading = 0; | |
/* final callback for redis, so send the response */ | |
evhtp_send_reply(request, EVHTP_RES_OK); | |
} | |
void | |
app_process_request(evhtp_request_t * request, void * arg) { | |
app_req_t * app_req = arg; | |
evhtp_connection_t * conn = evhtp_request_get_connection(request); | |
struct sockaddr_in * sin; | |
char tmp[1024]; | |
sin = (struct sockaddr_in *)conn->saddr; | |
evutil_inet_ntop(AF_INET, &sin->sin_addr, tmp, sizeof(tmp)); | |
/* increment a global counter of hits on redis */ | |
redisAsyncCommand(app_req->app->redis, redis_global_incr_cb, | |
(void *)app_req, "INCR requests:total"); | |
/* increment a counter for hits from this source address on redis */ | |
redisAsyncCommand(app_req->app->redis, redis_srcaddr_incr_cb, | |
(void *)app_req, "INCR requests:ip:%s", tmp); | |
/* add the source port of this request to a source-specific set */ | |
redisAsyncCommand(app_req->app->redis, redis_set_srcport_cb, (void *)app_req, | |
"SADD requests:ip:%s:ports %d", tmp, ntohs(sin->sin_port)); | |
/* get all of the ports this source address has used */ | |
redisAsyncCommand(app_req->app->redis, redis_get_srcport_cb, (void *)app_req, | |
"SMEMBERS requests:ip:%s:ports", tmp); | |
app_req->reading = 1; | |
evhtp_request_pause(request); | |
} | |
void | |
app_on_error_cb(evhtp_request_t * request, short events, void * arg) { | |
app_req_t * app_req = arg; | |
app_req->error = 1; | |
evhtp_unset_all_hooks(&request->hooks); | |
if (app_req->reading == 0) { | |
free(app_req); | |
} | |
} | |
evhtp_res | |
app_on_fini(evhtp_request_t * request, void * arg) { | |
app_req_t * app_req = arg; | |
if (app_req->reading) { | |
return EVHTP_RES_OK; | |
} | |
free(app_req); | |
evhtp_request_resume(request); | |
return EVHTP_RES_OK; | |
} | |
evhtp_res | |
app_onpath_cb(evhtp_request_t * request, evhtp_path_t * path, void * arg) { | |
app_t * app; | |
app_req_t * app_req; | |
evthr_t * thread; | |
thread = get_request_thr(request); | |
app = evthr_get_aux(thread); | |
app_req = calloc(sizeof(app_req_t), 1); | |
app_req->app = app; | |
app_req->http_req = request; | |
app_req->reading = 0; | |
app_req->error = 0; | |
request->cb = app_process_request; | |
request->cbarg = app_req; | |
evhtp_set_hook(&request->hooks, evhtp_hook_on_error, app_on_error_cb, app_req); | |
evhtp_set_hook(&request->hooks, evhtp_hook_on_request_fini, app_on_fini, app_req); | |
return EVHTP_RES_OK; | |
} | |
void | |
app_init_thread(evhtp_t * htp, evthr_t * thread, void * arg) { | |
app_parent_t * app_p; | |
app_t * app; | |
app_p = arg; | |
app = calloc(sizeof(app_t), 1); | |
app->parent = app_p; | |
app->evbase = evthr_get_base(thread); | |
app->redis = redisAsyncConnect(app_p->redis_host, app_p->redis_port); | |
redisLibeventAttach(app->redis, app->evbase); | |
evthr_set_aux(thread, app); | |
} | |
int | |
main(int argc, char ** argv) { | |
evbase_t * evbase; | |
evhtp_t * evhtp; | |
evhtp_callback_t * cb; | |
app_parent_t * app_p; | |
evbase = event_base_new(); | |
evhtp = evhtp_new(evbase, NULL); | |
app_p = calloc(sizeof(app_parent_t), 1); | |
app_p->evhtp = evhtp; | |
app_p->evbase = evbase; | |
app_p->redis_host = "127.0.0.1"; | |
app_p->redis_port = 6379; | |
cb = evhtp_set_cb(evhtp, "/", NULL, NULL); | |
evhtp_set_hook(&cb->hooks, evhtp_hook_on_path, app_onpath_cb, NULL); | |
evhtp_use_threads(evhtp, app_init_thread, 4, app_p); | |
evhtp_bind_socket(evhtp, "127.0.0.1", 9090, 1024); | |
event_base_loop(evbase, 0); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is different from the thread_design.c version that's included in examples/evhtp. Is this one newer or older? Is there documentation about what motivated the change? I'm asking because I'm having a problem on Mac OS X, where it seems like evhtp_request_pause messes with the event base enough so that it doesn't actually finish sending its response out to the network. Only sometimes will it proceed to send the response, but that could be something like 20 seconds later.