Skip to content

Instantly share code, notes, and snippets.

@tinnefeld
Created July 3, 2012 20:15
Show Gist options
  • Save tinnefeld/3042741 to your computer and use it in GitHub Desktop.
Save tinnefeld/3042741 to your computer and use it in GitHub Desktop.
Timer myTimer;
myTimer.start();
//check if we have one or two comparators
bool useSecondComparator = false;
if (secondComparator != NOTSET) {
useSecondComparator = true;
}
ColumnPositionList& result = *new ColumnPositionList;
//get tableId and tableDistribution
uint64_t tableId = cloud->getTableId(tableAttributeId.c_str());
uint64_t maxKey = this->size()/VALUESIZING;
if ((this->size() % VALUESIZING) == 0) --maxKey;
std::vector<RAMCloud::ObjectFinder::KeysAtServer> tableDist =
cloud->resolveTableDistribution(tableId,maxKey);
// Build safe result collector
std::vector<ColumnPositionList*> intermediateResults(tableDist.size());
RAMCloudThreadpool<analyticsdb::threadpool::AbstractWorkerFunctor*> pool;
//do the scan on each master that has a chunk from that column
//this for loop needs to be exectuted in a multi-threaded manner
for (uint32_t i = 0; i < tableDist.size(); i++) {
intermediateResults[i] = new ColumnPositionList;
auto f = new ScanFunctor(tableDist[i].serverConnectionString,
tableId, std::vector<uint64_t>(tableDist[i].keys),
(uint32_t)VALUESIZING,
(int32_t)MAXRPCSIZE,
(int32_t)comparator,
(int64_t)value,
useSecondComparator,
(int32_t)secondComparator,
(int64_t)secondValue,
logicalAnd,
intermediateResults[i]);
pool.add(f);
}
myTimer.stop();
cout << "" << endl;
cout << "Pool Prep: " << fixed << myTimer.elapsed_time() << endl;
myTimer.start();
pool.executeAll();
pool.waitAll();
myTimer.stop();
cout << "Pool Execute & Wait: " << fixed << myTimer.elapsed_time() << endl;
myTimer.start();
for (uint32_t i = 0; i < tableDist.size(); i++)
{
result.insert(result.begin(), intermediateResults[i]->begin(), intermediateResults[i]->end());
delete intermediateResults[i];
}
myTimer.stop();
cout << "Merge Results " << fixed << myTimer.elapsed_time() << endl;
//sorting the result
//sort (result.begin(), result.end());
return result;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment