-
-
Save rangan337/77da6f7069d0fb29d443 to your computer and use it in GitHub Desktop.
Libuv Fix - Example for Scenario 2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <signal.h> | |
#include <pthread.h> | |
#include <unistd.h> | |
#include <hiredis.h> | |
#include <async.h> | |
#include <adapters/libuv.h> | |
#define REDIS_PORT 19008 | |
#define REQ_THREADS 20 // No of parallel async reqs made to Redis server | |
/************* | |
* Variables * | |
*************/ | |
// LIBUV Event Loop | |
uv_loop_t* loop; | |
// Event loop watchers | |
uv_async_t watcherStop, watcherRunCmd; | |
// Async connection used in this test | |
redisAsyncContext *c; | |
// Variable to indicate if the async connection | |
// is valid or not (Connected = valid, disconnected = !valid) | |
int isAsyncConnValid = 0; | |
/************* | |
* Callbacks * | |
*************/ | |
// Callback function called when a reply is received for a | |
// Redis command send over the async connection | |
void getCallback(redisAsyncContext *c, void *r, void *privdata) { | |
redisReply *reply = r; | |
if (reply == NULL) | |
printf("-=[ NULL Reply received ]=-\n"); | |
else | |
printf("-=[ Valid Reply : argv[%s]: %s ]=-\n", (char*)privdata, reply->str); | |
} | |
// Callback invoked when a connection is attempted and a response | |
// for the attempt is available | |
void connectCallback(const redisAsyncContext *c, int status) { | |
if (status != REDIS_OK) { | |
printf("Error: %s\n", c->errstr); | |
return; | |
} | |
printf("-=[ Connected ]=-\n"); | |
isAsyncConnValid = 1; | |
} | |
// Callback invoked when an async connection is disconnected | |
void disconnectCallback(const redisAsyncContext *c, int status) { | |
isAsyncConnValid = 0; | |
if (status != REDIS_OK) { | |
printf("-=[ Disconnected with error: %s ]=-\n", c->errstr); | |
return; | |
} | |
printf("-=[ Gracefully disconnected ]=-\n"); | |
} | |
/***************************************************************** | |
* Watcher functions - Triggered/called when associated watchers * | |
* are signalled * | |
*****************************************************************/ | |
// This function basically stops a running event loop | |
void breakEventLoop(uv_async_t *handle) { | |
uv_stop(handle->loop); | |
} | |
// This function will run a Redis command on an async connection | |
// Called on the event loop thread when associated runCmd watcher | |
// is signalled | |
void runCmd(uv_async_t *handle) { | |
if(isAsyncConnValid == 1) { | |
// Some random Redis command (Valid format ofcourse) | |
redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key"); | |
} | |
} | |
/************************************* | |
* Functions run on spawned pthreads * | |
*************************************/ | |
// This function will run the event loop indefinitely until | |
// stopped explicitly by the watcher | |
void* runUVLoop(void *arg) { | |
printf("-=[ Starting event loop ]=-\n"); | |
uv_run(loop, UV_RUN_DEFAULT); | |
printf("-=[ Stopping event loop ]=-\n"); | |
return (void*) NULL; | |
} | |
// This function will fire an event on the event loop | |
// watcher to trigger running a Redis command on the async conn | |
void *executeRunCmd() { | |
uv_async_send(&watcherRunCmd); | |
return (void*) NULL; | |
} | |
// This function will execute a SEGFAULT on a sync conn | |
// to the Redis server | |
void* executeSegfault() { | |
printf("-=[ Initiating SEGFAULT ]=-\n"); | |
redisContext *c = redisConnect("127.0.0.1", REDIS_PORT); | |
redisCommand(c, "DEBUG SEGFAULT"); | |
printf("-=[ Completed SEGFAULT ]=-\n"); | |
return (void*) NULL; | |
} | |
/********************************************* | |
* Other functions called on the main thread * | |
*********************************************/ | |
// Function to populate the Redis server with some data | |
void populateServerWithData() { | |
redisContext *c = redisConnect("127.0.0.1", REDIS_PORT); | |
if (c->err) { | |
/* Let *c leak for now... */ | |
printf("Error: %s\n", c->errstr); | |
return; | |
} | |
char *str="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; | |
for(int i = 0; i < 10000; i++) { | |
char val[2000]; | |
sprintf(val,"%s_%d", str, i); | |
redisCommand(c, "ZADD test NX %d %s", i, val); | |
} | |
} | |
// This function will print out the current pending queued | |
// callbacks on the async connection | |
void printQueuedCallbacks() { | |
int queuedCallbacks = 0; | |
if(isAsyncConnValid) { | |
redisCallback *cb = c->replies.head; | |
while(cb != NULL) { | |
queuedCallbacks++; | |
cb = cb->next; | |
} | |
} | |
printf("-=[ Queued up callbacks : %d ]=-\n", queuedCallbacks); | |
} | |
int main (int argc, char **argv) { | |
signal(SIGPIPE, SIG_IGN); | |
populateServerWithData(); | |
loop = malloc(sizeof(uv_loop_t)); | |
uv_loop_init(loop); | |
uv_async_init(loop, &watcherStop, breakEventLoop); | |
uv_async_init(loop, &watcherRunCmd, runCmd); | |
c = redisAsyncConnect("127.0.0.1", REDIS_PORT); | |
if (c->err) { | |
/* Let *c leak for now... */ | |
printf("Error: %s\n", c->errstr); | |
return 1; | |
} | |
redisLibuvAttach(c,loop); | |
redisAsyncSetConnectCallback(c,connectCallback); | |
redisAsyncSetDisconnectCallback(c,disconnectCallback); | |
// Run the event loop once to connect on this thread | |
uv_run(loop, UV_RUN_ONCE); | |
// Start the event loop | |
pthread_t eventLoopThread; | |
pthread_create(&eventLoopThread, NULL, &runUVLoop, NULL); | |
// At this point, the async connection is established and data is successfully | |
// being sent over the connection | |
pthread_t segFaultThread; | |
pthread_create(&segFaultThread, NULL, &executeSegfault, NULL); | |
pthread_t runCmdThreads[REQ_THREADS]; | |
for(int i = 0 ; i < REQ_THREADS; i++) { | |
pthread_create(&runCmdThreads[i], NULL, &executeRunCmd, NULL); | |
} | |
// Sleep a couple of seconds to allow created threads to finish | |
// up their processing | |
sleep(2); | |
printQueuedCallbacks(); | |
printf("\n\nPress any key to continue...\n"); | |
// Wait for user to press any keystroke before exiting | |
getchar(); | |
printf("-=[ Second wave of requests ]=-\n"); | |
// Second wave of requests | |
for(int i = 0; i < REQ_THREADS; i++) { | |
pthread_create(&runCmdThreads[i], NULL, &executeRunCmd, NULL); | |
} | |
// Sleep for a couple of seconds for above threads to create and queue | |
// up commands | |
sleep(2); | |
printQueuedCallbacks(); | |
uv_async_send(&watcherStop); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Sample Output
Before PR
After PR