Skip to content

Instantly share code, notes, and snippets.

@tizzo
Last active March 11, 2016 22:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tizzo/8372d6a1fef9c52563ee to your computer and use it in GitHub Desktop.
Save tizzo/8372d6a1fef9c52563ee to your computer and use it in GitHub Desktop.
migrate_build_size_and_status
node_modules
'use strict';
var through2 = require('through2');
var eventbus = require('probo-eventbus');
var split2 = require('split2');
var os = require('os');
var level = require('level');
var bunyan = require('bunyan');
var async = require('async');
var log = bunyan.createLogger({name: 'migration'});
if (!process.argv[2]) {
console.error('A valid path to a leveldb instance is required.');
process.exit(1);
}
else {
var databaseLocation = process.argv[2];
}
function hasKey(object, path) {
var pieces = path.split('.');
var subObject = object;
for (let piece of pieces) {
if (!subObject.hasOwnProperty(piece)) {
return false;
}
subObject = subObject[piece];
}
return true;
}
var example = {
container: {
disk: {
containerSize: -1,
imageSize: -1,
},
},
};
var mapping = require('./mapping');
var messageProcessedCount = 0;
var Producer = eventbus.plugins.Kafka.Producer;
var db = level(databaseLocation);
var setup = function(done) {
var counter = 0;
db.createValueStream({gt: 'builds!!', lt: 'builds!~', valueEncoding: 'json'})
.pipe(through2.obj(function(build, enc, cb) {
counter++;
db.put(`build_date!!${build.createdAt}!!${build.id}`, JSON.stringify(build), cb);
}, function(cb) {
log.info(`wrote ${counter} keys indexed by date`);
cb();
done();
}));
};
var cleanup = function(done) {
var counter = 0;
db.createKeyStream({gt: 'build_date!!!', lt: 'build_date!!~'})
.pipe(through2(function(key, enc, cb) {
counter++;
db.del(key, cb);
}), function(cb) {
log.info(`deleted ${count} builds inexed by date`);
cb();
done();
});
};
var migration = function(done) {
new Producer({topic: 'build_events', version: 1}, function(error, producer) {
db
.createValueStream({gt: 'build_date!!!', lt: 'build_date!!~'})
.pipe(through2.obj(function(data, enc, cb) {
try {
data = JSON.parse(data);
this.push(data);
}
catch (e) {
log.error('parse failure', data);
}
cb();
}))
.pipe(through2.obj(function(build, enc, cb) {
if (hasKey(build, 'container.disk.containerSize') && !hasKey(build, 'container.diskSpace.realBytes')) {
build.diskSpace = {
realBytes: build.container.disk.containerSize,
virtualBytes: build.container.disk.imageSize,
};
}
else {
build.diskSpace = {
realBytes: 0,
virtualBytes: 0,
};
}
if (hasKey(build, 'project.id') && mapping[build.project.id]) {
build.project.organization = mapping[build.project.id];
}
else {
log.error({buildId: build.id, projectId: build.project.id}, 'No mapping was found.');
build.project = {
id: 'none',
organization: {
id: 'none',
subscription: {
rules: {
concurrentBuilds: 1,
diskSpace: 1,
diskspace: 1,
providerOrgs: 1,
},
},
},
};
build.reaped = true;
}
if (hasKey(build, 'container.state')) {
build.reaped = (build.container.state == 'deleted');
}
else {
log.error({'issue': 'unknown status'}, 'unknown status for ', build.id);
build.reaped = false;
}
delete build.payload;
db.put(`build!${build.id}`, JSON.stringify(build), function(error) {
cb(error, build);
});
}))
.pipe(through2.obj(function(build, enc, cb) {
messageProcessedCount++;
this.push({
event: 'ready',
build,
});
if (build.reaped) {
messageProcessedCount++;
this.push({
event: 'reaped',
build,
});
}
cb(null);
}))
.pipe(producer.stream)
.on('finish', function(cb) {
log.info({count: messageProcessedCount}, `Processed ${messageProcessedCount} messages`);
producer.destroy(cb);
});
});
}
async.series([setup, migration, cleanup], function() {
log.info('Migration complete');
});
{
"name": "8372d6a1fef9c52563ee",
"version": "1.0.0",
"description": "",
"main": "main.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"repository": {
"type": "git",
"url": "git+ssh://git@gist.github.com/8372d6a1fef9c52563ee.git"
},
"author": "",
"license": "ISC",
"bugs": {
"url": "https://gist.github.com/8372d6a1fef9c52563ee"
},
"homepage": "https://gist.github.com/8372d6a1fef9c52563ee",
"dependencies": {
"async": "^1.5.2",
"bunyan": "^1.7.1",
"level": "^1.4.0",
"probo-eventbus": "0.1.1",
"request": "^2.69.0",
"split2": "^2.0.1",
"through2": "^2.0.1"
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment