Skip to content

Instantly share code, notes, and snippets.

@trstringer
Created May 4, 2021 17:33
Show Gist options
  • Save trstringer/11cf33b185d342e540a657be4c1e672a to your computer and use it in GitHub Desktop.
Save trstringer/11cf33b185d342e540a657be4c1e672a to your computer and use it in GitHub Desktop.
Search an Azure Linux VM for resource disk with walinuxagent logic
import os
STORAGE_DEVICE_PATH = '/sys/bus/vmbus/devices/'
def search_for_resource_disk(gen1_device_prefix, gen2_device_id):
"""
Search the filesystem for a device by ID or prefix.
Args:
gen1_device_prefix (str): Gen1 resource disk prefix.
gen2_device_id (str): Gen2 resource device ID.
Returns:
str: The found device.
"""
device = None
# We have to try device IDs for both Gen1 and Gen2 VMs.
print('Searching gen1 prefix {0} or gen2 {1}'.format(gen1_device_prefix, gen2_device_id))
try:
for vmbus, guid in _enumerate_device_id():
print("vmbus: {}, guid: {}".format(vmbus, guid))
if guid.startswith(gen1_device_prefix) or guid == gen2_device_id:
print("guid matches device search")
for root, dirs, files in os.walk(STORAGE_DEVICE_PATH + vmbus): # pylint: disable=W0612
print("root: {}".format(root))
print("dirs: {}".format(dirs))
print("files: {}".format(files))
root_path_parts = root.split('/')
# For Gen1 VMs we only have to check for the block dir in the
# current device. But for Gen2 VMs all of the disks (sda, sdb,
# sr0) are presented in this device on the same SCSI controller.
# Because of that we need to also read the LUN. It will be:
# 0 - OS disk
# 1 - Resource disk
# 2 - CDROM
print("root_path_parts: {}".format(root_path_parts))
if root_path_parts[-1] == 'block' and (
guid != gen2_device_id or
root_path_parts[-2].split(':')[-1] == '1'):
print("Root path parts matches, returning device")
device = dirs[0]
print("Found device is {} but continuing".format(device))
# return device
else:
# older distros
print("Searching through dirs")
for d in dirs:
print("Current dir: {}".format(d))
if ':' in d and "block" == d.split(':')[0]:
device = d.split(':')[1]
print("Block found, returning device")
print("Found device is {} but continuing".format(device))
# return device
except (OSError, IOError) as exc:
print(
'Error getting device for {0} or {1}: {2}',
gen1_device_prefix,
gen2_device_id,
str(exc)
)
return None
def _enumerate_device_id():
"""
Enumerate all storage device IDs.
Args:
None
Returns:
Iterator[Tuple[str, str]]: VmBus and storage devices.
"""
if os.path.exists(STORAGE_DEVICE_PATH):
for vmbus in os.listdir(STORAGE_DEVICE_PATH):
deviceid = read_file(os.path.join(STORAGE_DEVICE_PATH, vmbus, "device_id"))
guid = deviceid.strip('{}\n')
yield vmbus, guid
def read_file(filepath, asbin=False, remove_bom=False, encoding='utf-8'):
"""
Read and return contents of 'filepath'.
"""
mode = 'rb'
with open(filepath, mode) as in_file:
data = in_file.read()
if data is None:
return None
if asbin:
return data
if remove_bom:
# remove bom on bytes data before it is converted into string.
data = remove_bom(data)
data = str(data, encoding=encoding)
return data
def remove_bom(c):
"""
bom is comprised of a sequence of three chars,0xef, 0xbb, 0xbf, in case of utf-8.
"""
if not is_str_none_or_whitespace(c) and \
len(c) > 2 and \
str_to_ord(c[0]) > 128 and \
str_to_ord(c[1]) > 128 and \
str_to_ord(c[2]) > 128:
c = c[3:]
return c
def is_str_none_or_whitespace(s):
return s is None or len(s) == 0 or s.isspace()
def str_to_ord(a):
"""
Allows indexing into a string or an array of integers transparently.
Generic utility function.
"""
if type(a) == type(b'') or type(a) == type(u''):
a = ord(a)
return a
if __name__ == "__main__":
search_for_resource_disk(
gen1_device_prefix="{0}-000{1}".format("00000000", 1),
gen2_device_id='f8b3781a-1e82-4818-a1c3-63d806ec15bb'
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment