Created
July 18, 2013 17:48
-
-
Save thlorenz/6031386 to your computer and use it in GitHub Desktop.
Outlines problems with leveldb piping data through transform and back into level
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
processing "/zanzibar/seeanddo" total 7 urls | |
processing "/zurich/hotels" total 9 urls | |
processing "/zurich/restaurants" total 8 urls | |
processing "/zurich/seeanddo" total 1 urls | |
transformed url db read value stream ended | |
raw html write db stream closed | |
database now contains raw html content for 15716 urls | |
transformed: /zurich/hotels/504749 to html with content length 6759 | |
[Error: stream.push() after EOF] | |
Error: stream.push() after EOF | |
at readableAddChunk (_stream_readable.js:146:15) | |
at UrlsToHtmlTransform.Readable.push (_stream_readable.js:127:10) | |
at UrlsToHtmlTransform.Transform.push (_stream_transform.js:140:32) | |
at /Users/thlorenz/dev/cn/minecierge/lib/raw/html-transform.js:51:20 | |
at Request._callback (/Users/thlorenz/dev/cn/minecierge/lib/fetch.js:9:5) | |
at Request.self.callback (/Users/thlorenz/dev/cn/minecierge/node_modules/request/index.js:148:22) | |
at Request.EventEmitter.emit (events.js:98:17) | |
at Request.<anonymous> (/Users/thlorenz/dev/cn/minecierge/node_modules/request/index.js:886:14) | |
at Request.EventEmitter.emit (events.js:117:20) | |
at IncomingMessage.<anonymous> (/Users/thlorenz/dev/cn/minecierge/node_modules/request/index.js:837:12) | |
transformed: /zurich/restaurants/15590 to html with content length 4367 | |
[Error: stream.push() after EOF] | |
Error: stream.push() after EOF | |
at readableAddChunk (_stream_readable.js:146:15) | |
at UrlsToHtmlTransform.Readable.push (_stream_readable.js:127:10) | |
at UrlsToHtmlTransform.Transform.push (_stream_transform.js:140:32) | |
at /Users/thlorenz/dev/cn/minecierge/lib/raw/html-transform.js:51:20 | |
at Request._callback (/Users/thlorenz/dev/cn/minecierge/lib/fetch.js:9:5) | |
at Request.self.callback (/Users/thlorenz/dev/cn/minecierge/node_modules/request/index.js:148:22) | |
at Request.EventEmitter.emit (events.js:98:17) | |
at Request.<anonymous> (/Users/thlorenz/dev/cn/minecierge/node_modules/request/index.js:886:14) | |
at Request.EventEmitter.emit (events.js:117:20) | |
at IncomingMessage.<anonymous> (/Users/thlorenz/dev/cn/minecierge/node_modules/request/index.js:837:12) | |
transformed: /zurich/hotels/2675 to html with content length 5515 | |
[Error: stream.push() after EOF] | |
Error: stream.push() after EOF | |
[...] (more of this) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
var config = require('../config'); | |
var leveldb = require('../leveldb'); | |
/** | |
* Stores the html data keyed by the urls | |
* | |
* @name exports | |
* @function | |
* @return data {Stream[Object]} { key: '/location/venuetype/id', value: 'html of relevant content' } | |
*/ | |
var go = exports.getWriteStream = function (db, cb) { | |
var rawDataSub = db.sublevel(config.sublevel.data.raw); | |
var ws = rawDataSub.createWriteStream({ type: 'put' }) | |
.on('error', console.error) | |
.on('end', function () { | |
console.error('raw html write db stream ended'); | |
}) | |
.on('close', function () { | |
console.error('raw html write db stream closed'); | |
cb(); | |
}); | |
return ws; | |
}; | |
exports.retrieve = function (db, url, cb) { | |
var rawDataSub = db.sublevel(config.sublevel.data.raw); | |
rawDataSub.get(url + '\xff', cb); | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
var config = require('../config'); | |
var leveldb = require('../leveldb'); | |
var store = require('./html-store'); | |
var transform = require('./html-transform'); | |
/** | |
* Finds all venueUrls contained in the db and fetches pages for all of them, | |
* cuts out relevant section and stores this html snippet in the db as raw html, keyed by the url. | |
* | |
* @name exports | |
* @function | |
* @param db {Object} leveldb root | |
* @param cb {Function} called back when all urls have been processed | |
*/ | |
var go = module.exports = function (db, cb) { | |
var venuesUrls = db.sublevel(config.sublevel.url.venues, { valueEncoding: 'json' }); | |
var ws = store.getWriteStream(db, cb); | |
var retrieve = store.retrieve.bind(null, db); | |
venuesUrls.createValueStream() | |
.pipe(transform(retrieve)) | |
.on('error', function (err) { | |
console.error(err); | |
console.error(err.stack); | |
}) | |
.on('end', function () { | |
console.error('transformed url db read value stream ended'); | |
/*ws.end(); | |
cb();*/ | |
}) | |
.on('close', function () { | |
console.error('transformed url db read value stream closed'); | |
}) | |
.pipe(ws); | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
var runnel = require('runnel'); | |
var Transform = require('stream').Transform; | |
var getHtml = require('./html-get'); | |
var content = require('./html-content'); | |
function UrlsToHtmlTransform (retrieve) { | |
Transform.call(this, { objectMode: true }); | |
this._retrieve = retrieve; | |
} | |
UrlsToHtmlTransform.prototype = Object.create(Transform.prototype, { constructor: { value: UrlsToHtmlTransform } }); | |
UrlsToHtmlTransform.prototype._transform = function (urls, encoding, done) { | |
var self = this; | |
if (!urls.length) return done(); | |
var parts = urls[0].split('/'); | |
parts.pop(); | |
var root = parts.join('/'); | |
console.error('processing "%s" total %s urls ', root, urls.length); | |
var tasks = urls.map(function (url) { | |
return function (cb) { | |
// Lets see if we already have it in the db | |
self._retrieve(url, function (err, html) { | |
if (!err && html && html.length) { | |
console.error('Already have html for [' + url + ']. Not fetching again'); | |
return cb(); | |
} | |
// didn't have it, lets go and fetch it | |
getHtml(url, function (err, html) { | |
// if we encounter an error keep going in order to get html for the remaining urls anyways | |
if (err) { | |
self.emit('error', err); | |
return cb(); | |
} | |
try { | |
var c = content(html); | |
console.error(' transformed: %s to html with content length', url, c.length); | |
try { | |
self.push({ key: url + '\xff', value: c }); | |
} catch (pushErr) { | |
// First push gets 'TypeError: invalid data' although data is valid, so we need to retry | |
if (pushErr.message === 'invalid data') | |
self.push({ key: url + '\xff', value: c }); | |
else | |
self.emit('error', pushErr); | |
} | |
} catch (e) { | |
self.emit('error', e); | |
} | |
// tell runnel to process next task | |
cb(); | |
}); | |
}); | |
}; | |
}); | |
runnel(tasks.concat(done)); | |
}; | |
/** | |
* Transforms a stream of url arrays into a stream of relevant content for each of those urls (fetched from the website). | |
* | |
* @name exports | |
* @function | |
* @param retrieve {Function} called to try to retrieve the html from the database. | |
* If it calls back without an error, it will not be fetched from the website. | |
* @return {Transform} a stream transform | |
*/ | |
var go = module.exports = function (retrieve) { | |
return new UrlsToHtmlTransform(retrieve); | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment