Skip to content

Instantly share code, notes, and snippets.

@ernestoongaro
Created May 19, 2023 16:53
Show Gist options
  • Save ernestoongaro/8ff2ce1c763af2e3429024d2a4d30d9f to your computer and use it in GitHub Desktop.
Save ernestoongaro/8ff2ce1c763af2e3429024d2a4d30d9f to your computer and use it in GitHub Desktop.
import enum
import os
import time
# Be sure to `pip install requests` in your python environment
import requests
ACCOUNT_ID = 39
JOB_ID = 302
# Store your dbt Cloud API token securely in your workflow tool
API_KEY = '<put in your dbt Cloud API key here'
def _trigger_job() -> int:
res = requests.post(
url=f"https://emea.dbt.com/api/v2/accounts/{ACCOUNT_ID}/jobs/{JOB_ID}/run/",
headers={'Authorization': f"Token {API_KEY}"},
json={
'cause': f"Triggered by Fivetran",
}
)
response_payload = res.json()
return response_payload['data']['id']
def run(request):
job_run_id = _trigger_job()
return(f"job_run_id = {job_run_id}")
Display the source blob
Display the rendered blob
Raw
import requests
import json
# define the api endpoint
url = "https://api.fivetran.com/v1/webhooks/account"
# define the headers for the post request
headers = {
'Content-Type': 'application/json',
}
# define the payload
payload = {
"url": "https://us-central1-utility-pad-323811.cloudfunctions.net/function-1",
"events": [
"sync_end"
],
"active": True,
}
# define your username and password for basic authentication
api_key = '<yours>'
api_secret = '<yours>'
# send a post request to the endpoint with basic authentication
response = requests.post(url, headers=headers, data=json.dumps(payload), auth=(api_key, api_secret))
# print the response from the server
print(response.text)
# define the GET endpoint
list_url = "https://api.fivetran.com/v1/webhooks"
# send a GET request to the endpoint with basic authentication
response = requests.get(list_url, auth=(api_key, api_secret))
# print the response from the server
print(response.text)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment