Skip to content

Instantly share code, notes, and snippets.

@utdrmac
Last active February 21, 2017 19:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save utdrmac/75446e6d96517d0436c2 to your computer and use it in GitHub Desktop.
Save utdrmac/75446e6d96517d0436c2 to your computer and use it in GitHub Desktop.
Multi-Threaded MySQL Connections in C
/*
* guider 1.0
* Matthew Boehm <mboehm@paypal.com> <matthew@matthewboehm.com>
*
* GUIDer reads in a file of global identifiers (GUIDs) and retrieves
* obfuscated data out of databases. This is done using pipe() to create
* a producer/consumer set-up. We create a pool of mysql connections
* and pthread out user-supplied number of threads to handle data.
* Each thread waits and reads 1 GUID off the front of the pipe() and processes
* it, outputting results to stdout. Once done with that set of queries, this
* thread goes back to waiting for a new GUID on the pipe().
*
*
*/
#include <mysql.h> // for mysql stuff
#include <stdio.h> // for printf
#include <string.h> // for strncpy
#include <stdlib.h> // for atoul
#include <unistd.h> // for getopt
#include <pthread.h> // for pthreads
#include <errno.h> // for errno/strerror
#include <assert.h> // for assert
#define DBUG 1
#define HOSTNAME "CORE_trinity_ro_sec_host_%d.pod02.geo-trinity.com"
#define DATABASE "trinity"
#define PORT 3306
#define NUMSPLITS 12
#define CONNTIMEOUT 10
#define QUERY " \
SELECT 'user_username : GUID %1$lu', plat_username from plat_user_username_%2$d WHERE identity_guid = %1$lu; \
SELECT 'username_cache : GUID %1$lu', username from username_cache_%2$d WHERE identity_guid = %1$lu; \
SELECT 'user_id_cache : GUID %1$lu', user_id from user_id_cache_%2$d WHERE identity_guid = %1$lu;"
char username[20] = "";
char password[20] = "";
typedef struct
{
MYSQL *mysql;
MYSQL_RES *res;
MYSQL_ROW row;
char hostname[50];
// which split are we connected
int splitNum;
// is this connection currently executing a query
pthread_mutex_t inUse;
} MySQLConnection;
// global array of MySQLConnections
MySQLConnection **mysqlPool;
// the pipe for threads writing/reading GUIDs
int fifo[2];
// the mutex for writing output
pthread_mutex_t writerMutex = PTHREAD_MUTEX_INITIALIZER;
// default. global'd so each thread knows max num
int numThreadsPerSplit = 10;
// global file descriptor for output
FILE *guidsOutFd;
// function for writing to output file
// protected by mutex
void writeOut(char *data)
{
pthread_mutex_lock(&writerMutex);
if(fputs(data, guidsOutFd) < 0)
{
fprintf(stderr, "Unable to write output: %s\n", strerror(errno));
exit(-1);
}
pthread_mutex_unlock(&writerMutex);
}
// Worker Thread
static void *workerThread(void *arg)
{
char currGuid[20] = "";
char tempRowText[100] = "";
char resultText[1024] = "";
char query[1024] = "";
unsigned long int guid;
int split = 0, r = 0, status = 0;
MySQLConnection *conn = NULL;
// looping forever, read from pipe
while(1)
{
// read 20 bytes/characters from pipe
if((r = read(fifo[0], &currGuid, 20)) != 20)
{
fprintf(stderr, "Failed to read 20 bytes. Read %d bytes. Errno: %d. Exiting.\n", r, errno);
goto cleanup;
}
// if we get a -, time to exit
if(strncmp(currGuid, "-", 1) == 0)
{
goto cleanup;
}
// convert string to unsigned long and get split number
guid = strtoul(currGuid, NULL, 0);
split = (guid % NUMSPLITS);
// construct multi-query string
if(snprintf(query, (sizeof(query) + (sizeof(guid) * 6) + (sizeof(split) * 3)), QUERY, guid, split) <= 0)
{
fprintf(stderr, "Unable to sprintf query for '%lu'.\n", guid);
goto cleanup;
}
// find available MySQL connection for this split. we will wait here
// until one becomes available
while(1)
{
for(r = 0; r < numThreadsPerSplit; r++)
{
// returns 0 on success
if(!pthread_mutex_trylock(&mysqlPool[split][r].inUse))
{
conn = &mysqlPool[split][r];
goto gotconn;
}
}
// we've checked all threads for this split and
// none are available. pause a second before looking again.
sleep(1);
}
gotconn:
// we got an available connection. execute query
if((status = mysql_query(conn->mysql, query)))
{
fprintf(stderr, "Could not execute query for '%lu' on '%s': %s", guid, conn->hostname, mysql_error(conn->mysql));
continue;
}
// loop over the result sets
do
{
conn->res = mysql_use_result(conn->mysql);
if(conn->res)
{
while((conn->row = mysql_fetch_row(conn->res)))
{
// put contents of this row result into temp holder
snprintf(tempRowText, sizeof(tempRowText), "%s\t%s\r\n", conn->row[0], conn->row[1]);
// concatenate the temp holder into the temp result set holder
strncat(resultText, tempRowText, sizeof(tempRowText));
}
// we are done with this result set
mysql_free_result(conn->res);
}
else
{
// no result set or error
printf("Error occurred for GUID '%lu' on '%s' - %s\n", guid, conn->hostname, mysql_error(conn->mysql));
}
// since we are executing multiple queries in one push,
// we check if there are there more result sets to process
if((status = mysql_next_result(conn->mysql)) > 0)
{
fprintf(stderr, "Could not execute statement for '%lu' on '%s': %s", guid, conn->hostname, mysql_error(conn->mysql));
continue;
}
}
while(status == 0);
// unlock the mutex now that we are done with this run
pthread_mutex_unlock(&mysqlPool[split][r].inUse);
// add extra new-line to separate GUID results
strcat(resultText, "\r\n");
// we have gathered all results/rows. write out to file in one chunk.
writeOut(resultText);
// clear contents of temp string holders
memset(tempRowText, 0, sizeof(tempRowText));
memset(resultText, 0, sizeof(resultText));
}
cleanup:
pthread_exit(NULL);
}
// Main
int main(int argc, char **argv)
{
pthread_t *threads;
pthread_attr_t threadAttr;
char guidFilePath[500] = "";
char guidOutputPath[500] = "";
char tGuid[20] = "";
int c, j, i, totalNumThreads;
// file descriptor for reading in
FILE *guidsFd;
opterr = 0;
while((c = getopt(argc, argv, "u:p:t:g:o:")) != -1)
{
switch(c)
{
case 'u':
strncpy(username, optarg, sizeof(username));
break;
case 'p':
strncpy(password, optarg, sizeof(password));
break;
case 't':
numThreadsPerSplit = atoi(optarg);
break;
case 'g':
strncpy(guidFilePath, optarg, sizeof(guidFilePath));
break;
case 'o':
strncpy(guidOutputPath, optarg, sizeof(guidOutputPath));
break;
case '?':
if(optopt == 'u' || optopt == 'p' || optopt == 't' || optopt == 'g' || optopt == 'o')
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
else
fprintf(stderr, "Unknown option '-%c'.\n", optopt);
return 1;
default:
return 1;
}
}
totalNumThreads = (NUMSPLITS * numThreadsPerSplit);
// sanity
if(strcmp(guidFilePath, "") == 0)
{
fprintf(stderr, "A file of input GUIDs is required.\n");
exit(1);
}
// more sanity
if(strcmp(guidOutputPath, "") == 0)
{
fprintf(stderr, "An output file to save results is required.\n");
exit(1);
}
// check that we can open the output file or can create it if it doesn't exist
if((guidsOutFd = fopen(guidOutputPath, "w")) == NULL)
{
fprintf(stderr, "Unable to open '%s' for writing.\n", guidOutputPath);
exit(-1);
}
// check input file exists and we can open it for reading
if((guidsFd = fopen(guidFilePath, "r")) == NULL)
{
fprintf(stderr, "Unable to read '%s'. Does it exist? Have permissions?\n", guidFilePath);
exit(-1);
}
// initialize the pthreads array
if((threads = (pthread_t *) malloc(totalNumThreads * sizeof(pthread_t))) == NULL)
{
fprintf(stderr, "Unable to allocate memory for threads array.\n");
exit(-1);
}
// allocate memory for MySQLConnection array
if((mysqlPool = (MySQLConnection **) malloc(NUMSPLITS * sizeof(MySQLConnection *))) == NULL)
{
fprintf(stderr, "Unable to allocate memory for MySQLConnection array.\n");
exit(-1);
}
// malloc memory for each per-split connection pool
for(i = 0; i < NUMSPLITS; i++)
{
if((mysqlPool[i] = (MySQLConnection *) malloc(numThreadsPerSplit * sizeof(MySQLConnection))) == NULL)
{
fprintf(stderr, "Unable to allocate memory for MySQLConnection.\n");
free(mysqlPool);
exit(-1);
}
}
// Set up MySQL stuff
mysql_library_init(0, NULL, NULL);
MYSQL *mysql = mysql_init(NULL);
mysql_thread_init();
// This is fake connect to remember various options from my.cnf for other threads
// Do note, this connect may succeed if ran on system with mysqld running
mysql_options(mysql, MYSQL_READ_DEFAULT_GROUP, "guider");
mysql_real_connect(mysql, NULL, NULL, NULL, NULL, 0, NULL, CLIENT_REMEMBER_OPTIONS);
// did user provide username on cli? if not, use .my.cnf extracted one
if(strcmp(username, "") == 0)
strncpy(username, mysql->options.user, sizeof(username));
// did user provide password on cli? if not, use .my.cnf extracted one
if(strcmp(password, "") == 0)
strncpy(password, mysql->options.password, sizeof(password));
// open the pipe
if(pipe(fifo) < 0)
{
fprintf(stderr, "Unable to open fifo pipe.\n");
exit(-1);
}
// some pthread initialization. make each one joinable (ie: wait for it)
pthread_attr_init(&threadAttr);
pthread_attr_setdetachstate(&threadAttr, PTHREAD_CREATE_JOINABLE);
// launch pthreads to handle GUIDs
for(i = 0; i < totalNumThreads; i++)
{
pthread_create(&threads[i], &threadAttr, workerThread, NULL);
}
fprintf(stderr, "%d worker threads launched.\n", totalNumThreads);
// go through array of per-split connections and connect to MySQL servers
for(i = 0; i < NUMSPLITS; i++)
{
for(j = 0; j < numThreadsPerSplit; j++)
{
mysqlPool[i][j].splitNum = i;
sprintf(mysqlPool[i][j].hostname, HOSTNAME, i);
pthread_mutex_init(&mysqlPool[i][j].inUse, NULL);
// attempt connect to MySQL host
mysqlPool[i][j].mysql = mysql_init(NULL);
// if we can't connect to a server, shutdown and investigate
if(!mysql_real_connect(mysqlPool[i][j].mysql, mysqlPool[i][j].hostname, username, password, DATABASE, PORT, NULL, CLIENT_MULTI_STATEMENTS))
{
fprintf(stderr, "Unable to connect to MySQL on '%s': %s\n", mysqlPool[i][j].hostname, mysql_error(mysqlPool[i][j].mysql));
goto shutdown;
}
}
}
fprintf(stderr, "MySQLConnection pool created and threads connected.\n");
// read from file and push to pipe until there are no more.
while(fgets(tGuid, 20, guidsFd))
{
// strip off new-line character
char *nl = strstr(tGuid, "\n");
if (nl)
nl[0] = 0;
if(write(fifo[1], tGuid, 20) != 20)
{
fprintf(stderr, "Unable to write to pipe.\n");
goto shutdown;
}
}
fprintf(stderr, "Done writing GUIDs to pipe.\n");
shutdown:
// once we have written all GUIDs from file to pipe and processes
// them, we need to write numThreads -'s to the pipe
for(i = 0; i < (NUMSPLITS * numThreadsPerSplit); i++)
{
if(write(fifo[1], "--------------------", 20) != 20)
{
fprintf(stderr, "Unable to write dash to pipe.\n");
}
}
fprintf(stderr, "Waiting for threads to finish up.\n");
// to prevent pipe from closing too early, join all threads
// and wait for them to exit cleanly
for(i = 0; i < totalNumThreads; i++)
{
pthread_join(threads[i], NULL);
}
// close pipe
close(fifo[0]);
close(fifo[1]);
// free memory used for pthread array
free(threads);
// close connections and free memory used for MySQLConnection arrays
for(i = 0; i < NUMSPLITS; i++)
{
for(j = 0; j < numThreadsPerSplit; j++)
{
mysql_close(mysqlPool[i][j].mysql);
}
free(mysqlPool[i]);
}
free(mysqlPool);
pthread_attr_destroy(&threadAttr);
mysql_library_end();
// close input file
fclose(guidsFd);
// close output file
fclose(guidsOutFd);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment