Last active
February 21, 2017 19:40
-
-
Save utdrmac/75446e6d96517d0436c2 to your computer and use it in GitHub Desktop.
Multi-Threaded MySQL Connections in C
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* guider 1.0 | |
* Matthew Boehm <mboehm@paypal.com> <matthew@matthewboehm.com> | |
* | |
* GUIDer reads in a file of global identifiers (GUIDs) and retrieves | |
* obfuscated data out of databases. This is done using pipe() to create | |
* a producer/consumer set-up. We create a pool of mysql connections | |
* and pthread out user-supplied number of threads to handle data. | |
* Each thread waits and reads 1 GUID off the front of the pipe() and processes | |
* it, outputting results to stdout. Once done with that set of queries, this | |
* thread goes back to waiting for a new GUID on the pipe(). | |
* | |
* | |
*/ | |
#include <mysql.h> // for mysql stuff | |
#include <stdio.h> // for printf | |
#include <string.h> // for strncpy | |
#include <stdlib.h> // for atoul | |
#include <unistd.h> // for getopt | |
#include <pthread.h> // for pthreads | |
#include <errno.h> // for errno/strerror | |
#include <assert.h> // for assert | |
#define DBUG 1 | |
#define HOSTNAME "CORE_trinity_ro_sec_host_%d.pod02.geo-trinity.com" | |
#define DATABASE "trinity" | |
#define PORT 3306 | |
#define NUMSPLITS 12 | |
#define CONNTIMEOUT 10 | |
#define QUERY " \ | |
SELECT 'user_username : GUID %1$lu', plat_username from plat_user_username_%2$d WHERE identity_guid = %1$lu; \ | |
SELECT 'username_cache : GUID %1$lu', username from username_cache_%2$d WHERE identity_guid = %1$lu; \ | |
SELECT 'user_id_cache : GUID %1$lu', user_id from user_id_cache_%2$d WHERE identity_guid = %1$lu;" | |
char username[20] = ""; | |
char password[20] = ""; | |
typedef struct | |
{ | |
MYSQL *mysql; | |
MYSQL_RES *res; | |
MYSQL_ROW row; | |
char hostname[50]; | |
// which split are we connected | |
int splitNum; | |
// is this connection currently executing a query | |
pthread_mutex_t inUse; | |
} MySQLConnection; | |
// global array of MySQLConnections | |
MySQLConnection **mysqlPool; | |
// the pipe for threads writing/reading GUIDs | |
int fifo[2]; | |
// the mutex for writing output | |
pthread_mutex_t writerMutex = PTHREAD_MUTEX_INITIALIZER; | |
// default. global'd so each thread knows max num | |
int numThreadsPerSplit = 10; | |
// global file descriptor for output | |
FILE *guidsOutFd; | |
// function for writing to output file | |
// protected by mutex | |
void writeOut(char *data) | |
{ | |
pthread_mutex_lock(&writerMutex); | |
if(fputs(data, guidsOutFd) < 0) | |
{ | |
fprintf(stderr, "Unable to write output: %s\n", strerror(errno)); | |
exit(-1); | |
} | |
pthread_mutex_unlock(&writerMutex); | |
} | |
// Worker Thread | |
static void *workerThread(void *arg) | |
{ | |
char currGuid[20] = ""; | |
char tempRowText[100] = ""; | |
char resultText[1024] = ""; | |
char query[1024] = ""; | |
unsigned long int guid; | |
int split = 0, r = 0, status = 0; | |
MySQLConnection *conn = NULL; | |
// looping forever, read from pipe | |
while(1) | |
{ | |
// read 20 bytes/characters from pipe | |
if((r = read(fifo[0], &currGuid, 20)) != 20) | |
{ | |
fprintf(stderr, "Failed to read 20 bytes. Read %d bytes. Errno: %d. Exiting.\n", r, errno); | |
goto cleanup; | |
} | |
// if we get a -, time to exit | |
if(strncmp(currGuid, "-", 1) == 0) | |
{ | |
goto cleanup; | |
} | |
// convert string to unsigned long and get split number | |
guid = strtoul(currGuid, NULL, 0); | |
split = (guid % NUMSPLITS); | |
// construct multi-query string | |
if(snprintf(query, (sizeof(query) + (sizeof(guid) * 6) + (sizeof(split) * 3)), QUERY, guid, split) <= 0) | |
{ | |
fprintf(stderr, "Unable to sprintf query for '%lu'.\n", guid); | |
goto cleanup; | |
} | |
// find available MySQL connection for this split. we will wait here | |
// until one becomes available | |
while(1) | |
{ | |
for(r = 0; r < numThreadsPerSplit; r++) | |
{ | |
// returns 0 on success | |
if(!pthread_mutex_trylock(&mysqlPool[split][r].inUse)) | |
{ | |
conn = &mysqlPool[split][r]; | |
goto gotconn; | |
} | |
} | |
// we've checked all threads for this split and | |
// none are available. pause a second before looking again. | |
sleep(1); | |
} | |
gotconn: | |
// we got an available connection. execute query | |
if((status = mysql_query(conn->mysql, query))) | |
{ | |
fprintf(stderr, "Could not execute query for '%lu' on '%s': %s", guid, conn->hostname, mysql_error(conn->mysql)); | |
continue; | |
} | |
// loop over the result sets | |
do | |
{ | |
conn->res = mysql_use_result(conn->mysql); | |
if(conn->res) | |
{ | |
while((conn->row = mysql_fetch_row(conn->res))) | |
{ | |
// put contents of this row result into temp holder | |
snprintf(tempRowText, sizeof(tempRowText), "%s\t%s\r\n", conn->row[0], conn->row[1]); | |
// concatenate the temp holder into the temp result set holder | |
strncat(resultText, tempRowText, sizeof(tempRowText)); | |
} | |
// we are done with this result set | |
mysql_free_result(conn->res); | |
} | |
else | |
{ | |
// no result set or error | |
printf("Error occurred for GUID '%lu' on '%s' - %s\n", guid, conn->hostname, mysql_error(conn->mysql)); | |
} | |
// since we are executing multiple queries in one push, | |
// we check if there are there more result sets to process | |
if((status = mysql_next_result(conn->mysql)) > 0) | |
{ | |
fprintf(stderr, "Could not execute statement for '%lu' on '%s': %s", guid, conn->hostname, mysql_error(conn->mysql)); | |
continue; | |
} | |
} | |
while(status == 0); | |
// unlock the mutex now that we are done with this run | |
pthread_mutex_unlock(&mysqlPool[split][r].inUse); | |
// add extra new-line to separate GUID results | |
strcat(resultText, "\r\n"); | |
// we have gathered all results/rows. write out to file in one chunk. | |
writeOut(resultText); | |
// clear contents of temp string holders | |
memset(tempRowText, 0, sizeof(tempRowText)); | |
memset(resultText, 0, sizeof(resultText)); | |
} | |
cleanup: | |
pthread_exit(NULL); | |
} | |
// Main | |
int main(int argc, char **argv) | |
{ | |
pthread_t *threads; | |
pthread_attr_t threadAttr; | |
char guidFilePath[500] = ""; | |
char guidOutputPath[500] = ""; | |
char tGuid[20] = ""; | |
int c, j, i, totalNumThreads; | |
// file descriptor for reading in | |
FILE *guidsFd; | |
opterr = 0; | |
while((c = getopt(argc, argv, "u:p:t:g:o:")) != -1) | |
{ | |
switch(c) | |
{ | |
case 'u': | |
strncpy(username, optarg, sizeof(username)); | |
break; | |
case 'p': | |
strncpy(password, optarg, sizeof(password)); | |
break; | |
case 't': | |
numThreadsPerSplit = atoi(optarg); | |
break; | |
case 'g': | |
strncpy(guidFilePath, optarg, sizeof(guidFilePath)); | |
break; | |
case 'o': | |
strncpy(guidOutputPath, optarg, sizeof(guidOutputPath)); | |
break; | |
case '?': | |
if(optopt == 'u' || optopt == 'p' || optopt == 't' || optopt == 'g' || optopt == 'o') | |
fprintf(stderr, "Option -%c requires an argument.\n", optopt); | |
else | |
fprintf(stderr, "Unknown option '-%c'.\n", optopt); | |
return 1; | |
default: | |
return 1; | |
} | |
} | |
totalNumThreads = (NUMSPLITS * numThreadsPerSplit); | |
// sanity | |
if(strcmp(guidFilePath, "") == 0) | |
{ | |
fprintf(stderr, "A file of input GUIDs is required.\n"); | |
exit(1); | |
} | |
// more sanity | |
if(strcmp(guidOutputPath, "") == 0) | |
{ | |
fprintf(stderr, "An output file to save results is required.\n"); | |
exit(1); | |
} | |
// check that we can open the output file or can create it if it doesn't exist | |
if((guidsOutFd = fopen(guidOutputPath, "w")) == NULL) | |
{ | |
fprintf(stderr, "Unable to open '%s' for writing.\n", guidOutputPath); | |
exit(-1); | |
} | |
// check input file exists and we can open it for reading | |
if((guidsFd = fopen(guidFilePath, "r")) == NULL) | |
{ | |
fprintf(stderr, "Unable to read '%s'. Does it exist? Have permissions?\n", guidFilePath); | |
exit(-1); | |
} | |
// initialize the pthreads array | |
if((threads = (pthread_t *) malloc(totalNumThreads * sizeof(pthread_t))) == NULL) | |
{ | |
fprintf(stderr, "Unable to allocate memory for threads array.\n"); | |
exit(-1); | |
} | |
// allocate memory for MySQLConnection array | |
if((mysqlPool = (MySQLConnection **) malloc(NUMSPLITS * sizeof(MySQLConnection *))) == NULL) | |
{ | |
fprintf(stderr, "Unable to allocate memory for MySQLConnection array.\n"); | |
exit(-1); | |
} | |
// malloc memory for each per-split connection pool | |
for(i = 0; i < NUMSPLITS; i++) | |
{ | |
if((mysqlPool[i] = (MySQLConnection *) malloc(numThreadsPerSplit * sizeof(MySQLConnection))) == NULL) | |
{ | |
fprintf(stderr, "Unable to allocate memory for MySQLConnection.\n"); | |
free(mysqlPool); | |
exit(-1); | |
} | |
} | |
// Set up MySQL stuff | |
mysql_library_init(0, NULL, NULL); | |
MYSQL *mysql = mysql_init(NULL); | |
mysql_thread_init(); | |
// This is fake connect to remember various options from my.cnf for other threads | |
// Do note, this connect may succeed if ran on system with mysqld running | |
mysql_options(mysql, MYSQL_READ_DEFAULT_GROUP, "guider"); | |
mysql_real_connect(mysql, NULL, NULL, NULL, NULL, 0, NULL, CLIENT_REMEMBER_OPTIONS); | |
// did user provide username on cli? if not, use .my.cnf extracted one | |
if(strcmp(username, "") == 0) | |
strncpy(username, mysql->options.user, sizeof(username)); | |
// did user provide password on cli? if not, use .my.cnf extracted one | |
if(strcmp(password, "") == 0) | |
strncpy(password, mysql->options.password, sizeof(password)); | |
// open the pipe | |
if(pipe(fifo) < 0) | |
{ | |
fprintf(stderr, "Unable to open fifo pipe.\n"); | |
exit(-1); | |
} | |
// some pthread initialization. make each one joinable (ie: wait for it) | |
pthread_attr_init(&threadAttr); | |
pthread_attr_setdetachstate(&threadAttr, PTHREAD_CREATE_JOINABLE); | |
// launch pthreads to handle GUIDs | |
for(i = 0; i < totalNumThreads; i++) | |
{ | |
pthread_create(&threads[i], &threadAttr, workerThread, NULL); | |
} | |
fprintf(stderr, "%d worker threads launched.\n", totalNumThreads); | |
// go through array of per-split connections and connect to MySQL servers | |
for(i = 0; i < NUMSPLITS; i++) | |
{ | |
for(j = 0; j < numThreadsPerSplit; j++) | |
{ | |
mysqlPool[i][j].splitNum = i; | |
sprintf(mysqlPool[i][j].hostname, HOSTNAME, i); | |
pthread_mutex_init(&mysqlPool[i][j].inUse, NULL); | |
// attempt connect to MySQL host | |
mysqlPool[i][j].mysql = mysql_init(NULL); | |
// if we can't connect to a server, shutdown and investigate | |
if(!mysql_real_connect(mysqlPool[i][j].mysql, mysqlPool[i][j].hostname, username, password, DATABASE, PORT, NULL, CLIENT_MULTI_STATEMENTS)) | |
{ | |
fprintf(stderr, "Unable to connect to MySQL on '%s': %s\n", mysqlPool[i][j].hostname, mysql_error(mysqlPool[i][j].mysql)); | |
goto shutdown; | |
} | |
} | |
} | |
fprintf(stderr, "MySQLConnection pool created and threads connected.\n"); | |
// read from file and push to pipe until there are no more. | |
while(fgets(tGuid, 20, guidsFd)) | |
{ | |
// strip off new-line character | |
char *nl = strstr(tGuid, "\n"); | |
if (nl) | |
nl[0] = 0; | |
if(write(fifo[1], tGuid, 20) != 20) | |
{ | |
fprintf(stderr, "Unable to write to pipe.\n"); | |
goto shutdown; | |
} | |
} | |
fprintf(stderr, "Done writing GUIDs to pipe.\n"); | |
shutdown: | |
// once we have written all GUIDs from file to pipe and processes | |
// them, we need to write numThreads -'s to the pipe | |
for(i = 0; i < (NUMSPLITS * numThreadsPerSplit); i++) | |
{ | |
if(write(fifo[1], "--------------------", 20) != 20) | |
{ | |
fprintf(stderr, "Unable to write dash to pipe.\n"); | |
} | |
} | |
fprintf(stderr, "Waiting for threads to finish up.\n"); | |
// to prevent pipe from closing too early, join all threads | |
// and wait for them to exit cleanly | |
for(i = 0; i < totalNumThreads; i++) | |
{ | |
pthread_join(threads[i], NULL); | |
} | |
// close pipe | |
close(fifo[0]); | |
close(fifo[1]); | |
// free memory used for pthread array | |
free(threads); | |
// close connections and free memory used for MySQLConnection arrays | |
for(i = 0; i < NUMSPLITS; i++) | |
{ | |
for(j = 0; j < numThreadsPerSplit; j++) | |
{ | |
mysql_close(mysqlPool[i][j].mysql); | |
} | |
free(mysqlPool[i]); | |
} | |
free(mysqlPool); | |
pthread_attr_destroy(&threadAttr); | |
mysql_library_end(); | |
// close input file | |
fclose(guidsFd); | |
// close output file | |
fclose(guidsOutFd); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment