Skip to content

Instantly share code, notes, and snippets.

@alexeldeib
Created June 19, 2023 17:16
Show Gist options
  • Save alexeldeib/5c4191d98f4568a61ae951e1a3951f48 to your computer and use it in GitHub Desktop.
Save alexeldeib/5c4191d98f4568a61ae951e1a3951f48 to your computer and use it in GitHub Desktop.
Azure Wireserver Extension config extraction
import http.client
import xml.etree.ElementTree as ET
from urllib.parse import urlparse
import json
from subprocess import Popen, PIPE, STDOUT
import base64
try:
# request goalstate from wireserver
wireserver = "168.63.129.16"
conn = http.client.HTTPConnection(wireserver)
conn.request("GET", "/machine/?comp=goalstate", headers={"x-ms-version": "2012-11-30"})
goalstate_response = conn.getresponse()
# response body is xml. read and parse.
goalstate_xml_str = goalstate_response.read()
goalstate_dom = ET.fromstring(goalstate_xml_str)
# extract URL of extension config from parsed XML
extension_config_str=goalstate_dom.find("./Container/RoleInstanceList/RoleInstance/Configuration/ExtensionsConfig").text
# url is http://168.63.129.16/foo/bar/baz?foo=bar&bar=baz
# split scheme and host, use only path with query params
# http connecton is already opened to wireserver
extension_config_url=urlparse(extension_config_str)
query_path=f"{extension_config_url.path}?{extension_config_url.query}"
# 2nd request to extension config URL.
conn.request("GET", query_path, headers={"x-ms-version": "2012-11-30"})
extension_config_response = conn.getresponse()
extension_config_str = extension_config_response.read()
extension_config_dom = ET.fromstring(extension_config_str)
# CSE config is b64-encoded json embedded inside xml
# extract it, base64 decode it, and parse it as json.
extension_config_json_str = extension_config_dom.find("./PluginSettings/Plugin/[@name='Microsoft.Azure.Extensions.CustomScript']/RuntimeSettings").text
extjson = json.loads(extension_config_json_str)
# extract the certificate used to encrypt the protect settings
# also extract the encrypted protected settings.
thumbprint = extjson['runtimeSettings'][0]['handlerSettings']['protectedSettingsCertThumbprint']
enc_settings = extjson['runtimeSettings'][0]['handlerSettings']['protectedSettings']
protected_settings=base64.b64decode(enc_settings)
key=f"/var/lib/waagent/{thumbprint}.prv"
# shell out to openssl for decryption.
# avoids relying on 3p python imports
# lets us use only python stdlib for compat/no egress calls.
p = Popen(['openssl', 'smime', '-decrypt', '-inform', 'der', '-inkey', key], stdout=PIPE, stdin=PIPE, stderr=STDOUT)
sslout = p.communicate(input=protected_settings)[0]
# the output has been b64-decoded to binary DER then decrypted.
# the result is the original json AKS set for extension settings.
# from this, we extract the actual command we wanted to execute
csejson = json.loads(sslout.decode())['commandToExecute']
print(csejson)
except Exception:
print(f"failed to extract cse: {Exception}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment