Skip to content

Instantly share code, notes, and snippets.

@sdstrowes
Last active May 17, 2019 17:31
Show Gist options
  • Save sdstrowes/199630247d9b631f96970f5e8bc5c95d to your computer and use it in GitHub Desktop.
Save sdstrowes/199630247d9b631f96970f5e8bc5c95d to your computer and use it in GitHub Desktop.
from ripe.atlas.cousteau import AtlasStream
import dns.message
import base64
import json
import sys
def decode_qbuf_abuf(result):
if "qbuf" in result:
try:
dnsmsg = dns.message.from_wire(base64.b64decode(result["qbuf"]))
print ""
print " -- Query Buffer -- "
print dnsmsg
except:
print "Error decoding qbuf from "+str(result)
return
try:
dnsmsg = dns.message.from_wire(base64.b64decode(result["result"]["abuf"]))
print ""
print " -- Answer Buffer --"
print dnsmsg
except:
print "Error decoding abuf from "+str(result)
return
def on_result_response(*args):
print ""
print "---------------------------"
print json.dumps(args[0], indent=2)
data = args[0]
if data["type"] != "dns":
return
if "resultset" in data:
for result in data["resultset"]:
if "result" not in result:
return
decode_qbuf_abuf(result)
if "result" in data:
decode_qbuf_abuf(data["result"])
def main():
atlas_stream = AtlasStream()
atlas_stream.connect()
channel = "atlas_result"
atlas_stream.bind_channel(channel, on_result_response)
stream_parameters = {"type": "dns", "destinationAddress": "8.8.8.8"}
atlas_stream.start_stream(stream_type="result", **stream_parameters)
atlas_stream.timeout()
atlas_stream.disconnect()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment