Skip to content

Instantly share code, notes, and snippets.

@corny
Created May 19, 2020 18:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save corny/fc7322aa61901a21b74674cd858da5e5 to your computer and use it in GitHub Desktop.
Save corny/fc7322aa61901a21b74674cd858da5e5 to your computer and use it in GitHub Desktop.
journald-nftables blacklist script for Caddy
#!/usr/bin/python3 -u
import select
import re
import subprocess
from systemd import journal
from datetime import timedelta
from collections import defaultdict
blockAfter = 10
resetAfter = timedelta(seconds=120)
pattern = re.compile(r"TLS handshake error from (([\d\.]+)|\[([0-9a-f:]+)\])")
# runtime variables
addresses = None
lastReset = None
def handle(line, time):
global addresses
global lastReset
result = pattern.search(line)
if not result:
return
address = result.group(2) or result.group(3)
# do we have to (re)initialize the address map?
if not lastReset or (time-lastReset) > resetAfter:
lastReset = time
addresses = defaultdict(int)
# count IP address
addresses[address] += 1
value = addresses[address]
# print the current counter value
print("attempt %d of %s" % (value, address))
# not enough failures?
if value < blockAfter:
return
# block address
args = ['nft', 'add', "element", "inet", "filter"]
args.append("blackhole6" if ":" in address else "blackhole4")
args.append("{ %s timeout 5m }" % address)
print("executing", args)
subprocess.check_output(args)
j = journal.Reader()
j.this_boot()
j.this_machine()
j.seek_tail()
j.get_previous()
j.log_level(journal.LOG_INFO)
j.add_match(_SYSTEMD_UNIT="caddy.service")
p = select.poll()
p.register(j.fileno(), j.get_events())
while True:
# Poll for new journal entries every second
if p.poll(250):
if j.process() == journal.APPEND:
for entry in j:
handle(entry['MESSAGE'], entry['__MONOTONIC_TIMESTAMP'].timestamp)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment