Skip to content

Instantly share code, notes, and snippets.

@chicagobuss
Last active September 20, 2021 23:40
Show Gist options
  • Save chicagobuss/68c18e622e73faba435dc0e5b9ea83a3 to your computer and use it in GitHub Desktop.
Save chicagobuss/68c18e622e73faba435dc0e5b9ea83a3 to your computer and use it in GitHub Desktop.
lambda-to-kafka
# First I created a working directory
~$ mkdir s3-kafka
# Then I installed the dependency I needed in that directory with pip
~$ cd s3-kafka
~$ pip install kafka-python -t $(pwd)
# Then I put my code into a file called s3-kafka.py
~$ vi s3-kafka.py
# Then I zipped up the entire directory into a zip file in my homedir (with a 'version' number for convenience)
~$ zip -r9 ~/s3-kafka-0.0.1.zip *
# Then I uploaded the zip to s3
~$ cd ~
~$ aws s3 cp s3-kafka-0.0.1.zip s3://examplebucket/
# Now when I created the lambda function, I could just specify the s3 location of the deployment package.
# The key to making it all work is the "Handler" field of the lambda function definition:
#
# "Handler": "s3-kafka.handler"
#
# You need your filename to match the part before the . (my file was called s3-kafka.py so it needed to be s3-kafka)
# ...and you need your function to match the part after the . (my function that handles the code execution is called 'handler' as you can see in the code above)
import boto3
import json
from kafka import KafkaProducer
s3_client = boto3.client('s3')
producer = KafkaProducer(bootstrap_servers='kafka1:9092,kafka2:9092,kafka3:9092')
def handler(event, context):
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
record['s3_full_path'] = "s3://%s/%s" % (bucket, key)
producer.send('new-objects', json.dumps(record))
producer.flush()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment