Skip to content

Instantly share code, notes, and snippets.

@theonlyway
Forked from Kentzo/CustomVolumeInUse.py
Created August 2, 2023 01:35
Show Gist options
  • Save theonlyway/2b6cf882c7a9b513f4e3e4c2361db2c7 to your computer and use it in GitHub Desktop.
Save theonlyway/2b6cf882c7a9b513f4e3e4c2361db2c7 to your computer and use it in GitHub Desktop.
import botocore.waiter
import botocore.session
session = botocore.session.get_session()
client = session.create_client('ec2')
VOLUME_ID = ... # e.g. 'vol-049df61146c4d7901'
INSTANCE_ID = ... # e.g. 'i-1234567890abcdef0'
DEVICE = ... # e.g. '/dev/xvdba'
WAITER_ID = ... # e.g. 'MyWaiter'
model = botocore.waiter.WaiterModel({
'version': 2,
'waiters': {
WAITER_ID: {
'delay': 15,
'operation': 'DescribeVolumes',
'maxAttempts': 40,
'acceptors': [
{
'expected': True,
'matcher': 'path',
'state': 'success',
'argument': "length(Volumes[?State == 'in-use'].Attachments[] | [?"
f"InstanceId == '{INSTANCE_ID}' &&"
f"Device == '{DEVICE}' &&"
"State == 'attached'"
"]) == `1`"
},
{
'expected': True,
'matcher': 'path',
'state': 'failure',
'argument': "length(Volumes[?State == 'in-use'].Attachments[] | [?"
f"InstanceId != '{INSTANCE_ID}' ||"
f"Device != '{DEVICE}'"
"]) > `0`"
},
{
'expected': 'deleted',
'matcher': 'pathAny',
'state': 'failure',
'argument': 'Volumes[].State'
},
{
'expected': 'error',
'matcher': 'pathAny',
'state': 'failure',
'argument': 'Volumes[].State'
}
]
}
}
})
waiter = botocore.waiter.create_waiter_with_client(WAITER_ID, model, client)
waiter.wait(VolumeIds=[VOLUME_ID])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment