Skip to content

Instantly share code, notes, and snippets.

@thisisjofrank
Forked from Ugbot/QuestLogger.js
Last active April 21, 2021 16:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thisisjofrank/2d0513a28425e64dcad3bfc8cefb5d72 to your computer and use it in GitHub Desktop.
Save thisisjofrank/2d0513a28425e64dcad3bfc8cefb5d72 to your computer and use it in GitHub Desktop.
Logging from Ably into QuestDB
<!DOCTYPE html>
<html lang="en">
<head>
<title>Ably + QuestDB Demo</title>
</head>
<body>
<svg></svg>
<script src="https://d3js.org/d3.v4.min.js"></script>
<script src="https://d3js.org/topojson.v1.min.js"></script>
<script src="https://cdn.ably.io/lib/ably.min-1.js"></script>
<script src="https://cdn.ably.io/lib/vcdiff-decoder.min-1.js"></script>
<script>
const width = 960;
const height = 500;
const config = {
speed: 0.005,
verticalTilt: -30,
horizontalTilt: 0
}
var ably = new Ably.Realtime({key:'HAVmIw.5TE_Cg:FgBaavXIOJCIElZY', plugins: {
vcdiffDecoder: vcdiffDecoder
}});
var channel = ably.channels.get('[?rewind=1]geo-ip', {
delta: 'vcdiff'
});
let locations = [];
channel.subscribe(function(msg) {
msg.data.forEach(function(number) {
try{
let locationArry = number[0].split(",");
locations.push({'latitude': parseFloat(locationArry[2]), 'longitude': parseFloat(locationArry[3])});
}
catch(err){
}
console.log(locations);
});
});
const svg = d3.select('svg')
.attr('width', width).attr('height', height).call(d3.zoom().on("zoom", function () {
svg.attr("transform", d3.event.transform)
}));
const markerGroup = svg.append('g');
const projection = d3.geoOrthographic();
const initialScale = projection.scale();
const path = d3.geoPath().projection(projection);
const center = [width/2, height/2];
drawGlobe();
drawGraticule();
enableRotation();
function drawGlobe() {
d3.queue()
.defer(d3.json, 'https://gist.githubusercontent.com/mbostock/4090846/raw/d534aba169207548a8a3d670c9c2cc719ff05c47/world-110m.json')
.defer(d3.json, 'https://gist.githubusercontent.com/Ugbot/6f8921f0d251d4f1b95597d066dbf777/raw/a282d3e0ee6650860fdea1ce6da2b5a4335bccb2/locations.json')
.await((error, worldData, locationData) => {
svg.selectAll(".segment")
.data(topojson.feature(worldData, worldData.objects.countries).features)
.enter().append("path")
.attr("class", "segment")
.attr("d", path)
.style("stroke", "#888")
.style("stroke-width", "1px")
.style("fill", (d, i) => '#e5e5e5')
.style("opacity", ".6");
//locations = locationData;
drawMarkers();
});
}
function drawGraticule() {
const graticule = d3.geoGraticule()
.step([10, 10]);
svg.append("path")
.datum(graticule)
.attr("class", "graticule")
.attr("d", path)
.style("fill", "#fff")
.style("stroke", "#ccc");
}
function enableRotation() {
d3.timer(function (elapsed) {
projection.rotate([config.speed * elapsed - 120, config.verticalTilt, config.horizontalTilt]);
svg.selectAll("path").attr("d", path);
drawMarkers();
});
}
function drawMarkers() {
const markers = markerGroup.selectAll('circle')
.data(locations);
markers
.enter()
.append('circle')
.merge(markers)
.attr('cx', d => projection([d.longitude, d.latitude])[0])
.attr('cy', d => projection([d.longitude, d.latitude])[1])
.attr('fill', d => {
const coordinate = [d.longitude, d.latitude];
gdistance = d3.geoDistance(coordinate, projection.invert(center));
return gdistance > 1.57 ? 'none' : 'steelblue';
})
.attr('r', 7);
markerGroup.each(function () {
this.parentNode.appendChild(this);
});
}
</script>
</body>
</html>
var net = require('net');
var path = require('path');
const express = require('express');
const bodyParser = require('body-parser');
const app = express();
const port = 3141;
const fetch = require("node-fetch");
const qs = require("querystring");
const QueryHOST = "http://localhost:9000";
const client = new net.Socket();
const HOST = "localhost";
const PORT = 9009;
var msgcount = 0;
var pastData;
const apiKey = "HAVmIw.5TE_Cg:FgBaavXIOJCIElZY";
var Ably = require('ably');
var ably = new Ably.Realtime({
key: apiKey
});
var channel = ably.channels.get('geo-ip');
function run() {
client.connect(PORT, HOST, () => {
console.log("Connected to DB")
})
client.on("data", function (data) {
console.log("Received: " + data)
})
client.on("close", function () {
console.log("Connection closed")
})
}
run()
JSON.flatten = (function (isArray, wrapped) {
return function (table) {
return reduce("", {}, table);
};
function reduce(path, accumulator, table) {
if (isArray(table)) {
var length = table.length;
if (length) {
var index = 0;
while (index < length) {
var property = path + "[" + index + "]", item = table[index++];
if (wrapped(item) !== item) accumulator[property] = item;
else reduce(property, accumulator, item);
}
} else accumulator[path] = table;
} else {
var empty = true;
if (path) {
for (var property in table) {
var item = table[property], property = path + "_" + property, empty = false;
if (wrapped(item) !== item) accumulator[property] = item;
else reduce(property, accumulator, item);
}
} else {
for (var property in table) {
var item = table[property], empty = false;
if (wrapped(item) !== item) accumulator[property] = item;
else reduce(property, accumulator, item);
}
}
if (empty) accumulator[path] = table;
}
return accumulator;
}
}(Array.isArray, Object));
app.use(express.static(path.resolve('./public')));
app.use(bodyParser.json());
app.use(
bodyParser.urlencoded({
extended: true,
})
);
app.post('/', (request, response) => {
// console.log(request.body);
// for ( var i in request.body['items']){
console.log( "Message number " + msgcount +":");
var string_to_send = "allMsgs1,mytag=\"apikey\" ";
string_to_send = Write_to_tcp_socket(request.body, string_to_send);
// };
})
app.get('/', function(req, res) {
res.sendFile(path.join(__dirname + '/index.html'));
});
function Write_to_tcp_socket(msg, string_to_send) {
var flat = JSON.flatten(msg);
for (var key in flat) {
if (!key.includes("toString") && !key.includes("toJSON") && !key.includes("timestamp") && !(typeof flat[key] === 'undefined')) {
string_to_send = string_to_send + key + "=\"" + flat[key] + "\",";
}
}
var editedText = string_to_send.slice(0, -1) + "\n"; //+ msg.timestamp
//console.log(editedText);
let result = editedText.replace(/-/g, '');;
client.write(result);
// console.log(result);
console.log("*************");
return string_to_send;
}
setInterval( async function () {
try {
const queryData = {
query: "SELECT DISTINCT data_connectionSource_xgeoip from metaConnectionLifecycleLog;",
}
const response = await fetch(`${QueryHOST}/exec?${qs.encode(queryData)}`);
const json = await response.json();
// console.log(pastData);
// console.log(json.dataset);
if(json.dataset[json.dataset.length - 1][0] !== pastData){
channel.publish('IPS', json.dataset);
pastData = json.dataset[json.dataset.length - 1][0];
}
} catch (error) {
console.log(error)
}
}, 2000);
app.listen(port, () => {
console.log(`App running on port ${port}.`)
})
let metaChannel = log_from_channel("[meta]connection.lifecycle", "my_app_name", "metaConnectionLifecycleLog");
let LogChannel = log_from_channel("[meta]log", "my_app_name","metaLog");
let PushLogChannel = log_from_channel("[meta]log:push", "my_app_name","metaPushLog");
let LifeCycleChannel = log_from_channel("[meta]channel.lifecycle", "my_app_name", "metaChannelLifecycleLog");
function log_from_channel(channel_name, App_Name, table_name) {
var this_channel = ably.channels.get(channel_name);
this_channel.subscribe(function (msg) {
var string_to_send = table_name + ",App_Name=\"" + App_Name + "\" ";
string_to_send = Write_to_tcp_socket(msg, string_to_send);
});
return this_channel;
}
function Write_to_tcp_socket(msg, string_to_send) {
var flat = JSON.flatten(msg);
for (var key in flat) {
if (!key.includes("toString") && !key.includes("toJSON") && !key.includes("timestamp") && !(typeof flat[key] === 'undefined')) {
string_to_send = string_to_send + key + "=\"" + flat[key] + "\",";
}
}
var editedText = string_to_send.slice(0, -1) + "\n"; //+ msg.timestamp
let result = editedText.replace(/-/g, '');;
client.write(result);
return string_to_send;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment