Skip to content

Instantly share code, notes, and snippets.

@juzna
Created January 14, 2020 12:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save juzna/120abd08e1f6cfc052660a829a788290 to your computer and use it in GitHub Desktop.
Save juzna/120abd08e1f6cfc052660a829a788290 to your computer and use it in GitHub Desktop.
Bulk retrieve data from S3 Glacier storage class.
import asyncio
import collections
import sys
import aiobotocore
import asyncio_pool
import botocore
import botocore.session, botocore.credentials
from botocore.exceptions import ClientError
from tqdm import tqdm
BUCKET = 'kiwi-datadog-logs-archive'
def patch_auth():
# Patch the API to work with asyncio.
_orig_create_client = botocore.credentials.AssumeRoleCredentialFetcher._create_client
def _create_client(self):
frozen_credentials = self._source_credentials.get_frozen_credentials()
if not getattr(self, '_boto_session', None):
self._boto_session = botocore.session.Session()
return self._boto_session.create_client(
'sts',
aws_access_key_id=frozen_credentials.access_key,
aws_secret_access_key=frozen_credentials.secret_key,
aws_session_token=frozen_credentials.token,
)
botocore.credentials.AssumeRoleCredentialFetcher._create_client = _create_client
async def list_all_objects(c, prefix):
marker = ''
ret = []
while True:
r = await c.list_objects(Bucket=BUCKET, Prefix=prefix, Marker=marker)
ret.extend(r['Contents'])
if r['IsTruncated']:
marker = r['Contents'][-1]['Key']
else:
return ret
async def f_head(c, k):
x = await c.head_object(Bucket=BUCKET, Key=k)
return x.get('StorageClass'), x.get('Restore')
async def f_restore(c, k):
try:
await c.restore_object(Bucket=BUCKET, Key=k, RestoreRequest=dict(Days=3))
except ClientError as e:
if e.response['Error']['Code'] == 'RestoreAlreadyInProgress':
return
else:
raise
async def main():
patch_auth()
action = sys.argv[1]
paths = sys.argv[2:]
if not paths:
raise ValueError('Run with list of path prefixes to restore.')
s = aiobotocore.get_session()
c = s.create_client('s3')
print("Listing objects...")
rs = [
await list_all_objects(c, p)
for p in tqdm(paths)
]
ks = [
o['Key']
for r in rs
for o in r
]
print(f"Found {len(ks):_d} objects to restore")
# Run restore.
if action == 'retrieve':
with tqdm(total=len(ks)) as bar:
bar.update(0)
async def f(k):
await f_restore(c, k)
bar.update()
r = await asyncio_pool.AioPool(size=20).map(f, ks)
print(filter(None, r)) # print errors
elif action == 'status':
# ks = ks[0:100]
cnt = collections.Counter()
with tqdm(total=len(ks)) as bar:
bar.update(0) # Make sure it's shown.
async def fh(k):
sc, r = await f_head(c, k)
cnt.update((sc, r))
bar.set_description_str(f"{cnt.most_common()}", refresh=False)
bar.update()
r = await asyncio_pool.AioPool(size=20).map(fh, ks)
print(cnt.most_common())
print(filter(None, r)) # print errors
else:
raise ValueError("Bad action")
await c.close()
if __name__ == '__main__':
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment