Skip to content

Instantly share code, notes, and snippets.

@Znuff
Last active September 2, 2020 02:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Znuff/ad7abb2eb4f16b08defb92dc37a5426e to your computer and use it in GitHub Desktop.
Save Znuff/ad7abb2eb4f16b08defb92dc37a5426e to your computer and use it in GitHub Desktop.

This is a simple action for fail2ban that uses Cloudflare's API for Rules & Filters to block IP Addresses.

Usage is simple:

  • Add an IP address to the rule/filter:

    ./fail2cloudflare.py add <ip>
    
  • Delete an IP address from the rule/filter:

    ./fail2cloudflare delete <ip>
    

In order for this to work, there needs to exist a rule with the description fail2ban.

If the script doesn't find such a firewall rule on Cloudflare, it will attempt to create it, and it will block 127.0.0.235 by default (should be harmless).

This is very useful to be used with fail2ban's nginx-limit-req filter.

#!/usr/bin/env python3
#-*- coding: utf-8 -*-
import requests
import argparse
import sys
import re
# -- edit here
email = 'your-api-email'
zone_id = 'your-zone-id'
auth_key = 'your-auth-key'
debug = False
# -- stop editing here
parser = argparse.ArgumentParser()
parser.add_argument('action', choices=('add', 'delete'))
parser.add_argument('ip', type=str)
args = parser.parse_args()
headers = {
'X-Auth-Email': email,
'X-Auth-Key': auth_key,
'Content-Type': 'application/json',
}
params = (
('description', 'fail2ban'),
)
api_url = 'https://api.cloudflare.com/client/v4/zones/{0}/firewall/rules'.format(zone_id)
s = requests.Session()
s.headers.update(headers)
s.timeout = 1
def dLog(string):
if debug:
print(string)
def create_rule():
data = '[{ "filter": { "expression":"(ip.src in {127.0.0.235})" }, "action": "block", "description": "fail2ban" }]'
d = s.post(api_url, data=data)
if d.status_code == 200 and d.json()['success']:
dLog('Rule creation succesful: {0}'.format(str(d.json())) )
return d.json()['result'][0]['id'], d.json()['result'][0]['filter']['id'], d.json()['result'][0]['filter']['expression']
else:
dLog('Rule creation failed: {0}'.format( str(d.json())))
sys.exit(1)
def get_rule():
d = s.get(api_url, params=params)
if d.status_code == 200 and d.json()['result']:
dLog('Rule exists!')
return d.json()['result'][0]['id'], d.json()['result'][0]['filter']['id'], d.json()['result'][0]['filter']['expression']
else:
dLog('Rule doesn\'t exist. Creating...')
return create_rule()
def update_rule(ips):
list_of_ips = ' '.join(ips)
expression = 'ip.src in {%s}' % list_of_ips
data = '{ "id": "%s", "paused": false, "description": "fail2ban", "expression": "(%s)"}' % (filter_id, expression)
api_url = 'https://api.cloudflare.com/client/v4/zones/{0}/filters/{1}'.format(zone_id, filter_id)
dLog('Data: {0}'.format(data))
dLog('URL: {0}'.format(api_url))
d = s.put(api_url, data=data)
if d.status_code == 200:
dLog('Filter update succeeded: {0}'.format( str(d.json())) )
print('Action: {0} {1} from filter returned success'.format(args.action, args.ip))
sys.exit(0)
else:
dLog('Filter update failed: {0}'.format( str(d.json()) ))
sys.exit(1)
# stuff happens here:
rule_id, filter_id, expression = get_rule()
m = re.search('{(.+?)}', expression)
if m:
ips = m[1].split()
else:
print('could not retrieve a list of IPs from the expression?')
sys.exit(1)
if args.action == 'add':
if args.ip in ips:
print('IP {0} already exists in the filter expression'.format(args.ip))
sys.exit(1)
else:
ips.append(args.ip)
update_rule(ips)
elif args.action == 'delete':
if args.ip in ips:
ips.remove(args.ip)
update_rule(ips)
else:
print('IP {0} does not exist in the filter expression'.format(args.ip))
sys.exit(1)
else:
print('?!')
# vim: set sw=2 tabstop=2 expandtab :
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment