Skip to content

Instantly share code, notes, and snippets.

@nhamilakis
Last active April 13, 2021 11:39
Show Gist options
  • Save nhamilakis/5a896c0679b27de6c55607864111ae71 to your computer and use it in GitHub Desktop.
Save nhamilakis/5a896c0679b27de6c55607864111ae71 to your computer and use it in GitHub Desktop.
durable_upload
""" Sample code for an tolerant to failure upload
using UploadManifest class to iterate over items to upload/process
UploadManifest does not close the loop if all the items do not have a status done or failed.
Customisable number of retries (via argument)
Checkpoint saving to local json file
"""
import json
import random
import time
from pathlib import Path
class UploadManifest:
@classmethod
def load(cls, filename: Path, retries: int = 2):
with filename.open('r') as fp:
dd = json.load(fp)
return cls(dd, filename, retries)
def __init__(self, list_manifest, save_file: Path, retries: int = 2):
if isinstance(list_manifest, dict):
self.man = list_manifest
else:
self.man = {
f"{name}": 'todo'
for name in list_manifest
}
self.save_file = save_file
self.retries = retries
def __iter__(self):
return self
def __next__(self):
for k, v in self.man.items():
if v == 'todo':
return k
for k, v in self.man.items():
if v == 'waiting':
return k
for k, v in self.man.items():
if 'retry' in v:
return k
raise StopIteration
def status(self, key):
return self.man[key]
def set_waiting(self, key):
if self.man[key] == 'todo':
self.man[key] = "waiting"
self.save()
def set_done(self, key):
self.man[key] = "done"
self.save()
def set_failed(self, key):
k = self.man[key]
if k in ["waiting", "todo"]:
self.man[key] = "retry_1"
elif "retry" in k:
nb = int(k.split('_')[1])
nb += 1
if nb > self.retries:
st = 'failed'
else:
st = f"retry_{nb}"
self.man[key] = st
self.save()
def save(self):
with self.save_file.open('w') as fp:
json.dump(self.man, fp)
def is_complete(self):
for k, v in self.man.items():
if v != "done":
return False
return True
def get_failed(self):
return [k for k, v in self.man.items() if v == 'failed']
def upload(name):
print(f'attempting upload of {name}')
time.sleep(len(name))
if name == 'file2':
return 500
return random.choice([200, 500])
def failure_chance(percent):
return random.random() < percent
if __name__ == '__main__':
things = ['file1', 'file2', 'file3', 'file4', 'file5', 'file6', 'file7', 'file8', 'file9']
manifest_file = Path('upload.manifest.json')
explode = False
# resume upload if interrupted
if manifest_file.is_file():
man = UploadManifest.load(manifest_file)
else:
man = UploadManifest(things, manifest_file)
man.save()
for item in man:
man.set_waiting(item)
response = upload(item)
# process response
if response == 200:
man.set_done(item)
else:
man.set_failed(item)
if failure_chance(0.1):
explode = True
print("explosion")
break
if not explode and man.is_complete():
print("upload completed.")
elif not explode:
print("upload completed.")
print(f"failed to upload the following: {man.get_failed()}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment