Skip to content

Instantly share code, notes, and snippets.

@iagoalonsomrf
Last active September 13, 2023 13:38
Show Gist options
  • Save iagoalonsomrf/ed6d4913653bc480f1c0a36cf82f31e5 to your computer and use it in GitHub Desktop.
Save iagoalonsomrf/ed6d4913653bc480f1c0a36cf82f31e5 to your computer and use it in GitHub Desktop.
Parse Ansible's json output of an UFW rules change step.
import json
import os
import re
import tempfile
COMMENT_RE = r'(route\:allow|allow)\s(tcp|udp)\s([0-9]{1,4})\s((?:(?:25[0-5]|(?:2[0-4]|1\d|[1-9]|)\d)\.?\b){4}/[0-9]{1,2})[\sa-zA-Z]+((?:(?:25[0-5]|(?:2[0-4]|1\d|[1-9]|)\d)\.?\b){4})' # noqa: E501
def get_diff(data):
diff_object = data['results'][2]['diff'][0]
before_fp = tempfile.NamedTemporaryFile('w+')
before_fp.write(diff_object['before_header'])
before_fp.write(diff_object['before'])
after_fp = tempfile.NamedTemporaryFile('w+')
after_fp.write(diff_object['after_header'])
after_fp.write(diff_object['after'])
try:
with os.popen(f'git diff {before_fp.name} {after_fp.name}') as cmd_stream:
return cmd_stream.read()
except Exception as e:
print('Error obtaining diff', e)
finally:
before_fp.close()
after_fp.close()
def parse_diff(diff_lines):
output_lines = []
for line in diff_lines:
action = None
if line.startswith('-### tuple ###'):
action = '-'
elif line.startswith('+### tuple ###'):
action = '+'
if action is not None:
comment = bytes.fromhex(line.split('comment=')[-1]).decode('utf-8')
try:
type_, protocol, port, _, ip = re.compile(COMMENT_RE).findall(line)[0]
line = f'{action} {"forward" if "route" in type_ else "allow "} {protocol} {port.ljust(6, " ")} {ip.ljust(16, " ")} {comment}' # noqa: E501
output_lines.append(f'{line}\n')
except IndexError:
print('Cannot parse IPV6 yet')
return output_lines
def main():
with open('ufw_rules.json', 'r') as fp:
data = json.load(fp)
diff = get_diff(data)
with open('rules.diff', 'w') as f:
f.write(diff)
parsed_diff = parse_diff(diff.splitlines())
with open('parsed.diff', 'w') as f:
f.writelines(parsed_diff)
if __name__ == '__main__':
main()
@iagoalonsomrf
Copy link
Author

iagoalonsomrf commented Sep 13, 2023

Just add register: ufw_rules to the Generate ufw rules step, and then save the output to a file, adding later a step like this

- name: Debug
  ansible.builtin.copy:
    dest: ufw_rules.json
    content: "{{ ufw_rules }}"
  delegate_to: localhost
  become: false
  check_mode: false

Note: You might have to tinker with the indexes in diff_object = data['results'][2]['diff'][0]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment