Skip to content

Instantly share code, notes, and snippets.

@hkulekci
Last active April 29, 2023 08:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hkulekci/db613ba510d0494cd1e414e922fd50e0 to your computer and use it in GitHub Desktop.
Save hkulekci/db613ba510d0494cd1e414e922fd50e0 to your computer and use it in GitHub Desktop.
Kinesis Queue Check
import boto3
def check_kinesis_stream_data(stream_name):
# Create a Kinesis client using your AWS credentials
kinesis_client = boto3.client('kinesis')
# Get the shard iterator for the specified stream
shard_response = kinesis_client.describe_stream(StreamName=stream_name)
shard_id = shard_response['StreamDescription']['Shards'][0]['ShardId']
shard_iterator_response = kinesis_client.get_shard_iterator(
StreamName=stream_name,
ShardId=shard_id,
ShardIteratorType='TRIM_HORIZON'
)
shard_iterator = shard_iterator_response['ShardIterator']
# Get the records from the shard iterator
records_response = kinesis_client.get_records(ShardIterator=shard_iterator, Limit=1)
records = records_response['Records']
return len(records) > 0
# Replace 'your-stream-name' with the name of the Kinesis stream you want to check
stream_name = 'your-stream-name'
if check_kinesis_stream_data(stream_name):
print(f"The {stream_name} has data.")
else:
print(f"The {stream_name} is empty.")
import boto3
def list_files_by_creation_date(bucket, prefix):
# Create an S3 client using your AWS credentials
s3_client = boto3.client('s3')
# List all objects within the specified folder
objects = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
# Sort the objects by creation date
sorted_objects = sorted(objects['Contents'], key=lambda obj: obj['LastModified'], reverse=True)
return sorted_objects
def read_most_recent_file(bucket, prefix):
sorted_objects = list_files_by_creation_date(bucket, prefix)
# Get the most recently created file
most_recent_file = sorted_objects[0]
# Read the content of the most recent file
s3_client = boto3.client('s3')
s3_file = s3_client.get_object(Bucket=bucket, Key=most_recent_file['Key'])
file_content = s3_file['Body'].read().decode('utf-8')
return file_content
# Replace 'your-s3-bucket' and 'your-s3-prefix' with the S3 bucket and prefix of the folder you want to read
s3_bucket = 'your-s3-bucket'
s3_prefix = 'your-s3-prefix'
most_recent_file_content = read_most_recent_file(s3_bucket, s3_prefix)
print(f"The content of the most recent file is:\n{most_recent_file_content}")
import boto3
import fnmatch
def list_files_with_dynamic_prefix(bucket, pattern):
s3_client = boto3.client('s3')
objects = s3_client.list_objects_v2(Bucket=bucket)
filtered_objects = [obj for obj in objects['Contents'] if fnmatch.fnmatch(obj['Key'], pattern)]
return filtered_objects
# Replace 'your-s3-bucket' with the name of your S3 bucket
s3_bucket = 'your-s3-bucket'
# Replace 'your-pattern' with the pattern to match, e.g., 'my-data-folder-*/*'
pattern = 'your-pattern'
filtered_objects = list_files_with_dynamic_prefix(s3_bucket, pattern)
print(f"Filtered objects:\n{filtered_objects}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment