Skip to content

Instantly share code, notes, and snippets.

@MichalCz
Forked from kharandziuk/scramjet.js
Last active April 8, 2017 11:56
Show Gist options
  • Save MichalCz/ae37100f433c3dbf87cd00ad06689a30 to your computer and use it in GitHub Desktop.
Save MichalCz/ae37100f433c3dbf87cd00ad06689a30 to your computer and use it in GitHub Desktop.
const scramjet = require("scramjet");
const {getAPI,insertDB} = require('./io-simulators');
const ROWS = 1000;
class APIReadable extends scramjet.DataStream {
constructor(options) {
super(options);
this.offset = 0;
}
_read() {
getAPI(this.offset, ROWS).then(data => {
if (data && data.length) {
this.push(data);
} else {
this.push(null);
}
});
this.offset = this.offset + ROWS;
}
}
let arr = [];
let ts = Date.now();
// build a stream of indexes
new APIReadable()
.flatMap((i) => i.map((item) => {
const idStr = ("" + item.id);
if (idStr.indexOf('3') > -1) {
return 0;
} else if (idStr.indexOf('9') > -1) {
return Object.assign(item, {
timestamp: Date.now()
});
} else {
return item;
}
}))
.remap((emit, item) => {
if (item) arr.push(item);
if (arr.length > 1000 || Date.now() - ts > 1000) {
emit(arr);
arr = []; ts = Date.now();
}
})
// write to database
.accumulate((insert, item) => insert(item), insertDB)
.catch(e => console.log(e && e.stack));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment