Skip to content

Instantly share code, notes, and snippets.

@rob-p
Created September 16, 2013 16:41
Show Gist options
  • Save rob-p/6583164 to your computer and use it in GitHub Desktop.
Save rob-p/6583164 to your computer and use it in GitHub Desktop.
#include <cstdio>
#include <cstdlib>
#include <vector>
#include <thread>
#include <atomic>
#include <iostream>
#include "unistd.h"
#include "fcntl.h"
extern "C" {
#include "kseq.h"
}
#include <boost/filesystem.hpp>
#include <boost/range/irange.hpp>
#include "tbb/concurrent_queue.h"
KSEQ_INIT(int, read)
namespace bfs = boost::filesystem;
struct ReadSeq {
char* seq = nullptr;
size_t len = 0;
};
class StreamingReadParser {
public:
StreamingReadParser( std::vector<bfs::path>& files ): inputStreams_(files),
parsing_(false), parsingThread_(nullptr)
{
readStructs_ = new ReadSeq[queueCapacity_];
readQueue_.set_capacity(queueCapacity_);
seqContainerQueue_.set_capacity(queueCapacity_);
for (size_t i = 0; i < queueCapacity_; ++i) {
seqContainerQueue_.push(&readStructs_[i]);
}
}
~StreamingReadParser() {
parsingThread_->join();
for (auto i : boost::irange(size_t{0}, queueCapacity_)) {
if (readStructs_[i].seq != nullptr) { free(readStructs_[i].seq); }
}
delete [] readStructs_;
delete parsingThread_;
}
bool start() {
if (!parsing_) {
parsing_ = true;
parsingThread_ = new std::thread([this](){
kseq_t* seq;
ReadSeq* s;
std::cerr << "reading from " << this->inputStreams_.size() << " streams\n";
for (auto file : this->inputStreams_) {
std::cerr << "reading from " << file.native() << "\n";
// open the file and init the parser
int fp = open(file.c_str(), O_RDONLY);
seq = kseq_init(fp);
while (kseq_read(seq) >= 0) {
//puts(seq->seq.s);
this->seqContainerQueue_.pop(s);
if (seq->seq.l > s->len) {
s->seq = static_cast<char*>(realloc(s->seq, seq->seq.l));
}
//std::cerr << "old len = " << s->len << ", ";
s->len = seq->seq.l;
//std::cerr << "new len = " << s->len << "\n";
memcpy(s->seq, seq->seq.s, s->len);
//puts(s->seq);
this->readQueue_.push(s);
}
// destroy the parser and close the file
kseq_destroy(seq);
close(fp);
}
this->parsing_ = false;
});
//std::swap(tmpThread, parsingThread_);
return true;
} else {
return false;
}
}
//tbb::concurrent_bounded_queue<ReadSeq*>& readQueue() { return readQueue_; }
inline bool nextRead(ReadSeq*& seq) {
while(parsing_) {
if (readQueue_.try_pop(seq)) { return true; }
}
return false;
}
inline void finishedWithRead(ReadSeq*& s) { seqContainerQueue_.push(s); }
private:
std::vector<bfs::path>& inputStreams_;
bool parsing_;
std::thread* parsingThread_;
tbb::concurrent_bounded_queue<ReadSeq*> readQueue_, seqContainerQueue_;
ReadSeq* readStructs_;
const size_t queueCapacity_ = 2000000;
};
int main(int argc, char* argv[]) {
/*
kseq_t *seq;
int fp = open(argv[1], O_RDONLY);
seq = kseq_init(fp);
tbb::concurrent_bounded_queue<ReadSeq*> seqContainerQueue;
tbb::concurrent_bounded_queue<ReadSeq*> readQueue;
size_t capacity = 2000000;
ReadSeq* readStructs = new ReadSeq[capacity];
seqContainerQueue.set_capacity(capacity);
readQueue.set_capacity(capacity);
for (size_t i = 0; i < capacity; ++i) {
seqContainerQueue.push(&readStructs[i]);
}
bool done{false};
auto readThread = std::thread([&seqContainerQueue, &readQueue, &seq, &done]() {
ReadSeq* s;
while (kseq_read(seq) >= 0) {
seqContainerQueue.pop(s);
if (seq->seq.l > s->len) {
s->seq = static_cast<char*>(realloc(s->seq, seq->seq.l));
}
s->len = seq->seq.l;
memcpy(s->seq, seq->seq.s, s->len);
//puts(seq->seq.s);
readQueue.push(s);
}
done = true;
});
*/
std::vector<bfs::path> paths;
for (size_t i = 1; i < argc; ++i) {
std::cerr << "stream " << i << " is " << argv[i];
paths.emplace_back(argv[i]);
}
StreamingReadParser parser(paths);
parser.start();
std::atomic<size_t> numProcessed{0};
size_t numThreads = 3;
std::vector<std::thread> threads;
for (size_t t=0; t < numThreads; ++t) {
threads.emplace_back([&parser, &numProcessed](){
ReadSeq* s;
while( parser.nextRead(s) ) {
size_t np = numProcessed++;
if (np % 100000 == 0) { std::cerr << "processed " << np << " reads\n";}
//printf("(%.90s)", s->seq);
//puts(s->seq);
parser.finishedWithRead(s);
}
});
}
for (auto& t : threads) { t.join(); }
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment