Skip to content

Instantly share code, notes, and snippets.

@Jammink2
Last active April 5, 2016 00:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Jammink2/2296101e062ab43567487fef67f78263 to your computer and use it in GitHub Desktop.
Save Jammink2/2296101e062ab43567487fef67f78263 to your computer and use it in GitHub Desktop.
import urllib, urllib2, base64, time, json, sys
import dateutil, dateutil.parser, datetime
td_database = 'kinesis'
td_table = 'events'
td_master_key = '<YOUR_TD_MASTER_API_KEY>'
td_endpoint = 'https://in.treasuredata.com/js/v3/event'
def upload_td(records):
# https://docs.treasuredata.com/articles/javascript-sdk#appendix-api-endpoint
headers = {
'Content-Type': 'application/json',
'X-TD-Data-Type': 'k',
'X-TD-Write-Key': td_master_key,
}
data = json.dumps({ '%s.%s' % (td_database, td_table): records })
req = urllib2.Request(td_endpoint, data=data, headers=headers)
response = urllib2.urlopen(req, timeout=180)
response.read()
print("Success: %s records" % len(records))
def lambda_handler(event, context):
records = []
for record in event['Records']:
# Kinesis data is base64 encoded so decode here
# We also assume payload comes as JSON form
payload = json.loads(base64.b64decode(record['kinesis']['data']))
if (not 'time'in payload):
payload['time'] = int(time.time())
elif isinstance(payload['time'], basestring):
payload['time'] = int((dateutil.parser.parse(payload['time']).replace(tzinfo=dateutil.tz.tzutc()) - datetime.datetime.utcfromtimestamp(0).replace(tzinfo=dateutil.tz.tzutc())).total_seconds())
records.append(payload)
upload_td(records)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment