Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save atharvai/bbdc437bfe21409e56215b11bafb73f8 to your computer and use it in GitHub Desktop.
Save atharvai/bbdc437bfe21409e56215b11bafb73f8 to your computer and use it in GitHub Desktop.
AWS Lambda Snowplow Analytics SDK example functions
from __future__ import print_function
import snowplow_analytics_sdk.event_transformer
import snowplow_analytics_sdk.snowplow_event_transformation_exception
import base64
import json
print('Loading function')
def lambda_handler(event, context):
#print("Received event: " + json.dumps(event, indent=2))
for record in event['Records']:
# Kinesis data is base64 encoded so decode here
payload = base64.b64decode(record['kinesis']['data'])
print("Decoded payload: " + payload)
try:
print(snowplow_analytics_sdk.event_transformer.transform(payload))
except snowplow_analytics_sdk.snowplow_event_transformation_exception.SnowplowEventTransformationException as e:
for error_message in e.error_messages:
print(error_message)
return 'Successfully processed {} records.'.format(len(event['Records']))
import snowplow_analytics_sdk.event_transformer
import snowplow_analytics_sdk.snowplow_event_transformation_exception
import base64
import json
print('Loading function')
def lambda_handler(event, context):
#print("Received event: " + json.dumps(event, indent=2))
for record in event['Records']:
# Kinesis data is base64 encoded so decode here
payload = base64.b64decode(record['kinesis']['data'])
print("Decoded payload: " + payload.decode('utf-8'))
try:
print(snowplow_analytics_sdk.event_transformer.transform(payload.decode('utf-8')))
except snowplow_analytics_sdk.snowplow_event_transformation_exception.SnowplowEventTransformationException as e:
for error_message in e.error_messages:
print(error_message)
return 'Successfully processed {} records.'.format(len(event['Records']))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment