Last active
July 21, 2020 04:31
-
-
Save xopr/a58bb9c83ad458a995c382f638b47fd5 to your computer and use it in GitHub Desktop.
Spacephone.org NAPTR to CSV/JSON/VCF phonebook generator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
#pip-3 install dnspython | |
import dns.resolver | |
import dns.e164 | |
import re | |
import sys | |
import optparse | |
prefixes = [ "+319799" ] # Currently, only the NL number prefix is added | |
ranges = [ range( 0, 99+1 ) ] # range 0 to 99 inclusive | |
productname = "spacephonebook v0.1" | |
# Python 3 basestring type replacement | |
try: | |
basestring | |
except NameError: | |
basestring = str | |
# e164 lookup | |
def lookup(number, domains, resolver=None): | |
"""Look for NAPTR RRs for the specified number in the specified domains. | |
e.g. lookup('16505551212', ['e164.dnspython.org.', 'e164.arpa.']) | |
""" | |
if resolver is None: | |
resolver = dns.resolver.get_default_resolver() | |
for domain in domains: | |
if isinstance(domain, basestring): | |
domain = dns.name.from_text(domain) | |
qname = dns.e164.from_e164(number, domain) | |
try: | |
return resolver.query(qname, 'NAPTR') | |
#except dns.resolver.NXDOMAIN: | |
except (Exception): | |
pass | |
return None | |
#raise dns.resolver.NXDOMAIN | |
if __name__ == "__main__": | |
parser = optparse.OptionParser() | |
parser.add_option( "-c", "--csv", dest="csv", default=None, help="Comma Seperated Values output" ) | |
parser.add_option( "-j", "--json", dest="json", default=None, help="JavaScript Object Notation output" ) | |
parser.add_option( "-v", "--vcf", dest="vcard", default=None, help="VCARD output" ) | |
parser.add_option( "-q", "--quiet", dest="quiet", default=False, action="store_true", help="don't output to screen" ) | |
(opts, args) = parser.parse_args() | |
if not opts.quiet: | |
print( "Querying, please wait (up to two minutes)", file=sys.stderr, end=".", flush=True ) | |
spacephonebook=[] | |
for i in range(len(prefixes)): | |
digits = len(str(abs(ranges[i].stop - 1))) | |
for r in ranges[i]: | |
spacephoneinfo = { "spacephonenumber": prefixes[i]+("{:0>"+str(digits)+"d}").format( r ) } | |
# Cheesy progress indicator | |
if not opts.quiet: | |
print( "", file=sys.stderr, end=".", flush=True ) | |
answers = lookup( spacephoneinfo["spacephonenumber"], ["e164.spacephone.org"] ) | |
if answers == None: | |
continue | |
for rdata in answers: | |
#print( rdata ) # 10 10 "u" "E2U+cnam" "!^.*$!data:,ACKspace!" ackspace.nl. | |
flag = rdata.flags.decode().lower() | |
service = rdata.service.decode() | |
# u: SIP URN or s: SRV record | |
if flag == "u": | |
# TODO: keep order/preference in mind (failover mechanism) | |
# TODO: Future considerations: | |
# service E2U+pstn:tel | |
# service E2U+pstn:cnam:data | |
#data:application/cnam,Francois%20Marie20%Arouet! | |
_, pattern, replace, _ = rdata.regexp.decode().split( "!" ) | |
replacement = re.sub(pattern, replace, spacephoneinfo["spacephonenumber"]) | |
if service == "E2U+sip" or service == "sip+E2U": # Latter is deprecated | |
spacephoneinfo[ "sip" ] = replacement | |
elif service == "E2U+cnam": | |
spacephoneinfo[ "name" ] = replacement.split( ",", 1 )[1] | |
elif service == "E2U+web:http": | |
spacephoneinfo[ "uri" ] = replacement | |
spacephonebook.append( spacephoneinfo ) | |
# Output to screen first | |
if not opts.quiet: | |
print( "\nResults:", file=sys.stderr ) | |
for entry in spacephonebook: | |
for key in entry: | |
print( f"{key:<16} {entry[key]}") | |
print( "" ) | |
if opts.csv: | |
import csv # lazy load to prevent ModuleNotFoundError | |
with open( opts.csv, 'w') as fp: | |
writer = csv.writer(fp) | |
writer = csv.DictWriter(fp, spacephonebook[0].keys()) | |
writer.writeheader() | |
for entry in spacephonebook: | |
writer.writerow(entry) | |
if not opts.quiet: | |
print( "Written to " + opts.csv, file=sys.stderr ) | |
if opts.json: | |
import json # lazy load to prevent ModuleNotFoundError | |
with open( opts.json, 'w') as fp: | |
json.dump(spacephonebook, fp, sort_keys=True, ensure_ascii=False, indent=4) | |
if not opts.quiet: | |
print( "Written to " + opts.json, file=sys.stderr ) | |
if opts.vcard: | |
#pip-3 install vobject | |
import vobject # lazy load to prevent ModuleNotFoundError | |
with open( opts.vcard, 'w', newline='') as fp: | |
for entry in spacephonebook: | |
vcard = vobject.vCard() | |
vcard.add( "n" ).value = vobject.vcard.Name( family="", given=entry["name"] ) | |
vcard.add( "fn" ).value = entry["name"] | |
#j.email.type_param = 'INTERNET' | |
sip = vcard.add( "tel" ) | |
sip.value = entry["sip"] | |
sip.type_param = ["PREF", "VOICE"] | |
sip = vcard.add( "tel" ) | |
sip.value = entry["spacephonenumber"] | |
sip.type_param = "VOICE" | |
vcard.add( "url" ).value = entry["uri"] | |
# Add tag to indicate this is generated | |
vcard.add( "prodid" ).value = productname | |
fp.write( vcard.serialize() ) | |
if not opts.quiet: | |
print( "Written to " + opts.vcard, file=sys.stderr ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment