Created
September 14, 2022 15:45
-
-
Save brk21/76aa122badba54aa0349911a0b521511 to your computer and use it in GitHub Desktop.
Add, Update, or Delete Emails from Hubspot Contacts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import time | |
def get_contact_by_email(email, access_token, properties=[]): | |
"""Gets Contact, along with properties noted by list of API names""" | |
url = f"https://api.hubapi.com/crm/v3/objects/contacts/{email}?idProperty=email" | |
for prop in properties: | |
url += f"&properties={prop}" | |
headers = {"accept": "application/json", "authorization": f"Bearer {access_token}"} | |
r = requests.request("GET", url, headers=headers, timeout=10) | |
if r.content: | |
return r.json() | |
else: | |
return {} | |
def add_secondary_email_to_contact(contact_id, email, access_key): | |
"""Adds secondary email to contact""" | |
if "@" in email: | |
url = f"https://api.hubapi.com/contacts/v1/secondary-email/{contact_id}/email/{email}" | |
headers = { | |
"accept": "application/json", | |
"authorization": f"Bearer {access_key}", | |
} | |
try: | |
r = requests.request("PUT", url, headers=headers, timeout=10) | |
r = r.content | |
except Exception as ex: | |
print(ex) | |
print(email) | |
time.sleep(10) | |
#Hubspot API has encountered stability issues lately, so this provides a wait and retry | |
try: | |
r = requests.request("PUT", url, headers=headers, timeout=10) | |
r = r.content | |
except Exception as ex2: | |
raise | |
else: | |
print("not a valid email") | |
r = "" | |
return r | |
def delete_secondary_email_from_contact(contact_id, email, access_token): | |
"""Deletes secondary email from contact""" | |
if "@" in email: | |
url = f"https://api.hubapi.com/contacts/v1/secondary-email/{contact_id}/email/{email}" | |
headers = { | |
"accept": "application/json", | |
"authorization": f"Bearer {access_token}", | |
} | |
try: | |
r = requests.request("DELETE", url, headers=headers, timeout=10) | |
response = r.json() | |
except Exception as ex: | |
response = r.content | |
pass | |
else: | |
print("Not a valid email") | |
response = "Not a valid email" | |
return response | |
def update_primary_email(contact_id, email, access_token): | |
"""Overwrites primary email. If you want to swap primary for secondary, you must add the previous primary after running this""" | |
if "@" in email: | |
headers = { | |
"authorization": f"Bearer {access_token}", | |
} | |
json_data = {"properties": [{"property": "email", "value": email}]} | |
try: | |
r = requests.request( | |
"POST", | |
f"https://api.hubapi.com/contacts/v1/contact/vid/{contact_id}/profile", | |
headers=headers, | |
json=json_data, | |
timeout=10 | |
) | |
response = r.json() | |
except Exception as ex: | |
response = r.content | |
pass | |
else: | |
print("Not a valid email") | |
response = "Not a valid email" | |
return response |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi @brk21. Thanks for sharing this! I've ported them in TS and they seem to work, but from where did you find those endpoints? In the official documentation of HubSpot I couldn't find them.