Skip to content

Instantly share code, notes, and snippets.

@oskar456
Created September 28, 2017 10:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save oskar456/fc02e4e13c076a2dfca00633fe02c397 to your computer and use it in GitHub Desktop.
Save oskar456/fc02e4e13c076a2dfca00633fe02c397 to your computer and use it in GitHub Desktop.
RIPE Atlas tools custom DNS renderer with compact output (1 result per line)
# Copyright (c) 2017 Ondrej Caletka <ondrej@caletka.cz>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from tzlocal import get_localzone
from ripe.atlas.tools.helpers.colours import colourise
from ripe.atlas.tools.helpers.sanitisers import sanitise
from ripe.atlas.tools.renderers.base import Renderer as BaseRenderer
class Renderer(BaseRenderer):
RENDERS = [BaseRenderer.TYPE_DNS]
TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
ANSWER_COLORS = ['cyan', 'blue']
def on_result(self, result):
created = result.created.astimezone(get_localzone())
probe_id = result.probe_id
r = []
if result.responses:
for response in result.responses:
r.append(self.get_formatted_response(probe_id,
created, response))
else:
r.append("{}{}\n".format(self.get_header(probe_id),
colourise("No response found", "red"),
))
return "\n".join(r)
@staticmethod
def get_header(probe_id):
return "Probe {0:>6}: ".format("#{}".format(probe_id))
@classmethod
def get_formatted_response(cls, probe_id, created, response):
s = []
answers = ""
if response.abuf:
header_flags = []
for flag in ("aa", "ad", "cd", "qr", "ra", "rd",):
if getattr(response.abuf.header, flag):
header_flags.append(flag)
s.append(created.strftime(cls.TIME_FORMAT))
s.append(response.abuf.header.return_code)
s.append(" ".join(header_flags))
answers = " {}".format(cls.print_answers(response.abuf.answers))
else:
s.append("No abuf found")
if response.is_error or not response.abuf:
color = "red"
elif len(response.abuf.answers) == 0:
color = "yellow"
else:
color = "green"
status = colourise(" ".join(s), color)
return "". join([cls.get_header(probe_id), status, answers, "\n"])
@staticmethod
def get_rrdata(data):
"""
Return RRdata in condensed text form.
"""
if not data:
return ""
# It's too complicated to override __str__ method of all Answer
# classes of Sagan so let's compress it text-wise instead
r = str(data).split()
r.pop(2) # get rid of the class
return sanitise(" ".join(r))
@classmethod
def print_answers(cls, data):
"""
Return list of colourised condensed textual RRdata.
"""
r = []
for record, color in zip(data, cls.ANSWER_COLORS*len(data)):
r.append(colourise(cls.get_rrdata(record), color))
return "; ".join(r)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment