Skip to content

Instantly share code, notes, and snippets.

@michalc
Last active December 7, 2023 19:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save michalc/20b79c9028c342ef7c38df8693f8715b to your computer and use it in GitHub Desktop.
Save michalc/20b79c9028c342ef7c38df8693f8715b to your computer and use it in GitHub Desktop.
Bulk delete files from an AWS S3 bucket in Python using multiple threads via a parallel (eventually) depth-first search
# Deletes objects in bulk using boto3's delete_objects, but using multiple threads to achieve some
# parallelism - in spite of the GIL multiple HTTP requests to S3 should happen at the same time.
# Instead of looping over all keys under the root prefix, it walks the tree of keys of delimiter-
# defined "folders" in a depth-first way, which allows each page to be processed by a separate
# thread as its discovered. Depth-first is done because the depth is limited by the maximum key
# size of 1024 in S3, and so means that there is a limit to the memory used by the algorithm to
# store the next requests to make. This would not be the case with breadth-first because there is
# no limit to how many keys are in any folder.
#
# To do the search in parallel, each bit of work (i.e. an HTTP request to fetch a page of keys
# under a prefix) in the tree is assigned a priority, added to a queue, and the work with the
# lowest valued priority in the queue is chosen to be done when a worker becomes available.
#
# The priority of each request is a tuple, where each item is itself a tuple of the index of the
# prefix in the page in which it appeared, and the current page number under that prefix. Going
# to the next page increments page number, and going "deeper" into the tree appends to the
# priority with another tuple. Say for example if we set page_size=2, then an entire tree of
# prefixes and priorities could look like the below.
#
# Prefix Priority
# root/ (0,0),
# ├─ apples/ (0,0), (0,0),
# │ ├─ ash/ (0,0), (0,0), (0,0),
# │ ├─ beech/ (0,0), (0,0), (1,0),
# │ apples/ (next page) (0,0), (0,1),
# │ ├─ cedar/ (0,0), (0,1), (0,0),
# │ └─ douglas-fir/ (0,0), (0,1), (1,0),
# ├─ bananas/ (0,0), (1,0),
# root/ (next page) (0,1),
# ├─ cherries/ (0,1), (0,0),
# └─ dates/ (0,1), (1,0),
#
# At any given moment only some of the tree is known, and because nodes are explored in parallel,
# at any given moment several requests that correspond to exploring several nodes could be
# in-flight, the tree is not even discovered in a deterministic order. However, the priorities are
# constructed so that the minimum of the ones in the queue corresponds to the best node to explore
# to makes the search as depth-first as possible. As nodes are explored and new priorities are
# added to the queue, this minimum can change and even be smaller than already explored or
# in-flight nodes. So the search is maybe better described as "eventually depth-first".
#
# This priority construction also has the property that going to the next page under a prefix does
# not increase the length of the priority itself - important when chains of pages are not limited
# in their length. The length of the priority does increase with the depth in the tree, but as
# mentioned above this is limited due to the limit on the length of keys. Technically Python
# requires more memory to store larger integers. But to store the index of page 1,000,000,000
# takes the same memory as to store 1 (28 bytes), and so it can in this case be treated as
# constant for all reachable page indexes.
#
# From some light testing deletes at between 2000 to 6000 objects per second, and works best if
# you have objects distributed into folders/CommonPrefixes as specified by the delimiter.
from concurrent.futures import FIRST_EXCEPTION, ThreadPoolExecutor, wait
from dataclasses import dataclass, field
from functools import partial
from typing import Callable, Optional, Tuple
from queue import PriorityQueue
import boto3
def bulk_delete(
bucket, prefix,
workers=8, page_size=1000, delimiter='/',
get_s3_client=lambda: boto3.client('s3'), on_delete=lambda num: None,
):
s3 = get_s3_client()
queue = PriorityQueue()
@dataclass(order=True)
class Work:
# A tuple that determines the priority of the bit of work in "func". This is a sort of
# "coordinate" in the paginated node tree that prioritises a depth-first search.
priority: Tuple[Tuple[int,int], ...]
# The work function itself that fetches a page of Key/CommonPrefixes, or deletes
func: Optional[Callable[[], None]] = field(compare=False)
# A sentinal "stop" Work instance with priority chosen to be before all work. So when it's
# queued the workers will stop at the very next opportunity
stop = Work(((-1,-1),), None)
def do_work():
while (work := queue.get()) is not stop:
work.func()
queue.task_done()
with queue.mutex:
unfinished_tasks = queue.unfinished_tasks
if unfinished_tasks == 0:
for _ in range(0, workers):
queue.put(stop)
def list_iter(prefix):
return iter(s3.get_paginator('list_objects_v2').paginate(
Bucket=bucket, Prefix=prefix,
Delimiter=delimiter, MaxKeys=page_size, PaginationConfig={'PageSize': page_size},
))
def delete_page(page):
s3.delete_objects(Bucket=bucket, Delete={'Objects': page})
on_delete(len(page))
def process_page(page_iter, priority):
try:
page = next(page_iter)
except StopIteration:
return
# Deleting a page is done at the same priority as this function. It will often be done
# straight after this call because this call must have been the highest priority for it to
# have run, but there could be unfinished nodes earlier in the depth-first search that have
# since submitted work, and so would be prioritised over the deletion
if contents := page.get('Contents'):
delete_priority = priority
queue.put(Work(
priority=delete_priority,
func=partial(delete_page, [{'Key': obj['Key']} for obj in contents]),
))
# Processing child prefixes are done after deletion and in order. Importantly anything each
# child prefix itself enqueues should be done before the work of any later child prefixes
# to make it a depth-first search. Appending the index of the child prefix to the priority
# tuple of this function does this, because the work inside each child prefix will only
# ever enqueue work at its priority or greater, but always less than the priority of
# subsequent child prefixes or the next page.
for prefix_index, obj in enumerate(page.get('CommonPrefixes', [])):
prefix_priority = priority + ((prefix_index,0),)
queue.put(Work(
priority=prefix_priority,
func=partial(process_page,
page_iter=list_iter(obj['Prefix']), priority=prefix_priority)
))
# The next page in pagination for this prefix is processed after delete for this page,
# after all the child prefixes are processed, and after anything the child prefixes
# themselves enqueue.
next_page_priority = priority[:-1] + ((priority[-1][0], priority[-1][1] + 1),)
queue.put(Work(
priority=next_page_priority,
func=partial(process_page, page_iter=page_iter, priority=next_page_priority),
))
with ThreadPoolExecutor(max_workers=workers) as worker_pool:
# Bootstrap with the first page
priority = ((0,0),)
queue.put(Work(
priority=priority,
func=partial(process_page, page_iter=list_iter(prefix), priority=priority),
))
# Run workers, waiting for the first exception, if any, raised by them
done, _ = wait(
tuple(worker_pool.submit(do_work) for _ in range(0, workers)),
return_when=FIRST_EXCEPTION,
)
# If an exception raised, stop all the other workers because otherwise exiting the
# ThreadPoolExecutor context will block, and re-raise the exception
if e := next(iter(done)).exception():
for _ in range(0, workers - 1):
queue.put(stop)
raise e from None
@michalc
Copy link
Author

michalc commented Nov 3, 2023

Usage

To delete everything under the my-prefix/ prefix in my-bucket you can use the bucket and prefix parameters:

bulk_delete(
    bucket='my-bucket', prefix='my-prefix/',
)

Or to do the same thing, but integrated with tqdm to give feedback when run from the command line you can use the on_delete parameter:

from tqdm import tqdm

print('Deleting...')
with tqdm(unit=' objects') as pbar:
    bulk_delete(
        bucket='my-bucket', prefix='my-prefix/',
        on_delete=lambda num: pbar.update(num),
    )
print('Done')

Or to customise the boto3 S3 client that's used, you can use the get_s3_client parameter:

import boto3

client = boto3.client('s3')
bulk_delete(
    bucket='my-bucket', prefix='my-prefix/',
    get_s3_client=lambda: client,
)

@tanayagrawa1
Copy link

@michalc will this delete all the objects from versioned bucket ?

@michalc
Copy link
Author

michalc commented Dec 7, 2023

@tanayagrawa1 If you mean all the versions of objects in a versioned bucket, then no.

Suspect it shouldn't be too much work to tweak it to use list_object_versions rather than list_objects_v2 though to do this, although shouldn't forget about the delete markers.

A rough, untested, initial guess as to the changes needed:

  • Change code here from list_objects_v2 to list_object_versions
  • Change code here to
    if contents := page.get('Versions', []) + page.get('DeleteMarkers', []):
       delete_priority = priority
       queue.put(Work(
           priority=delete_priority,
           func=partial(delete_page, [{'Key': obj['Key'], 'VersionId': obj['VersionId']} for obj in contents]),
       ))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment