Skip to content

Instantly share code, notes, and snippets.

@RouNNdeL
Created August 20, 2018 17:21
Show Gist options
  • Save RouNNdeL/2d0e4ae94a6a204235551d4306642a6b to your computer and use it in GitHub Desktop.
Save RouNNdeL/2d0e4ae94a6a204235551d4306642a6b to your computer and use it in GitHub Desktop.
Steam services ping netdata python.d plugin. Uses NetworkDatagramConfig.json from SteamDatabase to fetch the server IPs
#!/usr/bin/python
from bases.FrameworkServices.SimpleService import SimpleService
import os
import json
import ipaddress
import random
import subprocess
import re
FPING_REGEX = r"(\S*)\s+: ([\d.]+|-)"
def get_all_ips(network_list):
ips = []
for net in network_list:
try:
for host in list(ipaddress.ip_network(net).hosts()):
ips.append(host.exploded)
except ValueError:
continue
return ips
class Service(SimpleService):
def __init__(self, configuration=None, name=None):
SimpleService.__init__(self, configuration=configuration,
name=name)
server_list = configuration.get('server_list')
self.path = configuration.get('path')
with open(self.path) as f:
pops = json.load(f)['pops']
if server_list is None:
used_servers = list(pops.keys())
used_servers.sort()
else:
used_servers = server_list.split(' ')
all_ips = {}
server_by_ip = {}
lines = []
for server in used_servers:
if pops.get(server) is not None \
and pops[server].get('service_address_ranges') is not None:
ips = get_all_ips(pops[server]['service_address_ranges'])
if len(ips):
ip = random.choice(ips)
server_by_ip[ip] = server
all_ips[server] = ips
lines.append([server, server, 'absolute'])
self.all_ips = all_ips
self.server_by_ip = server_by_ip
self.order = ['ping']
self.definitions = {'ping': {'options': [
None,
'Ping measurement',
'ms',
'responses',
'steam.ping',
'line',
], 'lines': lines}}
self.configuration = configuration
def check(self):
if not os.access(self.path, os.R_OK):
self.error('{file} not readable or not exist'.format(file=self.path))
return False
if not os.path.getsize(self.path):
self.error('{file} is empty'.format(file=self.path))
return False
return True
def _get_data(self):
data = {}
command = "fping -q -C 1"
for ip in list(self.server_by_ip.keys()):
command += " "+ip
# For some reason fping output the summed up data to stderr
(_, response) = subprocess.Popen(command.split(' '),
stderr=subprocess.PIPE).communicate()
for line in response.split('\n'):
match = re.search(FPING_REGEX, line)
if match is not None:
ip = match.group(1)
server = self.server_by_ip.get(ip)
try:
data[server] = float(match.group(2))
except ValueError:
while True:
new = random.choice(self.all_ips[server])
if new != ip:
break
self.server_by_ip.pop(ip)
self.server_by_ip[new] = server
data[server] = None
return data
steam:
name: "main ping"
server_list: "waw mad lux sto vie"
path: "./NetworkDatagramConfig.json"
update_every: 5
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment