Skip to content

Instantly share code, notes, and snippets.

@zyv
Created February 20, 2021 17:27
Show Gist options
  • Save zyv/2798c886279e6d399d2f9bc0a9abdec5 to your computer and use it in GitHub Desktop.
Save zyv/2798c886279e6d399d2f9bc0a9abdec5 to your computer and use it in GitHub Desktop.
import json
import re
from base64 import b64encode
from enum import Enum, auto
from pathlib import Path
# pulumi config --json --show-secrets > pulumi_config.json
CONFIG = json.loads(Path("pulumi_config.json").read_text())
INPUT_FILE = Path("profiling/moneymeets-github.txt")
# INPUT_FILE = Path("profiling/moneymeets.trace")
OUTPUT_FILE = Path(str(INPUT_FILE.absolute()) + ".clean")
class InputType(Enum):
LOG = auto()
TRACE = auto()
INPUT_TYPE = None
if INPUT_FILE.name.endswith(".txt"):
INPUT_TYPE = InputType.LOG
elif INPUT_FILE.name.endswith(".trace"):
INPUT_TYPE = InputType.TRACE
else:
raise ValueError(f"unsupported input type: {INPUT_FILE.name}")
def mask(value: bytes, pattern: bytes) -> bytes:
return (pattern * (len(value) // len(pattern) + 1))[:len(value)]
def replacement(value: bytes, pattern: bytes = b"DEADBEEF") -> tuple[bytes, bytes]:
return value, mask(value, pattern)
def generic_replacements(search: bytes, pattern: bytes) -> list[tuple[bytes, bytes]]:
results = set(re.findall(search, file_content))
return [replacement(result, pattern) for result in results if result]
def sorted_translations(replacements: list[tuple[bytes, bytes]]) -> list[tuple[bytes, bytes]]:
return sorted(replacements, key=lambda translation: (len(translation[1]), translation[0]), reverse=True)
file_content = INPUT_FILE.read_bytes()
if INPUT_TYPE == InputType.LOG:
generic_translations = sorted_translations(
generic_replacements(br'__provider={(.+?)}', b"ABABABAB") +
generic_replacements(b'translated: (.+?)\n', b"CDCDCDCD")
)
elif INPUT_TYPE == InputType.TRACE:
generic_translations = sorted_translations(
generic_replacements(br'u003cstring_value:\\"(.+?)\\"', b"ABABABAB") +
generic_replacements(br'message:\\"[\w ]+ translated: (.+?)\\"', b"CDCDCDCD")
)
else:
raise RuntimeError
print(f"Generic translations: {len(generic_translations)}")
specific_translations = []
for config_key, config_value in CONFIG.items():
_, key_name = map(str.encode, config_key.split(":"))
key_content = config_value["value"].encode()
specific_translations.extend(
(
replacement(key_name),
replacement(key_content, b"FEEDFACE"),
replacement(b64encode(key_content), b"BADDCAFE"),
)
)
specific_translations = sorted_translations(specific_translations)
print(f"Specific translations: {len(specific_translations)}")
translations = generic_translations + specific_translations
# print(translations)
for item in translations:
file_content = file_content.replace(*item)
OUTPUT_FILE.write_bytes(file_content)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment