Skip to content

Instantly share code, notes, and snippets.

@dbrownidau
Last active September 8, 2021 05:14
Show Gist options
  • Save dbrownidau/69c94ce070334cdb22277b38ff676e1e to your computer and use it in GitHub Desktop.
Save dbrownidau/69c94ce070334cdb22277b38ff676e1e to your computer and use it in GitHub Desktop.
cute script to feed elasticsearch
#!/usr/bin/env python3
from datetime import datetime
import json
import websocket
import time
from elasticsearch import Elasticsearch
from queue import Queue
from threading import Thread
import threading
es = Elasticsearch(['192.168.1.24'])
index = 'rislive'
def monq(q):
print('Monitoring queue..')
while True:
state = q.qsize()
time.sleep(0.300)
delta = q.qsize() - state
print('delta:', delta)
if(delta > 200):
print('queue delta > 200 - start additional workers?')
print('qsize', q.qsize())
worker = Thread(target=do_es, args=(q,))
worker.setDaemon(True)
worker.start()
def do_es(q):
print('queue worker starting..')
while not q.empty():
d = json.loads(q.get())['data']
d['timestamp'] = datetime.utcfromtimestamp(d['timestamp']).isoformat()
es.index(index=index, body=d)
q.task_done()
print('queue empty, exiting')
q = Queue(maxsize=0)
worker = Thread(target=monq, args=(q,))
worker.setDaemon(True)
worker.start()
ws = websocket.WebSocket()
ws.connect("wss://ris-live.ripe.net/v1/ws/?client=damo")
params = {
"moreSpecific": True,
"host": "rrc00",
"socketOptions": {
"includeRaw": False
}
}
ws.send(json.dumps({
"type": "ris_subscribe",
"data": params
}))
for data in ws:
parsed = json.loads(data)
if not 'ris_message' in parsed["type"]:
print(parsed["type"], parsed["data"])
q.put(data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment