Skip to content

Instantly share code, notes, and snippets.

@innovia
Created October 21, 2018 20:05
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save innovia/56f102be2f24c521d0dec00563125a44 to your computer and use it in GitHub Desktop.
Save innovia/56f102be2f24c521d0dec00563125a44 to your computer and use it in GitHub Desktop.
Vault Runner - get secret from vault and replace process
#!/env python
import os
import json
import logging
import tarfile
import requests
import sys
CA_PATH = "/etc/tls/ca.pem"
VAULT_URL = "https://vault.default.svc.cluster.local:8200"
VAULT_SECRETS_PATH = "{{ .Values.vaultSecretsPartition }}"
VAULT_ROLE = "{{ .Values.vaultRole }}"
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logging.info("VAULT_URL: %s", VAULT_URL)
class ConfigurationError(Exception):
pass
class VaultSecretsError(Exception):
pass
class RequirementsInstallError(Exception):
pass
def get_kubernetes_token():
logging.info("Getting POD service account %s token", VAULT_ROLE )
with open("/var/run/secrets/kubernetes.io/serviceaccount/token", "r") as stream:
token = stream.read()
return token
def get_vault_client_token():
params = {
"role": VAULT_ROLE,
"jwt": get_kubernetes_token()
}
logging.info("Authenticating to Kubernetes via Vault")
response = json.loads(
requests.post(
"%s/v1/auth/kubernetes/login" % VAULT_URL,
data=json.dumps(params),
verify="/etc/tls/ca.pem"
).content
)
if response.get("errors") is not None:
error_message = (
"Error authenticating with kubernetes backend in vault - %s" % response["errors"]
)
logging.error(error_message)
raise ConfigurationError(error_message)
return response["auth"]["client_token"]
def get_secrets():
headers = {"X-Vault-Token": get_vault_client_token()}
url = "{vault_url}/v1/{secrets_path}".format(
vault_url=VAULT_URL,
secrets_path=VAULT_SECRETS_PATH
)
response = requests.get(url, headers=headers, verify=CA_PATH)
data = None
if response.status_code == 403:
raise VaultSecretsError(
"Failed retriving secret on path: {{ .Values.vaultSecretsPartition }}. either the path does not exists or policy does nbot grant access to this path"
)
if response.status_code == 200:
data = json.loads(response.content).get("data")
if data:
return data
else:
raise ConfigurationError("Could not load data from secret.")
def set_environment_variables():
secrets = get_secrets()
for key, value in secrets.iteritems():
logging.info("Now setting env var for %s.", key)
os.environ[key] = value
def launch_command():
logging.info("Running WSGI Server.")
os.execv("{{ .Values.runner.command.fullPath }}", [ "gunicorn", "-c", "/etc/gunicorn/gunicorn_config.py", "{{ .Values.gunicorn.wsgiModule }}" ] )
def main():
set_environment_variables()
launch_command()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment