Skip to content

Instantly share code, notes, and snippets.

@shevron
Created October 6, 2020 12:01
Show Gist options
  • Save shevron/5df67cdb846955295d1d53fe13816cde to your computer and use it in GitHub Desktop.
Save shevron/5df67cdb846955295d1d53fe13816cde to your computer and use it in GitHub Desktop.
Demo of using Azure Blob Storage API for simple multipart upload flow
"""Test Azure Multipart Uploads
"""
import base64
import logging
import os
from itertools import count
from typing import BinaryIO, Generator, Tuple
import click
from azure.core.exceptions import ResourceNotFoundError
from azure.storage.blob import BlobServiceClient, BlobBlock, BlockState
_log = logging.getLogger(__name__)
def encode_id(id: int, id_length=16) -> str:
return base64.b64encode(id.to_bytes(id_length, 'big')).decode('ascii')
def get_file_size(open_file: BinaryIO) -> int:
"""Get the size of an open file
"""
pos = open_file.tell()
open_file.seek(0, os.SEEK_END)
size = open_file.tell()
open_file.seek(pos)
return size
def get_file_blocks(file_size: int, block_size: int) -> Generator[Tuple[int, int, int], None, None]:
"""Generate a list of blocks for an input file based on size
For each block, this will yield a tuple of enumerated block ID, the start position in the file and the block size
"""
current_pos = 0
for block_id in count():
remaining = file_size - current_pos
if remaining < block_size:
current_block_size = remaining
else:
current_block_size = block_size
yield block_id, current_pos, current_block_size
current_pos += block_size
if current_pos >= file_size:
break
def upload_to_azure(open_file: BinaryIO, container_name: str, file_name: str, block_size: int = 1024*1000,
noop: bool = False):
blob_svc = BlobServiceClient.from_connection_string(os.getenv('AZURE_CONNECTION_STRING'))
blob = blob_svc.get_blob_client(container=container_name, blob=file_name)
_log.info(f"Uploading {file_name} to {container_name} ...")
file_size = get_file_size(open_file)
# Check if file already exists
try:
size = blob.get_blob_properties().size
if file_size == size:
_log.info("File has already been uploaded to storage")
return
else:
_log.warning(f"File exists but has wrong size ({size} instead of {file_size} bytes, re-uploading all parts")
except ResourceNotFoundError:
_log.info("File does not exist or has not been committed, checking for uploaded blocks")
try:
committed_blocks, uncommitted_blocks = blob.get_block_list(block_list_type='all')
except ResourceNotFoundError:
_log.debug("Blob was not initialized and no blocks have been uploaded")
committed_blocks, uncommitted_blocks = None, []
if committed_blocks:
raise RuntimeError("File exists and has some committed blocks; This is not expected")
# Let's calculate what blocks we *should* have
_log.debug("Current uncommitted uploaded blocks: %s", uncommitted_blocks)
skip_blocks = {b['id']: b['size'] for b in uncommitted_blocks}
_log.debug("Blocks to be skipped: %s", list(skip_blocks.keys()))
if noop:
_log.info("NOOP flag was passed, aborting here")
return
commit_blocks = []
for b_id, b_start, b_size in get_file_blocks(get_file_size(open_file), block_size):
b_id = encode_id(b_id)
if b_id in skip_blocks:
if skip_blocks[b_id] == b_size:
_log.info("Block %s was already uploaded, skipping", b_id)
commit_blocks.append(BlobBlock(b_id, BlockState.Uncommitted))
continue
else:
_log.warning("Block %s has wrong size, will re-upload", b_id)
_log.info("Uploading block %s of %d bytes", b_id, b_size)
open_file.seek(b_start)
block_data = open_file.read(b_size)
blob.stage_block(b_id, block_data, length=b_size)
commit_blocks.append(BlobBlock(b_id, BlockState.Uncommitted))
blob.commit_block_list(commit_blocks)
@click.command('upload_to_azure')
@click.argument('source', type=click.Path(readable=True, allow_dash=True, dir_okay=False), required=True)
@click.argument('target', type=str, required=True)
@click.option('--block-size', '-s', type=int, default=1024*1000)
@click.option('--dont-upload', '-N', is_flag=True, help='Do not perform any uploads, just checks')
def main(source, target, block_size, dont_upload):
# Set up logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)-15s %(name)-15s %(levelname)s %(message)s')
logging.getLogger('azure.core').setLevel(logging.WARNING)
try:
container_name, file_name = target.split('/', maxsplit=1)
except ValueError:
raise click.UsageError('target must be of the form <container_name>/<blob_name>')
with open(source, 'rb') as f:
return upload_to_azure(f, container_name, file_name, block_size, noop=dont_upload)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment