Skip to content

Instantly share code, notes, and snippets.

@dsoprea
Created October 11, 2022 23:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dsoprea/8105fe491a92114574e6640fa2cf5963 to your computer and use it in GitHub Desktop.
Save dsoprea/8105fe491a92114574e6640fa2cf5963 to your computer and use it in GitHub Desktop.
import os
import json
import requests
_PUBLIC_SECRET = os.environ['KLAVIYO_PUBLIC_SECRET']
_API_ROOT = 'https://a.klaviyo.com/api'
_VALID_EXTRA_ATTRIBUTES = [
'first_name',
'last_name',
'phone_number',
'city',
'region',
'country',
'zip',
]
def post_identity(email, **kwargs):
try:
kwargs['region'] = kwargs.pop('state')
except KeyError:
pass
properties = {}
for key, value in kwargs.items():
if value is None:
continue
assert \
key in _VALID_EXTRA_ATTRIBUTES, \
"Extra attribute not valid: [{}]".format(key)
effective_key = '${}'.format(key)
properties[effective_key] = value
properties['$email'] = email
request = {
'token': _PUBLIC_SECRET,
'properties': properties,
}
url = '{}/identify'.format(_API_ROOT)
r = requests.post(url, json=request)
r.raise_for_status()
assert \
r.text == '1', \
"Request not successful: [{}]\nPROPERTIES:\n{}".format(
r.text, json.dumps(properties))
def post_identity_activity(email, event_name, properties):
request = {
'token': _PUBLIC_SECRET,
'event': event_name,
'customer_properties': {
'$email': email
},
'properties': properties,
}
url = '{}/track'.format(_API_ROOT)
r = requests.post(url, json=request)
r.raise_for_status()
assert \
r.text == '1', \
"Request not successful: [{}]\nPROPERTIES:\n{}".format(
r.text, json.dumps(properties))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment