Skip to content

Instantly share code, notes, and snippets.

@jazzl0ver
Created August 20, 2020 16:09
Show Gist options
  • Save jazzl0ver/4d558b3739c1b11439c085ed7b638a7d to your computer and use it in GitHub Desktop.
Save jazzl0ver/4d558b3739c1b11439c085ed7b638a7d to your computer and use it in GitHub Desktop.
Replicate objects within or between regions in Oracle Cloud Object Storage
#!/usr/bin/env python
#
# The script requires that the OCI CLI be installed
#
#
# Edit these values to configure the script
#
oci_cli_profile_name = 'DEFAULT'
source_namespace = 'ocitsammut'
source_region = 'us-ashburn-1'
source_bucket = 'repl_source_small'
# Set source_prefix to None to copy all objects in source bucket
#source_prefix = 'foo/'
source_prefix = None
# Set source_prefix_exclude to exclude source objects by prefix
#source_prefix_exclude = 'bar/zaz/'
source_prefix_exclude = None
destination_namespace = 'ocitsammut'
destination_region = 'us-ashburn-1'
destination_bucket = 'repl_destination'
# Set this to lambda to mutate the destination object name, or None to disable
#destination_object_name_mutation = lambda x: "%s%s" % ('2/', x[2:])
destination_object_name_mutation = None
#
# Should not need to edit below here
#
import cPickle as pickle
import threading
import time
import Queue
import oci
request_worker_count = 50
status_worker_count = 50
status_interval = 60
state_file = 'copy_status'
base_retry_timeout = 2
max_retry_timeout = 16**2
#
# Should REALLY not need to edit below here
#
data = {}
data_lock = threading.Lock()
known_q = Queue.Queue()
update_q = Queue.Queue()
config = oci.config.from_file(profile_name=oci_cli_profile_name)
object_storage_client = oci.object_storage.ObjectStorageClient(config)
def copy_request_worker():
while True:
object_ = known_q.get()
state = get_state_for_object(object_)
interval_exp = base_retry_timeout
while True:
try:
response = copy_object(source_namespace, source_region, source_bucket, object_,
destination_namespace, destination_region, destination_bucket, object_)
break
except Exception:
if interval_exp > max_retry_timeout:
raise
print(" Received %s from API for object %s, will wait %s seconds before retrying." % (
response.status, object_, interval_exp))
time.sleep(interval_exp)
interval_exp **= 2
continue
state['work-request-id'] = response.headers.get('opc-work-request-id')
state['status'] = 'REQUESTED'
set_state_for_object(object_, state, persist=False)
known_q.task_done()
def work_request_status_worker():
while True:
object_ = update_q.get()
state = get_state_for_object(object_)
interval_exp = base_retry_timeout
while True:
try:
response = object_storage_client.get_work_request(state['work-request-id'])
state['status'] = response.data.status
break
except Exception:
if interval_exp > max_retry_timeout:
raise
print(" Received %s from API for work request %s, will wait %s seconds before retrying." % (
response.status, state['work-request-id'], interval_exp))
time.sleep(interval_exp)
interval_exp **= 2
continue
set_state_for_object(object_, state, persist=False)
update_q.task_done()
def add_objects_to_queue(ns, bucket):
global known_q
count = 0
next_starts_with = None
while True:
response = object_storage_client.list_objects(ns, bucket,
start=next_starts_with)
next_starts_with = response.data.next_start_with
for object_ in response.data.objects:
if source_prefix and not object_.name.startswith(source_prefix):
continue
if source_prefix_exclude and object_.name.startswith(source_prefix_exclude):
continue
set_state_for_object(object_.name, {'status': 'KNOWN'}, persist=False)
known_q.put(object_.name)
count += 1
if not next_starts_with:
break
save_all_state()
return count
def set_state_for_object(object_, state, persist=True):
global data
data_lock.acquire()
data[object_] = state
if persist:
with open(state_file, 'wb') as sf:
pickle.dump(data, sf, protocol=pickle.HIGHEST_PROTOCOL)
data_lock.release()
return data[object_]
def save_all_state():
data_lock.acquire()
with open(state_file, 'wb') as sf:
pickle.dump(data, sf, protocol=pickle.HIGHEST_PROTOCOL)
data_lock.release()
def get_state_for_object(object_):
return data[object_]
def get_work_request_count_by_status(status):
return len([x for x in data.keys() if data[x].get('status') == status])
def copy_object(src_ns, src_r, src_b, src_o, dst_ns, dst_r, dst_b, dst_o):
copy_request = oci.object_storage.models.copy_object_details.CopyObjectDetails()
# src_r not currently used here
copy_request.source_object_name = src_o
copy_request.destination_namespace = dst_ns
copy_request.destination_region = dst_r
copy_request.destination_bucket = dst_b
if destination_object_name_mutation:
copy_request.destination_object_name = destination_object_name_mutation(dst_o)
else:
copy_request.destination_object_name = dst_o
return object_storage_client.copy_object(src_ns, src_b, copy_request)
def update_all_work_requests_status(ns, bucket):
for object_ in data.keys():
state = get_state_for_object(object_)
if state['status'] not in ('KNOWN', 'COMPLETED', 'FAILED', 'CANCELED'):
update_q.put(object_)
update_q.join()
save_all_state()
def main():
print("Creating %s copy request workers." % (request_worker_count))
for i in range(request_worker_count):
worker = threading.Thread(target=copy_request_worker)
worker.daemon = True
worker.start()
print("Creating %s status workers." % (status_worker_count))
for i in range(status_worker_count):
worker = threading.Thread(target=work_request_status_worker)
worker.daemon = True
worker.start()
print("Getting list of objects from source bucket (%s). Copies will start immediately." % (source_bucket))
count = add_objects_to_queue(source_namespace, source_bucket)
print("Enqueued %s objects to be copied" % (count))
while True:
print("Waiting %s seconds before checking status." % (status_interval))
time.sleep(status_interval)
if get_work_request_count_by_status('KNOWN') > 0 or \
get_work_request_count_by_status('REQUESTED') > 0:
print("Determining copy status")
update_all_work_requests_status(source_namespace, source_bucket)
data_lock.acquire()
print(" KNOWN: %s, REQUESTED: %s, COMPLETED: %s, FAILED: %s, CANCELED: %s" % (
get_work_request_count_by_status('KNOWN'),
get_work_request_count_by_status('REQUESTED'),
get_work_request_count_by_status('COMPLETED'),
get_work_request_count_by_status('FAILED'),
get_work_request_count_by_status('CANCELED')))
if get_work_request_count_by_status('KNOWN') == 0 and \
get_work_request_count_by_status('REQUESTED') == 0:
data_lock.release()
break
else:
data_lock.release()
known_q.join()
if __name__ == '__main__':
main()
@jazzl0ver
Copy link
Author

Can be started within OCI Cloud Shell

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment