Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
// include necessary modules
var cluster = require('cluster');
var os = require('os');
var rbush = require('rbush');
var assert = require('assert');
var http = require("http");
var url = require("url");
var toobusy = require('toobusy-js');
var byline = require('byline');
var touch = require("touch");
var Canvas = require('canvas');
var events = require('events');
var log4js = require('log4js');
var fs = require('graceful-fs');
var pgPass = require('pgpass');
var Pool = require('pg-pool');
var mkdirp = require('mkdirp');
var pg = require('pg');
VectorTilerequest = require('./vectortilerequest.js');
BitmapTilerequest = require('./bitmaptilerequest.js');
var mkdirp = require('mkdirp');
var pg = require('pg');
var toobusy = require('toobusy-js');
var byline = require('byline');
logger = log4js.getLogger();
logger.setLevel('TRACE');
configuration = require('./config.json');
Tile = require('./tile.js');
Tilerequest = require('./tilerequest.js');
Tilequeue = require('./queue.js');
// Initiate the bull job message queue
var
Queue = require('bull'),
cluster = require('cluster');
var numWorkers = 12;
var queue = new Queue("test concurrent queue");
var cpus = os.cpus().length;
if(cluster.isMaster){
for (var i = 0; i < numWorkers; i++) {
cluster.fork();
}
cluster.on('online', function(worker) {
console.log("Worker came online")
});
lvls = [
{
'x_from': 8820,
'x_to': 9216,
'y_from': 3883,
'y_to': 5216,
'z': 14
},
{
'x_from': 17643,
'x_to': 17644,
'y_from': 7768,
'y_to': 10432,
'z': 15
},
]
for (i in lvls) {
for (x = lvls[i]['x_from']; x <= lvls[i]['x_to']; x ++){
for (y = lvls[i]['y_from']; y <= lvls[i]['y_to']; y ++){
queue.add([lvls[i]['z'], x, y])
}
}
console.log("done queuein")
}
cluster.on('exit', function(worker, code, signal) {
console.log('worker ' + worker.process.pid + ' died');
});
}else{
cpus = os.cpus().length;
// request password from pgpass only once at startup, not for each process or even each request
var connectionDetails =
{
'host' : 'localhost',
'port': configuration.port,
'database': configuration.database,
'user' : configuration.username,
'max': parseInt(configuration.maxPoolSize / cpus),
'idleTimeoutMillis': 10000
};
console.log("...")
pgPass(connectionDetails, function(password)
{
if (typeof password == 'undefined')
{
console.log('PGPASS file cannot be read or no matching line for given connection info found');
process.exit(1);
}
console.log('Successfully read password using PGPASS');
configuration.password = password;
pool = new Pool(connectionDetails);
pool.on('error', function (err, client)
{
console.log('Idle database client error: ' + err.message)
});
queue.process(function(job, jobDone){
zyx = job.data
console.log("Job started by worker", cluster.worker.id, job.id);
console.log('Rendering tile from the queue: ' + zyx[0] + '/' + zyx[1] + '/' + zyx[2] );
tile = new Tile(zyx[0],zyx[1],zyx[2])
tile.getVectorData(function(data)
{
console.log("done 1")
if(!tile) {
console.log("done 2")
jobDone();
return;
}
tile.saveVectorData(function(err)
{
if (err)
{
console.log('Vector tile could not be saved. Returning.' + err);
}
else
{
if(tile){
tile.rerenderBitmap();
console.log("done 50")
}else{
console.log("err 3")
jobDone();
}
// remove tile from queue and render next tile if every style was rendered
}
if (tile)
tile.destroy();
tile = null;
console.log("done 100")
jobDone();
});
jobDone();
}, function(err)
{
logger.info('Vector tile could not be created. Aborting.' + err);
if(tile)
tile.destroy();
tile = null;
console.log("done")
jobDone();
});
});
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment