Skip to content

Instantly share code, notes, and snippets.

@SAPikachu
Created April 26, 2016 10:31
Show Gist options
  • Save SAPikachu/c515af0596bcaa6ba60c4dc81ea2848a to your computer and use it in GitHub Desktop.
Save SAPikachu/c515af0596bcaa6ba60c4dc81ea2848a to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import os
import time
import math
from queue import Queue, Empty
from threading import Thread
import logging
from influxdb import InfluxDBClient
from influxdb.exceptions import InfluxDBClientError
from rg_etcd import RgRedis, RgEtcd
NS = 1000000000
class TimestampSnapper(object):
def __init__(self):
self.epoch = None
def snap(self, ts):
if not self.epoch:
self.epoch = ts
diff = 0
else:
diff = int(round(ts - self.epoch))
return (int(self.epoch) + diff) * NS
class VpnElectorInfluxDbWriter(object):
def __init__(self):
self._config = {
"host": "",
"port": 8086,
"username": "",
"password": "",
"database": "vpnelector",
"prefix": "",
"batchtime": 1,
"timeout": 5,
"retryinterval": 5,
}
self._queue = Queue()
self._snappers = {}
self._thread = None
self._close_signal = object()
self._running = False
self._client = None
self._write_token = None
self._create_loggers()
def _upload_thread(self):
while True:
points = []
deadline = None
while True:
timeout = None
if points and not deadline:
deadline = time.clock() + self._config["batchtime"]
if deadline:
timeout = deadline - time.clock()
if timeout <= 0:
break
try:
next_object = self._queue.get(timeout=timeout)
except Empty:
break
if next_object is self._close_signal:
if points:
# Upload queued points before exiting
self._queue.put(next_object)
break
return
points.extend(self._build_points(next_object))
assert points
while True:
retry = False
msg = None
try:
self._client.write_points(points)
except InfluxDBClientError as e:
retry = 500 <= int(e.code) <= 599
msg = (
"InfluxDBClientError: %s, %s" %
(e.code, e.content,)
)
except Exception as e:
retry = True
msg = (
"Error while uploading points: %s " %
(e,)
)
if msg:
if retry and self._running:
msg += " (will retry after %s seconds)" % (
self._config["retryinterval"],
)
self.error(msg)
if not self._running:
return
if retry:
time.sleep(self._config["retryinterval"])
continue
break
def _create_loggers(self):
log = logging.getLogger("vpnmetrics")
for x in ("error", "warning", "info", "debug"):
setattr(self, x, getattr(log, x))
def _handle_config(self, key, value):
sanitized_key = key.lower()
if sanitized_key not in self._config:
self.warning("Unrecognized config entry: %s", key)
return
value_class = self._config[sanitized_key].__class__
try:
value = value_class(value)
except (TypeError, ValueError):
self.error("Invalid config item: %s = %s", key, value)
self._config[sanitized_key] = value
self.debug("Config: %s = %s", sanitized_key, value)
def _validate_config(self):
if not self._config["host"]:
self.error("Config error: Host is not set")
return False
if not self._config["database"]:
self.error("Config error: Database is empty")
return False
if self._config["port"] < 1 or self._config["port"] > 65535:
self.error("Config error: Port is out of range")
return False
return True
def _snap_timestamp(self, ts):
key = "default"
if key not in self._snappers:
self._snappers[key] = TimestampSnapper()
return self._snappers[key].snap(ts)
def _build_points(self, group):
points = []
for name, stats in group["vpns"].items():
name_parts = name.split("-", 1)
if len(name_parts) == 1:
name_parts.append("unknown")
route, transport = name_parts
tags = {
"name": name,
"route": route,
"transport": transport,
}
def _add_point(name, value):
if (
value is None or
math.isnan(value) or
math.isinf(value)
):
return
points.append(self._build_one_point(
name, group["time_unix"], value, tags,
))
_add_point("tx_speed", stats.get("tx_speed"))
_add_point("rx_speed", stats.get("rx_speed"))
_add_point("score", stats.get("score"))
_add_point("ping", stats.get("ping_latest"))
_add_point("packet_loss.10s", stats["packet_loss"]["10s"])
_add_point("packet_loss.1min", stats["packet_loss"]["1min"])
return points
def _build_one_point(self, measurement, time, value, tags):
point = {
"measurement": measurement,
"time": self._snap_timestamp(time),
"fields": {
"value": value,
"original_timestamp": time,
},
"tags": tags,
}
return point
def config_from_env(self):
CONFIG_PREFIX = "VM_INFLUXDB_"
for k, v in os.environ.items():
if not k.startswith(CONFIG_PREFIX):
continue
self._handle_config(k[len(CONFIG_PREFIX):], v.strip())
return self._validate_config()
def run(self):
if not self.config_from_env():
return
self._client = InfluxDBClient(**{
k: v for k, v in self._config.items()
if k in (
"host", "port", "username", "password", "database",
"timeout",
)
})
assert not self._thread
self._running = True
self._queue = Queue()
self._snappers = {}
self._thread = Thread(target=self._upload_thread)
self._thread.start()
def shutdown(self):
if self._thread:
self._running = False
self._queue.put(self._close_signal)
self._thread.join()
self._client = None
def write(self, group):
assert self._running
self._queue.put(group)
def setup_signals():
""" Handle SIGTERM like SIGINT """
import signal
def sighandler(*args, **kwargs):
raise KeyboardInterrupt
signal.signal(signal.SIGTERM, sighandler)
def main():
logging.basicConfig(level=os.getenv("VM_LOG_LEVEL", "INFO"))
setup_signals()
redis = RgRedis(RgEtcd())
info_node = redis.root_node.metrics.vpnelector
writer = VpnElectorInfluxDbWriter()
writer.run()
try:
for group in info_node.subscribe():
writer.write(group)
except KeyboardInterrupt:
logging.info("Exiting normally")
finally:
writer.shutdown()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment