"cells": [
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import time\n",
"import random\n",
"import os\n",
"from typing import NamedTuple, Union, Tuple\n",
"from itertools import zip_longest\n",
"import multiprocessing as mp\n",
"import queue\n",
"from collections import defaultdict\n",
"from pprint import pprint as pp\n",
"import matplotlib.pyplot as plt\n",
"from tqdm.notebook import tqdm\n",
"import numpy as np\n",
"from hangar import Repository\n",
"from hangar.utils import random_string\n",
"from hangar.utils import is_suitable_user_key\n",
"from hangar.columns.common import open_file_handles"
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
"name": "stdout",
"output_type": "stream",
"text": [
"[('00002437_043', '/home/rick/xray/raw/images_002/images/00002437_043.png'), ('00003698_000', '/home/rick/xray/raw/images_002/images/00003698_000.png')]\n"
"source": [
"fnames_fpaths = []\n",
"for root, dirs, files in os.walk(\"/home/rick/xray/raw/\"):\n",
" for file in files:\n",
" if file.endswith(\".png\"):\n",
" fnames_fpaths.append((file, os.path.join(root, file)))\n",
" \n",
"fnames_fpaths = [(fname.strip('.png'), fpth) for fname, fpth in fnames_fpaths]\n",
"fnames_fpaths = fnames_fpaths[0:10_000]\n",
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"repo = Repository('./multirepo/')"
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
"name": "stdout",
"output_type": "stream",
"text": [
"Hangar Repo initialized at: /home/rick/xray/multirepo/.hangar\n"
"data": {
"text/plain": [
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
"source": [
"repo.init('test', '', remove_old=True)"
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"co = repo.checkout(write=True)"
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"col = co.add_ndarray_column('images', shape=(1024, 1024), dtype=np.float32)"
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"def image_reader(file_name):\n",
" arr = plt.imread(file_name)\n",
" if arr.shape != (1024, 1024):\n",
" arr = None\n",
" return arr"
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"KeyType = Union[str, int]\n",
"class BatchDataSpec(NamedTuple):\n",
" key: Union[Tuple[KeyType, KeyType], KeyType]\n",
" backend_spec: bytes\n",
" digest: str\n",
" \n",
"def grouper(iterable, n, fillvalue=None):\n",
" args = [iter(iterable)] * n\n",
" return zip_longest(*args, fillvalue=(None, None))"
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"def check_user_input(sample_keys_reader_args):\n",
" \"\"\"Validate user input specifying sample keys and reader func args/kwargs\n",
" \n",
" Parameters\n",
" ----------\n",
" sample_keys_reader_args: list/tuple\n",
" two element list/tuple where \n",
" \n",
" element 0 --> sample key (flat layout) or size == 2 list/tuple of \n",
" samples/subsample key (nested layout). \n",
" \n",
" element 1 --> single arg or dict of kwargs which are passed to\n",
" user specified data reader func to retrieve a single samples data\n",
" from disk.\n",
" \"\"\"\n",
" if not isinstance(sample_keys_reader_args, (list, tuple)):\n",
" raise TypeError(\n",
" f'expected arg of type list or tuple, recieved {type(sample_keys_reader_args)}')\n",
" \n",
" if len(sample_keys_reader_args) <= 1:\n",
" raise ValueError(f'batch input must specify more than 1 data sample')\n",
" \n",
" hashable_sample_keys = []\n",
" \n",
" for idx, sample_definition in enumerate(sample_keys_reader_args):\n",
" if len(sample_definition) != 2:\n",
" raise ValueError(\n",
" f'all items in batch input collection must be length two. Recieved '\n",
" f'{sample_definition} which is length {len(sample_definition)}')\n",
" \n",
" if idx == 0:\n",
" const_elem0_type = type(sample_definition[0])\n",
" const_elem1_type = type(sample_definition[1])\n",
" \n",
" if const_elem0_type in (list, tuple):\n",
" CHECK_ELEM0_SIZE = len(sample_definition[0])\n",
" else:\n",
" CHECK_ELEM0_SIZE = None\n",
" \n",
" elem0, elem1 = sample_definition\n",
" \n",
" if not isinstance(elem0, const_elem0_type):\n",
" raise TypeError(\n",
" f'all sample key arguments must be same type. sample 0 has type '\n",
" f'{const_elem0_type} and sample {idx} of value {elem0} is type {type(elem0)}')\n",
" if len(elem0) != CHECK_ELEM0_SIZE:\n",
" raise Value(\n",
" f'all sample key arguments must be same length. sample 0 has length '\n",
" f'{CHECK_ELEM0_SIZE} and sample {idx} of length {len(elem0)}')\n",
" \n",
" if not all([is_suitable_user_key(k) for k in elem0]):\n",
" raise ValueError(f'one of key values in {elem0} is not valid')\n",
" \n",
" else:\n",
" if isinstance(elem0, (list, tuple)):\n",
" raise TypeError(\n",
" f'all sample key arguments must be same type. sample 0 has type '\n",
" f'{const_elem0_type} and sample {idx} of value {elem0} is type {type(elem0)}')\n",
" \n",
" if not is_suitable_user_key(elem0):\n",
" raise ValueError(f'key {elem0} is not valid')\n",
" \n",
" hashable_sample_keys.append(tuple(elem0))\n",
" \n",
" # check that each key is unique\n",
" unique_sample_keys = set(hashable_sample_keys)\n",
" if len(hashable_sample_keys) != len(unique_sample_keys):\n",
" raise ValueError(f'sample keys cannot be duplicated')\n",
" "
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"class BatchProcessThread(mp.Process):\n",
" \"\"\"Image Thread\"\"\"\n",
" def __init__(self, read_func, backend_instance, schema, in_queue, out_queue, *args, **kwargs):\n",
" \"\"\"\n",
" read_func:\n",
" user provided function which takes some set of kwargs to generate one data sample\n",
" backend_instance:\n",
" initialized hangar backend class instance which will write data to disk for all\n",
" samples read in via this thread. \n",
" schema:\n",
" initialized schema object for the column. This is required in order to properly\n",
" calculate the data hash digests.\n",
" in_queue:\n",
" multiprocessing.Queue object which passes in kwargs to read data for one sample via `read_func`\n",
" as well as sample/subsample names to assign to the resulting data. \n",
" tuple in form of `(kwargs, (samplen, [subsamplen,]))`\n",
" out_queue:\n",
" multiprocessing.Queue object which passes back sample keys formated for storage in ref db,\n",
" serialized location spec, and hash digest of read / saved data. \n",
" \"\"\"\n",
" super().__init__(*args, **kwargs)\n",
" self.read_func = read_func\n",
" self.backend_instance = backend_instance\n",
" self.in_queue = in_queue\n",
" self.out_queue = out_queue\n",
" self.schema = schema\n",
" def run(self):\n",
" while True:\n",
" # Grabs image path from queue\n",
" try:\n",
" sample_keys_and_reader_kwargs = self.in_queue.get(True, 5)\n",
" except queue.Empty:\n",
" print(f'queue empty in worker')\n",
" break\n",
" \n",
" # Grab image\n",
" sample_keys_and_data = (\n",
" (keys, self.read_func(kwargs)) for keys, kwargs in sample_keys_and_reader_kwargs\n",
" if ((keys is not None) and (kwargs is not None))\n",
" )\n",
" \n",
" # Place image in out queue\n",
" saved_key_location_digests = []\n",
" for k, data in sample_keys_and_data:\n",
" if k is None or data is None:\n",
" continue\n",
" iscompat = self.schema.verify_data_compatible(data)\n",
" if not iscompat.compatible:\n",
" raise ValueError(f'data for key {k} incompatible due to {iscompat.reason}')\n",
" \n",
" digest = self.schema.data_hash_digest(data)\n",
" location_spec = self.backend_instance.write_data(data)\n",
" res = BatchDataSpec(k, location_spec, digest)\n",
" saved_key_location_digests.append(res)\n",
" \n",
" self.out_queue.put(saved_key_location_digests)"
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"user_provided_sample_keys_reader_kwargs = fnames_fpaths\n",
"reader_func = image_reader\n",
"n_cpus = 7\n",
"q_size = n_cpus + 2\n",
"batch_size = 5"
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "ccb6c72d2403438695e704da819f6278",
"version_major": 2,
"version_minor": 0
"text/plain": [
"HBox(children=(FloatProgress(value=0.0, max=10000.0), HTML(value='')))"
"metadata": {},
"output_type": "display_data"
"name": "stdout",
"output_type": "stream",
"text": [
"total time: 2.901878833770752\n",
"queue empty in worker\n",
"queue empty in worker\n",
"queue empty in worker\n",
"queue empty in worker\n",
"queue empty in worker\n",
"queue empty in worker\n",
"queue empty in worker\n"
"source": [
"in_queue = mp.Queue(maxsize=q_size)\n",
"out_queue = mp.Queue(maxsize=q_size)\n",
"grouped_sample_keys_and_reader_kwargs = grouper(user_provided_sample_keys_reader_kwargs, batch_size)\n",
"out = []\n",
"jobs = []\n",
"for i in range(n_cpus):\n",
" backend = col.backend\n",
" schema = col._schema\n",
" be_instance = open_file_handles([backend], col._path, mode='a', schema=col._schema)[backend]\n",
" t = BatchProcessThread(reader_func, be_instance, schema, in_queue, out_queue)\n",
" jobs.append(t)\n",
" t.start()\n",
" \n",
"# Populate queue with some paths to image data\n",
"for sample_keys_and_reader_kwargs_group in range(q_size):\n",
" sample_keys_and_reader_kwargs_group = next(grouped_sample_keys_and_reader_kwargs)\n",
" in_queue.put(sample_keys_and_reader_kwargs_group)\n",
"tot = 0\n",
"itr = 1\n",
"ngroups = 0\n",
"remaining = True\n",
"with tqdm(total=len(user_provided_sample_keys_reader_kwargs)) as pbar:\n",
" while remaining is True:\n",
" data_key_location_hash_digests = out_queue.get()\n",
" start = time.time()\n",
" for saved in data_key_location_hash_digests:\n",
" pbar.update(1)\n",
" out.append(saved)\n",
" itr += 1\n",
" stop = time.time()\n",
" tot += stop - start\n",
" try:\n",
" sz = in_queue.qsize()\n",
" if sz < min((2, q_size)):\n",
" for sample_keys_and_reader_kwargs_group in range(q_size-sz):\n",
" sample_keys_and_reader_kwargs_group = next(grouped_sample_keys_and_reader_kwargs)\n",
" in_queue.put(sample_keys_and_reader_kwargs_group)\n",
" except StopIteration:\n",
" if in_queue.qsize() == 0:\n",
" if ngroups == n_cpus or itr == len(user_provided_sample_keys_reader_kwargs):\n",
" remaining = False\n",
" break\n",
" else:\n",
" ngroups += 1\n",
" \n",
"print(f'total time: {tot}')\n",
"for j in jobs:\n",
" j.join()"
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
"data": {
"text/plain": [
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
"source": [
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
"data": {
"text/plain": [
"[BatchDataSpec(key='00002586_005', backend_spec=b'01:oi1d44bd:c971b2dca031426e:0:0:1024 1024', digest='0=19fc78e2d8f3dcfc7c1885b311e9b644fdd55a8c'),\n",
" BatchDataSpec(key='00003297_000', backend_spec=b'01:oi1d44bd:6c369bea1befc5aa:0:10:1024 1024', digest='0=dfaa7f6662ee328a3f5cbc2f5bdf354f8d09af1b'),\n",
" BatchDataSpec(key='00002403_001', backend_spec=b'01:oi1d44bd:4ebfff4ac8fa6eca:0:20:1024 1024', digest='0=c7003c3ed206cb09ef3375ef7ad1f324a36e4d44'),\n",
" BatchDataSpec(key='00001836_109', backend_spec=b'01:k1941ub5:fe39e2338e6569a1:0:0:1024 1024', digest='0=f2ded33933f60a0a2b385ee94425cf2bc44b04e2'),\n",
" BatchDataSpec(key='00003528_012', backend_spec=b'01:0nrbd3du:97b581b7e632cb0d:0:0:1024 1024', digest='0=ac2893179d41ae00aec4bcc36aaacad5cf457643'),\n",
" BatchDataSpec(key='00002735_000', backend_spec=b'01:8p0vlaz7:4b762d0d8868ab00:0:0:1024 1024', digest='0=2b1cbe51accc005b6732026e5c4217027a41539b'),\n",
" BatchDataSpec(key='00003534_005', backend_spec=b'01:0nrbd3du:96288f520ecb9077:0:5:1024 1024', digest='0=16eddcad3f42bcb62c3f79a7beefb86ac7c65755')]"
"execution_count": 17,
"metadata": {},
"output_type": "execute_result"
"source": [
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
"name": "stdout",
"output_type": "stream",
"text": [
"percentage duplicated\n",
"0.030% (2 of 9918)\n",
"duplicated digests\n",
"duplicated specs\n",
"[BatchDataSpec(key='00002658_002', backend_spec=b'01:k1941ub5:8da68bd4ea97f38c:10:42:1024 1024', digest='0=e3f1126ceed6b9606172ebaef15bd6eed9c37fb8'),\n",
" BatchDataSpec(key='00002658_001', backend_spec=b'01:gk5gjri3:8da68bd4ea97f38c:10:94:1024 1024', digest='0=e3f1126ceed6b9606172ebaef15bd6eed9c37fb8')]\n",
"[BatchDataSpec(key='00001989_004', backend_spec=b'01:8p0vlaz7:96f6c389836548a2:11:94:1024 1024', digest='0=5cd3d21d1abc1135c4d56fdadbdced5dd9dd8319'),\n",
" BatchDataSpec(key='00001989_005', backend_spec=b'01:k1941ub5:96f6c389836548a2:10:85:1024 1024', digest='0=5cd3d21d1abc1135c4d56fdadbdced5dd9dd8319')]\n",
"source": [
"digest_dict = defaultdict(list)\n",
"for spec in out:\n",
" digest = spec.digest\n",
" digest_dict[digest].append(spec)\n",
" \n",
"n_total = len(out)\n",
"n_unique = len(digest_dict)\n",
"percentage = (1 - (n_unique - n_total)) / n_total\n",
"print('percentage duplicated')\n",
"print(f'{percentage * 100:.3f}% ({n_total - n_unique} of {n_total})')\n",
"print('duplicated digests')\n",
"duplicate_digests = []\n",
"for digest, spec in digest_dict.items():\n",
" if len(spec) > 1:\n",
" duplicate_digests.append(digest)\n",
" print(digest)\n",
"print('duplicated specs')\n",
"for dup in duplicate_digests:\n",
" pp(digest_dict[dup])\n",
" print('')"
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.6"
"nbformat": 4,
"nbformat_minor": 4
