Last active
June 9, 2024 05:12
-
-
Save arita37/09cf695ca4f220f74ae45b6980d8b593 to your computer and use it in GitHub Desktop.
S3 Lock mechanism
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" Utils for S3 | |
pip install utilmy python-fire | |
##### Install LocalStack: | |
pip install localstack | |
Start LocalStack: Use the following command to start LocalStack with S3 service: | |
localstack start --services=s3 | |
Configure AWS CLI and Boto3: Set up AWS CLI and Boto3 to use the LocalStack endpoint for S3. Typically, LocalStack runs on http://localhost:4566. | |
aws configure | |
# Use access key: test, secret key: test, region: us-east-1, output format: json | |
Python Example: Here’s a Python script using boto3 to interact with S3 on LocalStack: | |
import boto3 | |
# Configure boto3 to use LocalStack | |
s3 = boto3.client('s3', endpoint_url='http://localhost:4566', aws_access_key_id='test', aws_secret_access_key='test', region_name='us-east-1') | |
# Create a bucket | |
s3.create_bucket(Bucket='my-test-bucket') | |
# Upload a file | |
s3.put_object(Bucket='my-test-bucket', Key='testfile.txt', Body=b'Hello LocalStack!') | |
# Retrieve the file | |
response = s3.get_object(Bucket='my-test-bucket', Key='testfile.txt') | |
data = response['Body'].read().decode('utf-8') | |
print(data) | |
""" | |
import boto3 | |
import pandas as pd | |
from io import StringIO | |
import time | |
import random | |
from datetime import datetime | |
from utilmy import log,loge | |
######################################################################################################### | |
def test2(): | |
# Usage for S3 | |
s3lock = cloudLock(dirlock="s3://mybucket/lock_hash") | |
s3lock.lock("s3://myotherbucket/myfile.csv") | |
# do something | |
s3lock.unlock("s3://myotherbucket/myfile.csv") | |
# Usage for GCS | |
gcslock = cloudLock(dirlock="gs://mybucket/lock_hash") | |
gcslock.lock("gs://myotherbucket/myfile.csv") | |
# do something | |
gcslock.unlock("gs://myotherbucket/myfile.csv") | |
# Usage for Local Filesystem | |
locallock = cloudLock(dirlock="/local/path/lock_hash") | |
locallock.lock("/local/path/myfile.csv") | |
# do something | |
locallock.unlock("/local/path/myfile.csv") | |
class cloudLock: | |
def __init__(self, dirlock="s3://bucket", ntry_max=20, ntry_sleep=5): | |
import fsspec, os | |
self.dirlock = dirlock if dirlock[-1] != "/" else dirlock[:-1] | |
storage_type= "local" | |
if. "s3://" in dirlock: storage_type= "s3" | |
elif "gs://" in dirlock: storage_type= "gcs" | |
self.storage_type = storage_type | |
if storage_type == 'local': | |
self.fs = fsspec.filesystem('file') | |
else: | |
self.fs = fsspec.filesystem(storage_type, anon=False) | |
self.ntry_max = ntry_max | |
self.ntry_sleep = nstry_sleep | |
def lock(self, file_path:str): | |
lock_path = self._get_lock_path(file_path) | |
ntry = 0 | |
while ntry < self.ntry_max: | |
try: | |
time0 = str(time.time_ns()) | |
with self.fs.open(lock_path, 'wb') as lock_file: | |
lock_file.write(time0) | |
val = self.read_file(lock_path) ### check if writing is correct | |
if str(val) == str(time0) : | |
break | |
except Exception as e : | |
print(lock_path, "blocked") | |
ntry+=1 | |
self.sleep(self, ntry) | |
if ntry >= self.ntry_max: | |
print("Maximum retries reached. The File is blocked") | |
return False | |
return True | |
def unlock(self, file_path:str): | |
lock_path = self._get_lock_path(file_path) | |
ntry = 0 | |
while ntry < self.ntry_max: | |
try: | |
val = self.read_file(lock_path) ### check if writing is correct | |
if val is None or len(val)== "" | |
self.delete_file(lock_path) | |
return True | |
except Exception as e : | |
print(lock_path, "blocked") | |
ntry+=1 | |
self.sleep(self, ntry) | |
if ntry >= self.ntry_max: | |
print("Maximum retries reached. The File is blocked") | |
return False | |
return True | |
def read_file(self, file_path): | |
lock_path = self._get_lock_path(file_path) | |
ntry= 0 | |
while ntry < self.ntry_max: | |
try : | |
with self.fs.open(lock_path, 'rb') as lock_file: | |
val = lock_file.read() | |
return val | |
except Exception as e : | |
pass | |
ntry +=1 | |
self.sleep(ntry) | |
def delete_file(self, file_path): | |
lock_path = self._get_lock_path(file_path) | |
ntry= 0 | |
while ntry < self.ntry_max: | |
try : | |
if self.fs.exists(lock_path): | |
self.fs.rm(lock_path) | |
except Exception as e : | |
ntry +=1 | |
self.sleep(ntry) | |
def _get_lock_path(self, file_path): | |
return self.dirlock + "/" + str(self.hashx(dirfile)) | |
def hashx(self, xstr:str, seed=123)->int: | |
""" Computes xxhash value """ | |
import xxhash | |
return xxhash.xxh64_intdigest(str(xstr), seed=seed) | |
def sleep(self, ntry): | |
dtsleep = 2.0 + self.ntry_sleep * ntry + random.uniform(0, 1) | |
print(f"Retry - {ntry}, retry in {dtsleep}") | |
time.sleep( dtsleep ) | |
class S3Lock_csv: | |
def __init__(self, dircsv:str, dirlock:str): | |
self.s3 = boto3.client('s3') | |
self.dircsv = dircsv | |
self.s3lock = cloudLock(dirlock) | |
def read_atomic(self, ntry_max): | |
islock = s3lock.islock(self.dircsv) : | |
while islock and ntry < ntry_max : | |
time.sleep(5*ntry) | |
ntry += 1 | |
self.s3lock.lock(self.dircsv) | |
bucket, dircsv1 = self.s3_split(self.dircsv) | |
df = self.s3.get_object(Bucket= bucket, Key= dircsv)["Body"].read().decode() | |
self.s3lock.unlock(self.dircsv) | |
return df | |
def update(self, newrow:list): | |
islock = s3lock.islock(self.dircsv, ntry_max=10) : | |
while islock and ntry < ntry_max : | |
time.sleep(5*ntry) | |
ntry += 1 | |
if ntry == ntry_max : | |
return False | |
self.s3lock.lock(self.dircsv) | |
bucket, dircsv1 = self.s3_split(self.dircsv) | |
df = self.s3.get_object(Bucket= bucket, Key= dircsv)["Body"].read().decode() | |
newrows = pd.DataFrame(newrows) | |
df = pd.concat((df, newrows)) | |
self.s3.get_object(Bucket= bucket, Key= dircsv)["Body"].put(df) | |
self.s3lock.unlock(self.dircsv) | |
return True | |
if __name__ == "__main__": | |
import fire | |
fire.Fire() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment