Skip to content

Instantly share code, notes, and snippets.

@BlackthornYugen
Created February 24, 2024 18:48
Show Gist options
  • Save BlackthornYugen/064771731f8c75deb78e6d3da4df82c6 to your computer and use it in GitHub Desktop.
Save BlackthornYugen/064771731f8c75deb78e6d3da4df82c6 to your computer and use it in GitHub Desktop.
Python modify haproxy acls
backend be_ipwhitelist
option forwardfor
http-request set-header x-forwarded-proto %[ssl_fc,iif(https,http)]
acl is_authorized src -f /etc/haproxy/ip_pass.lst
acl is_authorized http_auth(basic-auth-list)
acl is_logout_path path_end logout
http-request auth realm myrealm.example.org unless is_authorized !is_logout_path
http-request add-header Cache-Control no-cache
server python1 127.0.0.1:5333 check
<!doctype html>
<html>
<head>
<title>IP Allowlist</title>
</head>
<body>
<h2>Allow an IP Address</h2>
<form action="/whitelist" method="post">
<label for="ip">IP Address:</label>
<input type="text" id="ip" name="ip" value="{{ user_ip }}">
<input type="submit" value="Whitelist">
</form>
<h3>Allowed IPs</h3>
<ul>
{% for item in whitelisted_ips_info %}
<li>{{ item.ip }}
{% if not item.from_disk %}
<form action="/delete" method="post" style="display:inline;">
<input type="hidden" name="ip" value="{{ item.ip }}">
<input type="submit" value="Delete">
</form>
{% endif %}
</li>
{% endfor %}
</ul>
{% with messages = get_flashed_messages(category_filter=["ip_status"], with_categories=true)%}
{% if messages%}
{% for _, last_ip_status in messages%}
<p>{{last_ip_status}}</p>
{%endfor%}
{%endif%}
{%endwith%}
</body>
</html>
#!/usr/bin/env python3
from werkzeug.middleware.proxy_fix import ProxyFix
from flask import Flask, flash, request, render_template, redirect, url_for
import logging
import os
import socket
# Initialize Flask app
app = Flask(__name__)
# Apply ProxyFix to respect the X-Forwarded-Proto header
app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1)
# Setup logging
logging.basicConfig(level=logging.DEBUG)
# Initialize list to store IPs
whitelisted_ips = []
def read_whitelist_from_disk():
whitelist_path = '/etc/haproxy/ip_pass.lst'
try:
with open(whitelist_path, 'r') as file:
whitelisted_ips = file.read().splitlines()
except Exception as e:
app.logger.error(f'Failed to read whitelist from disk: {e}')
whitelisted_ips = []
return whitelisted_ips
def get_client_ip():
if "X-Forwarded-For" in request.headers:
# In some cases, the 'X-Forwarded-For' header can contain multiple IPs
# separated by commas. The first one is the original client IP.
ip = request.headers["X-Forwarded-For"].split(",")[0]
else:
# Fallback to the direct connection IP if the header is missing.
ip = request.remote_addr
return ip
def send_command_to_socket(command):
# Make sure haproxy has a socket that you can read/write to,
# you could add the following to /etc/haproxy/haproxy.cfg:
# stats socket /var/lib/haproxy/stats mode 660 group wheel
# https://sleeplessbeastie.eu/2020/01/29/how-to-use-haproxy-stats-socket/
# https://docs.haproxy.org/2.4/management.html#9.3
socket_path = os.getenv("TARGET_SOCK", "/var/lib/haproxy/stats")
response = ""
try:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client_socket:
client_socket.connect(socket_path)
# Ensure the command ends with a newline character
if not command.endswith('\n'):
command += '\n'
client_socket.sendall(command.encode('utf-8'))
# Wait for the response
response = client_socket.recv(4096).decode('utf-8') # Adjust buffer size as needed
except Exception as e:
app.logger.error(f"Failed to send command to socket: {e}")
return response
def fetch_whitelisted_ips():
global whitelisted_ips
data = send_command_to_socket("show acl /etc/haproxy/ip_pass.lst\n")
# Parse the output to update the whitelisted_ips list
whitelisted_ips = [line.split()[1] for line in data.strip().split('\n')]
return whitelisted_ips
def get_dynamic_whitelisted_ips():
whitelisted_ips = fetch_whitelisted_ips()
disk_whitelisted_ips = read_whitelist_from_disk() # IPs read from the disk
# Remove IPs that are saved on disk from the whitelisted_ips list
dynamic_whitelisted_ips = [ip for ip in whitelisted_ips if ip not in disk_whitelisted_ips]
return dynamic_whitelisted_ips
@app.route('/')
def home():
user_ip = get_client_ip()
disk_whitelisted_ips = read_whitelist_from_disk()
all_whitelisted_ips = fetch_whitelisted_ips() # Assume this fetches all whitelisted IPs
# Mark each IP with whether it is from disk
whitelisted_ips_info = [
{'ip': ip, 'from_disk': (ip in disk_whitelisted_ips)}
for ip in all_whitelisted_ips
]
return render_template("whitelist-ips.html", user_ip=user_ip, whitelisted_ips_info=whitelisted_ips_info)
@app.route('/whitelist', methods=['POST'])
def whitelist():
ip_address = request.form['ip']
disk_whitelisted_ips = read_whitelist_from_disk()
current_whitelisted_ips = fetch_whitelisted_ips()
if ip_address in current_whitelisted_ips:
app.logger.info(f'IP {ip_address} is already whitelisted.')
flash(f"{ip_address} can already connect.", 'ip_status')
else:
# Construct the command to add the IP to the ACL
command = f"add acl /etc/haproxy/ip_pass.lst {ip_address}\n"
response = send_command_to_socket(command)
app.logger.debug(f'Whitelisted {ip_address}')
flash(f"{ip_address} can now connect.", 'ip_status')
return redirect(url_for('home'))
@app.route('/delete', methods=['POST'])
def delete_ip():
ip_to_delete = request.form['ip']
if ip_to_delete in read_whitelist_from_disk():
app.logger.debug(f'IP cannot be removed from whitelist: {ip_to_delete}')
flash(f'IP cannot be removed from whitelist: {ip_to_delete}', 'ip_status')
return redirect(url_for('home'))
# Construct the command to delete the IP from the ACL
command = f"del acl /etc/haproxy/ip_pass.lst {ip_to_delete}"
response = send_command_to_socket(command)
app.logger.debug(f'Deleted {ip_to_delete}')
message = f'{ip_to_delete} can no longer connect.'
flash(message, 'ip_status')
app.logger.debug(message)
return redirect(url_for('home'))
if __name__ == '__main__':
# You'll need to set this for flash messages
# app.config['SECRET_KEY'] = ""
app.run("127.0.0.1", port=5333, debug=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment