Skip to content

Instantly share code, notes, and snippets.

View bbc4468's full-sized avatar

Saurabh Goyal bbc4468

View GitHub Profile
@bbc4468
bbc4468 / ingest.js
Last active March 10, 2021 21:26
Kafka to InfluxDB ingestion
const Influx = require('influx');
const kafka = require('kafka-node'),
ConsumerGroup = kafka.ConsumerGroup;
const consumer = new ConsumerGroup({
kafkaHost: `${process.env.KAFKA_HOST||"localhost"}:9092`,
groupId: 'price-history-trade-consumer'
}, ['trade-stream']);
@bbc4468
bbc4468 / influx_schema.sql
Last active December 30, 2021 20:49
Influx DB Schema for OHLC
create database price_history_db
CREATE CONTINUOUS QUERY "cq_1m" ON "price_history_db" BEGIN SELECT MIN(price) as low, MAX(price) as high, FIRST(price) as open, LAST(price) as close, SUM(size) as volume INTO "price_1m" FROM "trade" GROUP BY time(1m), symbol END
CREATE CONTINUOUS QUERY "cq_5m" ON "price_history_db" BEGIN SELECT MIN(low) as low, MAX(high) as high, FIRST(open) as open, LAST(close) as close, SUM(volume) as volume INTO "price_5m" FROM "price_1m" GROUP BY time(5m), symbol END
CREATE CONTINUOUS QUERY "cq_15m" ON "price_history_db" BEGIN SELECT MIN(low) as low, MAX(high) as high, FIRST(open) as open, LAST(close) as close, SUM(volume) as volume INTO "price_15m" FROM "price_5m" GROUP BY time(15m), symbol END
CREATE CONTINUOUS QUERY "cq_1h" ON "price_history_db" BEGIN SELECT MIN(low) as low, MAX(high) as high, FIRST(open) as open, LAST(close) as close, SUM(volume) as volume INTO "price_1h" FROM "price_15m" GROUP BY time(1h), symbol END
CREATE CONTINUOUS QUERY "cq_6h" ON "price_history_db" BEGIN SELECT