Skip to content

Instantly share code, notes, and snippets.

@rangan337
Last active October 1, 2015 18:08
Show Gist options
  • Save rangan337/77da6f7069d0fb29d443 to your computer and use it in GitHub Desktop.
Save rangan337/77da6f7069d0fb29d443 to your computer and use it in GitHub Desktop.
Libuv Fix - Example for Scenario 2
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <pthread.h>
#include <unistd.h>
#include <hiredis.h>
#include <async.h>
#include <adapters/libuv.h>
#define REDIS_PORT 19008
#define REQ_THREADS 20 // No of parallel async reqs made to Redis server
/*************
* Variables *
*************/
// LIBUV Event Loop
uv_loop_t* loop;
// Event loop watchers
uv_async_t watcherStop, watcherRunCmd;
// Async connection used in this test
redisAsyncContext *c;
// Variable to indicate if the async connection
// is valid or not (Connected = valid, disconnected = !valid)
int isAsyncConnValid = 0;
/*************
* Callbacks *
*************/
// Callback function called when a reply is received for a
// Redis command send over the async connection
void getCallback(redisAsyncContext *c, void *r, void *privdata) {
redisReply *reply = r;
if (reply == NULL)
printf("-=[ NULL Reply received ]=-\n");
else
printf("-=[ Valid Reply : argv[%s]: %s ]=-\n", (char*)privdata, reply->str);
}
// Callback invoked when a connection is attempted and a response
// for the attempt is available
void connectCallback(const redisAsyncContext *c, int status) {
if (status != REDIS_OK) {
printf("Error: %s\n", c->errstr);
return;
}
printf("-=[ Connected ]=-\n");
isAsyncConnValid = 1;
}
// Callback invoked when an async connection is disconnected
void disconnectCallback(const redisAsyncContext *c, int status) {
isAsyncConnValid = 0;
if (status != REDIS_OK) {
printf("-=[ Disconnected with error: %s ]=-\n", c->errstr);
return;
}
printf("-=[ Gracefully disconnected ]=-\n");
}
/*****************************************************************
* Watcher functions - Triggered/called when associated watchers *
* are signalled *
*****************************************************************/
// This function basically stops a running event loop
void breakEventLoop(uv_async_t *handle) {
uv_stop(handle->loop);
}
// This function will run a Redis command on an async connection
// Called on the event loop thread when associated runCmd watcher
// is signalled
void runCmd(uv_async_t *handle) {
if(isAsyncConnValid == 1) {
// Some random Redis command (Valid format ofcourse)
redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key");
}
}
/*************************************
* Functions run on spawned pthreads *
*************************************/
// This function will run the event loop indefinitely until
// stopped explicitly by the watcher
void* runUVLoop(void *arg) {
printf("-=[ Starting event loop ]=-\n");
uv_run(loop, UV_RUN_DEFAULT);
printf("-=[ Stopping event loop ]=-\n");
return (void*) NULL;
}
// This function will fire an event on the event loop
// watcher to trigger running a Redis command on the async conn
void *executeRunCmd() {
uv_async_send(&watcherRunCmd);
return (void*) NULL;
}
// This function will execute a SEGFAULT on a sync conn
// to the Redis server
void* executeSegfault() {
printf("-=[ Initiating SEGFAULT ]=-\n");
redisContext *c = redisConnect("127.0.0.1", REDIS_PORT);
redisCommand(c, "DEBUG SEGFAULT");
printf("-=[ Completed SEGFAULT ]=-\n");
return (void*) NULL;
}
/*********************************************
* Other functions called on the main thread *
*********************************************/
// Function to populate the Redis server with some data
void populateServerWithData() {
redisContext *c = redisConnect("127.0.0.1", REDIS_PORT);
if (c->err) {
/* Let *c leak for now... */
printf("Error: %s\n", c->errstr);
return;
}
char *str="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
for(int i = 0; i < 10000; i++) {
char val[2000];
sprintf(val,"%s_%d", str, i);
redisCommand(c, "ZADD test NX %d %s", i, val);
}
}
// This function will print out the current pending queued
// callbacks on the async connection
void printQueuedCallbacks() {
int queuedCallbacks = 0;
if(isAsyncConnValid) {
redisCallback *cb = c->replies.head;
while(cb != NULL) {
queuedCallbacks++;
cb = cb->next;
}
}
printf("-=[ Queued up callbacks : %d ]=-\n", queuedCallbacks);
}
int main (int argc, char **argv) {
signal(SIGPIPE, SIG_IGN);
populateServerWithData();
loop = malloc(sizeof(uv_loop_t));
uv_loop_init(loop);
uv_async_init(loop, &watcherStop, breakEventLoop);
uv_async_init(loop, &watcherRunCmd, runCmd);
c = redisAsyncConnect("127.0.0.1", REDIS_PORT);
if (c->err) {
/* Let *c leak for now... */
printf("Error: %s\n", c->errstr);
return 1;
}
redisLibuvAttach(c,loop);
redisAsyncSetConnectCallback(c,connectCallback);
redisAsyncSetDisconnectCallback(c,disconnectCallback);
// Run the event loop once to connect on this thread
uv_run(loop, UV_RUN_ONCE);
// Start the event loop
pthread_t eventLoopThread;
pthread_create(&eventLoopThread, NULL, &runUVLoop, NULL);
// At this point, the async connection is established and data is successfully
// being sent over the connection
pthread_t segFaultThread;
pthread_create(&segFaultThread, NULL, &executeSegfault, NULL);
pthread_t runCmdThreads[REQ_THREADS];
for(int i = 0 ; i < REQ_THREADS; i++) {
pthread_create(&runCmdThreads[i], NULL, &executeRunCmd, NULL);
}
// Sleep a couple of seconds to allow created threads to finish
// up their processing
sleep(2);
printQueuedCallbacks();
printf("\n\nPress any key to continue...\n");
// Wait for user to press any keystroke before exiting
getchar();
printf("-=[ Second wave of requests ]=-\n");
// Second wave of requests
for(int i = 0; i < REQ_THREADS; i++) {
pthread_create(&runCmdThreads[i], NULL, &executeRunCmd, NULL);
}
// Sleep for a couple of seconds for above threads to create and queue
// up commands
sleep(2);
printQueuedCallbacks();
uv_async_send(&watcherStop);
return 0;
}
@rangan337
Copy link
Author

Sample Output

Before PR

-=[ Connected ]=-
-=[ Initiating SEGFAULT ]=-
-=[ Starting event loop ]=-
-=[ Completed SEGFAULT ]=-
-=[ Queued up callbacks : 15 ]=-


Press any key to continue...

-=[ Second wave of requests ]=-
-=[ Queued up callbacks : 25 ]=-
-=[ Stopping event loop ]=-

After PR

-=[ Connected ]=-
-=[ Starting event loop ]=-
-=[ Initiating SEGFAULT ]=-
-=[ Completed SEGFAULT ]=-
-=[ NULL Reply received ]=-
-=[ NULL Reply received ]=-
-=[ NULL Reply received ]=-
-=[ NULL Reply received ]=-
-=[ NULL Reply received ]=-
-=[ NULL Reply received ]=-
-=[ NULL Reply received ]=-
-=[ NULL Reply received ]=-
-=[ NULL Reply received ]=-
-=[ Disconnected with error: redisLibuvPoll error - bad file descriptor ]=-
-=[ Queued up callbacks : 0 ]=-


Press any key to continue...

-=[ Second wave of requests ]=-
-=[ Queued up callbacks : 0 ]=-
-=[ Stopping event loop ]=-

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment