Skip to content

Instantly share code, notes, and snippets.

@hanleybrand
Last active February 10, 2024 21:35
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save hanleybrand/bafaa42d43d1faa93771dae3798c7656 to your computer and use it in GitHub Desktop.
Save hanleybrand/bafaa42d43d1faa93771dae3798c7656 to your computer and use it in GitHub Desktop.
Importing instructure-dap-client and using it programatically - quickstart example scripts
import os
from pathlib import Path
import shutil
from urllib.parse import urlparse
from dap.api import DAPClient
from dap.dap_types import Format, IncrementalQuery, SnapshotQuery
import requests
output_dir_base = Path("downloads")
table_name = 'accounts'
ns = 'canvas'
fmt = Format('csv')
snapshot = SnapshotQuery(format=fmt, filter=None)
with DAPClient() as dc:
start_job = dc.query_snapshot(ns, table_name, snapshot)
job_results = dc.await_job(start_job)
objects = dc.get_objects(job_results.id)
resources = dc.get_resources(objects)
# downloads = dc.download_resources(resources, job_results.id, output_dir_base)# dc.
# download_resources is omitted and replaced with
# a copy paste of the methods code altered to seperate the downloads
# into table_name folders and prepend table_name to each file name
downloads = []
dir = output_dir_base / table_name / f"{table_name}_job_{job_results.id}"
os.makedirs(dir, exist_ok=True)
for u in resources:
url = str(u.url)
url_path = urlparse(url).path
file_base_name = os.path.basename(url_path)
file_path = os.path.join(dir, file_base_name)
with requests.get(url, stream=True) as data, open(file_path, "wb") as file:
# save gzip data to file without decompressing
shutil.copyfileobj(data.raw, file)
downloads.append(file_path)
print(f"Files from server downloaded to folder: {dir}")
print(downloads)
import os
from datetime import datetime
from pathlib import Path
from dap.api import DAPClient
from dap.dap_types import Format, IncrementalQuery, SnapshotQuery
# -- Note, this script assumes you've set up dap correctly
# more specifically that you have ENV variables DAP_API_URL and DAP_API_KEY set up as per
# https://pypi.org/project/instructure-dap-client/ so that
# in a shell dap commands (e.g. `dap list`, `dap incremental --namespace canvas --table accounts` ) work locally
# if this is not the case, set up the cli first
# setup variables
table_name = 'accounts'
output_dir_base = Path("downloads") # this can be replaced with an absolute path (e.g. `/users/me/cd2_dl' etc)
print(f"Data downloads will be saved here: {output_dir_base.absolute()}")
# dap.DAPClient specifies a variety of types (defined in dap.types) , many of which employ python's new-ish dataclasses
# to handle data between steps. You'll need to instantiate a couple of them
# (specifically SnapshotQuery or IncrementalQuery) to start, and all the methods of DAPClient you'll use in your code
# return dap.types as well
# see https://realpython.com/python-data-classes/ or https://docs.python.org/3/library/dataclasses.html for more
ns = 'canvas'
# note: Format is an enum, not a dataclass
fmt = Format('csv')
snapshot = SnapshotQuery(format=fmt, filter=None)
# under the hood note:
# DAPClient uses the python-requests library to implement all the tedious 'do i have to' functionality like authentication,
# session maintenence, and constructing the GET and POST requests needed to interact with the API.
# If you have experience with python-requests, note that DAPClient should be invoked as a context manager as described in
# https://requests.readthedocs.io/en/latest/user/advanced/#session-objects
with DAPClient() as dc:
print("Beginning query/retrieval, this may take a few minutes - wait for ALL_CLEAR before evaluating variables")
# optional, you can get a list of the table names for writing loops that iterate through all the tables:
# dap_tables = dc.get_tables(ns)
start_job = dc.query_snapshot(ns, table_name, snapshot)
# TableJob(
# id='ea55451d-7515-4f4c-8b42-2c2c373c7540',
# status=<JobStatus.Waiting: 'waiting'>,
# expires_at=datetime.datetime(2023, 2, 1, 19, 18, 47, tzinfo=datetime.timezone.utc))
print(f"job id {start_job.id} is now pending")
job_results = dc.await_job(start_job) # note, await_job implements a wait
# CompleteSnapshotJob(
# id='ea55451d-7515-4f4c-8b42-2c2c373c7540',
# status=<JobStatus.Complete: 'complete'>,
# expires_at=datetime.datetime(expires_dt),
# objects=[Object(id='self.id/part-00000-e45ddcac-8c4a-4058-87f6-f05a6278034d-c000.csv.gz')],
# schema_version=1,
# at=datetime.datetime(2023, 1, 31, 16, 30, 37, tzinfo=datetime.timezone.utc))
print(f"Update on job id {start_job.id} : {job_results.status}")
# -- sidebar --
# if you are automating you need to save at least the table name and job_results.at value
# so that when you schedule future incremental jobs, ie
# next_incremental = IncrementalQuery(format=fmt, since=job_results.at, filter=None, until=None)
# note: you don't need the table name to create the IncrementalQuery, but you'll need it to know
# which tablename a specific previous_query_job_results.at is paired with
next_incremental = IncrementalQuery(format=fmt, since=job_results.at, filter=None, until=None)
# -- end sidebar --
objects = dc.get_objects(job_results.id)
# [Object(
# id='ea55451d-7515-4f4c-8b42-2c2c373c7540/part-00000-e45ddcac-8c4a-4058-87f6-f05a6278034d-c000.csv.gz')]
resources = dc.get_resources(objects)
# [Resource(url=
# URL(url='https://data-access-platform-output-prod-iad.s3.amazonaws.com/output/rootAccountId%3DndqgwsoafylHTTXdvHbgAfcLstsHCInrvmRfjLzU/queryId%3D97f43c5a-463c-4207-b24a-60e8954d1bb0/part-00000-068e82e4-99c2-47ac-b6d1-1555900627c4-c000.csv.gz?X-Amz-Security-Token=FwoGZXIvYXdzEFsaDO%2Ft%2BAIlm6FW34A%2FtyK7AUQmSLHgXLsS%2FnRG%2FYsD5TB%2BFNKH%2F%2F0xmQ9KjUXeNTHAnshJqLMB9p7B723kUwGftrCO9BOSXXw3bjhkjFdjAPU1CI6xE%2Bo46wMOqnH%2FQTEXYbFtg%2B60FukzsIcx8MtQpQy2nOTwWPry%2Bbd3%2BB06O0kesOUl54x3E5B5zY7xlUcdRPnqcMfZwThkEGFgXdOHu16j81LWd4otdi4goI0EEYSSCYoGDu0oqPef3mf7zOl%2BZ%2F4LE%2BsLQLJg704o4MzqngYyLSWuHaY%2BHekiZvL%2FAWfqvCmGGWR5%2B6Ya%2F%2Fs9T%2ByLAhG5v5PQRMewAAKm8c7uvA%3D%3D&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20230201T175242Z&X-Amz-SignedHeaders=host&X-Amz-Expires=3599&X-Amz-Credential=ASIAXX2PINZLJ24HOBP3%2F20230201%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Signature=ac8b11d84869bab0785d3cd973cc277b887a830c6c98fea945dc2f11eb023cd2'))]
print(f"beginning download of {[dr.url.url for dr in resources]}")
downloads = dc.download_resources(resources, job_results.id, output_dir_base)
# downloads the file[s] to the path specified in output_dir_base
# ['/Users/hanley/dev/k8s-airflow/canvas_data/downloads/job_ea55451d-7515-4f4c-8b42-2c2c373c7540/part-00000-ee952c38-4185-4cc3-ac55-16e123706759-c000.csv.gz']
print(f"files downloaded to {output_dir_base.absolute()}/job_{job_results.id}/ as if you had invoked")
print(f"\t'dap snapshot --namespace {ns} --table {table_name} --output-directory {output_dir_base}' \nin a shell")
print("""if in an interactive shell, inspect the variables
start_job, job_results, objects, resources, downloads""")
print("\nALL_CLEAR")
print(f"\n\nNext Incremental job for table {table_name} can use this IncrementalQuery:\n{next_incremental}")
import os
from pathlib import Path
from dap.api import DAPClient
from dap.dap_types import Format, IncrementalQuery, SnapshotQuery
output_dir_base = Path("downloads")
table_name = 'accounts'
ns = 'canvas'
fmt = Format('csv')
snapshot = SnapshotQuery(format=fmt, filter=None)
with DAPClient() as dc:
start_job = dc.query_snapshot(ns, table_name, snapshot)
job_results = dc.await_job(start_job)
objects = dc.get_objects(job_results.id)
resources = dc.get_resources(objects)
downloads = dc.download_resources(resources, job_results.id, output_dir_base)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment