Skip to content

Instantly share code, notes, and snippets.

@d00rman
Created December 21, 2017 16:34
Show Gist options
  • Save d00rman/f2002afd2d94f3afb9585d2221253eba to your computer and use it in GitHub Desktop.
Save d00rman/f2002afd2d94f3afb9585d2221253eba to your computer and use it in GitHub Desktop.
Move Math data script
#!/usr/bin/env nodejs
'use strict';
const cassandra = require('cassandra-driver');
const P = require('bluebird');
const preq = require('preq');
const yargs = require('yargs');
const argv = yargs
.usage('Usage: $0 <restbase-url>')
.options('h', {alias: 'help'})
.options('H', {
alias: 'hostname',
default: 'localhost',
describe: 'Contact hostname',
type: 'string'
})
.options('P', {
alias: 'port',
default: 9042,
describe: 'Contact port number',
type: 'number'
})
.options('u', {
alias: 'username',
default: 'cassandra',
describe: 'Cassandra username',
type: 'string'
})
.options('p', {
alias: 'password',
default: 'cassandra',
describe: 'Cassandra password',
type: 'string'
})
.argv;
if (argv.help) {
yargs.showHelp();
process.exit(0);
}
const host = argv.hostname;
const port = argv.port;
const rbHost = argv._[0];
const contact = `${host}:${port}`;
const user = argv.username;
const pass = argv.password;
/** Creates a single connection pool. */
function connect() {
const client = new cassandra.Client({
contactPoints: [ contact ],
authProvider: new cassandra.auth.PlainTextAuthProvider(user, pass),
sslOptions: { ca: '/dev/null' },
promiseFactory: P.fromCallback,
queryOptions: { consistency: cassandra.types.consistencies.one },
});
return client.connect().then(() => client);
}
function _nextPage(client, query, params, pageState, options) {
return P.try(() => client.execute(query, params, {
prepare: true,
fetchSize: options.fetchSize || 5,
pageState,
}))
.catch((err) => {
if (!options.retries) {
throw err;
}
options.retries--;
return _nextPage(client, query, params, pageState, options);
});
}
/**
* Async-safe Cassandra query execution
*
* Client#eachRow in the Cassandra driver relies upon a synchronous callback
* to provide back-pressure during paging; This function can safely execute
* async callback handlers.
*
* @param {object} cassandra-driver Client instance
* @param {string} CQL query string
* @param {array} CQL query params
* @param {object} options map
* @param {function} function to invoke for each row result
*/
function eachRow(client, query, params, options, handler) {
options.log = options.log || (() => {});
const origOptions = Object.assign({}, options);
function processPage(pageState) {
options.retries = origOptions.retries;
return _nextPage(client, query, params, pageState, options)
.then((res) => P.try(() => P.map(res.rows, row => handler(row), { concurrency: 16 }))
.then(() => {
if (!res || res.pageState === null) {
return resolve();
} else {
return processPage(res.pageState);
// Break the promise chain, so that we don't hold onto a
// previous page's memory.
//process.nextTick(() => P.try(() => processPage(res.pageState)).catch((e) => {
// there's something going on, ignore
//}).then(() => resolve()));
}
}));
}
return processPage(null);
}
let count = 0;
let cc;
const hh = {};
return connect().then((client) => {
cc = client;
return eachRow(
client,
'SELECT key, value FROM "local_group_globaldomain_T_mathoid_input".data',
{},
{
retries: 10,
fetchSize: 100,
log: console.log
},
(row) => {
hh[row.key] = true;
count++;
if(count % 1000 === 0) {
let hhl = Object.keys(hh).length;
let drift = count - hhl;
console.log(`- ${count}\t${hhl}\t${drift}`);
return new P((resolve) => process.nextTick(() => resolve()));
}
return P.resolve();
}
);
}).then(() => console.log(`Total count: ${count}`)).finally(() => cc.shutdown());
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment