Skip to content

Instantly share code, notes, and snippets.

@teer823
Last active April 3, 2024 09:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save teer823/cc23a13ee34578a7eeffb1f5ebc71fb3 to your computer and use it in GitHub Desktop.
Save teer823/cc23a13ee34578a7eeffb1f5ebc71fb3 to your computer and use it in GitHub Desktop.
Copy OBS object from daily to weekly or monthly folder
# -*- coding:utf-8 -*-
import json
import os
from urllib.parse import unquote
from datetime import datetime, timezone, timedelta
from obs import ObsClient, Object, DeleteObjectsRequest
def delete_object(obsClient, bucket_name, object_key):
print('deleting source in daily folder')
print('bucket = ', bucket_name)
print('object_key =', object_key)
try:
resp = obsClient.deleteObject(
bucketName=bucket_name,
objectKey=object_key)
if resp.status < 300:
print('Delete object ' + object_key + ' successfully!\n')
else:
print('Delete Failed')
print(resp)
return False
except Exception as e:
print('Error', e)
return False
def copy_daily_to_weekly_folder(obsClient, bucket_name, object_key):
print('start copying weekly backup')
dest_object_key = object_key.replace('daily','weekly')
print('bucket = ', bucket_name)
print('from =', object_key)
print('to = ', dest_object_key)
try:
resp = obsClient.copyObject(
sourceBucketName=bucket_name,
sourceObjectKey=object_key,
destBucketName=bucket_name,
destObjectKey=dest_object_key)
if resp.status < 300:
print('Copy object ' + dest_object_key + ' successfully!\n')
else:
print('Copy Failed')
print(resp)
return False
except Exception as e:
print('Error', e)
return False
def copy_daily_to_monthly_folder(obsClient, bucket_name, object_key):
print('start copying monthly backup')
dest_object_key = object_key.replace('daily','monthly')
print('bucket = ', bucket_name)
print('from =', object_key)
print('to = ', dest_object_key)
try:
resp = obsClient.copyObject(
sourceBucketName=bucket_name,
sourceObjectKey=object_key,
destBucketName=bucket_name,
destObjectKey=dest_object_key)
if resp.status < 300:
print('Copy object ' + dest_object_key + ' successfully!\n')
return True
else:
print('Copy Failed')
print(resp)
return False
except Exception as e:
print('Error', e)
return False
def handler (event, context):
print(event)
# Obtain AK / SK from Agency
AK = context.getAccessKey()
SK = context.getSecretKey()
obs_server = "https://obs.ap-southeast-2.myhuaweicloud.com"
# Create ObsClient
obsClient = ObsClient(access_key_id=AK, secret_access_key=SK, server=obs_server)
# Get Event Date from OBS event !!Possible improvement - convert to UTC+7 Timezone
record = event.get('Records')[0]
eventTime = record.get('eventTime')
event_dt = datetime.strptime(eventTime, "%Y-%m-%dT%H:%M:%S.%fZ")
# Get Bucket Information from OBS Event
bucket_name = record.get('s3').get('bucket').get('name')
encoded_object_key = record.get('s3').get('object').get('key')
# OBS event encode by converting space to + instead of %20, so need to convert + to space before do unquote to get correct format
object_key = unquote(encoded_object_key.replace('+', ' '))
print('bucket = ', bucket_name)
print('object = ', object_key)
# Get day of week and weekly backup day target in ISO weekday format
# ISO weekday start with Monday = 1, Sunday = 7
iso_week_day = event_dt.isoweekday()
print('iso_week_day = ', iso_week_day)
# From ENV , configurable day_of_week to do weekly backup
weekly_backup_day_of_week = int(os.getenv('weekly_backup_day_of_week'))
print('weekly_backup_day_of_week target = ', weekly_backup_day_of_week)
# Get Day of month and monthly backup date target
day_of_month = event_dt.day
print('day_of_month = ', day_of_month)
monthly_backup_date = int(os.getenv('monthly_backup_date'))
print('monthly_backup_date target = ', monthly_backup_date)
# Do Monthly backup if on specified date of month
if (day_of_month == monthly_backup_date):
print('do monthly copy')
copy_daily_to_monthly_folder(obsClient, bucket_name, object_key)
# Do Weekly backup if on specified day of week
if (iso_week_day == weekly_backup_day_of_week):
print('do weekly copy')
copy_daily_to_weekly_folder(obsClient, bucket_name, object_key)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment