Skip to content

Instantly share code, notes, and snippets.

@isaacharrisholt
Created April 23, 2022 15:14
Show Gist options
  • Save isaacharrisholt/21146cf06e90303f90d75e0386cd5934 to your computer and use it in GitHub Desktop.
Save isaacharrisholt/21146cf06e90303f90d75e0386cd5934 to your computer and use it in GitHub Desktop.
from typing import List
import requests
API_ROOT = 'http://localhost:8000/api/v1' # This is the default for Airbyte
def get_workspaces() -> List[str]:
response = requests.post(f'{API_ROOT}/workspaces/list')
response.raise_for_status() # Either handle this yourself, or use a tool like Sentry for logging
return [
workspace['workspaceId']
for workspace in repsonse.json()['workspaces']
]
def get_connections_for_workspace(workspace_id: str) -> List[str]:
response = requests.post(
f'{API_ROOT}/connections/list',
json={'workspaceId': workspace_id},
)
response.raise_for_status()
return [
connection['connectionId']
for connection in response.json()['connections']
if connection['status'] == 'active' # So we can still disable connections in the UI
]
def trigger_connection_sync(connection_id: str) -> dict:
response = requests.post(
f'{API_ROOT}/connections/sync',
json={'connectionId': connection_id},
)
response.raise_for_status()
return response.json()
if __name__ == '__main__':
workspaces = get_workspaces()
connections = []
for workspace_id in workspaces:
connections.extend(get_connections_for_workspace(workspace_id))
for connection_id in connections:
trigger_connection_sync(connection_id)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment