Last active
June 5, 2022 22:27
-
-
Save smessmer/1cc47a4c4050979afa5edcf2e2277c97 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## This script can either be used as a script | |
## $ python3 codaio_export.py [api-token] [dest-dir] | |
## | |
## or as a library by importing this into your own python project | |
## and calling the `export_all_docs` function | |
## | |
## Dependencies on pypi packages (those need to be installed with `pip install` first) | |
## - ensure | |
## - codaio | |
## | |
import os, csv, html, argparse | |
from io import StringIO | |
from typing import Dict, Any, List | |
from codaio import Coda # type: ignore | |
from ensure import check # type: ignore | |
## Main entry point if used as a library. Call this and pass in your api token and a destination path. | |
## The script will then download all your coda tables into the dest_path directory. | |
## Great for backup purposes. It will download them both in csv and html format. | |
## Unfortunately, because of limitations of the coda.io API, it only downloads your | |
## tables. It cannot download text content of pages. | |
def export_all_docs(api_token: str, dest_path: str) -> None: | |
api = _API(api_token) | |
for doc in api.get_all_docs(): | |
_export_doc(dest_path, api, doc) | |
class _Doc: | |
def __init__(self, data: Dict[str, Any]): | |
self._data = data | |
def data(self) -> Dict[str, Any]: | |
return self._data | |
def id(self) -> str: | |
return _parse_str(self._data["id"]) | |
def name(self) -> str: | |
return _parse_str(self._data["name"]) | |
def folder_id(self) -> str: | |
return _parse_str(self._data["folder"]["id"]) | |
def folder_name(self) -> str: | |
return _parse_str(self._data["folder"]["name"]) | |
def relative_dest_path(self) -> str: | |
folder_name = _remove_path_unsafe_characters(self.folder_name() + " " + self.folder_id()) | |
doc_name = _remove_path_unsafe_characters(self.name() + " " + self.id()) | |
return os.path.join(folder_name, doc_name) | |
class _Table: | |
def __init__(self, data: Dict[str, Any]): | |
self._data = data | |
def data(self) -> Dict[str, Any]: | |
return self._data | |
def id(self) -> str: | |
return _parse_str(self._data["id"]) | |
def name(self) -> str: | |
return _parse_str(self._data["name"]) | |
def dest_name(self) -> str: | |
relative_root = "tables" | |
table_type = _parse_str(self._data["tableType"]) | |
if table_type == "view": | |
relative_root = os.path.join(relative_root, "view") | |
elif table_type == "table": | |
relative_root = os.path.join(relative_root, "table") | |
else: | |
raise Exception("Unknown table type: {}".format(table_type)) | |
table_name = _remove_path_unsafe_characters(self.name() + " " + self.id()) | |
return os.path.join(relative_root, table_name) | |
class _Column: | |
def __init__(self, data: Dict[str, Any]): | |
self._data = data | |
def id(self) -> str: | |
return _parse_str(self._data["id"]) | |
def data(self) -> Dict[str, Any]: | |
return self._data | |
def name(self) -> str: | |
return _parse_str(self._data["name"]) | |
def dest_name(self) -> str: | |
return _remove_path_unsafe_characters(self.name() + " " + self.id()) | |
def formula(self) -> str: | |
if "formula" in self._data: | |
return _parse_str(self._data["formula"]) | |
else: | |
return "no formula" | |
class _Row: | |
def __init__(self, data: Dict[str, Any]): | |
self._data = data | |
def id(self) -> str: | |
return _parse_str(self._data["id"]) | |
def index(self) -> int: | |
return _parse_int(self._data["index"]) | |
def num_cells(self) -> int: | |
return len(self._data["values"]) | |
def get_cell_value(self, column_id: str) -> str: | |
return str(self._data["values"][column_id]) | |
class _API: | |
def __init__(self, api_token: str): | |
self._coda = Coda(api_token) | |
def get_all_docs(self) -> List[_Doc]: | |
# TODO https://github.com/Blasterai/codaio/issues/55 | |
docs = self._coda.list_docs(is_owner=None) | |
return [_Doc(doc) for doc in docs["items"]] | |
def get_all_tables_of_doc(self, doc: _Doc) -> List[_Table]: | |
tables = self._coda.list_tables(doc.id()) | |
return [_Table(table) for table in tables["items"]] | |
def get_all_columns_of_table(self, doc: _Doc, table: _Table) -> List[_Column]: | |
columns = self._coda.list_columns(doc.id(), table.id()) | |
return [_Column(column) for column in columns["items"]] | |
def get_all_rows_of_table(self, doc: _Doc, table: _Table) -> List[_Row]: | |
rows = self._coda.list_rows(doc.id(), table.id()) | |
return [_Row(row) for row in rows["items"]] | |
def _export_doc(dest_path: str, api: _API, doc: _Doc) -> None: | |
doc_path = _path_join_safe(dest_path, doc.relative_dest_path()) | |
os.makedirs(doc_path, exist_ok=False) | |
with open(_path_join_safe(doc_path, "doc.json"), 'w') as file: | |
file.write(str(doc.data())) | |
for table in api.get_all_tables_of_doc(doc): | |
_export_table(doc_path, api, doc, table) | |
def _export_table(doc_path: str, api: _API, doc: _Doc, table: _Table) -> None: | |
table_path = _path_join_safe(doc_path, table.dest_name()) | |
os.makedirs(table_path, exist_ok=False) | |
with open(_path_join_safe(table_path, "table.json"), 'w') as file: | |
file.write(str(table.data())) | |
columns = api.get_all_columns_of_table(doc, table) | |
_export_columns(table_path, columns) | |
rows = api.get_all_rows_of_table(doc, table) | |
_export_rows(table_path, columns, rows) | |
def _export_columns(table_path: str, columns: List[_Column]) -> None: | |
columns_path = _path_join_safe(table_path, "columns") | |
os.makedirs(columns_path, exist_ok=False) | |
for column in columns: | |
with open(_path_join_safe(columns_path, column.dest_name() + ".json"), 'w') as file: | |
file.write(str(column.data())) | |
def _export_rows(table_path: str, columns: List[_Column], rows: List[_Row]) -> None: | |
values = _parse_rows(columns, rows) | |
table_csv = _make_csv(columns, values) | |
with open(_path_join_safe(table_path, "values.csv"), 'w') as file: | |
file.write(table_csv) | |
table_html = _make_html(columns, values) | |
with open(_path_join_safe(table_path, "values.html"), 'w') as file: | |
file.write(table_html) | |
def _parse_rows(columns: List[_Column], rows: List[_Row]) -> List[List[str]]: | |
rows.sort(key=lambda row: row.index()) | |
return [_parse_row(columns, row) for row in rows] | |
def _parse_row(columns: List[_Column], row: _Row) -> List[str]: | |
if row.num_cells() != len(columns): | |
raise Exception(f"_Row {row.id()} has wrong number of cells. Expected {len(columns)} columns but found {row.num_cells()}") | |
return [row.get_cell_value(column.id()) for column in columns] | |
def _make_csv(columns: List[_Column], values: List[List[str]]) -> str: | |
output = StringIO() | |
wr = csv.writer(output, quoting=csv.QUOTE_ALL) | |
wr.writerow([column.name() for column in columns]) | |
wr.writerows(values) | |
return output.getvalue() | |
def _make_html(columns: List[_Column], values: List[List[str]]) -> str: | |
column_headers_html = "".join(f"<th title=\"{html.escape(column.formula())}\">{html.escape(str(column.name()))}</th>" for column in columns) | |
rows = [ | |
"".join([f"<td>{html.escape(str(cell))}</td>" for cell in row]) for row in values | |
] | |
rows_html = "".join(f"<tr>{row}</tr>" for row in rows) | |
table_html = f"<table><thead><tr>{column_headers_html}</tr></thead><tbody>{rows_html}</tbody></table>" | |
return f"<html><head/><body>{table_html}</body></html>" | |
def _remove_path_unsafe_characters(name: str) -> str: | |
return name.replace('/', '_') | |
# Like os.path.join, but ensures that the result is in a subdir of the basepath. | |
# If the subpath is an absolute path, os.path.join would just ignore basepath | |
# while _path_join_safe instead treats subpath as if it were a relative path. | |
def _path_join_safe(basepath: str, *subpaths: str) -> str: | |
# For convenience, allow subpaths that start with "/" and just treat them as relative paths | |
cleaned_subpaths = [subpath.lstrip("/") for subpath in subpaths] | |
result = os.path.join(basepath, *cleaned_subpaths) | |
# Security check: Paths starting with "/" aren't the only way we could escape basepath, there's also ".." and symlinks. | |
# Let's assert that there is no trickery and we actually return something inside basepath. | |
real_result = os.path.realpath(result) | |
real_base = os.path.realpath(basepath) | |
assert os.path.commonprefix([real_base, real_result]) == real_base, "_path_join_safe({}, {}) tried to access {} which escapes the base path {}".format(basepath, ", ".join(cleaned_subpaths), real_result, real_base) | |
# Let's check we're also within all intermediate directories | |
current_base = real_base | |
for subpath in cleaned_subpaths: | |
current_base = os.path.realpath(os.path.join(current_base, subpath)) | |
assert os.path.commonprefix([current_base, real_result]) == current_base, "_path_join_safe({}, {}) tried to access {} which escapes the base path {}".format(basepath, ", ".join(cleaned_subpaths), real_result, current_base) | |
return result | |
def _parse_int(v: Any) -> int: | |
check(v).is_a(int).or_raise( | |
lambda _: Exception(f"Tried to read {v} as int")) | |
assert isinstance(v, int) | |
return v | |
def _parse_str(v: Any) -> str: | |
check(v).is_a(str).or_raise( | |
lambda _: Exception(f"Tried to read {v} as str")) | |
assert isinstance(v, str) | |
return v | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description='Export tables from coda.io') | |
parser.add_argument('api_token', type=str) | |
parser.add_argument('dest_dir', type=str) | |
args = parser.parse_args() | |
export_all_docs(args.api_token, args.dest_dir) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment