Skip to content

Instantly share code, notes, and snippets.

@bbc4468
Last active March 10, 2021 21:26
Show Gist options
  • Save bbc4468/2cc749d1afc67d3872828c0f925122a1 to your computer and use it in GitHub Desktop.
Save bbc4468/2cc749d1afc67d3872828c0f925122a1 to your computer and use it in GitHub Desktop.
Kafka to InfluxDB ingestion
const Influx = require('influx');
const kafka = require('kafka-node'),
ConsumerGroup = kafka.ConsumerGroup;
const consumer = new ConsumerGroup({
kafkaHost: `${process.env.KAFKA_HOST||"localhost"}:9092`,
groupId: 'price-history-trade-consumer'
}, ['trade-stream']);
const influx = new Influx.InfluxDB({
host: process.env.INFLUX_HOST || 'localhost',
database: 'price_history_db',
schema: [
{
measurement: 'trade',
fields: {
price: Influx.FieldType.FLOAT,
size: Influx.FieldType.INTEGER
},
tags: [
'symbol',
'side'
]
}
]
});
consumer.on('message', function (message) {
const trade = JSON.parse(message.value.toString('utf8'));
console.log('DEBUG: Got new trade', trade);
influx.writePoints([
{
measurement: 'trade',
tags: {
symbol: trade.symbol,
side: trade.side
},
fields: {
price: trade.price, size: trade.size
},
timestamp: trade.timestamp || (new Date()).getTime() * 1000
}
], {
precision: 'u'
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment