Skip to content

Instantly share code, notes, and snippets.

@panzi
Last active March 9, 2024 05:43
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save panzi/81892af865a4818e9ccf578ab5766d36 to your computer and use it in GitHub Desktop.
Save panzi/81892af865a4818e9ccf578ab5766d36 to your computer and use it in GitHub Desktop.
Re-key all the embedded vaults in an Ansible vars file.
#!/usr/bin/env python3
# derived from https://stackoverflow.com/a/67161907/277767
# Changes to the StackOverflow version:
# * delete temporary files that contain vaults!
# * prompt for passwords instead of passing them as program argument
# * more precise vault replacement
# * a bit nicer error messages that points at the line where re-keying failed
# * decryption if no password is provided
from typing import Optional
import sys
import re
import os
import json
from os.path import join as join_path
from tempfile import gettempdir
from ansible.parsing.vault import VaultEditor, VaultLib, VaultSecret
from ansible.constants import DEFAULT_VAULT_IDENTITY # type: ignore
from ansible.errors import AnsibleError
from getpass import getpass
VAULT_REGEX = re.compile(r'(?P<prefix>!vault\s+[|>]-?\s*\n)?(?P<vault>^(?P<indent>\s*)\$ANSIBLE_VAULT\S*\n(?:\s*\w+\n)*)', re.MULTILINE)
temp_count = 0
class ReKeyError(Exception):
__slots__ = 'lineno', 'cause'
lineno: int
cause: Optional[Exception]
def __init__(self, lineno: int, cause: Optional[Exception]=None) -> None:
super().__init__()
self.lineno = lineno
self.cause = cause
def __str__(self) -> str:
return f'at line {self.lineno}: {self.cause if self.cause is not None else "an error occured"}'
def rekey(content: str, old_secret: VaultSecret, new_secret: Optional[VaultSecret]=None) -> str:
global temp_count
temp_name = join_path(gettempdir(), f'ansible-rekey-{os.getpid()}-{temp_count}.tmp')
temp_count += 1
prev_index = 0
new_content: list[str] = []
while True:
match = VAULT_REGEX.search(content, prev_index)
if match is None:
new_content.append(content[prev_index:])
break
prefix = match.group('prefix')
indentation = match.group('indent')
old_vault = match.group('vault')
index = match.start()
if index > prev_index:
new_content.append(content[prev_index:index])
string_content = old_vault.replace(indentation, '')
try:
with open(temp_name, 'w') as fout:
fout.write(string_content)
editor = VaultEditor(VaultLib([ (DEFAULT_VAULT_IDENTITY, old_secret) ]))
if new_secret is None:
editor.decrypt_file(temp_name)
else:
editor.rekey_file(temp_name, new_secret)
with open(temp_name) as fin:
lines = fin.readlines()
if new_secret is None:
if prefix:
new_content.append(json.dumps(''.join(lines)))
new_content.append('\n')
else:
new_content.append(indentation)
new_content.append(indentation.join(lines))
else:
if prefix:
new_content.append(prefix)
new_content.append(indentation)
new_content.append(indentation.join(lines))
except Exception as exc:
lineno = content.count('\n', 0, index) + 1
if isinstance(exc, AnsibleError):
exc.message = exc.message.replace(temp_name, f'line {lineno}')
raise ReKeyError(lineno, exc)
finally:
os.unlink(temp_name)
prev_index = match.end()
return ''.join(new_content)
def rekey_files(old_password: str, new_password: Optional[str], files: list[str]) -> None:
old_secret = VaultSecret(old_password.encode())
new_secret = VaultSecret(new_password.encode()) if new_password is not None else None
for file_name in files:
with open(file_name) as f:
content = f.read()
try:
new_content = rekey(content, old_secret, new_secret)
except ReKeyError as exc:
print(f'{file_name}:{exc.lineno}: {exc.cause if exc.cause is not None else "an error occured"}', file=sys.stderr)
else:
with open(file_name, 'w') as f:
f.write(new_content)
print('rekeyed', file_name)
def main() -> None:
if len(sys.argv) < 2:
print("Usage: rekey.py <file...>", file=sys.stderr)
sys.exit(1)
old_password = getpass('Vault password: ')
new_password = getpass('New Vault password (empty for decrypt-only): ')
new_password_confirmation = getpass('Confirm New Vault password: ')
if new_password != new_password_confirmation:
print('ERROR! Passwords do not match', file=sys.stderr)
sys.exit(1)
rekey_files(old_password, new_password if new_password else None, sys.argv[1:])
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment