Skip to content

Instantly share code, notes, and snippets.

@jimdigriz
Last active November 2, 2023 09:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jimdigriz/6ded4c013c277d0d3e1931980165a5cf to your computer and use it in GitHub Desktop.
Save jimdigriz/6ded4c013c277d0d3e1931980165a5cf to your computer and use it in GitHub Desktop.
dnspython implementation of draft-ietf-dnssd-srp
#!/usr/bin/env python3
# DNS-SD SRP Client implementation (draft 23)
# https://datatracker.ietf.org/doc/draft-ietf-dnssd-srp/
#
# Copyright (C) 2023, coreMem Limited <info@coremem.com>
# SPDX-License-Identifier: ISC
# dependency: pip install dnspython~=2.4.0 cryptography~=41.0.0
from datetime import datetime, timedelta, timezone
import struct
import time
from typing import Any, Dict, Optional
import dns.dnssec
from dns.dnssecalgs.ecdsa import PrivateECDSAP256SHA256
import dns.edns
import dns.exception
import dns.message
from dns.rdtypes.dnskeybase import DNSKEYBase
# SIG has the same format as RRSIG
from dns.rdtypes.ANY.RRSIG import RRSIG as SIG
import dns.query
import dns.update
# RFC2931, section 3.3
FUDGE = 300
# draft-ietf-dnssd-update-lease-08, section 9
LEASE_OPTION_CODE = 2
# draft-ietf-dnssd-update-lease-08, section 8
LEASE_TO_KEYLEASE = 7
LEASE_MIN = 30
LEASE_KEY_MIN = LEASE_MIN
##################################################################
# no support for SIG(0) and using update.additional not possible, places EDNS after)
# https://github.com/rthalley/dnspython/issues/314
#
# clone of original function but we move the call to add_opt to
# be before before adding the queued additional section RRSets
def _message_to_wire(
self,
origin: Optional[dns.name.Name] = None,
max_size: int = 0,
multi: bool = False,
tsig_ctx: Optional[Any] = None,
**kw: Dict[str, Any],
) -> bytes:
"""Return a string containing the message in DNS compressed wire
format.
Additional keyword arguments are passed to the RRset ``to_wire()``
method.
*origin*, a ``dns.name.Name`` or ``None``, the origin to be appended
to any relative names. If ``None``, and the message has an origin
attribute that is not ``None``, then it will be used.
*max_size*, an ``int``, the maximum size of the wire format
output; default is 0, which means "the message's request
payload, if nonzero, or 65535".
*multi*, a ``bool``, should be set to ``True`` if this message is
part of a multiple message sequence.
*tsig_ctx*, a ``dns.tsig.HMACTSig`` or ``dns.tsig.GSSTSig`` object, the
ongoing TSIG context, used when signing zone transfers.
Raises ``dns.exception.TooBig`` if *max_size* was exceeded.
Returns a ``bytes``.
"""
if origin is None and self.origin is not None:
origin = self.origin
if max_size == 0:
if self.request_payload != 0:
max_size = self.request_payload
else:
max_size = 65535
if max_size < 512:
max_size = 512
elif max_size > 65535:
max_size = 65535
r = dns.renderer.Renderer(self.id, self.flags, max_size, origin)
opt_reserve = self._compute_opt_reserve()
r.reserve(opt_reserve)
tsig_reserve = self._compute_tsig_reserve()
r.reserve(tsig_reserve)
for rrset in self.question:
r.add_question(rrset.name, rrset.rdtype, rrset.rdclass)
for rrset in self.answer:
r.add_rrset(dns.renderer.ANSWER, rrset, **kw)
for rrset in self.authority:
r.add_rrset(dns.renderer.AUTHORITY, rrset, **kw)
# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
if self.opt is not None:
r.add_opt(self.opt, self.pad, opt_reserve, tsig_reserve)
# <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
for rrset in self.additional:
r.add_rrset(dns.renderer.ADDITIONAL, rrset, **kw)
r.release_reserved()
r.write_header()
if self.tsig is not None:
(new_tsig, ctx) = dns.tsig.sign(
r.get_wire(),
self.keyring,
self.tsig[0],
int(time.time()),
self.request_mac,
tsig_ctx,
multi,
)
self.tsig.clear()
self.tsig.add(new_tsig)
r.add_rrset(dns.renderer.ADDITIONAL, self.tsig)
r.write_header()
if multi:
self.tsig_ctx = ctx
return r.get_wire()
dns.message.Message.to_wire = _message_to_wire
##################################################################
# KEY has the same format as DNSKEY
class KEY(DNSKEYBase):
"""KEY record"""
class LEASE(dns.edns.Option):
def __init__(self, otype, lease=None, keylease=None):
super().__init__(otype)
if lease is None:
raise ValueError('lease needs a value')
if lease < LEASE_MIN:
raise ValueError(f'lease time too short, minimum allowed is {LEASE_MIN} seconds')
if keylease is not None and keylease < LEASE_KEY_MIN:
raise ValueError(f'keylease time too short, minimum allowed is {LEASE_KEY_MIN} seconds')
if lease > 0 and keylease == 0:
raise ValueError('expiring the key but not the lease makes no sense')
self.lease = lease
self.keylease = keylease
def to_text(self):
return f'LEASE lease={self.lease} keylease={self.keylease}'
def to_wire(self, file=None):
value = struct.pack('!I', self.lease)
if self.keylease is not None:
value += struct.pack('!I', self.keylease)
if file is not None:
file.write(value)
return None
return value
@classmethod
def from_wire_parser(cls, otype, parser):
if parser.remaining() < 4:
raise dns.exception.FormError
lease = parser.get_uint32()
keylease = None
if parser.remaining() == 4:
keylease = parser.get_uint32()
if parser.remaining() != 0:
raise dns.exception.FormError
return cls(otype, lease, keylease)
dns.edns.register_type(LEASE, LEASE_OPTION_CODE)
update = dns.update.Update('example.com.')
update.delete('server')
update.add('server', 300, 'A', '192.0.2.1')
try:
with open('key.pem', 'rb') as keyfile:
private_key = PrivateECDSAP256SHA256.from_pem(keyfile.read())
except FileNotFoundError:
private_key = PrivateECDSAP256SHA256.generate()
with open('key.pem', 'wb') as keyfile:
keyfile.write(private_key.to_pem())
signer = dns.name.from_text('server.example.com.')
# dnssec-keygen -T KEY -a ECDSAP256SHA256 -n HOST -s 1 server.example.com
key = KEY(
rdclass='IN',
rdtype='KEY',
# RFC2535, section 3.1.2
# RFC2137, section 3.1.3
# https://github.com/Abhayakara/mdnsresponder/blob/main/ServiceRegistration/towire.c:dns_rdata_key_to_wire()
# https://github.com/openthread/openthread/blob/main/src/core/net/srp_client.cpp:Client::AppendKeyRecord()
flags=(0b00 << 14) + (0b0 << 13) + (0b0 << 12) + (0b0 << 11) + (0b0 << 10) + (0b10 << 8) + (0b0 << 7) + (0b0 << 6) + (0b0 << 5) + (0b0 << 4) + (0b0001 << 0),
# RFC2535, section 3.1.3
# RFC3445, section 4
protocol=3,
algorithm='ECDSAP256SHA256',
key=private_key.public_key().encode_key_bytes(),
)
# uncomment to obtain a KEY RR suitable for adding to your zone file
#print(key)
update.add('server', 300, key)
update.add('Example Test Server._x-test._tcp', 300, 'SRV', '0 0 1234 server')
update.add('Example Test Server._x-test._tcp', 300, 'TXT', '"txtvers=1" "test=valid" "eggs=ham"')
update.add('_x-test._tcp', 300, 'PTR', 'Example\\032Test\\032Server._x-test._tcp')
update.use_edns(edns=True, options=[
LEASE(LEASE_OPTION_CODE, lease=300, keylease=3600)
])
# RFC 2931, section 3.1
# https://github.com/mikepultz/netdns2/blob/master/Net/DNS2/RR/SIG.php:rrGet()
now = datetime.now(tz=timezone.utc)
sig0 = SIG(
rdclass='ANY',
rdtype='SIG',
type_covered=0,
algorithm=key.algorithm,
labels=0,
original_ttl=0,
expiration=int((now + timedelta(seconds=FUDGE)).timestamp()),
inception=int((now - timedelta(seconds=FUDGE)).timestamp()),
key_tag=dns.dnssec.key_id(key),
signer=signer,
signature=b'',
)
data = sig0.to_wire() + update.to_wire()
signature = private_key.sign(data)
sig = SIG(
rdclass='ANY',
rdtype='SIG',
type_covered=0,
algorithm=key.algorithm,
labels=0,
original_ttl=0,
expiration=int((now + timedelta(seconds=FUDGE)).timestamp()),
inception=int((now - timedelta(seconds=FUDGE)).timestamp()),
key_tag=dns.dnssec.key_id(key),
signer=signer,
signature=signature,
)
update.additional.append(dns.rrset.from_rdata('.', 0, sig))
response = dns.query.udp(update, '127.0.0.1', port=55353, timeout=3)
#response = dns.query.tcp(update, '127.0.0.1', port=55353, timeout=3)
print(response)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment