Skip to content

Instantly share code, notes, and snippets.

@rlizzo
Created March 4, 2020 23:53
Show Gist options
  • Save rlizzo/cc44fb94211f38db6c42a65f63a0727d to your computer and use it in GitHub Desktop.
Save rlizzo/cc44fb94211f38db6c42a65f63a0727d to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import time\n",
"import random\n",
"import os\n",
"from typing import NamedTuple, Union, Tuple\n",
"from itertools import zip_longest\n",
"import multiprocessing as mp\n",
"import queue\n",
"from collections import defaultdict\n",
"from pprint import pprint as pp\n",
"\n",
"import matplotlib.pyplot as plt\n",
"from tqdm.notebook import tqdm\n",
"import numpy as np\n",
"\n",
"from hangar import Repository\n",
"from hangar.utils import random_string\n",
"from hangar.utils import is_suitable_user_key\n",
"from hangar.columns.common import open_file_handles"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Generate list of sample names and file paths"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note: I'm using a rather large chest xray dataset for testing... you can download it at https://www.kaggle.com/nih-chest-xrays/data; but if you want to avoid the 42GB download then you'll need to adapt this to a local dataset or create method which generates data "
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[('00002437_043', '/home/rick/xray/raw/images_002/images/00002437_043.png'), ('00003698_000', '/home/rick/xray/raw/images_002/images/00003698_000.png')]\n"
]
}
],
"source": [
"fnames_fpaths = []\n",
"for root, dirs, files in os.walk(\"/home/rick/xray/raw/\"):\n",
" for file in files:\n",
" if file.endswith(\".png\"):\n",
" fnames_fpaths.append((file, os.path.join(root, file)))\n",
" \n",
"fnames_fpaths = [(fname.strip('.png'), fpth) for fname, fpth in fnames_fpaths]\n",
"fnames_fpaths = fnames_fpaths[0:10_000]\n",
"\n",
"print(fnames_fpaths[0:2])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Initialize repo and column to store data in"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"repo = Repository('./multirepo/')"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Hangar Repo initialized at: /home/rick/xray/multirepo/.hangar\n"
]
},
{
"data": {
"text/plain": [
"'/home/rick/xray/multirepo/.hangar'"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"repo.init('test', 'test@email.com', remove_old=True)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"co = repo.checkout(write=True)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"col = co.add_ndarray_column('images', shape=(1024, 1024), dtype=np.float32)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### User defined read function\n",
"\n",
" Input -> arg/kwargs specifying how to get data for a single (specific) sample \n",
" Output -> data associated with that single sample"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"def image_reader(file_name):\n",
" arr = plt.imread(file_name)\n",
" if arr.shape != (1024, 1024): \n",
" arr = None\n",
" return arr"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Hangar internals allowing batch data loader"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"KeyType = Union[str, int]\n",
"\n",
"class BatchDataSpec(NamedTuple):\n",
" key: Union[Tuple[KeyType, KeyType], KeyType]\n",
" backend_spec: bytes\n",
" digest: str\n",
" \n",
"def grouper(iterable, n, fillvalue=None):\n",
" \"\"\"split iterable into n sized groups upon each call to `next()`\n",
" \"\"\"\n",
" args = [iter(iterable)] * n\n",
" return zip_longest(*args, fillvalue=(None, None))"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"def check_user_input(sample_keys_reader_args):\n",
" \"\"\"Validate user input specifying sample keys and reader func args/kwargs\n",
" \n",
" Parameters\n",
" ----------\n",
" sample_keys_reader_args: list/tuple\n",
" two element list/tuple where \n",
" \n",
" element 0 --> sample key (flat layout) or size == 2 list/tuple of \n",
" samples/subsample key (nested layout). \n",
" \n",
" element 1 --> single arg or dict of kwargs which are passed to\n",
" user specified data reader func to retrieve a single samples data\n",
" from disk.\n",
" \"\"\"\n",
" if not isinstance(sample_keys_reader_args, (list, tuple)):\n",
" raise TypeError(\n",
" f'expected arg of type list or tuple, recieved {type(sample_keys_reader_args)}')\n",
" \n",
" if len(sample_keys_reader_args) <= 1:\n",
" raise ValueError(f'batch input must specify more than 1 data sample')\n",
" \n",
" hashable_sample_keys = []\n",
" \n",
" for idx, sample_definition in enumerate(sample_keys_reader_args):\n",
" if len(sample_definition) != 2:\n",
" raise ValueError(\n",
" f'all items in batch input collection must be length two. Recieved '\n",
" f'{sample_definition} which is length {len(sample_definition)}')\n",
" \n",
" if idx == 0:\n",
" const_elem0_type = type(sample_definition[0])\n",
" const_elem1_type = type(sample_definition[1])\n",
" \n",
" if const_elem0_type in (list, tuple):\n",
" CHECK_ELEM0_COLLECTION = True\n",
" CHECK_ELEM0_SIZE = len(sample_definition[0])\n",
" else:\n",
" CHECK_ELEM0_COLLECTION = False\n",
" CHECK_ELEM0_SIZE = None\n",
" \n",
" elem0, elem1 = sample_definition\n",
" \n",
" if CHECK_ELEM0_COLLECTION:\n",
" if not isinstance(elem0, const_elem0_type):\n",
" raise TypeError(\n",
" f'all sample key arguments must be same type. sample 0 has type '\n",
" f'{const_elem0_type} and sample {idx} of value {elem0} is type {type(elem0)}')\n",
" if len(elem0) != CHECK_ELEM0_SIZE:\n",
" raise Value(\n",
" f'all sample key arguments must be same length. sample 0 has length '\n",
" f'{CHECK_ELEM0_SIZE} and sample {idx} of length {len(elem0)}')\n",
" \n",
" if not all([is_suitable_user_key(k) for k in elem0]):\n",
" raise ValueError(f'one of key values in {elem0} is not valid')\n",
" \n",
" else:\n",
" if isinstance(elem0, (list, tuple)):\n",
" raise TypeError(\n",
" f'all sample key arguments must be same type. sample 0 has type '\n",
" f'{const_elem0_type} and sample {idx} of value {elem0} is type {type(elem0)}')\n",
" \n",
" if not is_suitable_user_key(elem0):\n",
" raise ValueError(f'key {elem0} is not valid')\n",
" \n",
" hashable_sample_keys.append(tuple(elem0))\n",
" \n",
" # check that each key is unique\n",
" unique_sample_keys = set(hashable_sample_keys)\n",
" if len(hashable_sample_keys) != len(unique_sample_keys):\n",
" raise ValueError(f'sample keys cannot be duplicated')\n",
" "
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"check_user_input(fnames_fpaths)"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"class BatchProcessThread(mp.Process):\n",
" \"\"\"Image Thread\"\"\"\n",
" def __init__(self, read_func, backend_instance, schema, in_queue, out_queue, *args, **kwargs):\n",
" \"\"\"\n",
" read_func:\n",
" user provided function which takes some set of kwargs to generate one data sample\n",
" backend_instance:\n",
" initialized hangar backend class instance which will write data to disk for all\n",
" samples read in via this thread. \n",
" schema:\n",
" initialized schema object for the column. This is required in order to properly\n",
" calculate the data hash digests.\n",
" in_queue:\n",
" multiprocessing.Queue object which passes in kwargs to read data for one sample via `read_func`\n",
" as well as sample/subsample names to assign to the resulting data. \n",
" tuple in form of `(kwargs, (samplen, [subsamplen,]))`\n",
" out_queue:\n",
" multiprocessing.Queue object which passes back sample keys formated for storage in ref db,\n",
" serialized location spec, and hash digest of read / saved data. \n",
" \"\"\"\n",
" super().__init__(*args, **kwargs)\n",
" self.read_func = read_func\n",
" self.backend_instance = backend_instance\n",
" self.in_queue = in_queue\n",
" self.out_queue = out_queue\n",
" self.schema = schema\n",
"\n",
" def run(self):\n",
" while True:\n",
" # Grabs image path from queue\n",
" try:\n",
" sample_keys_and_reader_kwargs = self.in_queue.get(True, 5)\n",
" except queue.Empty:\n",
" print(f'queue empty in worker')\n",
" break\n",
" \n",
" # Grab image\n",
" sample_keys_and_data = (\n",
" (keys, self.read_func(kwargs)) for keys, kwargs in sample_keys_and_reader_kwargs\n",
" if ((keys is not None) and (kwargs is not None))\n",
" )\n",
" \n",
" # Place image in out queue\n",
" saved_key_location_digests = []\n",
" for k, data in sample_keys_and_data:\n",
" if k is None or data is None:\n",
" continue\n",
" iscompat = self.schema.verify_data_compatible(data)\n",
" if not iscompat.compatible:\n",
" raise ValueError(f'data for key {k} incompatible due to {iscompat.reason}')\n",
" \n",
" digest = self.schema.data_hash_digest(data)\n",
" location_spec = self.backend_instance.write_data(data)\n",
" res = BatchDataSpec(k, location_spec, digest)\n",
" saved_key_location_digests.append(res)\n",
" \n",
" self.out_queue.put(saved_key_location_digests)"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"# CONFIGURATION\n",
"\n",
"user_provided_sample_keys_reader_kwargs = fnames_fpaths\n",
"reader_func = image_reader\n",
"\n",
"n_cpus = 7\n",
"q_size = n_cpus + 2\n",
"batch_size = 5"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "ccb6c72d2403438695e704da819f6278",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"HBox(children=(FloatProgress(value=0.0, max=10000.0), HTML(value='')))"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"=====================\n",
"total time: 2.901878833770752\n",
"queue empty in worker\n",
"queue empty in worker\n",
"queue empty in worker\n",
"queue empty in worker\n",
"queue empty in worker\n",
"queue empty in worker\n",
"queue empty in worker\n"
]
}
],
"source": [
"\"\"\"MAIN Method / Starting Point\n",
"\n",
"in_queue -> contains `batch_size` length list of (sample_key, reader_args) tuples\n",
"out_queue -> contains `batch_size` list of namedtuple specifying (sample_key, backend_location_spec, data_hash_digest)\n",
"col -> column instance to store data in\n",
"\n",
"because worker processes execute `run` in an infinite (while true) loop (so they can\n",
"reuse backend instance for every sample fed in to them), they will only `join` if the\n",
"`in_queue` is left empty for N (currently set to 5) seconds. Not a good solution,\n",
"should be changed...\n",
"\n",
"DESIRED BEHAVIOR (not sure if currently working):\n",
"\n",
"- should any sample read / write raise an exception, then the entire process should fail and all\n",
" data written to disk should be deleted. \n",
"\"\"\"\n",
"\n",
"# setup queues\n",
"in_queue = mp.Queue(maxsize=q_size)\n",
"out_queue = mp.Queue(maxsize=q_size)\n",
"grouped_sample_keys_and_reader_kwargs = grouper(user_provided_sample_keys_reader_kwargs, batch_size)\n",
"\n",
"\n",
"# start worker processes\n",
"out = []\n",
"jobs = []\n",
"for i in range(n_cpus):\n",
" backend = col.backend\n",
" schema = col._schema\n",
" be_instance = open_file_handles([backend], col._path, mode='a', schema=col._schema)[backend]\n",
" t = BatchProcessThread(reader_func, be_instance, schema, in_queue, out_queue)\n",
" jobs.append(t)\n",
" t.start()\n",
"\n",
" \n",
"# Populate queue with batched arguments\n",
"for sample_keys_and_reader_kwargs_group in range(q_size):\n",
" sample_keys_and_reader_kwargs_group = next(grouped_sample_keys_and_reader_kwargs)\n",
" in_queue.put(sample_keys_and_reader_kwargs_group)\n",
"\n",
" \n",
"# collect outputs and fill queue with more work if low\n",
"# terminate if no more work should be done. \n",
"with tqdm(total=len(user_provided_sample_keys_reader_kwargs)) as pbar:\n",
" start = time.time()\n",
" total_time = 0\n",
" itr = 1\n",
" ngroups = 0\n",
" remaining = True\n",
" \n",
" while remaining is True:\n",
" data_key_location_hash_digests = out_queue.get()\n",
" for saved in data_key_location_hash_digests:\n",
" pbar.update(1)\n",
" out.append(saved)\n",
" itr += 1\n",
" try:\n",
" sz = in_queue.qsize()\n",
" if sz < min((2, q_size)):\n",
" for sample_keys_and_reader_kwargs_group in range(q_size-sz):\n",
" sample_keys_and_reader_kwargs_group = next(grouped_sample_keys_and_reader_kwargs)\n",
" in_queue.put(sample_keys_and_reader_kwargs_group)\n",
" except StopIteration:\n",
" if in_queue.qsize() == 0:\n",
" if ngroups == n_cpus or itr == len(user_provided_sample_keys_reader_kwargs):\n",
" remaining = False\n",
" break\n",
" else:\n",
" ngroups += 1\n",
" \n",
"# calculate time elapsed\n",
"stop = time.time()\n",
"total_time += stop - start\n",
" \n",
"print('=====================')\n",
"print(f'total time: {total_time}')\n",
"\n",
"for j in jobs:\n",
" j.join()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Manual Sanity Checks"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"9918"
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"len(out)"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[BatchDataSpec(key='00002586_005', backend_spec=b'01:oi1d44bd:c971b2dca031426e:0:0:1024 1024', digest='0=19fc78e2d8f3dcfc7c1885b311e9b644fdd55a8c'),\n",
" BatchDataSpec(key='00003297_000', backend_spec=b'01:oi1d44bd:6c369bea1befc5aa:0:10:1024 1024', digest='0=dfaa7f6662ee328a3f5cbc2f5bdf354f8d09af1b'),\n",
" BatchDataSpec(key='00002403_001', backend_spec=b'01:oi1d44bd:4ebfff4ac8fa6eca:0:20:1024 1024', digest='0=c7003c3ed206cb09ef3375ef7ad1f324a36e4d44'),\n",
" BatchDataSpec(key='00001836_109', backend_spec=b'01:k1941ub5:fe39e2338e6569a1:0:0:1024 1024', digest='0=f2ded33933f60a0a2b385ee94425cf2bc44b04e2'),\n",
" BatchDataSpec(key='00003528_012', backend_spec=b'01:0nrbd3du:97b581b7e632cb0d:0:0:1024 1024', digest='0=ac2893179d41ae00aec4bcc36aaacad5cf457643'),\n",
" BatchDataSpec(key='00002735_000', backend_spec=b'01:8p0vlaz7:4b762d0d8868ab00:0:0:1024 1024', digest='0=2b1cbe51accc005b6732026e5c4217027a41539b'),\n",
" BatchDataSpec(key='00003534_005', backend_spec=b'01:0nrbd3du:96288f520ecb9077:0:5:1024 1024', digest='0=16eddcad3f42bcb62c3f79a7beefb86ac7c65755')]"
]
},
"execution_count": 17,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"out[0:70:10]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Post-Save Analysis and Prep for Saving\n",
"\n",
"At this point, we have the data written to disk in hangar format and have the necessary info for it to be included in a hangar checkout:\n",
"\n",
"- column name\n",
"- sample name\n",
"- data digest\n",
"- backend spec\n",
"\n",
"Even though the data is saved in the `.hangar` directory under the appropriate areas, it's not actually visible in Hangar book-keeping. \n",
"For this to occur, we need to write the followig info:\n",
"\n",
"- hash_db: `data_digest -> backend_location_spec` (p\n",
"- staged_hash_db: `data_digest -> backend_location_spec` (this allows the user to actually purge data from disk if they want to reset the staging area head)\n",
"- staged_ref_db: `column/sample key -> data_digest`\n",
"\n",
"Before we do that, we should (probably) do a few things:\n",
"\n",
"1. Check for large quantities of duplicated data written by the batch loader. \n",
"\n",
" Since batch loading doesn't check for / deduplicate identical sample data values (or check the global `hash_db` for already existing digests, it's possible that this operation would resave identical data multiple times in the repo. \n",
" \n",
" - As deduplicating data written in the batch loader would involve essentially rewriting all the data previously processed here, it's not a worthwhile operation if duplication is low. \n",
"\n",
" We need to set a threshold after which a repacking would occur, and build out the methods to dedup digests, sort, and rewrite the data if needed. If a full repack is not needed, then we should only choose one of the `backend_location_spec`'s to associate with a digest. (this means the other data location would be essentially un-referencable by hangar)\n",
" \n",
" - An analogous dedup method needs to be implemented on a check of the global `hash_db` table. \n",
" \n",
"2. Transform output of batch info into formated records expected by each of the three record databases we will write to. \n",
"\n",
" Use the methods defined in each column to transform records (but not save) into a list of two-tuples mapping db formatted records of `(keys, values)`...\n",
" \n",
"3. Wrap in Exit Stack / Context Manager which:\n",
"\n",
" - Makes record writes atomic\n",
" - If any error occurs, deletes all loaded data from hangar repo & rolls back any pending changes to the record dbs\n",
" - joines process pool upon completion / exception\n",
" "
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"percentage duplicated\n",
"---------------------\n",
"0.030% (2 of 9918)\n",
"\n",
"duplicated digests\n",
"------------------\n",
"0=e3f1126ceed6b9606172ebaef15bd6eed9c37fb8\n",
"0=5cd3d21d1abc1135c4d56fdadbdced5dd9dd8319\n",
"\n",
"duplicated specs\n",
"----------------\n",
"[BatchDataSpec(key='00002658_002', backend_spec=b'01:k1941ub5:8da68bd4ea97f38c:10:42:1024 1024', digest='0=e3f1126ceed6b9606172ebaef15bd6eed9c37fb8'),\n",
" BatchDataSpec(key='00002658_001', backend_spec=b'01:gk5gjri3:8da68bd4ea97f38c:10:94:1024 1024', digest='0=e3f1126ceed6b9606172ebaef15bd6eed9c37fb8')]\n",
"\n",
"[BatchDataSpec(key='00001989_004', backend_spec=b'01:8p0vlaz7:96f6c389836548a2:11:94:1024 1024', digest='0=5cd3d21d1abc1135c4d56fdadbdced5dd9dd8319'),\n",
" BatchDataSpec(key='00001989_005', backend_spec=b'01:k1941ub5:96f6c389836548a2:10:85:1024 1024', digest='0=5cd3d21d1abc1135c4d56fdadbdced5dd9dd8319')]\n",
"\n"
]
}
],
"source": [
"digest_dict = defaultdict(list)\n",
"\n",
"for spec in out:\n",
" digest = spec.digest\n",
" digest_dict[digest].append(spec)\n",
" \n",
"n_total = len(out)\n",
"n_unique = len(digest_dict)\n",
"percentage = (1 - (n_unique - n_total)) / n_total\n",
"print('percentage duplicated')\n",
"print('---------------------')\n",
"print(f'{percentage * 100:.3f}% ({n_total - n_unique} of {n_total})')\n",
"print('')\n",
"\n",
"print('duplicated digests')\n",
"print('------------------')\n",
"duplicate_digests = []\n",
"for digest, spec in digest_dict.items():\n",
" if len(spec) > 1:\n",
" duplicate_digests.append(digest)\n",
" print(digest)\n",
"print('')\n",
"\n",
"print('duplicated specs')\n",
"print('----------------')\n",
"for dup in duplicate_digests:\n",
" pp(digest_dict[dup])\n",
" print('')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.3"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment