Created
July 11, 2024 12:24
-
-
Save lubosmj/1d736226c1816fb019430e7fb78cdd55 to your computer and use it in GitHub Desktop.
Experiment with parallel chunked uploading (pulp-glue)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/pulp-glue/pulp_glue/core/context.py b/pulp-glue/pulp_glue/core/context.py | |
index 38c4bbf..979de80 100644 | |
--- a/pulp-glue/pulp_glue/core/context.py | |
+++ b/pulp-glue/pulp_glue/core/context.py | |
@@ -1,3 +1,5 @@ | |
+from multiprocess import Pool | |
+ | |
import datetime | |
import hashlib | |
import os | |
@@ -494,6 +496,12 @@ class PulpUploadContext(PulpEntityContext): | |
def upload_file(self, file: t.IO[bytes], chunk_size: int = 1000000) -> t.Any: | |
"""Upload a file and return the uncommitted upload_href.""" | |
+ sha256_hasher = hashlib.sha256() | |
+ for chunk in iter(lambda: file.read(chunk_size), b""): | |
+ sha256_hasher.update(chunk) | |
+ sha256_digest = sha256_hasher.hexdigest() | |
+ file.seek(0) | |
+ | |
start = 0 | |
size = os.path.getsize(file.name) | |
upload_href = self.create(body={"size": size})["pulp_href"] | |
@@ -512,8 +520,71 @@ class PulpUploadContext(PulpEntityContext): | |
self.delete(upload_href) | |
raise e | |
self.pulp_ctx.echo(_("Upload complete."), err=True) | |
+ | |
+ # self.pulp_ctx.echo(_("Creating artifact."), err=True) | |
+ # try: | |
+ # task = self.commit(sha256_digest) | |
+ # except Exception as e: | |
+ # self.delete() | |
+ # raise e | |
return upload_href | |
+ def upload_file_in_parallel(self, file: t.IO[bytes], chunk_size: int = 1000000) -> t.Any: | |
+ """Upload a file and return the uncommitted upload_href.""" | |
+ sha256_hasher = hashlib.sha256() | |
+ for chunk in iter(lambda: file.read(chunk_size), b""): | |
+ sha256_hasher.update(chunk) | |
+ sha256_digest = sha256_hasher.hexdigest() | |
+ file.seek(0) | |
+ | |
+ size = os.path.getsize(file.name) | |
+ piece_size = size // 4 | |
+ last_piece_size = size - piece_size * 4 | |
+ upload_href = self.create(body={"size": size})["pulp_href"] | |
+ | |
+ args = [ | |
+ [self, upload_href, size, chunk_size, file, 0, piece_size], | |
+ [self, upload_href, size, chunk_size, file, piece_size, 2*piece_size], | |
+ [self, upload_href, size, chunk_size, file, 2*piece_size, 3*piece_size], | |
+ [self, upload_href, size, chunk_size, file, 3*piece_size, 4*piece_size], | |
+ ] | |
+ if last_piece_size: | |
+ args.append([self, upload_href, size, chunk_size, file, 4 * piece_size + 1, | |
+ 4 * piece_size + last_piece_size]) | |
+ pool = Pool(processes=len(args)) | |
+ pool.map(up, args) | |
+ | |
+ #self.pulp_ctx.echo(_("Creating artifact."), err=True) | |
+ #try: | |
+ # task = self.commit(sha256_digest) | |
+ #except Exception as e: | |
+ # self.delete() | |
+ # raise e | |
+ return upload_href | |
+ | |
+ | |
+def up(args): | |
+ self, upload_href, size, chunk_size, file, start, p_size = args | |
+ try: | |
+ file.seek(start) | |
+ self.pulp_href = upload_href | |
+ while start < p_size: | |
+ if (start + chunk_size) > p_size: | |
+ chunk = file.read(p_size - start) | |
+ else: | |
+ chunk = file.read(chunk_size) | |
+ self.upload_chunk( | |
+ chunk=chunk, | |
+ size=size, | |
+ start=start, | |
+ ) | |
+ start += chunk_size | |
+ self.pulp_ctx.echo(".", nl=False, err=True) | |
+ except Exception as e: | |
+ self.delete(upload_href) | |
+ raise e | |
+ self.pulp_ctx.echo(_("Upload complete."), err=True) | |
+ | |
class PulpUserContext(PulpEntityContext): | |
ENTITY = _("user") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment