Skip to content

Instantly share code, notes, and snippets.

@ryancdotorg

ryancdotorg/rotate_dkim.py

Last active Nov 17, 2020
Embed
What would you like to do?
Experimental DKIM rotate/revoke/repudiate script for Exim+Route53. I take no responsibility for its use.
#!/usr/bin/env python
import os
import grp
import sys
import stat
import time
import hmac
import boto3
import tempfile
from datetime import datetime, timedelta
from hashlib import md5, sha1, sha256
from Crypto.PublicKey import RSA
from base64 import b64encode as b64e, b64decode as b64d
from binascii import hexlify, unhexlify
DOMAIN = sys.argv[1].rstrip('.')
SECRET = 'PUY_YOUR_RANDOM_SECRET_HERE'
BASEDIR = '/etc/exim4/dkim_keys'
# In the exim "remote_smpt" transport set the following (assuming Debian):
# DKIM_DOMAIN = ${domain:$return_path}
# DKIM_HMAC = PUT_YOUR_RANDOM_SECRET_HERE
# DKIM_DATE = ${substr{0}{8}{$tod_logfile}}
# DKIM_SELECTOR = DKIM_DATE-${substr{0}{23}{${hmac{sha1}{DKIM_HMAC}{DKIM_DATE}}}}
# DKIM_FILE = /etc/exim4/dkim_keys/${lc:DKIM_SELECTOR}_${lc:DKIM_DOMAIN}.key
# DKIM_PRIVATE_KEY = ${if exists{DKIM_FILE}{DKIM_FILE}{0}}
cli = boto3.Session(profile_name='dkim').client('route53')
zone_name = '_domainkey.'+DOMAIN+'.'
zone_list = cli.list_hosted_zones_by_name(DNSName=zone_name, MaxItems='1')
zone = zone_list['HostedZones'][0]
if zone['Name'] != zone_name:
raise Exception('zone %s not found!' % zone_name)
zone_id = zone['Id']
waiter = cli.get_waiter('resource_record_sets_changed')
waiter.config.delay = 15
waiter.config.max_attempts = 20
# This script is designed to work with exim, which as of the time of writing has
# built-in support for hmac, but only using md5 or sha1.
def hmac_sha1_hex(k, m):
return hmac.new(k, m, sha1).hexdigest()
def dkim_pub(rsa):
return 'v=DKIM1;t=s;p=' + b64e(rsa.publickey().exportKey('DER'))
# per RFC6376 empty p= means "revoked", and n= is a notes field
def dkim_priv(rsa):
return 'v=DKIM1;t=s;p=;n=e:%s,p:%s,q:%s' % (
long_to_b64(rsa.e),
long_to_b64(rsa.p),
long_to_b64(rsa.q),
)
# generate selectors that can't be guessed in advance without the key
def date_to_selector(date):
strdate = date.strftime('%Y%m%d')
return strdate + '-' + hmac_sha1_hex(SECRET, strdate)[0:23]
def long_to_b64(l):
a = bytearray()
while l:
a.append(l & 255)
l >>= 8
a.reverse()
return b64e(a)
def format_txt(txt):
# https://aws.amazon.com/premiumsupport/knowledge-center/txtrdatatoolong-error/
return ''.join([ '"%s"' % txt[i:i+255] for i in xrange(0, len(txt), 255) ])
def push_record(selector, txt):
res = cli.change_resource_record_sets(
HostedZoneId = zone_id,
ChangeBatch = {
'Changes': [{
'Action': 'UPSERT',
'ResourceRecordSet': {
'Name': selector+'.'+zone_name,
'Type': 'TXT',
'TTL': 5,
'ResourceRecords': [
{ 'Value': '"%s"' % txt }
]
}
}]
}
)
print 'waiting on record propagation %s' % res['ChangeInfo']['Id']
return waiter.wait(Id=res['ChangeInfo']['Id'])
# as an intermediate step before publishing private parameters, the key is simply revoked
def revoke_key(selector):
dkim = 'v=DKIM1;t=s;p='
res = cli.list_resource_record_sets(
HostedZoneId = zone_id,
StartRecordName = selector+'.'+zone_name,
StartRecordType = 'TXT',
MaxItems = '1'
)
val = res['ResourceRecordSets'][0]['ResourceRecords'][0]['Value']
if val != '"%s"' % dkim:
print selector
print 'push dkim revocation'
push_record(selector, dkim)
# publish private parameters, allowing signatures on old mail to be forged
def repudiate_key(selector):
#print selector
filename = '%s_%s.key' % (selector, DOMAIN)
path = BASEDIR + '/' + filename
if not os.path.isfile(path):
#print 'does not exist'
return
print selector
print 'load private key'
rsa = None
with open(path, 'r') as fh:
rsa = RSA.importKey(fh.read())
print 'generate dkim repudiation'
dkim = dkim_priv(rsa)
print 'push dkim repudiation'
push_record(selector, dkim)
print 'delete repudiated key file'
os.remove(path)
def create_key(selector):
filename = '%s_%s.key' % (selector, DOMAIN)
path = BASEDIR + '/' + filename
if os.path.isfile(path):
#print 'already exists'
return
print selector
print 'generate rsa key'
rsa = RSA.generate(1024)
print 'generate dkim record'
dkim = dkim_pub(rsa)
print 'push dkim record'
push_record(selector, dkim)
print 'writing private key to temp file'
tmp_fd, tmp_name = tempfile.mkstemp('', '.'+filename+'.', BASEDIR, True)
os.write(tmp_fd, rsa.exportKey())
os.fchown(tmp_fd, 0, grp.getgrnam('Debian-exim')[2])
os.fchmod(tmp_fd, 0o0440)
os.fsync(tmp_fd)
os.close(tmp_fd)
print 'renaming temp file'
os.rename(tmp_name, path)
d_start = datetime.utcnow()
# expire keys
d = d_start
while True:
d = d - timedelta(days=1)
filename = '%s_%s.key' % (date_to_selector(d), DOMAIN)
path = BASEDIR + '/' + filename
if not os.path.isfile(path):
break
# publish private parameters after 10 days
while d < d_start - timedelta(days=10):
repudiate_key(date_to_selector(d))
d = d + timedelta(days=1)
# public key revokation after 7 days
while d < d_start - timedelta(days=7):
revoke_key(date_to_selector(d))
d = d + timedelta(days=1)
# create keys
d = d_start
for _ in xrange(28):
create_key(date_to_selector(d))
d = d + timedelta(days=1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.