Skip to content

Instantly share code, notes, and snippets.

Last active April 21, 2021 16:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save Ugbot/653cfe40493dfcbd058dba6ce2d06a6b to your computer and use it in GitHub Desktop.
Save Ugbot/653cfe40493dfcbd058dba6ce2d06a6b to your computer and use it in GitHub Desktop.
Logging from Ably into QuestDB
<!DOCTYPE html>
<script src=""></script>
<script src=""></script>
<script src="" type="text/javascript"></script>
<script src=""></script>
const width = 960;
const height = 500;
const config = {
speed: 0.005,
verticalTilt: -30,
horizontalTilt: 0
var ably = new Ably.Realtime({key:'HAVmIw.5TE_Cg:FgBaavXIOJCIElZY', plugins: {
vcdiffDecoder: vcdiffDecoder
var channel = ably.channels.get('[?rewind=1]geo-ip', {
delta: 'vcdiff'
let locations = [];
channel.subscribe(function(msg) { {
let locationArry = number[0].split(",");
locations.push({'latitude': parseFloat(locationArry[2]), 'longitude': parseFloat(locationArry[3])});
const svg ='svg')
.attr('width', width).attr('height', height).call(d3.zoom().on("zoom", function () {
svg.attr("transform", d3.event.transform)
const markerGroup = svg.append('g');
const projection = d3.geoOrthographic();
const initialScale = projection.scale();
const path = d3.geoPath().projection(projection);
const center = [width/2, height/2];
function drawGlobe() {
.defer(d3.json, '')
.defer(d3.json, '')
.await((error, worldData, locationData) => {
.data(topojson.feature(worldData, worldData.objects.countries).features)
.attr("class", "segment")
.attr("d", path)
.style("stroke", "#888")
.style("stroke-width", "1px")
.style("fill", (d, i) => '#e5e5e5')
.style("opacity", ".6");
//locations = locationData;
function drawGraticule() {
const graticule = d3.geoGraticule()
.step([10, 10]);
.attr("class", "graticule")
.attr("d", path)
.style("fill", "#fff")
.style("stroke", "#ccc");
function enableRotation() {
d3.timer(function (elapsed) {
projection.rotate([config.speed * elapsed - 120, config.verticalTilt, config.horizontalTilt]);
svg.selectAll("path").attr("d", path);
function drawMarkers() {
const markers = markerGroup.selectAll('circle')
.attr('cx', d => projection([d.longitude, d.latitude])[0])
.attr('cy', d => projection([d.longitude, d.latitude])[1])
.attr('fill', d => {
const coordinate = [d.longitude, d.latitude];
gdistance = d3.geoDistance(coordinate, projection.invert(center));
return gdistance > 1.57 ? 'none' : 'steelblue';
.attr('r', 7);
markerGroup.each(function () {
var net = require('net');
var path = require('path');
const express = require('express');
const bodyParser = require('body-parser');
const app = express();
const port = 3141;
const fetch = require("node-fetch");
const qs = require("querystring");
const QueryHOST = "http://localhost:9000";
const client = new net.Socket();
const HOST = "localhost";
const PORT = 9009;
var msgcount = 0;
var pastData;
const apiKey = "HAVmIw.5TE_Cg:FgBaavXIOJCIElZY";
var Ably = require('ably');
var ably = new Ably.Realtime({
key: apiKey
var channel = ably.channels.get('geo-ip');
function run() {
client.connect(PORT, HOST, () => {
console.log("Connected to DB")
client.on("data", function (data) {
console.log("Received: " + data)
client.on("close", function () {
console.log("Connection closed")
JSON.flatten = (function (isArray, wrapped) {
return function (table) {
return reduce("", {}, table);
function reduce(path, accumulator, table) {
if (isArray(table)) {
var length = table.length;
if (length) {
var index = 0;
while (index < length) {
var property = path + "[" + index + "]", item = table[index++];
if (wrapped(item) !== item) accumulator[property] = item;
else reduce(property, accumulator, item);
} else accumulator[path] = table;
} else {
var empty = true;
if (path) {
for (var property in table) {
var item = table[property], property = path + "_" + property, empty = false;
if (wrapped(item) !== item) accumulator[property] = item;
else reduce(property, accumulator, item);
} else {
for (var property in table) {
var item = table[property], empty = false;
if (wrapped(item) !== item) accumulator[property] = item;
else reduce(property, accumulator, item);
if (empty) accumulator[path] = table;
return accumulator;
}(Array.isArray, Object));
extended: true,
);'/', (request, response) => {
// console.log(request.body);
// for ( var i in request.body['items']){
console.log( "Message number " + msgcount +":");
var string_to_send = "allMsgs1,mytag=\"apikey\" ";
string_to_send = Write_to_tcp_socket(request.body, string_to_send);
// };
app.get('/', function(req, res) {
res.sendFile(path.join(__dirname + '/index.html'));
function Write_to_tcp_socket(msg, string_to_send) {
var flat = JSON.flatten(msg);
for (var key in flat) {
if (!key.includes("toString") && !key.includes("toJSON") && !key.includes("timestamp") && !(typeof flat[key] === 'undefined')) {
string_to_send = string_to_send + key + "=\"" + flat[key] + "\",";
var editedText = string_to_send.slice(0, -1) + "\n"; //+ msg.timestamp
let result = editedText.replace(/-/g, '');;
// console.log(result);
return string_to_send;
setInterval( async function () {
try {
const queryData = {
query: "SELECT DISTINCT data_connectionSource_xgeoip from metaConnectionLifecycleLog;",
const response = await fetch(`${QueryHOST}/exec?${qs.encode(queryData)}`);
const json = await response.json();
// console.log(pastData);
// console.log(json.dataset);
if(json.dataset[json.dataset.length - 1][0] !== pastData){
channel.publish('IPS', json.dataset);
pastData = json.dataset[json.dataset.length - 1][0];
} catch (error) {
}, 2000);
app.listen(port, () => {
console.log(`App running on port ${port}.`)
let metaChannel = log_from_channel("[meta]connection.lifecycle", "my_app_name", "metaConnectionLifecycleLog");
let LogChannel = log_from_channel("[meta]log", "my_app_name","metaLog");
let PushLogChannel = log_from_channel("[meta]log:push", "my_app_name","metaPushLog");
let LifeCycleChannel = log_from_channel("[meta]channel.lifecycle", "my_app_name", "metaChannelLifecycleLog");
function log_from_channel(channel_name, App_Name, table_name) {
var this_channel = ably.channels.get(channel_name);
this_channel.subscribe(function (msg) {
var string_to_send = table_name + ",App_Name=\"" + App_Name + "\" ";
string_to_send = Write_to_tcp_socket(msg, string_to_send);
return this_channel;
function Write_to_tcp_socket(msg, string_to_send) {
var flat = JSON.flatten(msg);
for (var key in flat) {
if (!key.includes("toString") && !key.includes("toJSON") && !key.includes("timestamp") && !(typeof flat[key] === 'undefined')) {
string_to_send = string_to_send + key + "=\"" + flat[key] + "\",";
var editedText = string_to_send.slice(0, -1) + "\n"; //+ msg.timestamp
let result = editedText.replace(/-/g, '');;
return string_to_send;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment