-
-
Save thisisjofrank/2d0513a28425e64dcad3bfc8cefb5d72 to your computer and use it in GitHub Desktop.
Logging from Ably into QuestDB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<!DOCTYPE html> | |
<html lang="en"> | |
<head> | |
<title>Ably + QuestDB Demo</title> | |
</head> | |
<body> | |
<svg></svg> | |
<script src="https://d3js.org/d3.v4.min.js"></script> | |
<script src="https://d3js.org/topojson.v1.min.js"></script> | |
<script src="https://cdn.ably.io/lib/ably.min-1.js"></script> | |
<script src="https://cdn.ably.io/lib/vcdiff-decoder.min-1.js"></script> | |
<script> | |
const width = 960; | |
const height = 500; | |
const config = { | |
speed: 0.005, | |
verticalTilt: -30, | |
horizontalTilt: 0 | |
} | |
var ably = new Ably.Realtime({key:'HAVmIw.5TE_Cg:FgBaavXIOJCIElZY', plugins: { | |
vcdiffDecoder: vcdiffDecoder | |
}}); | |
var channel = ably.channels.get('[?rewind=1]geo-ip', { | |
delta: 'vcdiff' | |
}); | |
let locations = []; | |
channel.subscribe(function(msg) { | |
msg.data.forEach(function(number) { | |
try{ | |
let locationArry = number[0].split(","); | |
locations.push({'latitude': parseFloat(locationArry[2]), 'longitude': parseFloat(locationArry[3])}); | |
} | |
catch(err){ | |
} | |
console.log(locations); | |
}); | |
}); | |
const svg = d3.select('svg') | |
.attr('width', width).attr('height', height).call(d3.zoom().on("zoom", function () { | |
svg.attr("transform", d3.event.transform) | |
})); | |
const markerGroup = svg.append('g'); | |
const projection = d3.geoOrthographic(); | |
const initialScale = projection.scale(); | |
const path = d3.geoPath().projection(projection); | |
const center = [width/2, height/2]; | |
drawGlobe(); | |
drawGraticule(); | |
enableRotation(); | |
function drawGlobe() { | |
d3.queue() | |
.defer(d3.json, 'https://gist.githubusercontent.com/mbostock/4090846/raw/d534aba169207548a8a3d670c9c2cc719ff05c47/world-110m.json') | |
.defer(d3.json, 'https://gist.githubusercontent.com/Ugbot/6f8921f0d251d4f1b95597d066dbf777/raw/a282d3e0ee6650860fdea1ce6da2b5a4335bccb2/locations.json') | |
.await((error, worldData, locationData) => { | |
svg.selectAll(".segment") | |
.data(topojson.feature(worldData, worldData.objects.countries).features) | |
.enter().append("path") | |
.attr("class", "segment") | |
.attr("d", path) | |
.style("stroke", "#888") | |
.style("stroke-width", "1px") | |
.style("fill", (d, i) => '#e5e5e5') | |
.style("opacity", ".6"); | |
//locations = locationData; | |
drawMarkers(); | |
}); | |
} | |
function drawGraticule() { | |
const graticule = d3.geoGraticule() | |
.step([10, 10]); | |
svg.append("path") | |
.datum(graticule) | |
.attr("class", "graticule") | |
.attr("d", path) | |
.style("fill", "#fff") | |
.style("stroke", "#ccc"); | |
} | |
function enableRotation() { | |
d3.timer(function (elapsed) { | |
projection.rotate([config.speed * elapsed - 120, config.verticalTilt, config.horizontalTilt]); | |
svg.selectAll("path").attr("d", path); | |
drawMarkers(); | |
}); | |
} | |
function drawMarkers() { | |
const markers = markerGroup.selectAll('circle') | |
.data(locations); | |
markers | |
.enter() | |
.append('circle') | |
.merge(markers) | |
.attr('cx', d => projection([d.longitude, d.latitude])[0]) | |
.attr('cy', d => projection([d.longitude, d.latitude])[1]) | |
.attr('fill', d => { | |
const coordinate = [d.longitude, d.latitude]; | |
gdistance = d3.geoDistance(coordinate, projection.invert(center)); | |
return gdistance > 1.57 ? 'none' : 'steelblue'; | |
}) | |
.attr('r', 7); | |
markerGroup.each(function () { | |
this.parentNode.appendChild(this); | |
}); | |
} | |
</script> | |
</body> | |
</html> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var net = require('net'); | |
var path = require('path'); | |
const express = require('express'); | |
const bodyParser = require('body-parser'); | |
const app = express(); | |
const port = 3141; | |
const fetch = require("node-fetch"); | |
const qs = require("querystring"); | |
const QueryHOST = "http://localhost:9000"; | |
const client = new net.Socket(); | |
const HOST = "localhost"; | |
const PORT = 9009; | |
var msgcount = 0; | |
var pastData; | |
const apiKey = "HAVmIw.5TE_Cg:FgBaavXIOJCIElZY"; | |
var Ably = require('ably'); | |
var ably = new Ably.Realtime({ | |
key: apiKey | |
}); | |
var channel = ably.channels.get('geo-ip'); | |
function run() { | |
client.connect(PORT, HOST, () => { | |
console.log("Connected to DB") | |
}) | |
client.on("data", function (data) { | |
console.log("Received: " + data) | |
}) | |
client.on("close", function () { | |
console.log("Connection closed") | |
}) | |
} | |
run() | |
JSON.flatten = (function (isArray, wrapped) { | |
return function (table) { | |
return reduce("", {}, table); | |
}; | |
function reduce(path, accumulator, table) { | |
if (isArray(table)) { | |
var length = table.length; | |
if (length) { | |
var index = 0; | |
while (index < length) { | |
var property = path + "[" + index + "]", item = table[index++]; | |
if (wrapped(item) !== item) accumulator[property] = item; | |
else reduce(property, accumulator, item); | |
} | |
} else accumulator[path] = table; | |
} else { | |
var empty = true; | |
if (path) { | |
for (var property in table) { | |
var item = table[property], property = path + "_" + property, empty = false; | |
if (wrapped(item) !== item) accumulator[property] = item; | |
else reduce(property, accumulator, item); | |
} | |
} else { | |
for (var property in table) { | |
var item = table[property], empty = false; | |
if (wrapped(item) !== item) accumulator[property] = item; | |
else reduce(property, accumulator, item); | |
} | |
} | |
if (empty) accumulator[path] = table; | |
} | |
return accumulator; | |
} | |
}(Array.isArray, Object)); | |
app.use(express.static(path.resolve('./public'))); | |
app.use(bodyParser.json()); | |
app.use( | |
bodyParser.urlencoded({ | |
extended: true, | |
}) | |
); | |
app.post('/', (request, response) => { | |
// console.log(request.body); | |
// for ( var i in request.body['items']){ | |
console.log( "Message number " + msgcount +":"); | |
var string_to_send = "allMsgs1,mytag=\"apikey\" "; | |
string_to_send = Write_to_tcp_socket(request.body, string_to_send); | |
// }; | |
}) | |
app.get('/', function(req, res) { | |
res.sendFile(path.join(__dirname + '/index.html')); | |
}); | |
function Write_to_tcp_socket(msg, string_to_send) { | |
var flat = JSON.flatten(msg); | |
for (var key in flat) { | |
if (!key.includes("toString") && !key.includes("toJSON") && !key.includes("timestamp") && !(typeof flat[key] === 'undefined')) { | |
string_to_send = string_to_send + key + "=\"" + flat[key] + "\","; | |
} | |
} | |
var editedText = string_to_send.slice(0, -1) + "\n"; //+ msg.timestamp | |
//console.log(editedText); | |
let result = editedText.replace(/-/g, '');; | |
client.write(result); | |
// console.log(result); | |
console.log("*************"); | |
return string_to_send; | |
} | |
setInterval( async function () { | |
try { | |
const queryData = { | |
query: "SELECT DISTINCT data_connectionSource_xgeoip from metaConnectionLifecycleLog;", | |
} | |
const response = await fetch(`${QueryHOST}/exec?${qs.encode(queryData)}`); | |
const json = await response.json(); | |
// console.log(pastData); | |
// console.log(json.dataset); | |
if(json.dataset[json.dataset.length - 1][0] !== pastData){ | |
channel.publish('IPS', json.dataset); | |
pastData = json.dataset[json.dataset.length - 1][0]; | |
} | |
} catch (error) { | |
console.log(error) | |
} | |
}, 2000); | |
app.listen(port, () => { | |
console.log(`App running on port ${port}.`) | |
}) | |
let metaChannel = log_from_channel("[meta]connection.lifecycle", "my_app_name", "metaConnectionLifecycleLog"); | |
let LogChannel = log_from_channel("[meta]log", "my_app_name","metaLog"); | |
let PushLogChannel = log_from_channel("[meta]log:push", "my_app_name","metaPushLog"); | |
let LifeCycleChannel = log_from_channel("[meta]channel.lifecycle", "my_app_name", "metaChannelLifecycleLog"); | |
function log_from_channel(channel_name, App_Name, table_name) { | |
var this_channel = ably.channels.get(channel_name); | |
this_channel.subscribe(function (msg) { | |
var string_to_send = table_name + ",App_Name=\"" + App_Name + "\" "; | |
string_to_send = Write_to_tcp_socket(msg, string_to_send); | |
}); | |
return this_channel; | |
} | |
function Write_to_tcp_socket(msg, string_to_send) { | |
var flat = JSON.flatten(msg); | |
for (var key in flat) { | |
if (!key.includes("toString") && !key.includes("toJSON") && !key.includes("timestamp") && !(typeof flat[key] === 'undefined')) { | |
string_to_send = string_to_send + key + "=\"" + flat[key] + "\","; | |
} | |
} | |
var editedText = string_to_send.slice(0, -1) + "\n"; //+ msg.timestamp | |
let result = editedText.replace(/-/g, '');; | |
client.write(result); | |
return string_to_send; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment