Created
October 11, 2022 23:00
-
-
Save dsoprea/8105fe491a92114574e6640fa2cf5963 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import json | |
import requests | |
_PUBLIC_SECRET = os.environ['KLAVIYO_PUBLIC_SECRET'] | |
_API_ROOT = 'https://a.klaviyo.com/api' | |
_VALID_EXTRA_ATTRIBUTES = [ | |
'first_name', | |
'last_name', | |
'phone_number', | |
'city', | |
'region', | |
'country', | |
'zip', | |
] | |
def post_identity(email, **kwargs): | |
try: | |
kwargs['region'] = kwargs.pop('state') | |
except KeyError: | |
pass | |
properties = {} | |
for key, value in kwargs.items(): | |
if value is None: | |
continue | |
assert \ | |
key in _VALID_EXTRA_ATTRIBUTES, \ | |
"Extra attribute not valid: [{}]".format(key) | |
effective_key = '${}'.format(key) | |
properties[effective_key] = value | |
properties['$email'] = email | |
request = { | |
'token': _PUBLIC_SECRET, | |
'properties': properties, | |
} | |
url = '{}/identify'.format(_API_ROOT) | |
r = requests.post(url, json=request) | |
r.raise_for_status() | |
assert \ | |
r.text == '1', \ | |
"Request not successful: [{}]\nPROPERTIES:\n{}".format( | |
r.text, json.dumps(properties)) | |
def post_identity_activity(email, event_name, properties): | |
request = { | |
'token': _PUBLIC_SECRET, | |
'event': event_name, | |
'customer_properties': { | |
'$email': email | |
}, | |
'properties': properties, | |
} | |
url = '{}/track'.format(_API_ROOT) | |
r = requests.post(url, json=request) | |
r.raise_for_status() | |
assert \ | |
r.text == '1', \ | |
"Request not successful: [{}]\nPROPERTIES:\n{}".format( | |
r.text, json.dumps(properties)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment