Skip to content

Instantly share code, notes, and snippets.

@sathya-oracle
Last active March 7, 2023 16:07
Show Gist options
  • Save sathya-oracle/e7c78cffa5358993d11a0248c5364072 to your computer and use it in GitHub Desktop.
Save sathya-oracle/e7c78cffa5358993d11a0248c5364072 to your computer and use it in GitHub Desktop.
bulk-copy-bucket.py
##########################################################################
# Function for Bulk Copy of Objects from One Bucket to Another
# The limitation is the Function can only run 5 mins.
# Use ignore_exist=true parameter to re-run the function to skip already existing objects
##########################################################################
from fdk import response
import pickle
import threading
import time
import queue
import oci
import argparse
import datetime
import sys
import os
import io
import json
import logging
import logging.handlers
import logging.config
##########################################################################
# Create signer for Authentication
# Input - instance_principals
# Output - config and signer objects
##########################################################################
def create_signer():
# if instance principals authentications
try:
signer = oci.auth.signers.get_resource_principals_signer()
config = {'region': signer.region, 'tenancy': signer.tenancy_id}
return config, signer
except Exception:
logger.info("Error obtaining instance principals certificate, aborting")
raise SystemExit
##############################################################################
# get time
##############################################################################
def get_time(full=False):
if full:
return str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
else:
return str(datetime.datetime.now().strftime("%H:%M:%S"))
##############################################################################
# copy_request_worker
##############################################################################
def copy_request_worker(source_namespace,source_bucket,destination_namespace,destination_region,destination_bucket,object_storage_client):
while True:
object_ = known_q.get()
state = get_state_for_object(object_)
response = None
interval_exp = base_retry_timeout
while True:
try:
response = copy_object(source_namespace, source_bucket, object_, destination_namespace, destination_region, destination_bucket, object_,object_storage_client)
break
except Exception:
if interval_exp > max_retry_timeout:
raise
logger.info(" Received %s from API for object %s, will wait %s seconds before retrying." % (response.status, object_, interval_exp))
time.sleep(interval_exp)
interval_exp **= 2
continue
state['work-request-id'] = response.headers.get('opc-work-request-id')
state['status'] = 'REQUESTED'
set_state_for_object(object_, state, persist=False)
known_q.task_done()
##############################################################################
# work_request_status_worker
##############################################################################
def work_request_status_worker(object_storage_client):
while True:
object_ = update_q.get()
state = get_state_for_object(object_)
interval_exp = base_retry_timeout
while True:
try:
response = object_storage_client.get_work_request(state['work-request-id'])
state['status'] = response.data.status
break
except Exception:
if interval_exp > max_retry_timeout:
raise
logger.info(" Received %s from API for work request %s, will wait %s seconds before retrying." % (response.status, state['work-request-id'], interval_exp))
time.sleep(interval_exp)
interval_exp **= 2
continue
set_state_for_object(object_, state, persist=False)
update_q.task_done()
##############################################################################
# add_objects_to_queue
##############################################################################
def load_dest_bucket_to_mem(object_storage_client_dest, destination_namespace, destination_bucket):
loaded_page = 0
next_starts_with = None
while True:
response = object_storage_client_dest.list_objects(destination_namespace, destination_bucket, start=next_starts_with, prefix=source_prefix, fields="md5", retry_strategy=oci.retry.DEFAULT_RETRY_STRATEGY)
next_starts_with = response.data.next_start_with
if loaded_page % 100 == 0 and loaded_page > 0:
logger.info(get_time() + " - Loaded " + str(len(dest_bucket_memory)) + " files...")
for osb in response.data.objects:
dest_bucket_memory[str(osb.name)] = str(osb.md5)
if not next_starts_with:
break
loaded_page += 1
logger.info(get_time() + " - Loaded " + str(len(dest_bucket_memory)) + " files.")
##############################################################################
# add_objects_to_queue
##############################################################################
def add_objects_to_queue(ns, bucket,object_storage_client):
count = 0
skipped = 0
next_starts_with = None
while True:
response = object_storage_client.list_objects(ns, bucket, start=next_starts_with, prefix=source_prefix, retry_strategy=oci.retry.DEFAULT_RETRY_STRATEGY)
next_starts_with = response.data.next_start_with
for object_ in response.data.objects:
if source_prefix and not object_.name.startswith(source_prefix):
continue
if source_prefix_exclude and object_.name.startswith(source_prefix_exclude):
continue
# skip if exist in the dest bucket (Option is to use MD5 for comparison)
if str(object_.name) in dest_bucket_memory:
skipped += 1
if skipped % 100000 == 0:
logger.info(get_time() + " - Skipped " + str(skipped) + " exist files...")
continue
set_state_for_object(object_.name, {'status': 'KNOWN'}, persist=False)
known_q.put(object_.name)
count += 1
if count % 100000 == 0:
logger.info(get_time() + " - Added " + str(count) + " files to queue...")
if not next_starts_with:
break
# if skipped files, print
if skipped > 0:
logger.info(get_time() + " - Skipped " + str(skipped) + " exist files...")
save_all_state()
return count
##############################################################################
# set_state_for_object
##############################################################################
def set_state_for_object(object_, state, persist=True):
worker_lock.acquire()
worker_data[object_] = state
if persist:
with open(state_file, 'wb') as sf:
pickle.dump(worker_data, sf, protocol=pickle.HIGHEST_PROTOCOL)
worker_lock.release()
return worker_data[object_]
##############################################################################
# save_all_state
##############################################################################
def save_all_state():
worker_lock.acquire()
with open(state_file, 'wb') as sf:
pickle.dump(worker_data, sf, protocol=pickle.HIGHEST_PROTOCOL)
worker_lock.release()
##############################################################################
# get_state_for_object
##############################################################################
def get_state_for_object(object_):
return worker_data[object_]
##############################################################################
# get_work_request_count_by_status
##############################################################################
def get_work_request_count_by_status(status):
return len([x for x in worker_data.keys() if worker_data[x].get('status') == status])
##############################################################################
# copy_object
##############################################################################
def copy_object(src_ns, src_b, src_o, dst_ns, dst_r, dst_b, dst_o,object_storage_client):
copy_request = oci.object_storage.models.copy_object_details.CopyObjectDetails()
copy_request.source_object_name = src_o
copy_request.destination_namespace = dst_ns
copy_request.destination_region = dst_r
copy_request.destination_bucket = dst_b
copy_request.destination_object_name = dst_o
return object_storage_client.copy_object(src_ns, src_b, copy_request)
##############################################################################
# update_all_work_requests_status
##############################################################################
def update_all_work_requests_status(ns, bucket):
for object_ in worker_data.keys():
state = get_state_for_object(object_)
if state['status'] not in ('KNOWN', 'COMPLETED', 'FAILED', 'CANCELED'):
update_q.put(object_)
update_q.join()
save_all_state()
##############################################################################
# connect to object storage
##############################################################################
def connect_to_object_storage():
logger.info("Connecting to Object Storage")
# get signer
config, signer = create_signer()
try:
# connect to source region
logger.info("\nConnecting to Object Storage Service for source region - " + source_region)
object_storage_client = oci.object_storage.ObjectStorageClient(config, signer=signer)
# retrieve namespace from object storage
except Exception as e:
logger.info("\nError connecting to object storage at source region - " + str(e))
raise SystemExit
try:
# connect to destination object storage
logger.info("\nConnecting to Object Storage Service for destination region - " + destination_region)
config_destination = config
config_destination['region'] = destination_region
object_storage_client_dest = oci.object_storage.ObjectStorageClient(config_destination, signer=signer)
# retrieve namespace from object storage
except Exception as e:
logger.info("\nError connecting to object storage at destination region - " + str(e))
raise SystemExit
return object_storage_client,object_storage_client_dest
##############################################################################
# main
##############################################################################
def main(source_bucket,source_region,destination_region,source_namespace,destination_namespace,destination_bucket,ignore_exist,source_prefix,source_prefix_exclude,state_file):
# connect to object storage
object_storage_client,object_storage_client_dest =connect_to_object_storage()
logger.info("Start Processing")
logger.info(get_time() + " - Creating %s copy request workers." % (request_worker_count))
for i in range(request_worker_count):
worker = threading.Thread(target=copy_request_worker,args=(source_namespace,source_bucket,destination_namespace,destination_region,destination_bucket,object_storage_client,))
worker.daemon = True
worker.start()
logger.info(get_time() + " - Creating %s status workers." % (status_worker_count))
for i in range(status_worker_count):
worker = threading.Thread(target=work_request_status_worker,args=(object_storage_client,))
worker.daemon = True
worker.start()
if (ignore_exist.lower() == "true"):
print(get_time() + " - Loading list of objects from destination bucket (%s) to ignore exiting files." % (destination_bucket))
load_dest_bucket_to_mem(object_storage_client_dest, destination_namespace, destination_bucket)
logger.info(get_time() + " - Getting list of objects from source bucket (%s). Copies will start immediately." % (source_bucket))
count = add_objects_to_queue(source_namespace, source_bucket,object_storage_client)
logger.info(get_time() + " - Enqueued %s objects to be copied" % (count))
if count > 0:
logger.info("Finish queuing files, start checking")
while count > 0:
logger.info(get_time() + " - Waiting %s seconds before checking status." % (status_interval))
time.sleep(status_interval)
if get_work_request_count_by_status('KNOWN') > 0 or get_work_request_count_by_status('REQUESTED') > 0:
logger.info(get_time() + " - Determining copy status")
update_all_work_requests_status(source_namespace, source_bucket)
worker_lock.acquire()
logger.info(get_time() + " - KNOWN: %s, REQUESTED: %s, COMPLETED: %s, FAILED: %s, CANCELED: %s"
% (
get_work_request_count_by_status('KNOWN'),
get_work_request_count_by_status('REQUESTED'),
get_work_request_count_by_status('COMPLETED'),
get_work_request_count_by_status('FAILED'),
get_work_request_count_by_status('CANCELED'))
)
if get_work_request_count_by_status('KNOWN') == 0 and get_work_request_count_by_status('REQUESTED') == 0:
worker_lock.release()
break
else:
worker_lock.release()
known_q.join()
logger.info("Copy Completed at " + get_time())
known_object_count = get_work_request_count_by_status('KNOWN')
requested_object_count = get_work_request_count_by_status('REQUESTED')
failed_object_count = get_work_request_count_by_status('FAILED')
canceled_object_count = get_work_request_count_by_status('CANCELED')
completed_object_count = get_work_request_count_by_status('COMPLETED')
return failed_object_count,canceled_object_count,completed_object_count,known_object_count,requested_object_count
##############################################################################
# Execute
##############################################################################
def handler(ctx, data: io.BytesIO = None):
func_name = ctx.FnName()
#Global Variables Used
global logger,request_worker_count,status_worker_count,status_interval
global worker_data,worker_lock,dest_bucket_memory,known_q,update_q,object_storage_client,object_storage_client_dest,base_retry_timeout,max_retry_timeout
global source_bucket,source_region,destination_region,source_namespace,destination_namespace,destination_bucket,source_prefix,source_prefix_exclude,state_file,ignore_exist
global config,signer
logger = logging.getLogger(func_name)
logger.setLevel(logging.INFO)
logger.info("Function started.")
base_retry_timeout = 2
max_retry_timeout = 16**2
worker_data = {}
worker_lock = threading.Lock()
dest_bucket_memory = {}
known_q = queue.Queue()
update_q = queue.Queue()
object_storage_client = None
object_storage_client_dest = None
try:
cfg = ctx.Config()
request_worker_count = int(cfg["REQUEST_WORKER_COUNT"])
status_worker_count = int(cfg["STATUS_WORKER_COUNT"])
status_interval = int(cfg["STATUS_INTERVAL"])
except Exception:
logger.error("Missing function parameters.")
raise
try:
body = json.loads(data.getvalue())
source_bucket = body.get("source_bucket")
source_region = body.get("source_region")
destination_region = body.get("destination_region")
source_namespace = body.get("source_namespace")
destination_namespace = body.get("destination_namespace")
destination_bucket = body.get("destination_bucket")
ignore_exist = body.get("ignore_exist")
source_prefix = ""
source_prefix_exclude = ""
state_file = "/tmp/"+ source_bucket + "." + destination_bucket + ".wrk"
failed_object_count,canceled_object_count,completed_object_count,known_object_count,requested_object_count = main(source_bucket,source_region,destination_region,source_namespace,destination_namespace,destination_bucket,ignore_exist,source_prefix,source_prefix_exclude,state_file)
return_json = {
"failed_object_count" : failed_object_count,
"canceled_object_count" : canceled_object_count,
"completed_object_count" : completed_object_count,
"known_object_count" : known_object_count,
"requested_object_count" : requested_object_count
}
except (Exception) as ex:
logger.error("Function error: " + str(ex))
raise
logger.info("Function complete.")
return response.Response(
ctx,
response_data=json.dumps(return_json),
headers={"Content-Type": "application/json"}
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment