Skip to content

Instantly share code, notes, and snippets.

@jasonmimick
Last active September 26, 2016 18:59
Show Gist options
  • Save jasonmimick/128ecdc93247ea884421f1f59753c307 to your computer and use it in GitHub Desktop.
Save jasonmimick/128ecdc93247ea884421f1f59753c307 to your computer and use it in GitHub Desktop.
// fts_command.h
/**
* Copyright (C) 2012 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
#include <string>
#include <vector>
#include <set>
#include <list>
#include "mongo/base/string_data.h"
#include "mongo/db/commands.h"
#include "mongo/db/fts/fts_util.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/timer.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/ops/insert.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/index/fts_access_method.h"
#include "mongo/db/db.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/catalog/index_create.h"
#include "mongo/util/log.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
namespace fts {
using namespace mongoutils;
using std::string;
using std::stringstream;
using std::vector;
using std::set;
/* select textTerms(*) */
class CmdTextTermBuild2 : public Command {
public:
OperationContext* _txn;
virtual bool isWriteCommandForConfigServer() const { return false; }
CmdTextTermBuild2() : Command("textTermBuild2") { }
virtual bool logTheOp() { return true; }
virtual bool slaveOk() const { return false; }
virtual bool slaveOverrideOk() const { return true; }
virtual bool maintenanceOk() const { return false; }
virtual bool adminOnly() const { return false; }
virtual void help(stringstream& help) const { help << "text terms in collection"; }
virtual void addRequiredPrivileges(const std::string& dbname,
const BSONObj& cmdObj,
std::vector<Privilege>* out) {
ActionSet actions;
actions.addAction(ActionType::find);
out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions));
}
bool InsertTerms(const std::string& dbName, Collection* collTerm, NamespaceString nsTerms, const FTSAccessMethod* fam, bool isForward)
{
log() << "InsertTerms called on textTermBuild2";
std::unique_ptr<ScopedTransaction> scopedXact(new ScopedTransaction(_txn, MODE_IX));
Lock::CollectionLock colLock(_txn->lockState(), nsTerms.ns(), MODE_IX);
std::unique_ptr<SortedDataInterface::Cursor> cursor = fam->newCursor(_txn, isForward);
boost::optional<IndexKeyEntry> kv = cursor->seek(BSONObj(), true, SortedDataInterface::Cursor::kWantKey);
bool bReturn = false;
//std::unique_ptr<Lock::GlobalWrite> globalWriteLock(new Lock::GlobalWrite(_txn->lockState()));
int nCount = 0;
int nIndex = 0;
time_t lastLog(0);
set<string > setTerms;
while (kv)
{
if (nCount % 128 == 127) {
time_t now = time(0);
if (now - lastLog >= 60) {
// report progress
if (lastLog)
log() << "textTermBuild2 InsertTerms " << dbName << ' ' << nCount << std::endl;
lastLog = now;
}
_txn->checkForInterrupt();
scopedXact.reset();
//colLock.reset();
CurOp::get(_txn)->yielded();
scopedXact.reset(new ScopedTransaction(_txn, MODE_IX));
//colLock.reset(new Lock::GlobalWrite(_txn->lockState()));
// Check if everything is still all right.
/*
if (_txn->writesAreReplicated()) {
uassert(
100001,
str::stream() << "Cannot write to ns: " << collTerm
<< " after yielding",
repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(collTerm));
}
*/
// TODO: SERVER-16598 abort if original db or collection is gone.
/*
Database* db = dbHolder().get(_txn, dbName);
uassert(100002,
str::stream() << "Database " << dbName << " dropped while cloning",
db != NULL);
Collection* collection = NULL;
collection = db->getCollection(collTerm->toString());
uassert(100003,
str::stream() << "Collection " << collTerm
<< " dropped while cloning",
collection != NULL);
*/
}
BSONObj obj = kv->key;
BSONObjIterator keyDataIt(obj);
if (keyDataIt.more()) {
bReturn = true;
BSONElement keyDataElt = keyDataIt.next();
string strTerm = keyDataElt.String();
if (setTerms.find(strTerm) == setTerms.end())
{
setTerms.insert(strTerm);
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
_txn->checkForInterrupt();
WriteUnitOfWork wunit(_txn);
Status status = collTerm->insertDocument(_txn, BSON("_id" << nIndex << "term" << strTerm), true);
//log() << "textTermBuild2:" << status.toString();
nCount++;
nIndex++;
if (!status.isOK()) {
error() << "error: exception textTermBuild2 InsertTerms object in " << "PUT COLLECTION NAME HERE" << ' '
<< status << " term:" << strTerm;
}
uassertStatusOK(status);
wunit.commit();
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "textTermBuild2 InsertTerms", "PUT COLLECTION NAME HERE");
}
}
kv = cursor->next(SortedDataInterface::Cursor::kWantKey);
}
return bReturn;
}
virtual bool run(OperationContext* txn, const string& dbname, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result) {
_txn = txn;
string targetColl = cmdObj.firstElement().String();
string ns = dbname + "." + cmdObj.firstElement().String(); // test.Foo
string nsTerm = ns + "_Terms"; // test.Foo_Terms
string cTermNew = cmdObj.firstElement().String() + "_Terms.new"; //Foo_Terms.new
string nsTermNew = dbname + "." + cTermNew; // test.Foo_Terms.new
log() << "textTermBuild2 neTermNew=" << nsTermNew << std::endl;
NamespaceString nsTargetColl(dbname, targetColl);
NamespaceString nsTermsColl(dbname,targetColl+"_Terms");
NamespaceString nsTermsCollNew(dbname,targetColl+"_Terms.new");
Status status = userAllowedWriteNS(nsTermsColl);
if (!status.isOK()) {
return appendCommandStatus(result, status);
}
// 1. drop <coll>_Terms.new
// 2. create <coll>_Terms.new
// 3. createIndex <coll>_Terms.new { "term" : 1 }
// now we know we have to create index(es)
// Note: createIndexes command does not currently respect shard versioning.
ScopedTransaction transaction(_txn, MODE_IX);
Lock::DBLock dbLock(_txn->lockState(), nsTermsCollNew.db(), MODE_X);
if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nsTermsCollNew)) {
return appendCommandStatus(
result,
Status(ErrorCodes::NotMaster,
str::stream() << "Not primary while creating indexes in " << nsTermsCollNew.ns()));
}
Database* db = dbHolder().get(_txn, nsTermsCollNew.db());
if (!db) {
db = dbHolder().openDb(txn, nsTermsCollNew.db());
}
Collection* collTermsCollNew = db->getCollection(nsTermsCollNew.ns());
// the <coll>_Terms.new should never be there! it's always renamed at
// the end of this command
if (!collTermsCollNew) {
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
WriteUnitOfWork wunit(_txn);
collTermsCollNew = db->createCollection(txn, nsTermsCollNew.ns(), CollectionOptions());
invariant(collTermsCollNew);
collTermsCollNew->getIndexCatalog()->createIndexOnEmptyCollection(_txn,BSON("terms"<<1));
wunit.commit();
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "create:"+nsTermNew, nsTermsCollNew.ns());
}
auto client = _txn->getClient();
ScopeGuard lastOpSetterGuard =
MakeObjGuard(repl::ReplClientInfo::forClient(client),
&repl::ReplClientInfo::setLastOpToSystemLastOpTime,
_txn);
// 4. find text index in <coll>
// 5. get FTSAccessMethod
vector<IndexDescriptor*> idxMatches;
Collection* collTarget = db->getCollection(nsTargetColl.ns());
collTarget->getIndexCatalog()->findIndexByType(_txn, IndexNames::TEXT, idxMatches, false);
if (idxMatches.empty()) {
errmsg = "text index required for textTerms command";
return false;
}
if (idxMatches.size() > 1) {
errmsg = "more than one text index found for textTerms command";
return false;
}
invariant(idxMatches.size() == 1);
IndexDescriptor* index = idxMatches[0];
const FTSAccessMethod* fam =
static_cast<FTSAccessMethod*>(collTarget->getIndexCatalog()->getIndex(index));
invariant(fam);
_txn->recoveryUnit()->abandonSnapshot();
dbLock.relockWithMode(MODE_IX);
if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nsTargetColl)) {
return appendCommandStatus(
result,
Status(ErrorCodes::NotMaster,
str::stream() << "Not primary while creating background indexes in "
<< nsTargetColl.ns()));
}
// 6. call InsertTerms into <coll>_Terms.new
InsertTerms(dbname, collTermsCollNew, nsTermsCollNew,fam, true);
Lock::DBLock dbLock2(_txn->lockState(), nsTermsColl.db(), MODE_X);
// 7. Drop <coll>_Terms
// 8. Rename <coll>_Terms -> <coll>_Terms.new
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
WriteUnitOfWork wunit(_txn);
log() << "textTermBuild2 rename start from:" << nsTermNew << " to:" << nsTerm << std::endl;
log() << "textTermBuild2 rename try drop:" << nsTerm << std::endl;
Status ds = db->dropCollection(_txn, nsTerm);
if (ds.isOK()) {
log() << "textTermBuild2 rename dropped " << nsTerm << std::endl;
} else {
result.append("error",ds.reason());
errmsg = ds.reason();
error() << "textTermBuild2 drop error:" << ds.reason() << std::endl;
return false;
}
Status status = db->renameCollection(_txn, nsTermNew, nsTerm, true /* ?stayTemp? this will drop if exists?*/);
log() << "textTermBuild2 rename done" << std::endl;
if (!status.isOK()) {
result.append("error",status.reason());
errmsg = status.reason();
error() << "textTermBuild2 rename error:" << status.reason() << std::endl;
return false;
}
wunit.commit();
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "textTermBuild", nsTermsColl.ns());
lastOpSetterGuard.Dismiss();
result.append("ok", 1);
return true;
}
}CmdTextTermBuild2;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment