Skip to content

Instantly share code, notes, and snippets.

@cra
Last active November 14, 2023 16:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cra/74bb7a9df8e29ac230f4001be31fb455 to your computer and use it in GitHub Desktop.
Save cra/74bb7a9df8e29ac230f4001be31fb455 to your computer and use it in GitHub Desktop.
pin bw version
import os
from datetime import timedelta
import boto3
import logging
# pip install bytewax==0.17.2
from bytewax.connectors.stdio import StdOutput
from bytewax.connectors.periodic import SimplePollingInput
from bytewax.dataflow import Dataflow
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
)
class S3Input(SimplePollingInput):
def __init__(self, interval, bucket, subfolder):
# skip align_to argument
super().__init__(interval, None)
# custom S3 endpoint requeires a session obj
session = boto3.session.Session()
self.s3_client = session.client(
service_name='s3',
endpoint_url=os.getenv('S3_ENDPOINT_URL'),
aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
)
self.bucket_name = bucket
self.subfolder = subfolder
# you might want to pass some initial value for this one
self.known_filenames = set()
def _fetch_filenames(self):
response = self.s3_client.list_objects_v2(
Bucket=self.bucket_name,
Prefix=self.subfolder,
)
return [
item['Key']
for item in response.get('Contents', [])
]
def next_item(self):
current = set(self._fetch_filenames())
new_filenames = list(current - self.known_filenames)
if new_filenames:
logging.info(f'new filenames {new_filenames}')
else:
logging.info('no new filenames *yawns*')
# sorry, no transactions here
self.known_filenames = current
return new_filenames
def do_the_thing(filename):
'''
Placeholder function that does something with a filename passed to it
One cheap way to do pseudo-transactions is renaming files when you're done
'''
return (filename.lower(), len(filename))
flow = Dataflow()
flow.input(
'inp',
S3Input(
interval=timedelta(seconds=10),
bucket=os.getenv('S3_BUCKET'),
subfolder=os.getenv('S3_SUBFOLDER'),
),
)
flow.flat_map(lambda x: x)
flow.redistribute()
flow.map(do_the_thing)
flow.output('out', StdOutput())
export S3_ENDPOINT_URL=https://storage....
export AWS_ACCESS_KEY_ID=...
export AWS_SECRET_ACCESS_KEY=...
export S3_BUCKET=some-data
export S3_FOLDER=year=2023/month=05/day=02
@cra
Copy link
Author

cra commented Nov 14, 2023

pip install bytewax==0.17.2 boto3

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment