Skip to content

Instantly share code, notes, and snippets.

@Pchelolo
Created July 8, 2016 14:56
Show Gist options
  • Save Pchelolo/7f3cf0c2e553eca1f909aa6b9ab99a09 to your computer and use it in GitHub Desktop.
Save Pchelolo/7f3cf0c2e553eca1f909aa6b9ab99a09 to your computer and use it in GitHub Desktop.
// This will do the 'while(true)' trick for you.
// The callback should return a promise, that's fulfilled when this batch processing is done
// However, I'm not 100% sure this is the API we want. I would better see the Consumer inherit
// from an EventEmitter, and emit `message` message for each consumed message, and `error` for each error,
// same as kafka-node does. It feels more natural to me, but it's a bigger discussion.
class Consumer {
constructor(rawConsumer) {
this.rawConsumer = rawConsumer;
this.isStarted = false;
}
pollAsync(timeout) {
return new Promise((resolve) => {
resolve(rawConsumer.poll(timeout));
});
}
start(callback) {
this.isStarted = true;
const fetch = () => {
return this.pollAsync(1000)
.then(callback)
.catch((e) => console.log(e))
.finally(() => {
if (this.isStarted) {
return fetch();
}
});
};
fetch();
}
stop() {
this.isStarted = false;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment