Skip to content

Instantly share code, notes, and snippets.

@lubosmj
Created July 11, 2024 12:24
Show Gist options
  • Save lubosmj/1d736226c1816fb019430e7fb78cdd55 to your computer and use it in GitHub Desktop.
Save lubosmj/1d736226c1816fb019430e7fb78cdd55 to your computer and use it in GitHub Desktop.
Experiment with parallel chunked uploading (pulp-glue)
diff --git a/pulp-glue/pulp_glue/core/context.py b/pulp-glue/pulp_glue/core/context.py
index 38c4bbf..979de80 100644
--- a/pulp-glue/pulp_glue/core/context.py
+++ b/pulp-glue/pulp_glue/core/context.py
@@ -1,3 +1,5 @@
+from multiprocess import Pool
+
import datetime
import hashlib
import os
@@ -494,6 +496,12 @@ class PulpUploadContext(PulpEntityContext):
def upload_file(self, file: t.IO[bytes], chunk_size: int = 1000000) -> t.Any:
"""Upload a file and return the uncommitted upload_href."""
+ sha256_hasher = hashlib.sha256()
+ for chunk in iter(lambda: file.read(chunk_size), b""):
+ sha256_hasher.update(chunk)
+ sha256_digest = sha256_hasher.hexdigest()
+ file.seek(0)
+
start = 0
size = os.path.getsize(file.name)
upload_href = self.create(body={"size": size})["pulp_href"]
@@ -512,8 +520,71 @@ class PulpUploadContext(PulpEntityContext):
self.delete(upload_href)
raise e
self.pulp_ctx.echo(_("Upload complete."), err=True)
+
+ # self.pulp_ctx.echo(_("Creating artifact."), err=True)
+ # try:
+ # task = self.commit(sha256_digest)
+ # except Exception as e:
+ # self.delete()
+ # raise e
return upload_href
+ def upload_file_in_parallel(self, file: t.IO[bytes], chunk_size: int = 1000000) -> t.Any:
+ """Upload a file and return the uncommitted upload_href."""
+ sha256_hasher = hashlib.sha256()
+ for chunk in iter(lambda: file.read(chunk_size), b""):
+ sha256_hasher.update(chunk)
+ sha256_digest = sha256_hasher.hexdigest()
+ file.seek(0)
+
+ size = os.path.getsize(file.name)
+ piece_size = size // 4
+ last_piece_size = size - piece_size * 4
+ upload_href = self.create(body={"size": size})["pulp_href"]
+
+ args = [
+ [self, upload_href, size, chunk_size, file, 0, piece_size],
+ [self, upload_href, size, chunk_size, file, piece_size, 2*piece_size],
+ [self, upload_href, size, chunk_size, file, 2*piece_size, 3*piece_size],
+ [self, upload_href, size, chunk_size, file, 3*piece_size, 4*piece_size],
+ ]
+ if last_piece_size:
+ args.append([self, upload_href, size, chunk_size, file, 4 * piece_size + 1,
+ 4 * piece_size + last_piece_size])
+ pool = Pool(processes=len(args))
+ pool.map(up, args)
+
+ #self.pulp_ctx.echo(_("Creating artifact."), err=True)
+ #try:
+ # task = self.commit(sha256_digest)
+ #except Exception as e:
+ # self.delete()
+ # raise e
+ return upload_href
+
+
+def up(args):
+ self, upload_href, size, chunk_size, file, start, p_size = args
+ try:
+ file.seek(start)
+ self.pulp_href = upload_href
+ while start < p_size:
+ if (start + chunk_size) > p_size:
+ chunk = file.read(p_size - start)
+ else:
+ chunk = file.read(chunk_size)
+ self.upload_chunk(
+ chunk=chunk,
+ size=size,
+ start=start,
+ )
+ start += chunk_size
+ self.pulp_ctx.echo(".", nl=False, err=True)
+ except Exception as e:
+ self.delete(upload_href)
+ raise e
+ self.pulp_ctx.echo(_("Upload complete."), err=True)
+
class PulpUserContext(PulpEntityContext):
ENTITY = _("user")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment