Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thlorenz/6031386 to your computer and use it in GitHub Desktop.
Save thlorenz/6031386 to your computer and use it in GitHub Desktop.
Outlines problems with leveldb piping data through transform and back into level
processing "/zanzibar/seeanddo" total 7 urls
processing "/zurich/hotels" total 9 urls
processing "/zurich/restaurants" total 8 urls
processing "/zurich/seeanddo" total 1 urls
transformed url db read value stream ended
raw html write db stream closed
database now contains raw html content for 15716 urls
transformed: /zurich/hotels/504749 to html with content length 6759
[Error: stream.push() after EOF]
Error: stream.push() after EOF
at readableAddChunk (_stream_readable.js:146:15)
at UrlsToHtmlTransform.Readable.push (_stream_readable.js:127:10)
at UrlsToHtmlTransform.Transform.push (_stream_transform.js:140:32)
at /Users/thlorenz/dev/cn/minecierge/lib/raw/html-transform.js:51:20
at Request._callback (/Users/thlorenz/dev/cn/minecierge/lib/fetch.js:9:5)
at Request.self.callback (/Users/thlorenz/dev/cn/minecierge/node_modules/request/index.js:148:22)
at Request.EventEmitter.emit (events.js:98:17)
at Request.<anonymous> (/Users/thlorenz/dev/cn/minecierge/node_modules/request/index.js:886:14)
at Request.EventEmitter.emit (events.js:117:20)
at IncomingMessage.<anonymous> (/Users/thlorenz/dev/cn/minecierge/node_modules/request/index.js:837:12)
transformed: /zurich/restaurants/15590 to html with content length 4367
[Error: stream.push() after EOF]
Error: stream.push() after EOF
at readableAddChunk (_stream_readable.js:146:15)
at UrlsToHtmlTransform.Readable.push (_stream_readable.js:127:10)
at UrlsToHtmlTransform.Transform.push (_stream_transform.js:140:32)
at /Users/thlorenz/dev/cn/minecierge/lib/raw/html-transform.js:51:20
at Request._callback (/Users/thlorenz/dev/cn/minecierge/lib/fetch.js:9:5)
at Request.self.callback (/Users/thlorenz/dev/cn/minecierge/node_modules/request/index.js:148:22)
at Request.EventEmitter.emit (events.js:98:17)
at Request.<anonymous> (/Users/thlorenz/dev/cn/minecierge/node_modules/request/index.js:886:14)
at Request.EventEmitter.emit (events.js:117:20)
at IncomingMessage.<anonymous> (/Users/thlorenz/dev/cn/minecierge/node_modules/request/index.js:837:12)
transformed: /zurich/hotels/2675 to html with content length 5515
[Error: stream.push() after EOF]
Error: stream.push() after EOF
[...] (more of this)
'use strict';
var config = require('../config');
var leveldb = require('../leveldb');
/**
* Stores the html data keyed by the urls
*
* @name exports
* @function
* @return data {Stream[Object]} { key: '/location/venuetype/id', value: 'html of relevant content' }
*/
var go = exports.getWriteStream = function (db, cb) {
var rawDataSub = db.sublevel(config.sublevel.data.raw);
var ws = rawDataSub.createWriteStream({ type: 'put' })
.on('error', console.error)
.on('end', function () {
console.error('raw html write db stream ended');
})
.on('close', function () {
console.error('raw html write db stream closed');
cb();
});
return ws;
};
exports.retrieve = function (db, url, cb) {
var rawDataSub = db.sublevel(config.sublevel.data.raw);
rawDataSub.get(url + '\xff', cb);
};
'use strict';
var config = require('../config');
var leveldb = require('../leveldb');
var store = require('./html-store');
var transform = require('./html-transform');
/**
* Finds all venueUrls contained in the db and fetches pages for all of them,
* cuts out relevant section and stores this html snippet in the db as raw html, keyed by the url.
*
* @name exports
* @function
* @param db {Object} leveldb root
* @param cb {Function} called back when all urls have been processed
*/
var go = module.exports = function (db, cb) {
var venuesUrls = db.sublevel(config.sublevel.url.venues, { valueEncoding: 'json' });
var ws = store.getWriteStream(db, cb);
var retrieve = store.retrieve.bind(null, db);
venuesUrls.createValueStream()
.pipe(transform(retrieve))
.on('error', function (err) {
console.error(err);
console.error(err.stack);
})
.on('end', function () {
console.error('transformed url db read value stream ended');
/*ws.end();
cb();*/
})
.on('close', function () {
console.error('transformed url db read value stream closed');
})
.pipe(ws);
};
'use strict';
var runnel = require('runnel');
var Transform = require('stream').Transform;
var getHtml = require('./html-get');
var content = require('./html-content');
function UrlsToHtmlTransform (retrieve) {
Transform.call(this, { objectMode: true });
this._retrieve = retrieve;
}
UrlsToHtmlTransform.prototype = Object.create(Transform.prototype, { constructor: { value: UrlsToHtmlTransform } });
UrlsToHtmlTransform.prototype._transform = function (urls, encoding, done) {
var self = this;
if (!urls.length) return done();
var parts = urls[0].split('/');
parts.pop();
var root = parts.join('/');
console.error('processing "%s" total %s urls ', root, urls.length);
var tasks = urls.map(function (url) {
return function (cb) {
// Lets see if we already have it in the db
self._retrieve(url, function (err, html) {
if (!err && html && html.length) {
console.error('Already have html for [' + url + ']. Not fetching again');
return cb();
}
// didn't have it, lets go and fetch it
getHtml(url, function (err, html) {
// if we encounter an error keep going in order to get html for the remaining urls anyways
if (err) {
self.emit('error', err);
return cb();
}
try {
var c = content(html);
console.error(' transformed: %s to html with content length', url, c.length);
try {
self.push({ key: url + '\xff', value: c });
} catch (pushErr) {
// First push gets 'TypeError: invalid data' although data is valid, so we need to retry
if (pushErr.message === 'invalid data')
self.push({ key: url + '\xff', value: c });
else
self.emit('error', pushErr);
}
} catch (e) {
self.emit('error', e);
}
// tell runnel to process next task
cb();
});
});
};
});
runnel(tasks.concat(done));
};
/**
* Transforms a stream of url arrays into a stream of relevant content for each of those urls (fetched from the website).
*
* @name exports
* @function
* @param retrieve {Function} called to try to retrieve the html from the database.
* If it calls back without an error, it will not be fetched from the website.
* @return {Transform} a stream transform
*/
var go = module.exports = function (retrieve) {
return new UrlsToHtmlTransform(retrieve);
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment