Skip to content

Instantly share code, notes, and snippets.

@drio
Created January 22, 2012 14:53
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save drio/1657289 to your computer and use it in GitHub Desktop.
Save drio/1657289 to your computer and use it in GitHub Desktop.
Playing with pthreads

What's this?

I was/am in the process of adding multithreading capabilities to a tool I am working on (more on that soon). The tool is written in C/C++. My first option was using POSIX threads (pthreads) before going into higher level options like boost.

I first read this fantastic tutorial. (Make sure you read the links also). After that, I started to look into how pthreads is used by the masters. I read parts of bwa's source code. Concretely the aln step. In [bwtaln.c:bwa_aln_core()] (https://github.com/lh3/bwa/blob/master/bwtaln.c#L172-215) is where the threads are setup and [bwtaln.c:bwa_cal_sa_reg_gap()] (https://github.com/lh3/bwa/blob/master/bwtaln.c#L78-120) is the main routine for implementing the aln step.

Then I implemented a little toy project to make sure I understood the concepts. The idea is to look for a kmer in nextgen reads. This is what the code on this gist is about.

I have followed the same approach than bwa. It basically loads reads (concretely 0x40000 or 262144 in decimal) to memory. Then each thread processes reads based on the position of the read in memory and the thread id. The gem is here.

Let me know if you find it useful.

#!/bin/bash
#
READS=$HOME/Dropbox/git_repo/rGenotype/data/soldata.txt
[ ".$1" == "." ] && TIMES=1 || TIMES=$1
loop()
{
i=1
while [ $i -le $TIMES ]; do
grep -v ">" $READS
i=$[$i+1]
done
}
SRC=thread_version_find_kmer.c
echo "TIMES=$TIMES"
echo "+ NO pthreads"
gcc -Wall -pendatic -lpthread $SRC && \
time (loop) | ./a.out 2>/dev/null
echo "+ WITH pthreads"
gcc -Wall -pendatic -lpthread -DHAVE_PTHREAD $SRC && \
time (loop) | ./a.out 2>/dev/null
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define BUF_SIZE 1024
int main(int argc, char *argv[])
{
char buf[BUF_SIZE];
char *kmer = "CGTCCCTGT"; // 286 hits in soldata.txt
int ksize;
int count;
count = 0;
ksize = strlen(kmer);
while (fgets(buf, BUF_SIZE, stdin))
if (strstr(buf, kmer)) count += 1;
printf("%d\n", count);
return 0;
}
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define KMER "CGTCCCTGT" // 286 hits in soldata.txt
#define KMER_SIZE 9
#define NUM_THREADS 6
#define N_READS_TO_LOAD 1000000
#define READ_SIZE 36 // 35 + \n
// This is the code for the worker
// In this case, just count how many times you see a kmer
void *look(int n_reads, int tid, char **reads, int *t_count)
{
char save;
int i,j;
for (i=0; i!=n_reads; ++i) {
#ifdef HAVE_PTHREAD
if (i % NUM_THREADS != tid) continue;
#endif
for (j=0; j<READ_SIZE-KMER_SIZE; ++j) {
save = reads[i][j+KMER_SIZE];
reads[i][j+KMER_SIZE] = '\0';
if (strcmp(KMER, &reads[i][j]) == 0) *t_count += 1;
reads[i][j+KMER_SIZE] = save;
}
}
return 0;
}
/* Load data into buffer for thread t (if there is more data in file) */
int load_to_buffer(char **b)
{
int nrr = 0; // number of reads read
size_t linecap = 0;
int bytes_read = 0;
while (nrr < N_READS_TO_LOAD &&
(bytes_read = getline(&b[nrr], &linecap, stdin)) > 0) {
if (bytes_read == -1) {
fprintf(stderr, "ERROR: reading data from input stream. (nrr=%d)\n", nrr);
exit(1);
}
else
nrr++;
}
return nrr;
}
#ifdef HAVE_PTHREAD
typedef struct {
int tid;
char **reads;
int count;
int n_reads;
} thread_data;
static void *worker(void *data)
{
thread_data *d = (thread_data*) data;
look(d->n_reads, d->tid, d->reads, &d->count);
return 0;
}
#endif
int main(int argc, char *argv[])
{
char **buffer;
{ // request mem for buffer of reads
int j;
buffer = (char **) calloc(sizeof(char *), N_READS_TO_LOAD);
for (j=0; j<N_READS_TO_LOAD; ++j)
buffer[j] = (char *) calloc(sizeof(char), READ_SIZE);
}
int total = 0; // total number of hits seen for kmer
int n_rr; // n_rr: number of reads read
int total_n_reads = 0; // total number of reads processed
while((n_rr = load_to_buffer(buffer)) != 0) {
total_n_reads += n_rr;
#ifdef HAVE_PTHREAD
if (NUM_THREADS == 1) {
int i_total = 0;
look(n_rr, 0, buffer, &i_total);
total += i_total;
}
else {
pthread_t *tid;
pthread_attr_t attr;
thread_data *data;
int j;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
data = (thread_data*) calloc(NUM_THREADS, sizeof(thread_data));
tid = (pthread_t*) calloc(NUM_THREADS , sizeof(pthread_t));
for (j=0; j<NUM_THREADS; ++j) {
data[j].n_reads = n_rr;
data[j].tid = j;
data[j].reads = buffer;
data[j].count = 0;
pthread_create(&tid[j], &attr, worker, data+j);
}
// Now run them
for (j=0; j<NUM_THREADS; ++j) pthread_join(tid[j], 0);
for (j=0; j<NUM_THREADS; ++j) total += data[j].count; // save hits
free(data); free(tid);
}
#else
look(n_rr, 0, buffer, &total);
#endif
fprintf(stderr, "> # of lines processed: %d\n", total_n_reads);
}
printf("# lines processed: %d, hits: %d\n", total_n_reads, total); // Dump # of times we have seen the kmer in input stream.
{ // Release mem
int j;
for (j=0; j<N_READS_TO_LOAD; ++j)
free(buffer[j]);
free(buffer);
}
return 0;
}
@amb690
Copy link

amb690 commented Oct 13, 2017

Hello drio, I'm testing your code but it's throwing the following error, /root/Dropbox/git_repo/rGenotype/data/soldata.txt: file or directory not found, could you upload soldata.txt?. Regards.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment