Skip to content

Instantly share code, notes, and snippets.

@cliftonc
Last active May 30, 2016 19:31
Show Gist options
  • Save cliftonc/7447f260100dca40e66c92bcc2679f13 to your computer and use it in GitHub Desktop.
Save cliftonc/7447f260100dca40e66c92bcc2679f13 to your computer and use it in GitHub Desktop.
Cassandra version
//select * from downloads.downloads where user = 3626249 and resource IN (6109933,123,6326144);
var _ = require('lodash');
var async = require('async');
var fs = require('fs');
var path = require('path');
var written = 0;
var stream = require('stream');
var util = require('util');
const cassandra = require('cassandra-driver');
const client = new cassandra.Client({ contactPoints: ['127.0.0.1']});
function CassandraStream () {
stream.Writable.call(this);
this.buffer = [];
};
util.inherits(CassandraStream, stream.Writable);
CassandraStream.prototype.write = function (chunk) {
var self = this;
var data = JSON.parse(chunk);
if(self.buffer.length === 1000) {
written += self.buffer.length;
client.batch(self.buffer, { prepare: true }, function(err) {
if (err) { console.dir(err); }
self.buffer = [];
if(written > 2000000) { process.exit(1); }
self.emit('drain');
});
return false;
} else {
self.buffer.push(
{
query: 'INSERT INTO downloads.downloads (user, resource, when) VALUES (?, ?, ?)',
params: [data[0], data[1], new Date(data[2])]
}
);
return true;
}
}
function create(next) {
const keyspace = 'CREATE KEYSPACE IF NOT EXISTS downloads WITH replication = {\'class\' : \'SimpleStrategy\', \'replication_factor\' : 1}';
const table = 'CREATE TABLE IF NOT EXISTS downloads.downloads (user int, resource int, when timestamp, PRIMARY KEY (user, resource))';
async.series([
async.apply(client.execute.bind(client), keyspace),
async.apply(client.execute.bind(client), table)
], next);
}
function load() {
console.log('Loading ...');
read = 0;
written = 0;
var logger = setInterval(function() {
console.log('Loaded ' + written + ' ...');
// client.hmset('user', user, function() {});
// client.hmset('resource', resource, function() {});
}, 5000);
var cassandraStream = new CassandraStream();
var headers = ['user_id','resource_id','download_date']
var filePath = path.resolve('rd.csv');
var csv = require('fast-csv');
csv
.fromPath(filePath, {objectMode: false})
.pipe(cassandraStream)
.on('end', function() {
clearInterval(logger);
client.quit();
})
}
create(load);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment