Created
October 11, 2023 06:58
-
-
Save mikelgg93/6ac56a621c740290b0ffec2641179d89 to your computer and use it in GitHub Desktop.
Cloud API utilities
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import glob | |
import logging | |
import os | |
import shutil | |
from pathlib import Path | |
import pandas as pd | |
import requests | |
API_URL = "https://api.cloud.pupil-labs.com/v2" | |
log = logging.getLogger(__name__) | |
logging.basicConfig(level=logging.INFO) | |
def api_get(path: str) -> dict: | |
url = f"{API_URL}/{path}" | |
if requests.get(url, headers={"api-key": API_KEY}).json()["status"] == "success": | |
return requests.get(url, headers={"api-key": API_KEY}).json()["result"] | |
else: | |
error = requests.get(url, headers={"api-key": API_KEY}).json()["message"] | |
log.error(error) | |
raise (Exception(error)) | |
def get_enrichment_dict(enrichment_id: str, project_id: str, workspace_id: str) -> dict: | |
log.info(f"fetching enrichment:{enrichment_id}") | |
enrichment_dict = api_get(f"/workspaces/{workspace_id}/projects/{project_id}/enrichments/{enrichment_id}") | |
return enrichment_dict | |
def get_workspace_recordings(workspace_id: str) -> dict: | |
log.info(f"fetching recordings in project:{workspace_id}") | |
project_recordings = api_get(f"/workspaces/{workspace_id}/recordings") | |
return project_recordings | |
def download_url(path: str, save_path: str, chunk_size=128) -> None: | |
url = f"{API_URL}/{path}" | |
r = requests.get(url, stream=True, headers={"api-key": API_KEY}) | |
with open(save_path, 'wb') as fd: | |
for chunk in r.iter_content(chunk_size=chunk_size): | |
fd.write(chunk) | |
return r.status_code | |
def download_recording(recording_id: str, workspace_id: str, download_path: str) -> None: | |
os.makedirs(download_path, exist_ok=True) | |
status= download_url(f"/workspaces/{workspace_id}/recordings:raw-data-export?ids={recording_id}", download_path / f"{recording_id}.zip", chunk_size=128) | |
shutil.unpack_archive(download_path / f"{recording_id}.zip", download_path / f"{recording_id}") | |
os.remove(download_path / f"{recording_id}.zip") | |
for file_source in glob.glob(str(download_path / f"{recording_id}/*/*")): | |
file_source = Path(file_source) | |
file_destination = file_source.parents[1] / file_source.name | |
shutil.move(file_source,file_destination) | |
def download_raw_recording(recording_id: str, workspace_id: str, download_path: str) -> None: | |
os.makedirs(download_path, exist_ok=True) | |
status= download_url(f"/workspaces/{workspace_id}/recordings/{recording_id}.zip", download_path / f"{recording_id}.zip", chunk_size=128) | |
shutil.unpack_archive(download_path / f"{recording_id}.zip", download_path / f"{recording_id}") | |
os.remove(download_path / f"{recording_id}.zip") | |
for file_source in glob.glob(str(download_path / f"{recording_id}/*/*")): | |
file_source = Path(file_source) | |
file_destination = file_source.parents[1] / file_source.name | |
shutil.move(file_source,file_destination) | |
def download_file(recording_id: str, workspace_id: str, download_path: str, file_name: str) -> None: | |
os.makedirs(download_path, exist_ok=True) | |
files_on_rec = api_get(f"/workspaces/{workspace_id}/recordings/{recording_id}/files") | |
file_exists=False | |
for f in files_on_rec: | |
if f["name"]==file_name: | |
file_exists=True | |
if not file_exists: | |
log.error(f"file {file_name} not found in recording {recording_id}") | |
raise Exception(f"file {file_name} not found in recording {recording_id}") | |
else: | |
status= download_url(f"/workspaces/{workspace_id}/recordings/{recording_id}/files/{file_name}", download_path / f"{file_name}", chunk_size=128) | |
return status, download_path / f"{file_name}" | |
def download_events(recording_id: str, workspace_id: str, download_path: str, **kwargs): | |
os.makedirs(download_path, exist_ok=True) | |
events= api_get(f"/workspaces/{workspace_id}/recordings/{recording_id}/events") | |
filename= "events" | |
for k, v in kwargs.items(): | |
filename = filename + f"_{v}" | |
filename= filename +".csv" | |
if len(events)==0: | |
log.warning(f"No events found in recording {recording_id}") | |
else: | |
events_df= pd.DataFrame(events).to_csv(download_path / filename, index=False) | |
log.info(filename) | |
if __name__ == "__main__": | |
import argparse | |
parser= argparse.ArgumentParser() | |
parser.add_argument("--workspace_id", type=str, default="", help="The ID of the workspace workspace_id") | |
parser.add_argument("--download_path", type=str, default=Path(""), help="Where to download the results download_path") | |
parser.add_argument("--API_KEY", type=str, default="", help="The API key to use"), | |
args= parser.parse_args() | |
API_KEY= args.API_KEY | |
if API_KEY is None: | |
raise Exception("--API_KEY is None. Please provide an API_KEY") | |
workspace_id= args.workspace_id | |
if workspace_id is None: | |
raise Exception("--workspace_id is None. Please provide a workspace_id") | |
download_path= Path(args.download_path) | |
if download_path is None: | |
raise Exception("--download_path is None. Please provide a download_path") | |
download_path = os.path.normpath(download_path) | |
recs= get_workspace_recordings(workspace_id=workspace_id) | |
log.info(f"Found {len(recs)} recordings in project {workspace_id}") | |
for i, rec in enumerate(recs): | |
log.info(f"Downloading events from recording {i} out of {len(recs)}: {rec['id']}") | |
wearer_name= api_get(f"/workspaces/{workspace_id}/wearers/{rec['wearer_id']}")['name'] | |
download_events(recording_id=rec["id"], workspace_id=workspace_id, download_path=download_path, wearer_name=wearer_name, recording_name=rec["name"]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment