Skip to content

Instantly share code, notes, and snippets.

@mikelgg93
Created October 11, 2023 06:58
Show Gist options
  • Save mikelgg93/6ac56a621c740290b0ffec2641179d89 to your computer and use it in GitHub Desktop.
Save mikelgg93/6ac56a621c740290b0ffec2641179d89 to your computer and use it in GitHub Desktop.
Cloud API utilities
import glob
import logging
import os
import shutil
from pathlib import Path
import pandas as pd
import requests
API_URL = "https://api.cloud.pupil-labs.com/v2"
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
def api_get(path: str) -> dict:
url = f"{API_URL}/{path}"
if requests.get(url, headers={"api-key": API_KEY}).json()["status"] == "success":
return requests.get(url, headers={"api-key": API_KEY}).json()["result"]
else:
error = requests.get(url, headers={"api-key": API_KEY}).json()["message"]
log.error(error)
raise (Exception(error))
def get_enrichment_dict(enrichment_id: str, project_id: str, workspace_id: str) -> dict:
log.info(f"fetching enrichment:{enrichment_id}")
enrichment_dict = api_get(f"/workspaces/{workspace_id}/projects/{project_id}/enrichments/{enrichment_id}")
return enrichment_dict
def get_workspace_recordings(workspace_id: str) -> dict:
log.info(f"fetching recordings in project:{workspace_id}")
project_recordings = api_get(f"/workspaces/{workspace_id}/recordings")
return project_recordings
def download_url(path: str, save_path: str, chunk_size=128) -> None:
url = f"{API_URL}/{path}"
r = requests.get(url, stream=True, headers={"api-key": API_KEY})
with open(save_path, 'wb') as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
fd.write(chunk)
return r.status_code
def download_recording(recording_id: str, workspace_id: str, download_path: str) -> None:
os.makedirs(download_path, exist_ok=True)
status= download_url(f"/workspaces/{workspace_id}/recordings:raw-data-export?ids={recording_id}", download_path / f"{recording_id}.zip", chunk_size=128)
shutil.unpack_archive(download_path / f"{recording_id}.zip", download_path / f"{recording_id}")
os.remove(download_path / f"{recording_id}.zip")
for file_source in glob.glob(str(download_path / f"{recording_id}/*/*")):
file_source = Path(file_source)
file_destination = file_source.parents[1] / file_source.name
shutil.move(file_source,file_destination)
def download_raw_recording(recording_id: str, workspace_id: str, download_path: str) -> None:
os.makedirs(download_path, exist_ok=True)
status= download_url(f"/workspaces/{workspace_id}/recordings/{recording_id}.zip", download_path / f"{recording_id}.zip", chunk_size=128)
shutil.unpack_archive(download_path / f"{recording_id}.zip", download_path / f"{recording_id}")
os.remove(download_path / f"{recording_id}.zip")
for file_source in glob.glob(str(download_path / f"{recording_id}/*/*")):
file_source = Path(file_source)
file_destination = file_source.parents[1] / file_source.name
shutil.move(file_source,file_destination)
def download_file(recording_id: str, workspace_id: str, download_path: str, file_name: str) -> None:
os.makedirs(download_path, exist_ok=True)
files_on_rec = api_get(f"/workspaces/{workspace_id}/recordings/{recording_id}/files")
file_exists=False
for f in files_on_rec:
if f["name"]==file_name:
file_exists=True
if not file_exists:
log.error(f"file {file_name} not found in recording {recording_id}")
raise Exception(f"file {file_name} not found in recording {recording_id}")
else:
status= download_url(f"/workspaces/{workspace_id}/recordings/{recording_id}/files/{file_name}", download_path / f"{file_name}", chunk_size=128)
return status, download_path / f"{file_name}"
def download_events(recording_id: str, workspace_id: str, download_path: str, **kwargs):
os.makedirs(download_path, exist_ok=True)
events= api_get(f"/workspaces/{workspace_id}/recordings/{recording_id}/events")
filename= "events"
for k, v in kwargs.items():
filename = filename + f"_{v}"
filename= filename +".csv"
if len(events)==0:
log.warning(f"No events found in recording {recording_id}")
else:
events_df= pd.DataFrame(events).to_csv(download_path / filename, index=False)
log.info(filename)
if __name__ == "__main__":
import argparse
parser= argparse.ArgumentParser()
parser.add_argument("--workspace_id", type=str, default="", help="The ID of the workspace workspace_id")
parser.add_argument("--download_path", type=str, default=Path(""), help="Where to download the results download_path")
parser.add_argument("--API_KEY", type=str, default="", help="The API key to use"),
args= parser.parse_args()
API_KEY= args.API_KEY
if API_KEY is None:
raise Exception("--API_KEY is None. Please provide an API_KEY")
workspace_id= args.workspace_id
if workspace_id is None:
raise Exception("--workspace_id is None. Please provide a workspace_id")
download_path= Path(args.download_path)
if download_path is None:
raise Exception("--download_path is None. Please provide a download_path")
download_path = os.path.normpath(download_path)
recs= get_workspace_recordings(workspace_id=workspace_id)
log.info(f"Found {len(recs)} recordings in project {workspace_id}")
for i, rec in enumerate(recs):
log.info(f"Downloading events from recording {i} out of {len(recs)}: {rec['id']}")
wearer_name= api_get(f"/workspaces/{workspace_id}/wearers/{rec['wearer_id']}")['name']
download_events(recording_id=rec["id"], workspace_id=workspace_id, download_path=download_path, wearer_name=wearer_name, recording_name=rec["name"])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment