Skip to content

Instantly share code, notes, and snippets.

@tejastank
Forked from bigsnarfdude/azure_load.py
Created March 31, 2021 06:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tejastank/1693439ab88f169264b09dcdda6c50d3 to your computer and use it in GitHub Desktop.
Save tejastank/1693439ab88f169264b09dcdda6c50d3 to your computer and use it in GitHub Desktop.
azure_load.py
import os
from os import listdir
from os.path import isfile, join
import asyncio
from azure.storage.blob import BlobServiceClient
from azure.storage.blob.aio import BlobClient
from azure.storage.blob.aio import ContainerClient
from datetime import datetime, timedelta
async def upload_single_file_async(connect_str, target_container, folder_path, file_name):
blob = BlobClient.from_connection_string(conn_str=connect_str, container_name=target_container, blob_name=file_name)
with open(join(folder_path, file_name), "rb") as data:
await blob.upload_blob(data)
async def upload_files(connect_str, target_container, folder_path):
list_of_files = []
for f in listdir(folder_path):
if isfile(join(folder_path, f)):
list_of_files.append(f)
for file_name in list_of_files:
result = await upload_single_file_async(connect_str, target_container, folder_path, file_name)
return result
async def get_list_files(connect_str, target_container):
container = ContainerClient.from_connection_string(conn_str=connect_str, container_name=target_container)
blob_list = []
async for blob in container.list_blobs():
blob_list.append(blob)
return blob_list
async def main():
await upload_files(connect_str, target_container, folder_path)
await get_list_files(connect_str, target_container)
connect_str = os.getenv('AZURE_STORAGE_CONNECTION_STRING')
yesterday = datetime.now() - timedelta(days=1)
date_time = yesterday.strftime('-%Y-%m-%d')
target_container = 'serverdumps' + date_time
folder_path = 'D:\\data\\all_server_backups'
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
container_client = blob_service_client.create_container(target_container)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment