Skip to content

Instantly share code, notes, and snippets.

@doowb
Created August 25, 2014 18:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save doowb/81eb96c9107dff19a271 to your computer and use it in GitHub Desktop.
Save doowb/81eb96c9107dff19a271 to your computer and use it in GitHub Desktop.
Put transformations not running.
'use strict';
var _ = require('lodash');
var dat = require('dat');
var through = require('through2');
var products = [
'PROD-001',
// 'PROD-002',
'PROD-003',
// 'PROD-004',
// 'PROD-005',
// 'PROD-006',
'PROD-007',
'PROD-008',
];
var transforms = [
function () {
return through.obj(function features (product, enc, next) {
console.log('features', product);
product.features = (product.features || []).concat([
{ color: 'red', sizes: ['S', 'M', 'L', 'XL', 'XXL'] },
{ color: 'blue', sizes: ['S', 'M', 'L', 'XL'] },
{ color: 'green', sizes: ['M', 'L'] }
]);
product.transformed = (product.transformed || []).concat(['doowb-features']);
// this.push(product);
next(null, product);
});
},
function () {
return through.obj(function PROD1 (product, enc, next) {
console.log('PROD1', product);
if (product.id === 'PROD-001') {
product.cost *= 0.80;
product.transformed = (product.transformed || []).concat(['doowb-PROD1']);
}
// this.push(product);
next(null, product);
});
},
function () {
return through.obj(function PROD8 (product, enc, next) {
console.log('PROD8', product);
if (product.id === 'PROD-008') {
product.cost *= 0.50;
product.transformed = (product.transformed || []).concat(['doowb-PROD8']);
}
// this.push(product);
next(null, product);
});
}
];
var userDB = dat('./data/products/doowb', function (err) {
if (err) {
throw new Error(err);
}
var relatedDB = dat('./data/products/jon', { transformations: { put: transforms } }, function (err) {
if (err) {
throw new Error(err);
}
// get the products from userDB and pass them through the transforms
var readOpts = {
start: 'PROD-001',
end: 'PROD-008'
};
var userStream = userDB.createReadStream(readOpts);
userStream.pipe(through.obj(function (product, enc, callback) {
var stream = this;
if (!product || _.contains(products, product.id) === false) {
this.push(product);
return callback();
}
console.log('transforming product', product);
relatedDB.put(product.id, product, {force: true}, function (err, updated) {
if (err) {
throw new Error(err);
}
console.log('Product ', updated.id, 'updated in related database');
console.log(updated);
stream.push(product);
callback();
});
}));
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment