Last active
March 7, 2023 16:07
-
-
Save sathya-oracle/e7c78cffa5358993d11a0248c5364072 to your computer and use it in GitHub Desktop.
bulk-copy-bucket.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
########################################################################## | |
# Function for Bulk Copy of Objects from One Bucket to Another | |
# The limitation is the Function can only run 5 mins. | |
# Use ignore_exist=true parameter to re-run the function to skip already existing objects | |
########################################################################## | |
from fdk import response | |
import pickle | |
import threading | |
import time | |
import queue | |
import oci | |
import argparse | |
import datetime | |
import sys | |
import os | |
import io | |
import json | |
import logging | |
import logging.handlers | |
import logging.config | |
########################################################################## | |
# Create signer for Authentication | |
# Input - instance_principals | |
# Output - config and signer objects | |
########################################################################## | |
def create_signer(): | |
# if instance principals authentications | |
try: | |
signer = oci.auth.signers.get_resource_principals_signer() | |
config = {'region': signer.region, 'tenancy': signer.tenancy_id} | |
return config, signer | |
except Exception: | |
logger.info("Error obtaining instance principals certificate, aborting") | |
raise SystemExit | |
############################################################################## | |
# get time | |
############################################################################## | |
def get_time(full=False): | |
if full: | |
return str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) | |
else: | |
return str(datetime.datetime.now().strftime("%H:%M:%S")) | |
############################################################################## | |
# copy_request_worker | |
############################################################################## | |
def copy_request_worker(source_namespace,source_bucket,destination_namespace,destination_region,destination_bucket,object_storage_client): | |
while True: | |
object_ = known_q.get() | |
state = get_state_for_object(object_) | |
response = None | |
interval_exp = base_retry_timeout | |
while True: | |
try: | |
response = copy_object(source_namespace, source_bucket, object_, destination_namespace, destination_region, destination_bucket, object_,object_storage_client) | |
break | |
except Exception: | |
if interval_exp > max_retry_timeout: | |
raise | |
logger.info(" Received %s from API for object %s, will wait %s seconds before retrying." % (response.status, object_, interval_exp)) | |
time.sleep(interval_exp) | |
interval_exp **= 2 | |
continue | |
state['work-request-id'] = response.headers.get('opc-work-request-id') | |
state['status'] = 'REQUESTED' | |
set_state_for_object(object_, state, persist=False) | |
known_q.task_done() | |
############################################################################## | |
# work_request_status_worker | |
############################################################################## | |
def work_request_status_worker(object_storage_client): | |
while True: | |
object_ = update_q.get() | |
state = get_state_for_object(object_) | |
interval_exp = base_retry_timeout | |
while True: | |
try: | |
response = object_storage_client.get_work_request(state['work-request-id']) | |
state['status'] = response.data.status | |
break | |
except Exception: | |
if interval_exp > max_retry_timeout: | |
raise | |
logger.info(" Received %s from API for work request %s, will wait %s seconds before retrying." % (response.status, state['work-request-id'], interval_exp)) | |
time.sleep(interval_exp) | |
interval_exp **= 2 | |
continue | |
set_state_for_object(object_, state, persist=False) | |
update_q.task_done() | |
############################################################################## | |
# add_objects_to_queue | |
############################################################################## | |
def load_dest_bucket_to_mem(object_storage_client_dest, destination_namespace, destination_bucket): | |
loaded_page = 0 | |
next_starts_with = None | |
while True: | |
response = object_storage_client_dest.list_objects(destination_namespace, destination_bucket, start=next_starts_with, prefix=source_prefix, fields="md5", retry_strategy=oci.retry.DEFAULT_RETRY_STRATEGY) | |
next_starts_with = response.data.next_start_with | |
if loaded_page % 100 == 0 and loaded_page > 0: | |
logger.info(get_time() + " - Loaded " + str(len(dest_bucket_memory)) + " files...") | |
for osb in response.data.objects: | |
dest_bucket_memory[str(osb.name)] = str(osb.md5) | |
if not next_starts_with: | |
break | |
loaded_page += 1 | |
logger.info(get_time() + " - Loaded " + str(len(dest_bucket_memory)) + " files.") | |
############################################################################## | |
# add_objects_to_queue | |
############################################################################## | |
def add_objects_to_queue(ns, bucket,object_storage_client): | |
count = 0 | |
skipped = 0 | |
next_starts_with = None | |
while True: | |
response = object_storage_client.list_objects(ns, bucket, start=next_starts_with, prefix=source_prefix, retry_strategy=oci.retry.DEFAULT_RETRY_STRATEGY) | |
next_starts_with = response.data.next_start_with | |
for object_ in response.data.objects: | |
if source_prefix and not object_.name.startswith(source_prefix): | |
continue | |
if source_prefix_exclude and object_.name.startswith(source_prefix_exclude): | |
continue | |
# skip if exist in the dest bucket (Option is to use MD5 for comparison) | |
if str(object_.name) in dest_bucket_memory: | |
skipped += 1 | |
if skipped % 100000 == 0: | |
logger.info(get_time() + " - Skipped " + str(skipped) + " exist files...") | |
continue | |
set_state_for_object(object_.name, {'status': 'KNOWN'}, persist=False) | |
known_q.put(object_.name) | |
count += 1 | |
if count % 100000 == 0: | |
logger.info(get_time() + " - Added " + str(count) + " files to queue...") | |
if not next_starts_with: | |
break | |
# if skipped files, print | |
if skipped > 0: | |
logger.info(get_time() + " - Skipped " + str(skipped) + " exist files...") | |
save_all_state() | |
return count | |
############################################################################## | |
# set_state_for_object | |
############################################################################## | |
def set_state_for_object(object_, state, persist=True): | |
worker_lock.acquire() | |
worker_data[object_] = state | |
if persist: | |
with open(state_file, 'wb') as sf: | |
pickle.dump(worker_data, sf, protocol=pickle.HIGHEST_PROTOCOL) | |
worker_lock.release() | |
return worker_data[object_] | |
############################################################################## | |
# save_all_state | |
############################################################################## | |
def save_all_state(): | |
worker_lock.acquire() | |
with open(state_file, 'wb') as sf: | |
pickle.dump(worker_data, sf, protocol=pickle.HIGHEST_PROTOCOL) | |
worker_lock.release() | |
############################################################################## | |
# get_state_for_object | |
############################################################################## | |
def get_state_for_object(object_): | |
return worker_data[object_] | |
############################################################################## | |
# get_work_request_count_by_status | |
############################################################################## | |
def get_work_request_count_by_status(status): | |
return len([x for x in worker_data.keys() if worker_data[x].get('status') == status]) | |
############################################################################## | |
# copy_object | |
############################################################################## | |
def copy_object(src_ns, src_b, src_o, dst_ns, dst_r, dst_b, dst_o,object_storage_client): | |
copy_request = oci.object_storage.models.copy_object_details.CopyObjectDetails() | |
copy_request.source_object_name = src_o | |
copy_request.destination_namespace = dst_ns | |
copy_request.destination_region = dst_r | |
copy_request.destination_bucket = dst_b | |
copy_request.destination_object_name = dst_o | |
return object_storage_client.copy_object(src_ns, src_b, copy_request) | |
############################################################################## | |
# update_all_work_requests_status | |
############################################################################## | |
def update_all_work_requests_status(ns, bucket): | |
for object_ in worker_data.keys(): | |
state = get_state_for_object(object_) | |
if state['status'] not in ('KNOWN', 'COMPLETED', 'FAILED', 'CANCELED'): | |
update_q.put(object_) | |
update_q.join() | |
save_all_state() | |
############################################################################## | |
# connect to object storage | |
############################################################################## | |
def connect_to_object_storage(): | |
logger.info("Connecting to Object Storage") | |
# get signer | |
config, signer = create_signer() | |
try: | |
# connect to source region | |
logger.info("\nConnecting to Object Storage Service for source region - " + source_region) | |
object_storage_client = oci.object_storage.ObjectStorageClient(config, signer=signer) | |
# retrieve namespace from object storage | |
except Exception as e: | |
logger.info("\nError connecting to object storage at source region - " + str(e)) | |
raise SystemExit | |
try: | |
# connect to destination object storage | |
logger.info("\nConnecting to Object Storage Service for destination region - " + destination_region) | |
config_destination = config | |
config_destination['region'] = destination_region | |
object_storage_client_dest = oci.object_storage.ObjectStorageClient(config_destination, signer=signer) | |
# retrieve namespace from object storage | |
except Exception as e: | |
logger.info("\nError connecting to object storage at destination region - " + str(e)) | |
raise SystemExit | |
return object_storage_client,object_storage_client_dest | |
############################################################################## | |
# main | |
############################################################################## | |
def main(source_bucket,source_region,destination_region,source_namespace,destination_namespace,destination_bucket,ignore_exist,source_prefix,source_prefix_exclude,state_file): | |
# connect to object storage | |
object_storage_client,object_storage_client_dest =connect_to_object_storage() | |
logger.info("Start Processing") | |
logger.info(get_time() + " - Creating %s copy request workers." % (request_worker_count)) | |
for i in range(request_worker_count): | |
worker = threading.Thread(target=copy_request_worker,args=(source_namespace,source_bucket,destination_namespace,destination_region,destination_bucket,object_storage_client,)) | |
worker.daemon = True | |
worker.start() | |
logger.info(get_time() + " - Creating %s status workers." % (status_worker_count)) | |
for i in range(status_worker_count): | |
worker = threading.Thread(target=work_request_status_worker,args=(object_storage_client,)) | |
worker.daemon = True | |
worker.start() | |
if (ignore_exist.lower() == "true"): | |
print(get_time() + " - Loading list of objects from destination bucket (%s) to ignore exiting files." % (destination_bucket)) | |
load_dest_bucket_to_mem(object_storage_client_dest, destination_namespace, destination_bucket) | |
logger.info(get_time() + " - Getting list of objects from source bucket (%s). Copies will start immediately." % (source_bucket)) | |
count = add_objects_to_queue(source_namespace, source_bucket,object_storage_client) | |
logger.info(get_time() + " - Enqueued %s objects to be copied" % (count)) | |
if count > 0: | |
logger.info("Finish queuing files, start checking") | |
while count > 0: | |
logger.info(get_time() + " - Waiting %s seconds before checking status." % (status_interval)) | |
time.sleep(status_interval) | |
if get_work_request_count_by_status('KNOWN') > 0 or get_work_request_count_by_status('REQUESTED') > 0: | |
logger.info(get_time() + " - Determining copy status") | |
update_all_work_requests_status(source_namespace, source_bucket) | |
worker_lock.acquire() | |
logger.info(get_time() + " - KNOWN: %s, REQUESTED: %s, COMPLETED: %s, FAILED: %s, CANCELED: %s" | |
% ( | |
get_work_request_count_by_status('KNOWN'), | |
get_work_request_count_by_status('REQUESTED'), | |
get_work_request_count_by_status('COMPLETED'), | |
get_work_request_count_by_status('FAILED'), | |
get_work_request_count_by_status('CANCELED')) | |
) | |
if get_work_request_count_by_status('KNOWN') == 0 and get_work_request_count_by_status('REQUESTED') == 0: | |
worker_lock.release() | |
break | |
else: | |
worker_lock.release() | |
known_q.join() | |
logger.info("Copy Completed at " + get_time()) | |
known_object_count = get_work_request_count_by_status('KNOWN') | |
requested_object_count = get_work_request_count_by_status('REQUESTED') | |
failed_object_count = get_work_request_count_by_status('FAILED') | |
canceled_object_count = get_work_request_count_by_status('CANCELED') | |
completed_object_count = get_work_request_count_by_status('COMPLETED') | |
return failed_object_count,canceled_object_count,completed_object_count,known_object_count,requested_object_count | |
############################################################################## | |
# Execute | |
############################################################################## | |
def handler(ctx, data: io.BytesIO = None): | |
func_name = ctx.FnName() | |
#Global Variables Used | |
global logger,request_worker_count,status_worker_count,status_interval | |
global worker_data,worker_lock,dest_bucket_memory,known_q,update_q,object_storage_client,object_storage_client_dest,base_retry_timeout,max_retry_timeout | |
global source_bucket,source_region,destination_region,source_namespace,destination_namespace,destination_bucket,source_prefix,source_prefix_exclude,state_file,ignore_exist | |
global config,signer | |
logger = logging.getLogger(func_name) | |
logger.setLevel(logging.INFO) | |
logger.info("Function started.") | |
base_retry_timeout = 2 | |
max_retry_timeout = 16**2 | |
worker_data = {} | |
worker_lock = threading.Lock() | |
dest_bucket_memory = {} | |
known_q = queue.Queue() | |
update_q = queue.Queue() | |
object_storage_client = None | |
object_storage_client_dest = None | |
try: | |
cfg = ctx.Config() | |
request_worker_count = int(cfg["REQUEST_WORKER_COUNT"]) | |
status_worker_count = int(cfg["STATUS_WORKER_COUNT"]) | |
status_interval = int(cfg["STATUS_INTERVAL"]) | |
except Exception: | |
logger.error("Missing function parameters.") | |
raise | |
try: | |
body = json.loads(data.getvalue()) | |
source_bucket = body.get("source_bucket") | |
source_region = body.get("source_region") | |
destination_region = body.get("destination_region") | |
source_namespace = body.get("source_namespace") | |
destination_namespace = body.get("destination_namespace") | |
destination_bucket = body.get("destination_bucket") | |
ignore_exist = body.get("ignore_exist") | |
source_prefix = "" | |
source_prefix_exclude = "" | |
state_file = "/tmp/"+ source_bucket + "." + destination_bucket + ".wrk" | |
failed_object_count,canceled_object_count,completed_object_count,known_object_count,requested_object_count = main(source_bucket,source_region,destination_region,source_namespace,destination_namespace,destination_bucket,ignore_exist,source_prefix,source_prefix_exclude,state_file) | |
return_json = { | |
"failed_object_count" : failed_object_count, | |
"canceled_object_count" : canceled_object_count, | |
"completed_object_count" : completed_object_count, | |
"known_object_count" : known_object_count, | |
"requested_object_count" : requested_object_count | |
} | |
except (Exception) as ex: | |
logger.error("Function error: " + str(ex)) | |
raise | |
logger.info("Function complete.") | |
return response.Response( | |
ctx, | |
response_data=json.dumps(return_json), | |
headers={"Content-Type": "application/json"} | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment