Skip to content

Instantly share code, notes, and snippets.

@Pchelolo
Created October 3, 2018 23:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Pchelolo/0203c271fe97e270e40195bc415cd22d to your computer and use it in GitHub Desktop.
Save Pchelolo/0203c271fe97e270e40195bc415cd22d to your computer and use it in GitHub Desktop.
"use strict";
const { EventEmitter } = require('events');
const P = require('bluebird');
/**
* Probability of committing the offset in case the particular message
* did not match the rule. For some rules like null edits in change-prop
* the rule match is very unprobably, because they reuse a common topic
* with many other rules that fire way more frequently.
*
* If we only commit messages when they match, that can create a large
* false backlog - after a restart a lot of messages can be reread and
* continuosly rejected.
*
* To avoid it, commit offsets of the rejected messages from time to time.
*
* @const
* @type {number}
*/
const NO_MATCH_COMMIT_PROBABILITY = 0.01;
class Event {
constructor(message, eventStream) {
this._eventStream = eventStream;
}
ack() {
// We're pushing it to pending messages only if it matched so that items
// that don't match don't count against the concurrency limit.
this._eventStream._pendingMsgs.add(msg);
}
commit() {
this._eventStream._notifyFinished(msg);
if (this._eventStream._pendingMsgs.size < this._eventStream.concurrency
&& !this._eventStream._consuming) {
this._eventStream._consume();
}
}
reject() {
if (Math.random() < NO_MATCH_COMMIT_PROBABILITY) {
// Again, do not return the promise as the commit can be done async
this._eventStream._notifyFinished(msg);
}
}
}
class EventStream extends EventEmitter {
constructor(consumer) {
super();
this._consumer = consumer;
// In order to filter out the pending messages faster make them offset->msg map
this._pendingMsgs = new Set();
this._pendingCommits = new Map();
this._consuming = false;
this._connected = false;
}
start() {
}
stop() {
}
_consume() {
if (!this._connected) {
return;
}
this._consuming = true;
this._consumer.consumeAsync(this.consumerBatchSize)
.then((messages) => {
if (!messages.length) {
// No new messages, delay a bit and try again.
return P.delay(100);
}
messages.forEach((msg) => {
const message = this._safeParse(msg.value.toString('utf8'));
this.emit('event', new Event(message, this));
});
})
.catch((e) => {
// This errors must come from the KafkaConsumer
// since the actual handler must never throw errors
/* eslint-disable indent */
switch (e.code) {
case kafka.CODES.ERRORS.ERR__PARTITION_EOF:
case kafka.CODES.ERRORS.ERR__TIMED_OUT:
// We're reading to fast, nothing is there, slow down a little bit
return P.delay(100);
default:
if (e.code === kafka.CODES.ERRORS.ERR__STATE) {
if (this._connected) {
// KafkaConsumer is disconnected or entered error state,
// but not because we stopped it.
// Give it some time to reconnect before the new attempt
// to fetch messages again to avoid a tight loop
this._logger.log(`error/consumer`, e);
return P.delay(1000);
} else {
return;
}
}
// Something else is terribly wrong.
this._logger.log(`error/consumer`, e);
}
/* eslint-enable indent */
})
.finally(() => {
if (this._pendingMsgs.size < this.concurrency) {
this._consume();
} else {
this._consuming = false;
}
});
}
_notifyFinished(finishedMsg) {
this._pendingMsgs.delete(finishedMsg);
if (this._pendingCommits.has(finishedMsg.topic)) {
this._pendingCommits.get(finishedMsg.topic).push(finishedMsg);
} else {
this._pendingCommits.set(finishedMsg.topic, [ finishedMsg ]);
}
if (this.options.test_mode) {
this._logger.log('trace/commit', 'Running in TEST MODE; Offset commits disabled');
return;
}
if (!this._commitTimeout) {
this._commitTimeout = setTimeout(() => {
const stillHasPending = (proposedToCommit) => {
for (const pendingMsg of this._pendingMsgs.entries()) {
if (pendingMsg.topic === proposedToCommit.topic
&& pendingMsg.offset <= proposedToCommit.offset) {
return true;
}
}
return false;
};
const toCommit = [];
this._commitTimeout = null;
if (!this._connected) {
return;
}
for (const [topic, commitQueue] of this._pendingCommits.entries()) {
if (commitQueue.length) {
let sortedCommitQueue = commitQueue.sort((msg1, msg2) =>
msg1.offset - msg2.offset);
let msgToCommit;
while (sortedCommitQueue.length
&& !stillHasPending(sortedCommitQueue[0])) {
msgToCommit = sortedCommitQueue[0];
sortedCommitQueue = sortedCommitQueue.slice(1);
}
if (msgToCommit) {
toCommit.push(msgToCommit);
this._pendingCommits.set(topic, sortedCommitQueue);
}
}
}
return P.all(toCommit.map(message => this._consumer.commitMessageAsync(message)
.catch((e) => {
this._logger.log(`error/commit`, () => ({
msg: 'Commit failed',
offset: message.offset,
raw_event: message.value.toString(),
description: e.toString()
}));
})
));
}, DEFAULT_COMMIT_INTERVAL);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment