Skip to content

Instantly share code, notes, and snippets.

@smessmer
Last active June 5, 2022 22:27
Show Gist options
  • Save smessmer/1cc47a4c4050979afa5edcf2e2277c97 to your computer and use it in GitHub Desktop.
Save smessmer/1cc47a4c4050979afa5edcf2e2277c97 to your computer and use it in GitHub Desktop.
## This script can either be used as a script
## $ python3 codaio_export.py [api-token] [dest-dir]
##
## or as a library by importing this into your own python project
## and calling the `export_all_docs` function
##
## Dependencies on pypi packages (those need to be installed with `pip install` first)
## - ensure
## - codaio
##
import os, csv, html, argparse
from io import StringIO
from typing import Dict, Any, List
from codaio import Coda # type: ignore
from ensure import check # type: ignore
## Main entry point if used as a library. Call this and pass in your api token and a destination path.
## The script will then download all your coda tables into the dest_path directory.
## Great for backup purposes. It will download them both in csv and html format.
## Unfortunately, because of limitations of the coda.io API, it only downloads your
## tables. It cannot download text content of pages.
def export_all_docs(api_token: str, dest_path: str) -> None:
api = _API(api_token)
for doc in api.get_all_docs():
_export_doc(dest_path, api, doc)
class _Doc:
def __init__(self, data: Dict[str, Any]):
self._data = data
def data(self) -> Dict[str, Any]:
return self._data
def id(self) -> str:
return _parse_str(self._data["id"])
def name(self) -> str:
return _parse_str(self._data["name"])
def folder_id(self) -> str:
return _parse_str(self._data["folder"]["id"])
def folder_name(self) -> str:
return _parse_str(self._data["folder"]["name"])
def relative_dest_path(self) -> str:
folder_name = _remove_path_unsafe_characters(self.folder_name() + " " + self.folder_id())
doc_name = _remove_path_unsafe_characters(self.name() + " " + self.id())
return os.path.join(folder_name, doc_name)
class _Table:
def __init__(self, data: Dict[str, Any]):
self._data = data
def data(self) -> Dict[str, Any]:
return self._data
def id(self) -> str:
return _parse_str(self._data["id"])
def name(self) -> str:
return _parse_str(self._data["name"])
def dest_name(self) -> str:
relative_root = "tables"
table_type = _parse_str(self._data["tableType"])
if table_type == "view":
relative_root = os.path.join(relative_root, "view")
elif table_type == "table":
relative_root = os.path.join(relative_root, "table")
else:
raise Exception("Unknown table type: {}".format(table_type))
table_name = _remove_path_unsafe_characters(self.name() + " " + self.id())
return os.path.join(relative_root, table_name)
class _Column:
def __init__(self, data: Dict[str, Any]):
self._data = data
def id(self) -> str:
return _parse_str(self._data["id"])
def data(self) -> Dict[str, Any]:
return self._data
def name(self) -> str:
return _parse_str(self._data["name"])
def dest_name(self) -> str:
return _remove_path_unsafe_characters(self.name() + " " + self.id())
def formula(self) -> str:
if "formula" in self._data:
return _parse_str(self._data["formula"])
else:
return "no formula"
class _Row:
def __init__(self, data: Dict[str, Any]):
self._data = data
def id(self) -> str:
return _parse_str(self._data["id"])
def index(self) -> int:
return _parse_int(self._data["index"])
def num_cells(self) -> int:
return len(self._data["values"])
def get_cell_value(self, column_id: str) -> str:
return str(self._data["values"][column_id])
class _API:
def __init__(self, api_token: str):
self._coda = Coda(api_token)
def get_all_docs(self) -> List[_Doc]:
# TODO https://github.com/Blasterai/codaio/issues/55
docs = self._coda.list_docs(is_owner=None)
return [_Doc(doc) for doc in docs["items"]]
def get_all_tables_of_doc(self, doc: _Doc) -> List[_Table]:
tables = self._coda.list_tables(doc.id())
return [_Table(table) for table in tables["items"]]
def get_all_columns_of_table(self, doc: _Doc, table: _Table) -> List[_Column]:
columns = self._coda.list_columns(doc.id(), table.id())
return [_Column(column) for column in columns["items"]]
def get_all_rows_of_table(self, doc: _Doc, table: _Table) -> List[_Row]:
rows = self._coda.list_rows(doc.id(), table.id())
return [_Row(row) for row in rows["items"]]
def _export_doc(dest_path: str, api: _API, doc: _Doc) -> None:
doc_path = _path_join_safe(dest_path, doc.relative_dest_path())
os.makedirs(doc_path, exist_ok=False)
with open(_path_join_safe(doc_path, "doc.json"), 'w') as file:
file.write(str(doc.data()))
for table in api.get_all_tables_of_doc(doc):
_export_table(doc_path, api, doc, table)
def _export_table(doc_path: str, api: _API, doc: _Doc, table: _Table) -> None:
table_path = _path_join_safe(doc_path, table.dest_name())
os.makedirs(table_path, exist_ok=False)
with open(_path_join_safe(table_path, "table.json"), 'w') as file:
file.write(str(table.data()))
columns = api.get_all_columns_of_table(doc, table)
_export_columns(table_path, columns)
rows = api.get_all_rows_of_table(doc, table)
_export_rows(table_path, columns, rows)
def _export_columns(table_path: str, columns: List[_Column]) -> None:
columns_path = _path_join_safe(table_path, "columns")
os.makedirs(columns_path, exist_ok=False)
for column in columns:
with open(_path_join_safe(columns_path, column.dest_name() + ".json"), 'w') as file:
file.write(str(column.data()))
def _export_rows(table_path: str, columns: List[_Column], rows: List[_Row]) -> None:
values = _parse_rows(columns, rows)
table_csv = _make_csv(columns, values)
with open(_path_join_safe(table_path, "values.csv"), 'w') as file:
file.write(table_csv)
table_html = _make_html(columns, values)
with open(_path_join_safe(table_path, "values.html"), 'w') as file:
file.write(table_html)
def _parse_rows(columns: List[_Column], rows: List[_Row]) -> List[List[str]]:
rows.sort(key=lambda row: row.index())
return [_parse_row(columns, row) for row in rows]
def _parse_row(columns: List[_Column], row: _Row) -> List[str]:
if row.num_cells() != len(columns):
raise Exception(f"_Row {row.id()} has wrong number of cells. Expected {len(columns)} columns but found {row.num_cells()}")
return [row.get_cell_value(column.id()) for column in columns]
def _make_csv(columns: List[_Column], values: List[List[str]]) -> str:
output = StringIO()
wr = csv.writer(output, quoting=csv.QUOTE_ALL)
wr.writerow([column.name() for column in columns])
wr.writerows(values)
return output.getvalue()
def _make_html(columns: List[_Column], values: List[List[str]]) -> str:
column_headers_html = "".join(f"<th title=\"{html.escape(column.formula())}\">{html.escape(str(column.name()))}</th>" for column in columns)
rows = [
"".join([f"<td>{html.escape(str(cell))}</td>" for cell in row]) for row in values
]
rows_html = "".join(f"<tr>{row}</tr>" for row in rows)
table_html = f"<table><thead><tr>{column_headers_html}</tr></thead><tbody>{rows_html}</tbody></table>"
return f"<html><head/><body>{table_html}</body></html>"
def _remove_path_unsafe_characters(name: str) -> str:
return name.replace('/', '_')
# Like os.path.join, but ensures that the result is in a subdir of the basepath.
# If the subpath is an absolute path, os.path.join would just ignore basepath
# while _path_join_safe instead treats subpath as if it were a relative path.
def _path_join_safe(basepath: str, *subpaths: str) -> str:
# For convenience, allow subpaths that start with "/" and just treat them as relative paths
cleaned_subpaths = [subpath.lstrip("/") for subpath in subpaths]
result = os.path.join(basepath, *cleaned_subpaths)
# Security check: Paths starting with "/" aren't the only way we could escape basepath, there's also ".." and symlinks.
# Let's assert that there is no trickery and we actually return something inside basepath.
real_result = os.path.realpath(result)
real_base = os.path.realpath(basepath)
assert os.path.commonprefix([real_base, real_result]) == real_base, "_path_join_safe({}, {}) tried to access {} which escapes the base path {}".format(basepath, ", ".join(cleaned_subpaths), real_result, real_base)
# Let's check we're also within all intermediate directories
current_base = real_base
for subpath in cleaned_subpaths:
current_base = os.path.realpath(os.path.join(current_base, subpath))
assert os.path.commonprefix([current_base, real_result]) == current_base, "_path_join_safe({}, {}) tried to access {} which escapes the base path {}".format(basepath, ", ".join(cleaned_subpaths), real_result, current_base)
return result
def _parse_int(v: Any) -> int:
check(v).is_a(int).or_raise(
lambda _: Exception(f"Tried to read {v} as int"))
assert isinstance(v, int)
return v
def _parse_str(v: Any) -> str:
check(v).is_a(str).or_raise(
lambda _: Exception(f"Tried to read {v} as str"))
assert isinstance(v, str)
return v
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Export tables from coda.io')
parser.add_argument('api_token', type=str)
parser.add_argument('dest_dir', type=str)
args = parser.parse_args()
export_all_docs(args.api_token, args.dest_dir)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment