Skip to content

Instantly share code, notes, and snippets.

@arita37
Last active June 9, 2024 05:12
Show Gist options
  • Save arita37/09cf695ca4f220f74ae45b6980d8b593 to your computer and use it in GitHub Desktop.
Save arita37/09cf695ca4f220f74ae45b6980d8b593 to your computer and use it in GitHub Desktop.
S3 Lock mechanism
""" Utils for S3
pip install utilmy python-fire
##### Install LocalStack:
pip install localstack
Start LocalStack: Use the following command to start LocalStack with S3 service:
localstack start --services=s3
Configure AWS CLI and Boto3: Set up AWS CLI and Boto3 to use the LocalStack endpoint for S3. Typically, LocalStack runs on http://localhost:4566.
aws configure
# Use access key: test, secret key: test, region: us-east-1, output format: json
Python Example: Here’s a Python script using boto3 to interact with S3 on LocalStack:
import boto3
# Configure boto3 to use LocalStack
s3 = boto3.client('s3', endpoint_url='http://localhost:4566', aws_access_key_id='test', aws_secret_access_key='test', region_name='us-east-1')
# Create a bucket
s3.create_bucket(Bucket='my-test-bucket')
# Upload a file
s3.put_object(Bucket='my-test-bucket', Key='testfile.txt', Body=b'Hello LocalStack!')
# Retrieve the file
response = s3.get_object(Bucket='my-test-bucket', Key='testfile.txt')
data = response['Body'].read().decode('utf-8')
print(data)
"""
import boto3
import pandas as pd
from io import StringIO
import time
import random
from datetime import datetime
from utilmy import log,loge
#########################################################################################################
def test2():
# Usage for S3
s3lock = cloudLock(dirlock="s3://mybucket/lock_hash")
s3lock.lock("s3://myotherbucket/myfile.csv")
# do something
s3lock.unlock("s3://myotherbucket/myfile.csv")
# Usage for GCS
gcslock = cloudLock(dirlock="gs://mybucket/lock_hash")
gcslock.lock("gs://myotherbucket/myfile.csv")
# do something
gcslock.unlock("gs://myotherbucket/myfile.csv")
# Usage for Local Filesystem
locallock = cloudLock(dirlock="/local/path/lock_hash")
locallock.lock("/local/path/myfile.csv")
# do something
locallock.unlock("/local/path/myfile.csv")
class cloudLock:
def __init__(self, dirlock="s3://bucket", ntry_max=20, ntry_sleep=5):
import fsspec, os
self.dirlock = dirlock if dirlock[-1] != "/" else dirlock[:-1]
storage_type= "local"
if. "s3://" in dirlock: storage_type= "s3"
elif "gs://" in dirlock: storage_type= "gcs"
self.storage_type = storage_type
if storage_type == 'local':
self.fs = fsspec.filesystem('file')
else:
self.fs = fsspec.filesystem(storage_type, anon=False)
self.ntry_max = ntry_max
self.ntry_sleep = nstry_sleep
def lock(self, file_path:str):
lock_path = self._get_lock_path(file_path)
ntry = 0
while ntry < self.ntry_max:
try:
time0 = str(time.time_ns())
with self.fs.open(lock_path, 'wb') as lock_file:
lock_file.write(time0)
val = self.read_file(lock_path) ### check if writing is correct
if str(val) == str(time0) :
break
except Exception as e :
print(lock_path, "blocked")
ntry+=1
self.sleep(self, ntry)
if ntry >= self.ntry_max:
print("Maximum retries reached. The File is blocked")
return False
return True
def unlock(self, file_path:str):
lock_path = self._get_lock_path(file_path)
ntry = 0
while ntry < self.ntry_max:
try:
val = self.read_file(lock_path) ### check if writing is correct
if val is None or len(val)== ""
self.delete_file(lock_path)
return True
except Exception as e :
print(lock_path, "blocked")
ntry+=1
self.sleep(self, ntry)
if ntry >= self.ntry_max:
print("Maximum retries reached. The File is blocked")
return False
return True
def read_file(self, file_path):
lock_path = self._get_lock_path(file_path)
ntry= 0
while ntry < self.ntry_max:
try :
with self.fs.open(lock_path, 'rb') as lock_file:
val = lock_file.read()
return val
except Exception as e :
pass
ntry +=1
self.sleep(ntry)
def delete_file(self, file_path):
lock_path = self._get_lock_path(file_path)
ntry= 0
while ntry < self.ntry_max:
try :
if self.fs.exists(lock_path):
self.fs.rm(lock_path)
except Exception as e :
ntry +=1
self.sleep(ntry)
def _get_lock_path(self, file_path):
return self.dirlock + "/" + str(self.hashx(dirfile))
def hashx(self, xstr:str, seed=123)->int:
""" Computes xxhash value """
import xxhash
return xxhash.xxh64_intdigest(str(xstr), seed=seed)
def sleep(self, ntry):
dtsleep = 2.0 + self.ntry_sleep * ntry + random.uniform(0, 1)
print(f"Retry - {ntry}, retry in {dtsleep}")
time.sleep( dtsleep )
class S3Lock_csv:
def __init__(self, dircsv:str, dirlock:str):
self.s3 = boto3.client('s3')
self.dircsv = dircsv
self.s3lock = cloudLock(dirlock)
def read_atomic(self, ntry_max):
islock = s3lock.islock(self.dircsv) :
while islock and ntry < ntry_max :
time.sleep(5*ntry)
ntry += 1
self.s3lock.lock(self.dircsv)
bucket, dircsv1 = self.s3_split(self.dircsv)
df = self.s3.get_object(Bucket= bucket, Key= dircsv)["Body"].read().decode()
self.s3lock.unlock(self.dircsv)
return df
def update(self, newrow:list):
islock = s3lock.islock(self.dircsv, ntry_max=10) :
while islock and ntry < ntry_max :
time.sleep(5*ntry)
ntry += 1
if ntry == ntry_max :
return False
self.s3lock.lock(self.dircsv)
bucket, dircsv1 = self.s3_split(self.dircsv)
df = self.s3.get_object(Bucket= bucket, Key= dircsv)["Body"].read().decode()
newrows = pd.DataFrame(newrows)
df = pd.concat((df, newrows))
self.s3.get_object(Bucket= bucket, Key= dircsv)["Body"].put(df)
self.s3lock.unlock(self.dircsv)
return True
if __name__ == "__main__":
import fire
fire.Fire()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment