Skip to content

Instantly share code, notes, and snippets.

@matthewturk
Created June 5, 2021 13:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save matthewturk/d1aa92738518f676e79e01f99287bd3c to your computer and use it in GitHub Desktop.
Save matthewturk/d1aa92738518f676e79e01f99287bd3c to your computer and use it in GitHub Desktop.
Particle Indexing Code Tour
{
"$schema": "https://aka.ms/codetour-schema",
"title": "particle indexing",
"steps": [
{
"title": "Introduction",
"description": "We're going to walk through how particle indexing works in yt 4.0.\n\nImagine you've got a bunch of LEGO blocks that you know build up a model. (For fun, let's pretend it's the Arendelle castle.) They're all separated into little bags, and you don't exactly know which pieces are in which bags, but maybe you vaguely know that there's some organizational scheme to them.\n\nWhen you build your castle, you *could* read the instructions step by step and inspect every single piece in every single bag until you find the one you're looking for. But that wouldn't be terribly efficient! Wouldn't it be easier if you had some way of saying, \"I know that this piece is in one of *these* bags, so I'll only look in those bags until I find it.\"\n\nThat's what we do with indexing in yt, and why particle indexing is particularly tricky -- because when we want to select data, we want to minimize the amount of time we spend searching, checking, and discarding the parts of the dataset that we don't need.\n\nSo how do we do this in yt?"
},
{
"file": "yt/geometry/particle_geometry_handler.py",
"description": "The `ParticleIndex` class is used by all of the frontends that have particles as their principle component. In `yt`, there are basically four main management classes for a given dataset -- the `Dataset` object itself, which contains metadata and pointers to the other managers, along with a `FieldInfoContainer` that describes the different \"fields\" that `yt` knows how to generate, an `IOHandler` that manages access to the actual data, and -- most relevant for our purposes today! -- the `Index` object.\n\nThere are a few different types, but the one we're going to look at today is the `ParticleIndex` class.",
"line": 17,
"selection": {
"start": {
"line": 17,
"character": 1
},
"end": {
"line": 17,
"character": 28
}
},
"contents": "import collections\nimport errno\nimport os\nimport struct\nimport weakref\n\nimport numpy as np\n\nfrom yt.data_objects.index_subobjects.particle_container import ParticleContainer\nfrom yt.funcs import get_pbar, only_on_root\nfrom yt.geometry.geometry_handler import Index, YTDataChunk\nfrom yt.geometry.particle_oct_container import ParticleBitmap\nfrom yt.utilities.lib.fnv_hash import fnv_hash\nfrom yt.utilities.logger import ytLogger as mylog\n\n\nclass ParticleIndex(Index):\n \"\"\"The Index subclass for particle datasets\"\"\"\n\n def __init__(self, ds, dataset_type):\n self.dataset_type = dataset_type\n self.dataset = weakref.proxy(ds)\n self.float_type = np.float64\n super().__init__(ds, dataset_type)\n self._initialize_index()\n\n def _setup_geometry(self):\n self.regions = None\n\n def get_smallest_dx(self):\n \"\"\"\n Returns (in code units) the smallest cell size in the simulation.\n \"\"\"\n return self.ds.arr(0, \"code_length\")\n\n def _get_particle_type_counts(self):\n result = collections.defaultdict(lambda: 0)\n for df in self.data_files:\n for k in df.total_particles.keys():\n result[k] += df.total_particles[k]\n return dict(result)\n\n def convert(self, unit):\n return self.dataset.conversion_factors[unit]\n\n @property\n def chunksize(self):\n # This can be overridden in subclasses\n return 64 ** 3\n\n _data_files = None\n\n @property\n def data_files(self):\n if self._data_files is not None:\n return self._data_files\n\n self._setup_filenames()\n return self._data_files\n\n @data_files.setter\n def data_files(self, value):\n self._data_files = value\n\n _total_particles = None\n\n @property\n def total_particles(self):\n if self._total_particles is not None:\n return self._total_particles\n\n self._total_particles = sum(\n sum(d.total_particles.values()) for d in self.data_files\n )\n return self._total_particles\n\n def _setup_filenames(self):\n template = self.dataset.filename_template\n ndoms = self.dataset.file_count\n cls = self.dataset._file_class\n self.data_files = []\n fi = 0\n for i in range(int(ndoms)):\n start = 0\n if self.chunksize > 0:\n end = start + self.chunksize\n else:\n end = None\n while True:\n df = cls(self.dataset, self.io, template % {\"num\": i}, fi, (start, end))\n if max(df.total_particles.values()) == 0:\n break\n fi += 1\n self.data_files.append(df)\n if self.chunksize <= 0:\n break\n start = end\n end += self.chunksize\n\n def _initialize_index(self):\n ds = self.dataset\n only_on_root(\n mylog.info,\n \"Allocating for %0.3e particles\",\n self.total_particles,\n global_rootonly=True,\n )\n\n # if we have not yet set domain_left_edge and domain_right_edge then do\n # an I/O pass over the particle coordinates to determine a bounding box\n if self.ds.domain_left_edge is None:\n min_ppos = np.empty(3, dtype=\"float64\")\n min_ppos[:] = np.nan\n max_ppos = np.empty(3, dtype=\"float64\")\n max_ppos[:] = np.nan\n only_on_root(\n mylog.info,\n \"Bounding box cannot be inferred from metadata, reading \"\n \"particle positions to infer bounding box\",\n )\n for df in self.data_files:\n for _, ppos in self.io._yield_coordinates(df):\n min_ppos = np.nanmin(np.vstack([min_ppos, ppos]), axis=0)\n max_ppos = np.nanmax(np.vstack([max_ppos, ppos]), axis=0)\n only_on_root(\n mylog.info,\n \"Load this dataset with bounding_box=[%s, %s] to avoid I/O \"\n \"overhead from inferring bounding_box.\" % (min_ppos, max_ppos),\n )\n ds.domain_left_edge = ds.arr(1.05 * min_ppos, \"code_length\")\n ds.domain_right_edge = ds.arr(1.05 * max_ppos, \"code_length\")\n ds.domain_width = ds.domain_right_edge - ds.domain_left_edge\n\n # use a trivial morton index for datasets containing a single chunk\n if len(self.data_files) == 1:\n order1 = 1\n order2 = 1\n else:\n order1 = ds.index_order[0]\n order2 = ds.index_order[1]\n\n if order1 == 1 and order2 == 1:\n dont_cache = True\n else:\n dont_cache = False\n\n # If we have applied a bounding box then we can't cache the\n # ParticleBitmap because it is doman dependent\n if getattr(ds, \"_domain_override\", False):\n dont_cache = True\n\n if not hasattr(self.ds, \"_file_hash\"):\n self.ds._file_hash = self._generate_hash()\n\n self.regions = ParticleBitmap(\n ds.domain_left_edge,\n ds.domain_right_edge,\n ds.periodicity,\n self.ds._file_hash,\n len(self.data_files),\n index_order1=order1,\n index_order2=order2,\n )\n\n # Load Morton index from file if provided\n if getattr(ds, \"index_filename\", None) is None:\n fname = ds.parameter_filename + \".index{}_{}.ewah\".format(\n self.regions.index_order1, self.regions.index_order2\n )\n else:\n fname = ds.index_filename\n\n dont_load = dont_cache and not hasattr(ds, \"index_filename\")\n try:\n if dont_load:\n raise OSError\n rflag = self.regions.load_bitmasks(fname)\n rflag = self.regions.check_bitmasks()\n self._initialize_frontend_specific()\n if rflag == 0:\n raise OSError\n except (OSError, struct.error):\n self.regions.reset_bitmasks()\n self._initialize_coarse_index()\n self._initialize_refined_index()\n wdir = os.path.dirname(fname)\n if not dont_cache and os.access(wdir, os.W_OK):\n # Sometimes os mis-reports whether a directory is writable,\n # So pass if writing the bitmask file fails.\n try:\n self.regions.save_bitmasks(fname)\n except OSError:\n pass\n rflag = self.regions.check_bitmasks()\n\n def _initialize_coarse_index(self):\n pb = get_pbar(\"Initializing coarse index \", len(self.data_files))\n for i, data_file in enumerate(self.data_files):\n pb.update(i + 1)\n for ptype, pos in self.io._yield_coordinates(data_file):\n ds = self.ds\n if hasattr(ds, \"_sph_ptypes\") and ptype == ds._sph_ptypes[0]:\n hsml = self.io._get_smoothing_length(\n data_file, pos.dtype, pos.shape\n )\n else:\n hsml = None\n self.regions._coarse_index_data_file(pos, hsml, data_file.file_id)\n self.regions._set_coarse_index_data_file(data_file.file_id)\n pb.finish()\n self.regions.find_collisions_coarse()\n\n def _initialize_refined_index(self):\n mask = self.regions.masks.sum(axis=1).astype(\"uint8\")\n max_npart = max(sum(d.total_particles.values()) for d in self.data_files) * 28\n sub_mi1 = np.zeros(max_npart, \"uint64\")\n sub_mi2 = np.zeros(max_npart, \"uint64\")\n pb = get_pbar(\"Initializing refined index\", len(self.data_files))\n mask_threshold = getattr(self, \"_index_mask_threshold\", 2)\n count_threshold = getattr(self, \"_index_count_threshold\", 256)\n mylog.debug(\n \"Using estimated thresholds of %s and %s for refinement\",\n mask_threshold,\n count_threshold,\n )\n total_refined = 0\n total_coarse_refined = (\n (mask >= 2) & (self.regions.particle_counts > count_threshold)\n ).sum()\n mylog.debug(\n \"This should produce roughly %s zones, for %s of the domain\",\n total_coarse_refined,\n 100 * total_coarse_refined / mask.size,\n )\n for i, data_file in enumerate(self.data_files):\n coll = None\n pb.update(i + 1)\n nsub_mi = 0\n for ptype, pos in self.io._yield_coordinates(data_file):\n if pos.size == 0:\n continue\n if hasattr(self.ds, \"_sph_ptypes\") and ptype == self.ds._sph_ptypes[0]:\n hsml = self.io._get_smoothing_length(\n data_file, pos.dtype, pos.shape\n )\n else:\n hsml = None\n nsub_mi, coll = self.regions._refined_index_data_file(\n coll,\n pos,\n hsml,\n mask,\n sub_mi1,\n sub_mi2,\n data_file.file_id,\n nsub_mi,\n count_threshold=count_threshold,\n mask_threshold=mask_threshold,\n )\n total_refined += nsub_mi\n self.regions.bitmasks.append(data_file.file_id, coll)\n pb.finish()\n self.regions.find_collisions_refined()\n\n def _detect_output_fields(self):\n # TODO: Add additional fields\n dsl = []\n units = {}\n pcounts = self._get_particle_type_counts()\n field_cache = {}\n for dom in self.data_files:\n if dom.filename in field_cache:\n fl, _units = field_cache[dom.filename]\n else:\n fl, _units = self.io._identify_fields(dom)\n field_cache[dom.filename] = fl, _units\n units.update(_units)\n dom._calculate_offsets(fl, pcounts)\n for f in fl:\n if f not in dsl:\n dsl.append(f)\n self.field_list = dsl\n ds = self.dataset\n ds.particle_types = tuple({pt for pt, ds in dsl})\n # This is an attribute that means these particle types *actually*\n # exist. As in, they are real, in the dataset.\n ds.field_units.update(units)\n ds.particle_types_raw = ds.particle_types\n\n def _identify_base_chunk(self, dobj):\n # Must check that chunk_info contains the right number of ghost zones\n if getattr(dobj, \"_chunk_info\", None) is None:\n if isinstance(dobj, ParticleContainer):\n dobj._chunk_info = [dobj]\n else:\n # TODO: only return files\n if getattr(dobj.selector, \"is_all_data\", False):\n nfiles = self.regions.nfiles\n dfi = np.arange(nfiles)\n else:\n dfi, file_masks, addfi = self.regions.identify_file_masks(\n dobj.selector\n )\n nfiles = len(file_masks)\n dobj._chunk_info = [None for _ in range(nfiles)]\n\n # The following was moved here from ParticleContainer in order\n # to make the ParticleContainer object pickleable. By having\n # the base_selector as its own argument, we avoid having to\n # rebuild the index on unpickling a ParticleContainer.\n if hasattr(dobj, \"base_selector\"):\n base_selector = dobj.base_selector\n base_region = dobj.base_region\n else:\n base_region = dobj\n base_selector = dobj.selector\n\n for i in range(nfiles):\n domain_id = i + 1\n dobj._chunk_info[i] = ParticleContainer(\n base_region,\n base_selector,\n [self.data_files[dfi[i]]],\n domain_id=domain_id,\n )\n # NOTE: One fun thing about the way IO works is that it\n # consolidates things quite nicely. So we should feel free to\n # create as many objects as part of the chunk as we want, since\n # it'll take the set() of them. So if we break stuff up like\n # this here, we end up in a situation where we have the ability\n # to break things down further later on for buffer zones and the\n # like.\n (dobj._current_chunk,) = self._chunk_all(dobj)\n\n def _chunk_all(self, dobj):\n oobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n yield YTDataChunk(dobj, \"all\", oobjs, None)\n\n def _chunk_spatial(self, dobj, ngz, sort=None, preload_fields=None):\n sobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n for og in sobjs:\n with og._expand_data_files():\n if ngz > 0:\n g = og.retrieve_ghost_zones(ngz, [], smoothed=True)\n else:\n g = og\n yield YTDataChunk(dobj, \"spatial\", [g])\n\n def _chunk_io(self, dobj, cache=True, local_only=False):\n oobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n for container in oobjs:\n yield YTDataChunk(dobj, \"io\", [container], None, cache=cache)\n\n def _generate_hash(self):\n # Generate an FNV hash by creating a byte array containing the\n # modification time of as well as the first and last 1 MB of data in\n # every output file\n ret = bytearray()\n for pfile in self.data_files:\n\n # only look at \"real\" files, not \"fake\" files generated by the\n # chunking system\n if pfile.start not in (0, None):\n continue\n try:\n mtime = os.path.getmtime(pfile.filename)\n except OSError as e:\n if e.errno == errno.ENOENT:\n # this is an in-memory file so we return with a dummy\n # value\n return -1\n else:\n raise\n ret.extend(str(mtime).encode(\"utf-8\"))\n size = os.path.getsize(pfile.filename)\n if size > 1e6:\n size = int(1e6)\n with open(pfile.filename, \"rb\") as fh:\n # read in first and last 1 MB of data\n data = fh.read(size)\n fh.seek(-size, os.SEEK_END)\n data = fh.read(size)\n ret.extend(data)\n return fnv_hash(ret)\n\n def _initialize_frontend_specific(self):\n \"\"\"This is for frontend-specific initialization code\n\n If there are frontend-specific things that need to be set while\n creating the index, this function forces these operations to happen\n in cases where we are reloading the index from a sidecar file.\n \"\"\"\n pass\n"
},
{
"file": "yt/geometry/particle_geometry_handler.py",
"description": "The `_initialize_index` method is called only once on each `Index` object. It's where we set up the in-memory data structures that tell us how to map spatial regions in the domain of the dataset to places \"on disk\" (but not necessarily on an actual disk! They could be in dictionaries, in remote URLs, etc) that the data can be found. This is where the bulk of the operations occur, and it can be expensive for some situations, so we try to minimize the incurred cost of time.",
"line": 100,
"contents": "import collections\nimport errno\nimport os\nimport struct\nimport weakref\n\nimport numpy as np\n\nfrom yt.data_objects.index_subobjects.particle_container import ParticleContainer\nfrom yt.funcs import get_pbar, only_on_root\nfrom yt.geometry.geometry_handler import Index, YTDataChunk\nfrom yt.geometry.particle_oct_container import ParticleBitmap\nfrom yt.utilities.lib.fnv_hash import fnv_hash\nfrom yt.utilities.logger import ytLogger as mylog\n\n\nclass ParticleIndex(Index):\n \"\"\"The Index subclass for particle datasets\"\"\"\n\n def __init__(self, ds, dataset_type):\n self.dataset_type = dataset_type\n self.dataset = weakref.proxy(ds)\n self.float_type = np.float64\n super().__init__(ds, dataset_type)\n self._initialize_index()\n\n def _setup_geometry(self):\n self.regions = None\n\n def get_smallest_dx(self):\n \"\"\"\n Returns (in code units) the smallest cell size in the simulation.\n \"\"\"\n return self.ds.arr(0, \"code_length\")\n\n def _get_particle_type_counts(self):\n result = collections.defaultdict(lambda: 0)\n for df in self.data_files:\n for k in df.total_particles.keys():\n result[k] += df.total_particles[k]\n return dict(result)\n\n def convert(self, unit):\n return self.dataset.conversion_factors[unit]\n\n @property\n def chunksize(self):\n # This can be overridden in subclasses\n return 64 ** 3\n\n _data_files = None\n\n @property\n def data_files(self):\n if self._data_files is not None:\n return self._data_files\n\n self._setup_filenames()\n return self._data_files\n\n @data_files.setter\n def data_files(self, value):\n self._data_files = value\n\n _total_particles = None\n\n @property\n def total_particles(self):\n if self._total_particles is not None:\n return self._total_particles\n\n self._total_particles = sum(\n sum(d.total_particles.values()) for d in self.data_files\n )\n return self._total_particles\n\n def _setup_filenames(self):\n template = self.dataset.filename_template\n ndoms = self.dataset.file_count\n cls = self.dataset._file_class\n self.data_files = []\n fi = 0\n for i in range(int(ndoms)):\n start = 0\n if self.chunksize > 0:\n end = start + self.chunksize\n else:\n end = None\n while True:\n df = cls(self.dataset, self.io, template % {\"num\": i}, fi, (start, end))\n if max(df.total_particles.values()) == 0:\n break\n fi += 1\n self.data_files.append(df)\n if self.chunksize <= 0:\n break\n start = end\n end += self.chunksize\n\n def _initialize_index(self):\n ds = self.dataset\n only_on_root(\n mylog.info,\n \"Allocating for %0.3e particles\",\n self.total_particles,\n global_rootonly=True,\n )\n\n # if we have not yet set domain_left_edge and domain_right_edge then do\n # an I/O pass over the particle coordinates to determine a bounding box\n if self.ds.domain_left_edge is None:\n min_ppos = np.empty(3, dtype=\"float64\")\n min_ppos[:] = np.nan\n max_ppos = np.empty(3, dtype=\"float64\")\n max_ppos[:] = np.nan\n only_on_root(\n mylog.info,\n \"Bounding box cannot be inferred from metadata, reading \"\n \"particle positions to infer bounding box\",\n )\n for df in self.data_files:\n for _, ppos in self.io._yield_coordinates(df):\n min_ppos = np.nanmin(np.vstack([min_ppos, ppos]), axis=0)\n max_ppos = np.nanmax(np.vstack([max_ppos, ppos]), axis=0)\n only_on_root(\n mylog.info,\n \"Load this dataset with bounding_box=[%s, %s] to avoid I/O \"\n \"overhead from inferring bounding_box.\" % (min_ppos, max_ppos),\n )\n ds.domain_left_edge = ds.arr(1.05 * min_ppos, \"code_length\")\n ds.domain_right_edge = ds.arr(1.05 * max_ppos, \"code_length\")\n ds.domain_width = ds.domain_right_edge - ds.domain_left_edge\n\n # use a trivial morton index for datasets containing a single chunk\n if len(self.data_files) == 1:\n order1 = 1\n order2 = 1\n else:\n order1 = ds.index_order[0]\n order2 = ds.index_order[1]\n\n if order1 == 1 and order2 == 1:\n dont_cache = True\n else:\n dont_cache = False\n\n # If we have applied a bounding box then we can't cache the\n # ParticleBitmap because it is doman dependent\n if getattr(ds, \"_domain_override\", False):\n dont_cache = True\n\n if not hasattr(self.ds, \"_file_hash\"):\n self.ds._file_hash = self._generate_hash()\n\n self.regions = ParticleBitmap(\n ds.domain_left_edge,\n ds.domain_right_edge,\n ds.periodicity,\n self.ds._file_hash,\n len(self.data_files),\n index_order1=order1,\n index_order2=order2,\n )\n\n # Load Morton index from file if provided\n if getattr(ds, \"index_filename\", None) is None:\n fname = ds.parameter_filename + \".index{}_{}.ewah\".format(\n self.regions.index_order1, self.regions.index_order2\n )\n else:\n fname = ds.index_filename\n\n dont_load = dont_cache and not hasattr(ds, \"index_filename\")\n try:\n if dont_load:\n raise OSError\n rflag = self.regions.load_bitmasks(fname)\n rflag = self.regions.check_bitmasks()\n self._initialize_frontend_specific()\n if rflag == 0:\n raise OSError\n except (OSError, struct.error):\n self.regions.reset_bitmasks()\n self._initialize_coarse_index()\n self._initialize_refined_index()\n wdir = os.path.dirname(fname)\n if not dont_cache and os.access(wdir, os.W_OK):\n # Sometimes os mis-reports whether a directory is writable,\n # So pass if writing the bitmask file fails.\n try:\n self.regions.save_bitmasks(fname)\n except OSError:\n pass\n rflag = self.regions.check_bitmasks()\n\n def _initialize_coarse_index(self):\n pb = get_pbar(\"Initializing coarse index \", len(self.data_files))\n for i, data_file in enumerate(self.data_files):\n pb.update(i + 1)\n for ptype, pos in self.io._yield_coordinates(data_file):\n ds = self.ds\n if hasattr(ds, \"_sph_ptypes\") and ptype == ds._sph_ptypes[0]:\n hsml = self.io._get_smoothing_length(\n data_file, pos.dtype, pos.shape\n )\n else:\n hsml = None\n self.regions._coarse_index_data_file(pos, hsml, data_file.file_id)\n self.regions._set_coarse_index_data_file(data_file.file_id)\n pb.finish()\n self.regions.find_collisions_coarse()\n\n def _initialize_refined_index(self):\n mask = self.regions.masks.sum(axis=1).astype(\"uint8\")\n max_npart = max(sum(d.total_particles.values()) for d in self.data_files) * 28\n sub_mi1 = np.zeros(max_npart, \"uint64\")\n sub_mi2 = np.zeros(max_npart, \"uint64\")\n pb = get_pbar(\"Initializing refined index\", len(self.data_files))\n mask_threshold = getattr(self, \"_index_mask_threshold\", 2)\n count_threshold = getattr(self, \"_index_count_threshold\", 256)\n mylog.debug(\n \"Using estimated thresholds of %s and %s for refinement\",\n mask_threshold,\n count_threshold,\n )\n total_refined = 0\n total_coarse_refined = (\n (mask >= 2) & (self.regions.particle_counts > count_threshold)\n ).sum()\n mylog.debug(\n \"This should produce roughly %s zones, for %s of the domain\",\n total_coarse_refined,\n 100 * total_coarse_refined / mask.size,\n )\n for i, data_file in enumerate(self.data_files):\n coll = None\n pb.update(i + 1)\n nsub_mi = 0\n for ptype, pos in self.io._yield_coordinates(data_file):\n if pos.size == 0:\n continue\n if hasattr(self.ds, \"_sph_ptypes\") and ptype == self.ds._sph_ptypes[0]:\n hsml = self.io._get_smoothing_length(\n data_file, pos.dtype, pos.shape\n )\n else:\n hsml = None\n nsub_mi, coll = self.regions._refined_index_data_file(\n coll,\n pos,\n hsml,\n mask,\n sub_mi1,\n sub_mi2,\n data_file.file_id,\n nsub_mi,\n count_threshold=count_threshold,\n mask_threshold=mask_threshold,\n )\n total_refined += nsub_mi\n self.regions.bitmasks.append(data_file.file_id, coll)\n pb.finish()\n self.regions.find_collisions_refined()\n\n def _detect_output_fields(self):\n # TODO: Add additional fields\n dsl = []\n units = {}\n pcounts = self._get_particle_type_counts()\n field_cache = {}\n for dom in self.data_files:\n if dom.filename in field_cache:\n fl, _units = field_cache[dom.filename]\n else:\n fl, _units = self.io._identify_fields(dom)\n field_cache[dom.filename] = fl, _units\n units.update(_units)\n dom._calculate_offsets(fl, pcounts)\n for f in fl:\n if f not in dsl:\n dsl.append(f)\n self.field_list = dsl\n ds = self.dataset\n ds.particle_types = tuple({pt for pt, ds in dsl})\n # This is an attribute that means these particle types *actually*\n # exist. As in, they are real, in the dataset.\n ds.field_units.update(units)\n ds.particle_types_raw = ds.particle_types\n\n def _identify_base_chunk(self, dobj):\n # Must check that chunk_info contains the right number of ghost zones\n if getattr(dobj, \"_chunk_info\", None) is None:\n if isinstance(dobj, ParticleContainer):\n dobj._chunk_info = [dobj]\n else:\n # TODO: only return files\n if getattr(dobj.selector, \"is_all_data\", False):\n nfiles = self.regions.nfiles\n dfi = np.arange(nfiles)\n else:\n dfi, file_masks, addfi = self.regions.identify_file_masks(\n dobj.selector\n )\n nfiles = len(file_masks)\n dobj._chunk_info = [None for _ in range(nfiles)]\n\n # The following was moved here from ParticleContainer in order\n # to make the ParticleContainer object pickleable. By having\n # the base_selector as its own argument, we avoid having to\n # rebuild the index on unpickling a ParticleContainer.\n if hasattr(dobj, \"base_selector\"):\n base_selector = dobj.base_selector\n base_region = dobj.base_region\n else:\n base_region = dobj\n base_selector = dobj.selector\n\n for i in range(nfiles):\n domain_id = i + 1\n dobj._chunk_info[i] = ParticleContainer(\n base_region,\n base_selector,\n [self.data_files[dfi[i]]],\n domain_id=domain_id,\n )\n # NOTE: One fun thing about the way IO works is that it\n # consolidates things quite nicely. So we should feel free to\n # create as many objects as part of the chunk as we want, since\n # it'll take the set() of them. So if we break stuff up like\n # this here, we end up in a situation where we have the ability\n # to break things down further later on for buffer zones and the\n # like.\n (dobj._current_chunk,) = self._chunk_all(dobj)\n\n def _chunk_all(self, dobj):\n oobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n yield YTDataChunk(dobj, \"all\", oobjs, None)\n\n def _chunk_spatial(self, dobj, ngz, sort=None, preload_fields=None):\n sobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n for og in sobjs:\n with og._expand_data_files():\n if ngz > 0:\n g = og.retrieve_ghost_zones(ngz, [], smoothed=True)\n else:\n g = og\n yield YTDataChunk(dobj, \"spatial\", [g])\n\n def _chunk_io(self, dobj, cache=True, local_only=False):\n oobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n for container in oobjs:\n yield YTDataChunk(dobj, \"io\", [container], None, cache=cache)\n\n def _generate_hash(self):\n # Generate an FNV hash by creating a byte array containing the\n # modification time of as well as the first and last 1 MB of data in\n # every output file\n ret = bytearray()\n for pfile in self.data_files:\n\n # only look at \"real\" files, not \"fake\" files generated by the\n # chunking system\n if pfile.start not in (0, None):\n continue\n try:\n mtime = os.path.getmtime(pfile.filename)\n except OSError as e:\n if e.errno == errno.ENOENT:\n # this is an in-memory file so we return with a dummy\n # value\n return -1\n else:\n raise\n ret.extend(str(mtime).encode(\"utf-8\"))\n size = os.path.getsize(pfile.filename)\n if size > 1e6:\n size = int(1e6)\n with open(pfile.filename, \"rb\") as fh:\n # read in first and last 1 MB of data\n data = fh.read(size)\n fh.seek(-size, os.SEEK_END)\n data = fh.read(size)\n ret.extend(data)\n return fnv_hash(ret)\n\n def _initialize_frontend_specific(self):\n \"\"\"This is for frontend-specific initialization code\n\n If there are frontend-specific things that need to be set while\n creating the index, this function forces these operations to happen\n in cases where we are reloading the index from a sidecar file.\n \"\"\"\n pass\n"
},
{
"file": "yt/geometry/particle_geometry_handler.py",
"description": "With particle datasets, if we can't guess the left and right edges, we do a pass over the data to figure them out.\n\nThis isn't amazing! It's definitely not my personal favorite. But, it is what we do. There are ways out there of avoiding this -- in fact, it would be completely feasible to just set the domain to be all the way from the minimum double precision representation to the maximum double precision representation, and then only occupy a small portion in a hierarchical mapping. But we don't do this now.",
"line": 111,
"contents": "import collections\nimport errno\nimport os\nimport struct\nimport weakref\n\nimport numpy as np\n\nfrom yt.data_objects.index_subobjects.particle_container import ParticleContainer\nfrom yt.funcs import get_pbar, only_on_root\nfrom yt.geometry.geometry_handler import Index, YTDataChunk\nfrom yt.geometry.particle_oct_container import ParticleBitmap\nfrom yt.utilities.lib.fnv_hash import fnv_hash\nfrom yt.utilities.logger import ytLogger as mylog\n\n\nclass ParticleIndex(Index):\n \"\"\"The Index subclass for particle datasets\"\"\"\n\n def __init__(self, ds, dataset_type):\n self.dataset_type = dataset_type\n self.dataset = weakref.proxy(ds)\n self.float_type = np.float64\n super().__init__(ds, dataset_type)\n self._initialize_index()\n\n def _setup_geometry(self):\n self.regions = None\n\n def get_smallest_dx(self):\n \"\"\"\n Returns (in code units) the smallest cell size in the simulation.\n \"\"\"\n return self.ds.arr(0, \"code_length\")\n\n def _get_particle_type_counts(self):\n result = collections.defaultdict(lambda: 0)\n for df in self.data_files:\n for k in df.total_particles.keys():\n result[k] += df.total_particles[k]\n return dict(result)\n\n def convert(self, unit):\n return self.dataset.conversion_factors[unit]\n\n @property\n def chunksize(self):\n # This can be overridden in subclasses\n return 64 ** 3\n\n _data_files = None\n\n @property\n def data_files(self):\n if self._data_files is not None:\n return self._data_files\n\n self._setup_filenames()\n return self._data_files\n\n @data_files.setter\n def data_files(self, value):\n self._data_files = value\n\n _total_particles = None\n\n @property\n def total_particles(self):\n if self._total_particles is not None:\n return self._total_particles\n\n self._total_particles = sum(\n sum(d.total_particles.values()) for d in self.data_files\n )\n return self._total_particles\n\n def _setup_filenames(self):\n template = self.dataset.filename_template\n ndoms = self.dataset.file_count\n cls = self.dataset._file_class\n self.data_files = []\n fi = 0\n for i in range(int(ndoms)):\n start = 0\n if self.chunksize > 0:\n end = start + self.chunksize\n else:\n end = None\n while True:\n df = cls(self.dataset, self.io, template % {\"num\": i}, fi, (start, end))\n if max(df.total_particles.values()) == 0:\n break\n fi += 1\n self.data_files.append(df)\n if self.chunksize <= 0:\n break\n start = end\n end += self.chunksize\n\n def _initialize_index(self):\n ds = self.dataset\n only_on_root(\n mylog.info,\n \"Allocating for %0.3e particles\",\n self.total_particles,\n global_rootonly=True,\n )\n\n # if we have not yet set domain_left_edge and domain_right_edge then do\n # an I/O pass over the particle coordinates to determine a bounding box\n if self.ds.domain_left_edge is None:\n min_ppos = np.empty(3, dtype=\"float64\")\n min_ppos[:] = np.nan\n max_ppos = np.empty(3, dtype=\"float64\")\n max_ppos[:] = np.nan\n only_on_root(\n mylog.info,\n \"Bounding box cannot be inferred from metadata, reading \"\n \"particle positions to infer bounding box\",\n )\n for df in self.data_files:\n for _, ppos in self.io._yield_coordinates(df):\n min_ppos = np.nanmin(np.vstack([min_ppos, ppos]), axis=0)\n max_ppos = np.nanmax(np.vstack([max_ppos, ppos]), axis=0)\n only_on_root(\n mylog.info,\n \"Load this dataset with bounding_box=[%s, %s] to avoid I/O \"\n \"overhead from inferring bounding_box.\" % (min_ppos, max_ppos),\n )\n ds.domain_left_edge = ds.arr(1.05 * min_ppos, \"code_length\")\n ds.domain_right_edge = ds.arr(1.05 * max_ppos, \"code_length\")\n ds.domain_width = ds.domain_right_edge - ds.domain_left_edge\n\n # use a trivial morton index for datasets containing a single chunk\n if len(self.data_files) == 1:\n order1 = 1\n order2 = 1\n else:\n order1 = ds.index_order[0]\n order2 = ds.index_order[1]\n\n if order1 == 1 and order2 == 1:\n dont_cache = True\n else:\n dont_cache = False\n\n # If we have applied a bounding box then we can't cache the\n # ParticleBitmap because it is doman dependent\n if getattr(ds, \"_domain_override\", False):\n dont_cache = True\n\n if not hasattr(self.ds, \"_file_hash\"):\n self.ds._file_hash = self._generate_hash()\n\n self.regions = ParticleBitmap(\n ds.domain_left_edge,\n ds.domain_right_edge,\n ds.periodicity,\n self.ds._file_hash,\n len(self.data_files),\n index_order1=order1,\n index_order2=order2,\n )\n\n # Load Morton index from file if provided\n if getattr(ds, \"index_filename\", None) is None:\n fname = ds.parameter_filename + \".index{}_{}.ewah\".format(\n self.regions.index_order1, self.regions.index_order2\n )\n else:\n fname = ds.index_filename\n\n dont_load = dont_cache and not hasattr(ds, \"index_filename\")\n try:\n if dont_load:\n raise OSError\n rflag = self.regions.load_bitmasks(fname)\n rflag = self.regions.check_bitmasks()\n self._initialize_frontend_specific()\n if rflag == 0:\n raise OSError\n except (OSError, struct.error):\n self.regions.reset_bitmasks()\n self._initialize_coarse_index()\n self._initialize_refined_index()\n wdir = os.path.dirname(fname)\n if not dont_cache and os.access(wdir, os.W_OK):\n # Sometimes os mis-reports whether a directory is writable,\n # So pass if writing the bitmask file fails.\n try:\n self.regions.save_bitmasks(fname)\n except OSError:\n pass\n rflag = self.regions.check_bitmasks()\n\n def _initialize_coarse_index(self):\n pb = get_pbar(\"Initializing coarse index \", len(self.data_files))\n for i, data_file in enumerate(self.data_files):\n pb.update(i + 1)\n for ptype, pos in self.io._yield_coordinates(data_file):\n ds = self.ds\n if hasattr(ds, \"_sph_ptypes\") and ptype == ds._sph_ptypes[0]:\n hsml = self.io._get_smoothing_length(\n data_file, pos.dtype, pos.shape\n )\n else:\n hsml = None\n self.regions._coarse_index_data_file(pos, hsml, data_file.file_id)\n self.regions._set_coarse_index_data_file(data_file.file_id)\n pb.finish()\n self.regions.find_collisions_coarse()\n\n def _initialize_refined_index(self):\n mask = self.regions.masks.sum(axis=1).astype(\"uint8\")\n max_npart = max(sum(d.total_particles.values()) for d in self.data_files) * 28\n sub_mi1 = np.zeros(max_npart, \"uint64\")\n sub_mi2 = np.zeros(max_npart, \"uint64\")\n pb = get_pbar(\"Initializing refined index\", len(self.data_files))\n mask_threshold = getattr(self, \"_index_mask_threshold\", 2)\n count_threshold = getattr(self, \"_index_count_threshold\", 256)\n mylog.debug(\n \"Using estimated thresholds of %s and %s for refinement\",\n mask_threshold,\n count_threshold,\n )\n total_refined = 0\n total_coarse_refined = (\n (mask >= 2) & (self.regions.particle_counts > count_threshold)\n ).sum()\n mylog.debug(\n \"This should produce roughly %s zones, for %s of the domain\",\n total_coarse_refined,\n 100 * total_coarse_refined / mask.size,\n )\n for i, data_file in enumerate(self.data_files):\n coll = None\n pb.update(i + 1)\n nsub_mi = 0\n for ptype, pos in self.io._yield_coordinates(data_file):\n if pos.size == 0:\n continue\n if hasattr(self.ds, \"_sph_ptypes\") and ptype == self.ds._sph_ptypes[0]:\n hsml = self.io._get_smoothing_length(\n data_file, pos.dtype, pos.shape\n )\n else:\n hsml = None\n nsub_mi, coll = self.regions._refined_index_data_file(\n coll,\n pos,\n hsml,\n mask,\n sub_mi1,\n sub_mi2,\n data_file.file_id,\n nsub_mi,\n count_threshold=count_threshold,\n mask_threshold=mask_threshold,\n )\n total_refined += nsub_mi\n self.regions.bitmasks.append(data_file.file_id, coll)\n pb.finish()\n self.regions.find_collisions_refined()\n\n def _detect_output_fields(self):\n # TODO: Add additional fields\n dsl = []\n units = {}\n pcounts = self._get_particle_type_counts()\n field_cache = {}\n for dom in self.data_files:\n if dom.filename in field_cache:\n fl, _units = field_cache[dom.filename]\n else:\n fl, _units = self.io._identify_fields(dom)\n field_cache[dom.filename] = fl, _units\n units.update(_units)\n dom._calculate_offsets(fl, pcounts)\n for f in fl:\n if f not in dsl:\n dsl.append(f)\n self.field_list = dsl\n ds = self.dataset\n ds.particle_types = tuple({pt for pt, ds in dsl})\n # This is an attribute that means these particle types *actually*\n # exist. As in, they are real, in the dataset.\n ds.field_units.update(units)\n ds.particle_types_raw = ds.particle_types\n\n def _identify_base_chunk(self, dobj):\n # Must check that chunk_info contains the right number of ghost zones\n if getattr(dobj, \"_chunk_info\", None) is None:\n if isinstance(dobj, ParticleContainer):\n dobj._chunk_info = [dobj]\n else:\n # TODO: only return files\n if getattr(dobj.selector, \"is_all_data\", False):\n nfiles = self.regions.nfiles\n dfi = np.arange(nfiles)\n else:\n dfi, file_masks, addfi = self.regions.identify_file_masks(\n dobj.selector\n )\n nfiles = len(file_masks)\n dobj._chunk_info = [None for _ in range(nfiles)]\n\n # The following was moved here from ParticleContainer in order\n # to make the ParticleContainer object pickleable. By having\n # the base_selector as its own argument, we avoid having to\n # rebuild the index on unpickling a ParticleContainer.\n if hasattr(dobj, \"base_selector\"):\n base_selector = dobj.base_selector\n base_region = dobj.base_region\n else:\n base_region = dobj\n base_selector = dobj.selector\n\n for i in range(nfiles):\n domain_id = i + 1\n dobj._chunk_info[i] = ParticleContainer(\n base_region,\n base_selector,\n [self.data_files[dfi[i]]],\n domain_id=domain_id,\n )\n # NOTE: One fun thing about the way IO works is that it\n # consolidates things quite nicely. So we should feel free to\n # create as many objects as part of the chunk as we want, since\n # it'll take the set() of them. So if we break stuff up like\n # this here, we end up in a situation where we have the ability\n # to break things down further later on for buffer zones and the\n # like.\n (dobj._current_chunk,) = self._chunk_all(dobj)\n\n def _chunk_all(self, dobj):\n oobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n yield YTDataChunk(dobj, \"all\", oobjs, None)\n\n def _chunk_spatial(self, dobj, ngz, sort=None, preload_fields=None):\n sobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n for og in sobjs:\n with og._expand_data_files():\n if ngz > 0:\n g = og.retrieve_ghost_zones(ngz, [], smoothed=True)\n else:\n g = og\n yield YTDataChunk(dobj, \"spatial\", [g])\n\n def _chunk_io(self, dobj, cache=True, local_only=False):\n oobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n for container in oobjs:\n yield YTDataChunk(dobj, \"io\", [container], None, cache=cache)\n\n def _generate_hash(self):\n # Generate an FNV hash by creating a byte array containing the\n # modification time of as well as the first and last 1 MB of data in\n # every output file\n ret = bytearray()\n for pfile in self.data_files:\n\n # only look at \"real\" files, not \"fake\" files generated by the\n # chunking system\n if pfile.start not in (0, None):\n continue\n try:\n mtime = os.path.getmtime(pfile.filename)\n except OSError as e:\n if e.errno == errno.ENOENT:\n # this is an in-memory file so we return with a dummy\n # value\n return -1\n else:\n raise\n ret.extend(str(mtime).encode(\"utf-8\"))\n size = os.path.getsize(pfile.filename)\n if size > 1e6:\n size = int(1e6)\n with open(pfile.filename, \"rb\") as fh:\n # read in first and last 1 MB of data\n data = fh.read(size)\n fh.seek(-size, os.SEEK_END)\n data = fh.read(size)\n ret.extend(data)\n return fnv_hash(ret)\n\n def _initialize_frontend_specific(self):\n \"\"\"This is for frontend-specific initialization code\n\n If there are frontend-specific things that need to be set while\n creating the index, this function forces these operations to happen\n in cases where we are reloading the index from a sidecar file.\n \"\"\"\n pass\n"
},
{
"file": "yt/geometry/particle_geometry_handler.py",
"description": "When we don't have a lot of data, for instance of the `data_files` attribute only contains one thing, we don't do any indexing. We'll have to read the whole thing anyway, so, whatever, right?\n\nOne thing that's worth keeping in mind here is that `data_files` are not always just the \"files\" that live on disk. Some data formats, for instance, use just a single file, but include a huge number of particles in it. We sub-chunk these so they appear as several \"virtual\" data files.\n\nThis helps us keep our indexing efficient, since there's no point to doing a potentially-expensive indexing operation on a small dataset, and we also don't want to give up all of our ability to set the size we read from disk.",
"line": 135,
"contents": "import collections\nimport errno\nimport os\nimport struct\nimport weakref\n\nimport numpy as np\n\nfrom yt.data_objects.index_subobjects.particle_container import ParticleContainer\nfrom yt.funcs import get_pbar, only_on_root\nfrom yt.geometry.geometry_handler import Index, YTDataChunk\nfrom yt.geometry.particle_oct_container import ParticleBitmap\nfrom yt.utilities.lib.fnv_hash import fnv_hash\nfrom yt.utilities.logger import ytLogger as mylog\n\n\nclass ParticleIndex(Index):\n \"\"\"The Index subclass for particle datasets\"\"\"\n\n def __init__(self, ds, dataset_type):\n self.dataset_type = dataset_type\n self.dataset = weakref.proxy(ds)\n self.float_type = np.float64\n super().__init__(ds, dataset_type)\n self._initialize_index()\n\n def _setup_geometry(self):\n self.regions = None\n\n def get_smallest_dx(self):\n \"\"\"\n Returns (in code units) the smallest cell size in the simulation.\n \"\"\"\n return self.ds.arr(0, \"code_length\")\n\n def _get_particle_type_counts(self):\n result = collections.defaultdict(lambda: 0)\n for df in self.data_files:\n for k in df.total_particles.keys():\n result[k] += df.total_particles[k]\n return dict(result)\n\n def convert(self, unit):\n return self.dataset.conversion_factors[unit]\n\n @property\n def chunksize(self):\n # This can be overridden in subclasses\n return 64 ** 3\n\n _data_files = None\n\n @property\n def data_files(self):\n if self._data_files is not None:\n return self._data_files\n\n self._setup_filenames()\n return self._data_files\n\n @data_files.setter\n def data_files(self, value):\n self._data_files = value\n\n _total_particles = None\n\n @property\n def total_particles(self):\n if self._total_particles is not None:\n return self._total_particles\n\n self._total_particles = sum(\n sum(d.total_particles.values()) for d in self.data_files\n )\n return self._total_particles\n\n def _setup_filenames(self):\n template = self.dataset.filename_template\n ndoms = self.dataset.file_count\n cls = self.dataset._file_class\n self.data_files = []\n fi = 0\n for i in range(int(ndoms)):\n start = 0\n if self.chunksize > 0:\n end = start + self.chunksize\n else:\n end = None\n while True:\n df = cls(self.dataset, self.io, template % {\"num\": i}, fi, (start, end))\n if max(df.total_particles.values()) == 0:\n break\n fi += 1\n self.data_files.append(df)\n if self.chunksize <= 0:\n break\n start = end\n end += self.chunksize\n\n def _initialize_index(self):\n ds = self.dataset\n only_on_root(\n mylog.info,\n \"Allocating for %0.3e particles\",\n self.total_particles,\n global_rootonly=True,\n )\n\n # if we have not yet set domain_left_edge and domain_right_edge then do\n # an I/O pass over the particle coordinates to determine a bounding box\n if self.ds.domain_left_edge is None:\n min_ppos = np.empty(3, dtype=\"float64\")\n min_ppos[:] = np.nan\n max_ppos = np.empty(3, dtype=\"float64\")\n max_ppos[:] = np.nan\n only_on_root(\n mylog.info,\n \"Bounding box cannot be inferred from metadata, reading \"\n \"particle positions to infer bounding box\",\n )\n for df in self.data_files:\n for _, ppos in self.io._yield_coordinates(df):\n min_ppos = np.nanmin(np.vstack([min_ppos, ppos]), axis=0)\n max_ppos = np.nanmax(np.vstack([max_ppos, ppos]), axis=0)\n only_on_root(\n mylog.info,\n \"Load this dataset with bounding_box=[%s, %s] to avoid I/O \"\n \"overhead from inferring bounding_box.\" % (min_ppos, max_ppos),\n )\n ds.domain_left_edge = ds.arr(1.05 * min_ppos, \"code_length\")\n ds.domain_right_edge = ds.arr(1.05 * max_ppos, \"code_length\")\n ds.domain_width = ds.domain_right_edge - ds.domain_left_edge\n\n # use a trivial morton index for datasets containing a single chunk\n if len(self.data_files) == 1:\n order1 = 1\n order2 = 1\n else:\n order1 = ds.index_order[0]\n order2 = ds.index_order[1]\n\n if order1 == 1 and order2 == 1:\n dont_cache = True\n else:\n dont_cache = False\n\n # If we have applied a bounding box then we can't cache the\n # ParticleBitmap because it is doman dependent\n if getattr(ds, \"_domain_override\", False):\n dont_cache = True\n\n if not hasattr(self.ds, \"_file_hash\"):\n self.ds._file_hash = self._generate_hash()\n\n self.regions = ParticleBitmap(\n ds.domain_left_edge,\n ds.domain_right_edge,\n ds.periodicity,\n self.ds._file_hash,\n len(self.data_files),\n index_order1=order1,\n index_order2=order2,\n )\n\n # Load Morton index from file if provided\n if getattr(ds, \"index_filename\", None) is None:\n fname = ds.parameter_filename + \".index{}_{}.ewah\".format(\n self.regions.index_order1, self.regions.index_order2\n )\n else:\n fname = ds.index_filename\n\n dont_load = dont_cache and not hasattr(ds, \"index_filename\")\n try:\n if dont_load:\n raise OSError\n rflag = self.regions.load_bitmasks(fname)\n rflag = self.regions.check_bitmasks()\n self._initialize_frontend_specific()\n if rflag == 0:\n raise OSError\n except (OSError, struct.error):\n self.regions.reset_bitmasks()\n self._initialize_coarse_index()\n self._initialize_refined_index()\n wdir = os.path.dirname(fname)\n if not dont_cache and os.access(wdir, os.W_OK):\n # Sometimes os mis-reports whether a directory is writable,\n # So pass if writing the bitmask file fails.\n try:\n self.regions.save_bitmasks(fname)\n except OSError:\n pass\n rflag = self.regions.check_bitmasks()\n\n def _initialize_coarse_index(self):\n pb = get_pbar(\"Initializing coarse index \", len(self.data_files))\n for i, data_file in enumerate(self.data_files):\n pb.update(i + 1)\n for ptype, pos in self.io._yield_coordinates(data_file):\n ds = self.ds\n if hasattr(ds, \"_sph_ptypes\") and ptype == ds._sph_ptypes[0]:\n hsml = self.io._get_smoothing_length(\n data_file, pos.dtype, pos.shape\n )\n else:\n hsml = None\n self.regions._coarse_index_data_file(pos, hsml, data_file.file_id)\n self.regions._set_coarse_index_data_file(data_file.file_id)\n pb.finish()\n self.regions.find_collisions_coarse()\n\n def _initialize_refined_index(self):\n mask = self.regions.masks.sum(axis=1).astype(\"uint8\")\n max_npart = max(sum(d.total_particles.values()) for d in self.data_files) * 28\n sub_mi1 = np.zeros(max_npart, \"uint64\")\n sub_mi2 = np.zeros(max_npart, \"uint64\")\n pb = get_pbar(\"Initializing refined index\", len(self.data_files))\n mask_threshold = getattr(self, \"_index_mask_threshold\", 2)\n count_threshold = getattr(self, \"_index_count_threshold\", 256)\n mylog.debug(\n \"Using estimated thresholds of %s and %s for refinement\",\n mask_threshold,\n count_threshold,\n )\n total_refined = 0\n total_coarse_refined = (\n (mask >= 2) & (self.regions.particle_counts > count_threshold)\n ).sum()\n mylog.debug(\n \"This should produce roughly %s zones, for %s of the domain\",\n total_coarse_refined,\n 100 * total_coarse_refined / mask.size,\n )\n for i, data_file in enumerate(self.data_files):\n coll = None\n pb.update(i + 1)\n nsub_mi = 0\n for ptype, pos in self.io._yield_coordinates(data_file):\n if pos.size == 0:\n continue\n if hasattr(self.ds, \"_sph_ptypes\") and ptype == self.ds._sph_ptypes[0]:\n hsml = self.io._get_smoothing_length(\n data_file, pos.dtype, pos.shape\n )\n else:\n hsml = None\n nsub_mi, coll = self.regions._refined_index_data_file(\n coll,\n pos,\n hsml,\n mask,\n sub_mi1,\n sub_mi2,\n data_file.file_id,\n nsub_mi,\n count_threshold=count_threshold,\n mask_threshold=mask_threshold,\n )\n total_refined += nsub_mi\n self.regions.bitmasks.append(data_file.file_id, coll)\n pb.finish()\n self.regions.find_collisions_refined()\n\n def _detect_output_fields(self):\n # TODO: Add additional fields\n dsl = []\n units = {}\n pcounts = self._get_particle_type_counts()\n field_cache = {}\n for dom in self.data_files:\n if dom.filename in field_cache:\n fl, _units = field_cache[dom.filename]\n else:\n fl, _units = self.io._identify_fields(dom)\n field_cache[dom.filename] = fl, _units\n units.update(_units)\n dom._calculate_offsets(fl, pcounts)\n for f in fl:\n if f not in dsl:\n dsl.append(f)\n self.field_list = dsl\n ds = self.dataset\n ds.particle_types = tuple({pt for pt, ds in dsl})\n # This is an attribute that means these particle types *actually*\n # exist. As in, they are real, in the dataset.\n ds.field_units.update(units)\n ds.particle_types_raw = ds.particle_types\n\n def _identify_base_chunk(self, dobj):\n # Must check that chunk_info contains the right number of ghost zones\n if getattr(dobj, \"_chunk_info\", None) is None:\n if isinstance(dobj, ParticleContainer):\n dobj._chunk_info = [dobj]\n else:\n # TODO: only return files\n if getattr(dobj.selector, \"is_all_data\", False):\n nfiles = self.regions.nfiles\n dfi = np.arange(nfiles)\n else:\n dfi, file_masks, addfi = self.regions.identify_file_masks(\n dobj.selector\n )\n nfiles = len(file_masks)\n dobj._chunk_info = [None for _ in range(nfiles)]\n\n # The following was moved here from ParticleContainer in order\n # to make the ParticleContainer object pickleable. By having\n # the base_selector as its own argument, we avoid having to\n # rebuild the index on unpickling a ParticleContainer.\n if hasattr(dobj, \"base_selector\"):\n base_selector = dobj.base_selector\n base_region = dobj.base_region\n else:\n base_region = dobj\n base_selector = dobj.selector\n\n for i in range(nfiles):\n domain_id = i + 1\n dobj._chunk_info[i] = ParticleContainer(\n base_region,\n base_selector,\n [self.data_files[dfi[i]]],\n domain_id=domain_id,\n )\n # NOTE: One fun thing about the way IO works is that it\n # consolidates things quite nicely. So we should feel free to\n # create as many objects as part of the chunk as we want, since\n # it'll take the set() of them. So if we break stuff up like\n # this here, we end up in a situation where we have the ability\n # to break things down further later on for buffer zones and the\n # like.\n (dobj._current_chunk,) = self._chunk_all(dobj)\n\n def _chunk_all(self, dobj):\n oobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n yield YTDataChunk(dobj, \"all\", oobjs, None)\n\n def _chunk_spatial(self, dobj, ngz, sort=None, preload_fields=None):\n sobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n for og in sobjs:\n with og._expand_data_files():\n if ngz > 0:\n g = og.retrieve_ghost_zones(ngz, [], smoothed=True)\n else:\n g = og\n yield YTDataChunk(dobj, \"spatial\", [g])\n\n def _chunk_io(self, dobj, cache=True, local_only=False):\n oobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n for container in oobjs:\n yield YTDataChunk(dobj, \"io\", [container], None, cache=cache)\n\n def _generate_hash(self):\n # Generate an FNV hash by creating a byte array containing the\n # modification time of as well as the first and last 1 MB of data in\n # every output file\n ret = bytearray()\n for pfile in self.data_files:\n\n # only look at \"real\" files, not \"fake\" files generated by the\n # chunking system\n if pfile.start not in (0, None):\n continue\n try:\n mtime = os.path.getmtime(pfile.filename)\n except OSError as e:\n if e.errno == errno.ENOENT:\n # this is an in-memory file so we return with a dummy\n # value\n return -1\n else:\n raise\n ret.extend(str(mtime).encode(\"utf-8\"))\n size = os.path.getsize(pfile.filename)\n if size > 1e6:\n size = int(1e6)\n with open(pfile.filename, \"rb\") as fh:\n # read in first and last 1 MB of data\n data = fh.read(size)\n fh.seek(-size, os.SEEK_END)\n data = fh.read(size)\n ret.extend(data)\n return fnv_hash(ret)\n\n def _initialize_frontend_specific(self):\n \"\"\"This is for frontend-specific initialization code\n\n If there are frontend-specific things that need to be set while\n creating the index, this function forces these operations to happen\n in cases where we are reloading the index from a sidecar file.\n \"\"\"\n pass\n"
},
{
"file": "yt/geometry/particle_geometry_handler.py",
"description": "And here's where we start actually setting up our index.\n\nBecause the indexing operation can be expensive, we build in ways of caching. That way, the *second* time you load a dataset, it won't have to conduct the indexing operation unless it really needs to -- it'll just use what it came up with the first time.",
"line": 174,
"contents": "import collections\nimport errno\nimport os\nimport struct\nimport weakref\n\nimport numpy as np\n\nfrom yt.data_objects.index_subobjects.particle_container import ParticleContainer\nfrom yt.funcs import get_pbar, only_on_root\nfrom yt.geometry.geometry_handler import Index, YTDataChunk\nfrom yt.geometry.particle_oct_container import ParticleBitmap\nfrom yt.utilities.lib.fnv_hash import fnv_hash\nfrom yt.utilities.logger import ytLogger as mylog\n\n\nclass ParticleIndex(Index):\n \"\"\"The Index subclass for particle datasets\"\"\"\n\n def __init__(self, ds, dataset_type):\n self.dataset_type = dataset_type\n self.dataset = weakref.proxy(ds)\n self.float_type = np.float64\n super().__init__(ds, dataset_type)\n self._initialize_index()\n\n def _setup_geometry(self):\n self.regions = None\n\n def get_smallest_dx(self):\n \"\"\"\n Returns (in code units) the smallest cell size in the simulation.\n \"\"\"\n return self.ds.arr(0, \"code_length\")\n\n def _get_particle_type_counts(self):\n result = collections.defaultdict(lambda: 0)\n for df in self.data_files:\n for k in df.total_particles.keys():\n result[k] += df.total_particles[k]\n return dict(result)\n\n def convert(self, unit):\n return self.dataset.conversion_factors[unit]\n\n @property\n def chunksize(self):\n # This can be overridden in subclasses\n return 64 ** 3\n\n _data_files = None\n\n @property\n def data_files(self):\n if self._data_files is not None:\n return self._data_files\n\n self._setup_filenames()\n return self._data_files\n\n @data_files.setter\n def data_files(self, value):\n self._data_files = value\n\n _total_particles = None\n\n @property\n def total_particles(self):\n if self._total_particles is not None:\n return self._total_particles\n\n self._total_particles = sum(\n sum(d.total_particles.values()) for d in self.data_files\n )\n return self._total_particles\n\n def _setup_filenames(self):\n template = self.dataset.filename_template\n ndoms = self.dataset.file_count\n cls = self.dataset._file_class\n self.data_files = []\n fi = 0\n for i in range(int(ndoms)):\n start = 0\n if self.chunksize > 0:\n end = start + self.chunksize\n else:\n end = None\n while True:\n df = cls(self.dataset, self.io, template % {\"num\": i}, fi, (start, end))\n if max(df.total_particles.values()) == 0:\n break\n fi += 1\n self.data_files.append(df)\n if self.chunksize <= 0:\n break\n start = end\n end += self.chunksize\n\n def _initialize_index(self):\n ds = self.dataset\n only_on_root(\n mylog.info,\n \"Allocating for %0.3e particles\",\n self.total_particles,\n global_rootonly=True,\n )\n\n # if we have not yet set domain_left_edge and domain_right_edge then do\n # an I/O pass over the particle coordinates to determine a bounding box\n if self.ds.domain_left_edge is None:\n min_ppos = np.empty(3, dtype=\"float64\")\n min_ppos[:] = np.nan\n max_ppos = np.empty(3, dtype=\"float64\")\n max_ppos[:] = np.nan\n only_on_root(\n mylog.info,\n \"Bounding box cannot be inferred from metadata, reading \"\n \"particle positions to infer bounding box\",\n )\n for df in self.data_files:\n for _, ppos in self.io._yield_coordinates(df):\n min_ppos = np.nanmin(np.vstack([min_ppos, ppos]), axis=0)\n max_ppos = np.nanmax(np.vstack([max_ppos, ppos]), axis=0)\n only_on_root(\n mylog.info,\n \"Load this dataset with bounding_box=[%s, %s] to avoid I/O \"\n \"overhead from inferring bounding_box.\" % (min_ppos, max_ppos),\n )\n ds.domain_left_edge = ds.arr(1.05 * min_ppos, \"code_length\")\n ds.domain_right_edge = ds.arr(1.05 * max_ppos, \"code_length\")\n ds.domain_width = ds.domain_right_edge - ds.domain_left_edge\n\n # use a trivial morton index for datasets containing a single chunk\n if len(self.data_files) == 1:\n order1 = 1\n order2 = 1\n else:\n order1 = ds.index_order[0]\n order2 = ds.index_order[1]\n\n if order1 == 1 and order2 == 1:\n dont_cache = True\n else:\n dont_cache = False\n\n # If we have applied a bounding box then we can't cache the\n # ParticleBitmap because it is doman dependent\n if getattr(ds, \"_domain_override\", False):\n dont_cache = True\n\n if not hasattr(self.ds, \"_file_hash\"):\n self.ds._file_hash = self._generate_hash()\n\n self.regions = ParticleBitmap(\n ds.domain_left_edge,\n ds.domain_right_edge,\n ds.periodicity,\n self.ds._file_hash,\n len(self.data_files),\n index_order1=order1,\n index_order2=order2,\n )\n\n # Load Morton index from file if provided\n if getattr(ds, \"index_filename\", None) is None:\n fname = ds.parameter_filename + \".index{}_{}.ewah\".format(\n self.regions.index_order1, self.regions.index_order2\n )\n else:\n fname = ds.index_filename\n\n dont_load = dont_cache and not hasattr(ds, \"index_filename\")\n try:\n if dont_load:\n raise OSError\n rflag = self.regions.load_bitmasks(fname)\n rflag = self.regions.check_bitmasks()\n self._initialize_frontend_specific()\n if rflag == 0:\n raise OSError\n except (OSError, struct.error):\n self.regions.reset_bitmasks()\n self._initialize_coarse_index()\n self._initialize_refined_index()\n wdir = os.path.dirname(fname)\n if not dont_cache and os.access(wdir, os.W_OK):\n # Sometimes os mis-reports whether a directory is writable,\n # So pass if writing the bitmask file fails.\n try:\n self.regions.save_bitmasks(fname)\n except OSError:\n pass\n rflag = self.regions.check_bitmasks()\n\n def _initialize_coarse_index(self):\n pb = get_pbar(\"Initializing coarse index \", len(self.data_files))\n for i, data_file in enumerate(self.data_files):\n pb.update(i + 1)\n for ptype, pos in self.io._yield_coordinates(data_file):\n ds = self.ds\n if hasattr(ds, \"_sph_ptypes\") and ptype == ds._sph_ptypes[0]:\n hsml = self.io._get_smoothing_length(\n data_file, pos.dtype, pos.shape\n )\n else:\n hsml = None\n self.regions._coarse_index_data_file(pos, hsml, data_file.file_id)\n self.regions._set_coarse_index_data_file(data_file.file_id)\n pb.finish()\n self.regions.find_collisions_coarse()\n\n def _initialize_refined_index(self):\n mask = self.regions.masks.sum(axis=1).astype(\"uint8\")\n max_npart = max(sum(d.total_particles.values()) for d in self.data_files) * 28\n sub_mi1 = np.zeros(max_npart, \"uint64\")\n sub_mi2 = np.zeros(max_npart, \"uint64\")\n pb = get_pbar(\"Initializing refined index\", len(self.data_files))\n mask_threshold = getattr(self, \"_index_mask_threshold\", 2)\n count_threshold = getattr(self, \"_index_count_threshold\", 256)\n mylog.debug(\n \"Using estimated thresholds of %s and %s for refinement\",\n mask_threshold,\n count_threshold,\n )\n total_refined = 0\n total_coarse_refined = (\n (mask >= 2) & (self.regions.particle_counts > count_threshold)\n ).sum()\n mylog.debug(\n \"This should produce roughly %s zones, for %s of the domain\",\n total_coarse_refined,\n 100 * total_coarse_refined / mask.size,\n )\n for i, data_file in enumerate(self.data_files):\n coll = None\n pb.update(i + 1)\n nsub_mi = 0\n for ptype, pos in self.io._yield_coordinates(data_file):\n if pos.size == 0:\n continue\n if hasattr(self.ds, \"_sph_ptypes\") and ptype == self.ds._sph_ptypes[0]:\n hsml = self.io._get_smoothing_length(\n data_file, pos.dtype, pos.shape\n )\n else:\n hsml = None\n nsub_mi, coll = self.regions._refined_index_data_file(\n coll,\n pos,\n hsml,\n mask,\n sub_mi1,\n sub_mi2,\n data_file.file_id,\n nsub_mi,\n count_threshold=count_threshold,\n mask_threshold=mask_threshold,\n )\n total_refined += nsub_mi\n self.regions.bitmasks.append(data_file.file_id, coll)\n pb.finish()\n self.regions.find_collisions_refined()\n\n def _detect_output_fields(self):\n # TODO: Add additional fields\n dsl = []\n units = {}\n pcounts = self._get_particle_type_counts()\n field_cache = {}\n for dom in self.data_files:\n if dom.filename in field_cache:\n fl, _units = field_cache[dom.filename]\n else:\n fl, _units = self.io._identify_fields(dom)\n field_cache[dom.filename] = fl, _units\n units.update(_units)\n dom._calculate_offsets(fl, pcounts)\n for f in fl:\n if f not in dsl:\n dsl.append(f)\n self.field_list = dsl\n ds = self.dataset\n ds.particle_types = tuple({pt for pt, ds in dsl})\n # This is an attribute that means these particle types *actually*\n # exist. As in, they are real, in the dataset.\n ds.field_units.update(units)\n ds.particle_types_raw = ds.particle_types\n\n def _identify_base_chunk(self, dobj):\n # Must check that chunk_info contains the right number of ghost zones\n if getattr(dobj, \"_chunk_info\", None) is None:\n if isinstance(dobj, ParticleContainer):\n dobj._chunk_info = [dobj]\n else:\n # TODO: only return files\n if getattr(dobj.selector, \"is_all_data\", False):\n nfiles = self.regions.nfiles\n dfi = np.arange(nfiles)\n else:\n dfi, file_masks, addfi = self.regions.identify_file_masks(\n dobj.selector\n )\n nfiles = len(file_masks)\n dobj._chunk_info = [None for _ in range(nfiles)]\n\n # The following was moved here from ParticleContainer in order\n # to make the ParticleContainer object pickleable. By having\n # the base_selector as its own argument, we avoid having to\n # rebuild the index on unpickling a ParticleContainer.\n if hasattr(dobj, \"base_selector\"):\n base_selector = dobj.base_selector\n base_region = dobj.base_region\n else:\n base_region = dobj\n base_selector = dobj.selector\n\n for i in range(nfiles):\n domain_id = i + 1\n dobj._chunk_info[i] = ParticleContainer(\n base_region,\n base_selector,\n [self.data_files[dfi[i]]],\n domain_id=domain_id,\n )\n # NOTE: One fun thing about the way IO works is that it\n # consolidates things quite nicely. So we should feel free to\n # create as many objects as part of the chunk as we want, since\n # it'll take the set() of them. So if we break stuff up like\n # this here, we end up in a situation where we have the ability\n # to break things down further later on for buffer zones and the\n # like.\n (dobj._current_chunk,) = self._chunk_all(dobj)\n\n def _chunk_all(self, dobj):\n oobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n yield YTDataChunk(dobj, \"all\", oobjs, None)\n\n def _chunk_spatial(self, dobj, ngz, sort=None, preload_fields=None):\n sobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n for og in sobjs:\n with og._expand_data_files():\n if ngz > 0:\n g = og.retrieve_ghost_zones(ngz, [], smoothed=True)\n else:\n g = og\n yield YTDataChunk(dobj, \"spatial\", [g])\n\n def _chunk_io(self, dobj, cache=True, local_only=False):\n oobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n for container in oobjs:\n yield YTDataChunk(dobj, \"io\", [container], None, cache=cache)\n\n def _generate_hash(self):\n # Generate an FNV hash by creating a byte array containing the\n # modification time of as well as the first and last 1 MB of data in\n # every output file\n ret = bytearray()\n for pfile in self.data_files:\n\n # only look at \"real\" files, not \"fake\" files generated by the\n # chunking system\n if pfile.start not in (0, None):\n continue\n try:\n mtime = os.path.getmtime(pfile.filename)\n except OSError as e:\n if e.errno == errno.ENOENT:\n # this is an in-memory file so we return with a dummy\n # value\n return -1\n else:\n raise\n ret.extend(str(mtime).encode(\"utf-8\"))\n size = os.path.getsize(pfile.filename)\n if size > 1e6:\n size = int(1e6)\n with open(pfile.filename, \"rb\") as fh:\n # read in first and last 1 MB of data\n data = fh.read(size)\n fh.seek(-size, os.SEEK_END)\n data = fh.read(size)\n ret.extend(data)\n return fnv_hash(ret)\n\n def _initialize_frontend_specific(self):\n \"\"\"This is for frontend-specific initialization code\n\n If there are frontend-specific things that need to be set while\n creating the index, this function forces these operations to happen\n in cases where we are reloading the index from a sidecar file.\n \"\"\"\n pass\n"
},
{
"file": "yt/geometry/particle_geometry_handler.py",
"description": "This is where we build our new index. Let's take a look at what that looks like!\n\nWe have two steps to this process. The first is to build a \"coarse\" index -- this is a reasonably low-resolution index to let us know, \"Hey, this bit might be relevant!\") and the second is a much more \"refined\" index for when we need to do very detail-oriented subselection.",
"line": 184,
"contents": "import collections\nimport errno\nimport os\nimport struct\nimport weakref\n\nimport numpy as np\n\nfrom yt.data_objects.index_subobjects.particle_container import ParticleContainer\nfrom yt.funcs import get_pbar, only_on_root\nfrom yt.geometry.geometry_handler import Index, YTDataChunk\nfrom yt.geometry.particle_oct_container import ParticleBitmap\nfrom yt.utilities.lib.fnv_hash import fnv_hash\nfrom yt.utilities.logger import ytLogger as mylog\n\n\nclass ParticleIndex(Index):\n \"\"\"The Index subclass for particle datasets\"\"\"\n\n def __init__(self, ds, dataset_type):\n self.dataset_type = dataset_type\n self.dataset = weakref.proxy(ds)\n self.float_type = np.float64\n super().__init__(ds, dataset_type)\n self._initialize_index()\n\n def _setup_geometry(self):\n self.regions = None\n\n def get_smallest_dx(self):\n \"\"\"\n Returns (in code units) the smallest cell size in the simulation.\n \"\"\"\n return self.ds.arr(0, \"code_length\")\n\n def _get_particle_type_counts(self):\n result = collections.defaultdict(lambda: 0)\n for df in self.data_files:\n for k in df.total_particles.keys():\n result[k] += df.total_particles[k]\n return dict(result)\n\n def convert(self, unit):\n return self.dataset.conversion_factors[unit]\n\n @property\n def chunksize(self):\n # This can be overridden in subclasses\n return 64 ** 3\n\n _data_files = None\n\n @property\n def data_files(self):\n if self._data_files is not None:\n return self._data_files\n\n self._setup_filenames()\n return self._data_files\n\n @data_files.setter\n def data_files(self, value):\n self._data_files = value\n\n _total_particles = None\n\n @property\n def total_particles(self):\n if self._total_particles is not None:\n return self._total_particles\n\n self._total_particles = sum(\n sum(d.total_particles.values()) for d in self.data_files\n )\n return self._total_particles\n\n def _setup_filenames(self):\n template = self.dataset.filename_template\n ndoms = self.dataset.file_count\n cls = self.dataset._file_class\n self.data_files = []\n fi = 0\n for i in range(int(ndoms)):\n start = 0\n if self.chunksize > 0:\n end = start + self.chunksize\n else:\n end = None\n while True:\n df = cls(self.dataset, self.io, template % {\"num\": i}, fi, (start, end))\n if max(df.total_particles.values()) == 0:\n break\n fi += 1\n self.data_files.append(df)\n if self.chunksize <= 0:\n break\n start = end\n end += self.chunksize\n\n def _initialize_index(self):\n ds = self.dataset\n only_on_root(\n mylog.info,\n \"Allocating for %0.3e particles\",\n self.total_particles,\n global_rootonly=True,\n )\n\n # if we have not yet set domain_left_edge and domain_right_edge then do\n # an I/O pass over the particle coordinates to determine a bounding box\n if self.ds.domain_left_edge is None:\n min_ppos = np.empty(3, dtype=\"float64\")\n min_ppos[:] = np.nan\n max_ppos = np.empty(3, dtype=\"float64\")\n max_ppos[:] = np.nan\n only_on_root(\n mylog.info,\n \"Bounding box cannot be inferred from metadata, reading \"\n \"particle positions to infer bounding box\",\n )\n for df in self.data_files:\n for _, ppos in self.io._yield_coordinates(df):\n min_ppos = np.nanmin(np.vstack([min_ppos, ppos]), axis=0)\n max_ppos = np.nanmax(np.vstack([max_ppos, ppos]), axis=0)\n only_on_root(\n mylog.info,\n \"Load this dataset with bounding_box=[%s, %s] to avoid I/O \"\n \"overhead from inferring bounding_box.\" % (min_ppos, max_ppos),\n )\n ds.domain_left_edge = ds.arr(1.05 * min_ppos, \"code_length\")\n ds.domain_right_edge = ds.arr(1.05 * max_ppos, \"code_length\")\n ds.domain_width = ds.domain_right_edge - ds.domain_left_edge\n\n # use a trivial morton index for datasets containing a single chunk\n if len(self.data_files) == 1:\n order1 = 1\n order2 = 1\n else:\n order1 = ds.index_order[0]\n order2 = ds.index_order[1]\n\n if order1 == 1 and order2 == 1:\n dont_cache = True\n else:\n dont_cache = False\n\n # If we have applied a bounding box then we can't cache the\n # ParticleBitmap because it is doman dependent\n if getattr(ds, \"_domain_override\", False):\n dont_cache = True\n\n if not hasattr(self.ds, \"_file_hash\"):\n self.ds._file_hash = self._generate_hash()\n\n self.regions = ParticleBitmap(\n ds.domain_left_edge,\n ds.domain_right_edge,\n ds.periodicity,\n self.ds._file_hash,\n len(self.data_files),\n index_order1=order1,\n index_order2=order2,\n )\n\n # Load Morton index from file if provided\n if getattr(ds, \"index_filename\", None) is None:\n fname = ds.parameter_filename + \".index{}_{}.ewah\".format(\n self.regions.index_order1, self.regions.index_order2\n )\n else:\n fname = ds.index_filename\n\n dont_load = dont_cache and not hasattr(ds, \"index_filename\")\n try:\n if dont_load:\n raise OSError\n rflag = self.regions.load_bitmasks(fname)\n rflag = self.regions.check_bitmasks()\n self._initialize_frontend_specific()\n if rflag == 0:\n raise OSError\n except (OSError, struct.error):\n self.regions.reset_bitmasks()\n self._initialize_coarse_index()\n self._initialize_refined_index()\n wdir = os.path.dirname(fname)\n if not dont_cache and os.access(wdir, os.W_OK):\n # Sometimes os mis-reports whether a directory is writable,\n # So pass if writing the bitmask file fails.\n try:\n self.regions.save_bitmasks(fname)\n except OSError:\n pass\n rflag = self.regions.check_bitmasks()\n\n def _initialize_coarse_index(self):\n pb = get_pbar(\"Initializing coarse index \", len(self.data_files))\n for i, data_file in enumerate(self.data_files):\n pb.update(i + 1)\n for ptype, pos in self.io._yield_coordinates(data_file):\n ds = self.ds\n if hasattr(ds, \"_sph_ptypes\") and ptype == ds._sph_ptypes[0]:\n hsml = self.io._get_smoothing_length(\n data_file, pos.dtype, pos.shape\n )\n else:\n hsml = None\n self.regions._coarse_index_data_file(pos, hsml, data_file.file_id)\n self.regions._set_coarse_index_data_file(data_file.file_id)\n pb.finish()\n self.regions.find_collisions_coarse()\n\n def _initialize_refined_index(self):\n mask = self.regions.masks.sum(axis=1).astype(\"uint8\")\n max_npart = max(sum(d.total_particles.values()) for d in self.data_files) * 28\n sub_mi1 = np.zeros(max_npart, \"uint64\")\n sub_mi2 = np.zeros(max_npart, \"uint64\")\n pb = get_pbar(\"Initializing refined index\", len(self.data_files))\n mask_threshold = getattr(self, \"_index_mask_threshold\", 2)\n count_threshold = getattr(self, \"_index_count_threshold\", 256)\n mylog.debug(\n \"Using estimated thresholds of %s and %s for refinement\",\n mask_threshold,\n count_threshold,\n )\n total_refined = 0\n total_coarse_refined = (\n (mask >= 2) & (self.regions.particle_counts > count_threshold)\n ).sum()\n mylog.debug(\n \"This should produce roughly %s zones, for %s of the domain\",\n total_coarse_refined,\n 100 * total_coarse_refined / mask.size,\n )\n for i, data_file in enumerate(self.data_files):\n coll = None\n pb.update(i + 1)\n nsub_mi = 0\n for ptype, pos in self.io._yield_coordinates(data_file):\n if pos.size == 0:\n continue\n if hasattr(self.ds, \"_sph_ptypes\") and ptype == self.ds._sph_ptypes[0]:\n hsml = self.io._get_smoothing_length(\n data_file, pos.dtype, pos.shape\n )\n else:\n hsml = None\n nsub_mi, coll = self.regions._refined_index_data_file(\n coll,\n pos,\n hsml,\n mask,\n sub_mi1,\n sub_mi2,\n data_file.file_id,\n nsub_mi,\n count_threshold=count_threshold,\n mask_threshold=mask_threshold,\n )\n total_refined += nsub_mi\n self.regions.bitmasks.append(data_file.file_id, coll)\n pb.finish()\n self.regions.find_collisions_refined()\n\n def _detect_output_fields(self):\n # TODO: Add additional fields\n dsl = []\n units = {}\n pcounts = self._get_particle_type_counts()\n field_cache = {}\n for dom in self.data_files:\n if dom.filename in field_cache:\n fl, _units = field_cache[dom.filename]\n else:\n fl, _units = self.io._identify_fields(dom)\n field_cache[dom.filename] = fl, _units\n units.update(_units)\n dom._calculate_offsets(fl, pcounts)\n for f in fl:\n if f not in dsl:\n dsl.append(f)\n self.field_list = dsl\n ds = self.dataset\n ds.particle_types = tuple({pt for pt, ds in dsl})\n # This is an attribute that means these particle types *actually*\n # exist. As in, they are real, in the dataset.\n ds.field_units.update(units)\n ds.particle_types_raw = ds.particle_types\n\n def _identify_base_chunk(self, dobj):\n # Must check that chunk_info contains the right number of ghost zones\n if getattr(dobj, \"_chunk_info\", None) is None:\n if isinstance(dobj, ParticleContainer):\n dobj._chunk_info = [dobj]\n else:\n # TODO: only return files\n if getattr(dobj.selector, \"is_all_data\", False):\n nfiles = self.regions.nfiles\n dfi = np.arange(nfiles)\n else:\n dfi, file_masks, addfi = self.regions.identify_file_masks(\n dobj.selector\n )\n nfiles = len(file_masks)\n dobj._chunk_info = [None for _ in range(nfiles)]\n\n # The following was moved here from ParticleContainer in order\n # to make the ParticleContainer object pickleable. By having\n # the base_selector as its own argument, we avoid having to\n # rebuild the index on unpickling a ParticleContainer.\n if hasattr(dobj, \"base_selector\"):\n base_selector = dobj.base_selector\n base_region = dobj.base_region\n else:\n base_region = dobj\n base_selector = dobj.selector\n\n for i in range(nfiles):\n domain_id = i + 1\n dobj._chunk_info[i] = ParticleContainer(\n base_region,\n base_selector,\n [self.data_files[dfi[i]]],\n domain_id=domain_id,\n )\n # NOTE: One fun thing about the way IO works is that it\n # consolidates things quite nicely. So we should feel free to\n # create as many objects as part of the chunk as we want, since\n # it'll take the set() of them. So if we break stuff up like\n # this here, we end up in a situation where we have the ability\n # to break things down further later on for buffer zones and the\n # like.\n (dobj._current_chunk,) = self._chunk_all(dobj)\n\n def _chunk_all(self, dobj):\n oobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n yield YTDataChunk(dobj, \"all\", oobjs, None)\n\n def _chunk_spatial(self, dobj, ngz, sort=None, preload_fields=None):\n sobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n for og in sobjs:\n with og._expand_data_files():\n if ngz > 0:\n g = og.retrieve_ghost_zones(ngz, [], smoothed=True)\n else:\n g = og\n yield YTDataChunk(dobj, \"spatial\", [g])\n\n def _chunk_io(self, dobj, cache=True, local_only=False):\n oobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n for container in oobjs:\n yield YTDataChunk(dobj, \"io\", [container], None, cache=cache)\n\n def _generate_hash(self):\n # Generate an FNV hash by creating a byte array containing the\n # modification time of as well as the first and last 1 MB of data in\n # every output file\n ret = bytearray()\n for pfile in self.data_files:\n\n # only look at \"real\" files, not \"fake\" files generated by the\n # chunking system\n if pfile.start not in (0, None):\n continue\n try:\n mtime = os.path.getmtime(pfile.filename)\n except OSError as e:\n if e.errno == errno.ENOENT:\n # this is an in-memory file so we return with a dummy\n # value\n return -1\n else:\n raise\n ret.extend(str(mtime).encode(\"utf-8\"))\n size = os.path.getsize(pfile.filename)\n if size > 1e6:\n size = int(1e6)\n with open(pfile.filename, \"rb\") as fh:\n # read in first and last 1 MB of data\n data = fh.read(size)\n fh.seek(-size, os.SEEK_END)\n data = fh.read(size)\n ret.extend(data)\n return fnv_hash(ret)\n\n def _initialize_frontend_specific(self):\n \"\"\"This is for frontend-specific initialization code\n\n If there are frontend-specific things that need to be set while\n creating the index, this function forces these operations to happen\n in cases where we are reloading the index from a sidecar file.\n \"\"\"\n pass\n"
},
{
"file": "yt/geometry/particle_geometry_handler.py",
"description": "To generate a coarse index, we loop over all of our files, and look at the particles. (This sure does sound expensive, right? Good thing we're going to cache the results!)\n\nEach `IOHandler` for the particle objects has to provide a `_yield_coordinates` method. This method just has to yield a bunch of tuples of the particle types and their positions.",
"line": 198,
"contents": "import collections\nimport errno\nimport os\nimport struct\nimport weakref\n\nimport numpy as np\n\nfrom yt.data_objects.index_subobjects.particle_container import ParticleContainer\nfrom yt.funcs import get_pbar, only_on_root\nfrom yt.geometry.geometry_handler import Index, YTDataChunk\nfrom yt.geometry.particle_oct_container import ParticleBitmap\nfrom yt.utilities.lib.fnv_hash import fnv_hash\nfrom yt.utilities.logger import ytLogger as mylog\n\n\nclass ParticleIndex(Index):\n \"\"\"The Index subclass for particle datasets\"\"\"\n\n def __init__(self, ds, dataset_type):\n self.dataset_type = dataset_type\n self.dataset = weakref.proxy(ds)\n self.float_type = np.float64\n super().__init__(ds, dataset_type)\n self._initialize_index()\n\n def _setup_geometry(self):\n self.regions = None\n\n def get_smallest_dx(self):\n \"\"\"\n Returns (in code units) the smallest cell size in the simulation.\n \"\"\"\n return self.ds.arr(0, \"code_length\")\n\n def _get_particle_type_counts(self):\n result = collections.defaultdict(lambda: 0)\n for df in self.data_files:\n for k in df.total_particles.keys():\n result[k] += df.total_particles[k]\n return dict(result)\n\n def convert(self, unit):\n return self.dataset.conversion_factors[unit]\n\n @property\n def chunksize(self):\n # This can be overridden in subclasses\n return 64 ** 3\n\n _data_files = None\n\n @property\n def data_files(self):\n if self._data_files is not None:\n return self._data_files\n\n self._setup_filenames()\n return self._data_files\n\n @data_files.setter\n def data_files(self, value):\n self._data_files = value\n\n _total_particles = None\n\n @property\n def total_particles(self):\n if self._total_particles is not None:\n return self._total_particles\n\n self._total_particles = sum(\n sum(d.total_particles.values()) for d in self.data_files\n )\n return self._total_particles\n\n def _setup_filenames(self):\n template = self.dataset.filename_template\n ndoms = self.dataset.file_count\n cls = self.dataset._file_class\n self.data_files = []\n fi = 0\n for i in range(int(ndoms)):\n start = 0\n if self.chunksize > 0:\n end = start + self.chunksize\n else:\n end = None\n while True:\n df = cls(self.dataset, self.io, template % {\"num\": i}, fi, (start, end))\n if max(df.total_particles.values()) == 0:\n break\n fi += 1\n self.data_files.append(df)\n if self.chunksize <= 0:\n break\n start = end\n end += self.chunksize\n\n def _initialize_index(self):\n ds = self.dataset\n only_on_root(\n mylog.info,\n \"Allocating for %0.3e particles\",\n self.total_particles,\n global_rootonly=True,\n )\n\n # if we have not yet set domain_left_edge and domain_right_edge then do\n # an I/O pass over the particle coordinates to determine a bounding box\n if self.ds.domain_left_edge is None:\n min_ppos = np.empty(3, dtype=\"float64\")\n min_ppos[:] = np.nan\n max_ppos = np.empty(3, dtype=\"float64\")\n max_ppos[:] = np.nan\n only_on_root(\n mylog.info,\n \"Bounding box cannot be inferred from metadata, reading \"\n \"particle positions to infer bounding box\",\n )\n for df in self.data_files:\n for _, ppos in self.io._yield_coordinates(df):\n min_ppos = np.nanmin(np.vstack([min_ppos, ppos]), axis=0)\n max_ppos = np.nanmax(np.vstack([max_ppos, ppos]), axis=0)\n only_on_root(\n mylog.info,\n \"Load this dataset with bounding_box=[%s, %s] to avoid I/O \"\n \"overhead from inferring bounding_box.\" % (min_ppos, max_ppos),\n )\n ds.domain_left_edge = ds.arr(1.05 * min_ppos, \"code_length\")\n ds.domain_right_edge = ds.arr(1.05 * max_ppos, \"code_length\")\n ds.domain_width = ds.domain_right_edge - ds.domain_left_edge\n\n # use a trivial morton index for datasets containing a single chunk\n if len(self.data_files) == 1:\n order1 = 1\n order2 = 1\n else:\n order1 = ds.index_order[0]\n order2 = ds.index_order[1]\n\n if order1 == 1 and order2 == 1:\n dont_cache = True\n else:\n dont_cache = False\n\n # If we have applied a bounding box then we can't cache the\n # ParticleBitmap because it is doman dependent\n if getattr(ds, \"_domain_override\", False):\n dont_cache = True\n\n if not hasattr(self.ds, \"_file_hash\"):\n self.ds._file_hash = self._generate_hash()\n\n self.regions = ParticleBitmap(\n ds.domain_left_edge,\n ds.domain_right_edge,\n ds.periodicity,\n self.ds._file_hash,\n len(self.data_files),\n index_order1=order1,\n index_order2=order2,\n )\n\n # Load Morton index from file if provided\n if getattr(ds, \"index_filename\", None) is None:\n fname = ds.parameter_filename + \".index{}_{}.ewah\".format(\n self.regions.index_order1, self.regions.index_order2\n )\n else:\n fname = ds.index_filename\n\n dont_load = dont_cache and not hasattr(ds, \"index_filename\")\n try:\n if dont_load:\n raise OSError\n rflag = self.regions.load_bitmasks(fname)\n rflag = self.regions.check_bitmasks()\n self._initialize_frontend_specific()\n if rflag == 0:\n raise OSError\n except (OSError, struct.error):\n self.regions.reset_bitmasks()\n self._initialize_coarse_index()\n self._initialize_refined_index()\n wdir = os.path.dirname(fname)\n if not dont_cache and os.access(wdir, os.W_OK):\n # Sometimes os mis-reports whether a directory is writable,\n # So pass if writing the bitmask file fails.\n try:\n self.regions.save_bitmasks(fname)\n except OSError:\n pass\n rflag = self.regions.check_bitmasks()\n\n def _initialize_coarse_index(self):\n pb = get_pbar(\"Initializing coarse index \", len(self.data_files))\n for i, data_file in enumerate(self.data_files):\n pb.update(i + 1)\n for ptype, pos in self.io._yield_coordinates(data_file):\n ds = self.ds\n if hasattr(ds, \"_sph_ptypes\") and ptype == ds._sph_ptypes[0]:\n hsml = self.io._get_smoothing_length(\n data_file, pos.dtype, pos.shape\n )\n else:\n hsml = None\n self.regions._coarse_index_data_file(pos, hsml, data_file.file_id)\n self.regions._set_coarse_index_data_file(data_file.file_id)\n pb.finish()\n self.regions.find_collisions_coarse()\n\n def _initialize_refined_index(self):\n mask = self.regions.masks.sum(axis=1).astype(\"uint8\")\n max_npart = max(sum(d.total_particles.values()) for d in self.data_files) * 28\n sub_mi1 = np.zeros(max_npart, \"uint64\")\n sub_mi2 = np.zeros(max_npart, \"uint64\")\n pb = get_pbar(\"Initializing refined index\", len(self.data_files))\n mask_threshold = getattr(self, \"_index_mask_threshold\", 2)\n count_threshold = getattr(self, \"_index_count_threshold\", 256)\n mylog.debug(\n \"Using estimated thresholds of %s and %s for refinement\",\n mask_threshold,\n count_threshold,\n )\n total_refined = 0\n total_coarse_refined = (\n (mask >= 2) & (self.regions.particle_counts > count_threshold)\n ).sum()\n mylog.debug(\n \"This should produce roughly %s zones, for %s of the domain\",\n total_coarse_refined,\n 100 * total_coarse_refined / mask.size,\n )\n for i, data_file in enumerate(self.data_files):\n coll = None\n pb.update(i + 1)\n nsub_mi = 0\n for ptype, pos in self.io._yield_coordinates(data_file):\n if pos.size == 0:\n continue\n if hasattr(self.ds, \"_sph_ptypes\") and ptype == self.ds._sph_ptypes[0]:\n hsml = self.io._get_smoothing_length(\n data_file, pos.dtype, pos.shape\n )\n else:\n hsml = None\n nsub_mi, coll = self.regions._refined_index_data_file(\n coll,\n pos,\n hsml,\n mask,\n sub_mi1,\n sub_mi2,\n data_file.file_id,\n nsub_mi,\n count_threshold=count_threshold,\n mask_threshold=mask_threshold,\n )\n total_refined += nsub_mi\n self.regions.bitmasks.append(data_file.file_id, coll)\n pb.finish()\n self.regions.find_collisions_refined()\n\n def _detect_output_fields(self):\n # TODO: Add additional fields\n dsl = []\n units = {}\n pcounts = self._get_particle_type_counts()\n field_cache = {}\n for dom in self.data_files:\n if dom.filename in field_cache:\n fl, _units = field_cache[dom.filename]\n else:\n fl, _units = self.io._identify_fields(dom)\n field_cache[dom.filename] = fl, _units\n units.update(_units)\n dom._calculate_offsets(fl, pcounts)\n for f in fl:\n if f not in dsl:\n dsl.append(f)\n self.field_list = dsl\n ds = self.dataset\n ds.particle_types = tuple({pt for pt, ds in dsl})\n # This is an attribute that means these particle types *actually*\n # exist. As in, they are real, in the dataset.\n ds.field_units.update(units)\n ds.particle_types_raw = ds.particle_types\n\n def _identify_base_chunk(self, dobj):\n # Must check that chunk_info contains the right number of ghost zones\n if getattr(dobj, \"_chunk_info\", None) is None:\n if isinstance(dobj, ParticleContainer):\n dobj._chunk_info = [dobj]\n else:\n # TODO: only return files\n if getattr(dobj.selector, \"is_all_data\", False):\n nfiles = self.regions.nfiles\n dfi = np.arange(nfiles)\n else:\n dfi, file_masks, addfi = self.regions.identify_file_masks(\n dobj.selector\n )\n nfiles = len(file_masks)\n dobj._chunk_info = [None for _ in range(nfiles)]\n\n # The following was moved here from ParticleContainer in order\n # to make the ParticleContainer object pickleable. By having\n # the base_selector as its own argument, we avoid having to\n # rebuild the index on unpickling a ParticleContainer.\n if hasattr(dobj, \"base_selector\"):\n base_selector = dobj.base_selector\n base_region = dobj.base_region\n else:\n base_region = dobj\n base_selector = dobj.selector\n\n for i in range(nfiles):\n domain_id = i + 1\n dobj._chunk_info[i] = ParticleContainer(\n base_region,\n base_selector,\n [self.data_files[dfi[i]]],\n domain_id=domain_id,\n )\n # NOTE: One fun thing about the way IO works is that it\n # consolidates things quite nicely. So we should feel free to\n # create as many objects as part of the chunk as we want, since\n # it'll take the set() of them. So if we break stuff up like\n # this here, we end up in a situation where we have the ability\n # to break things down further later on for buffer zones and the\n # like.\n (dobj._current_chunk,) = self._chunk_all(dobj)\n\n def _chunk_all(self, dobj):\n oobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n yield YTDataChunk(dobj, \"all\", oobjs, None)\n\n def _chunk_spatial(self, dobj, ngz, sort=None, preload_fields=None):\n sobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n for og in sobjs:\n with og._expand_data_files():\n if ngz > 0:\n g = og.retrieve_ghost_zones(ngz, [], smoothed=True)\n else:\n g = og\n yield YTDataChunk(dobj, \"spatial\", [g])\n\n def _chunk_io(self, dobj, cache=True, local_only=False):\n oobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n for container in oobjs:\n yield YTDataChunk(dobj, \"io\", [container], None, cache=cache)\n\n def _generate_hash(self):\n # Generate an FNV hash by creating a byte array containing the\n # modification time of as well as the first and last 1 MB of data in\n # every output file\n ret = bytearray()\n for pfile in self.data_files:\n\n # only look at \"real\" files, not \"fake\" files generated by the\n # chunking system\n if pfile.start not in (0, None):\n continue\n try:\n mtime = os.path.getmtime(pfile.filename)\n except OSError as e:\n if e.errno == errno.ENOENT:\n # this is an in-memory file so we return with a dummy\n # value\n return -1\n else:\n raise\n ret.extend(str(mtime).encode(\"utf-8\"))\n size = os.path.getsize(pfile.filename)\n if size > 1e6:\n size = int(1e6)\n with open(pfile.filename, \"rb\") as fh:\n # read in first and last 1 MB of data\n data = fh.read(size)\n fh.seek(-size, os.SEEK_END)\n data = fh.read(size)\n ret.extend(data)\n return fnv_hash(ret)\n\n def _initialize_frontend_specific(self):\n \"\"\"This is for frontend-specific initialization code\n\n If there are frontend-specific things that need to be set while\n creating the index, this function forces these operations to happen\n in cases where we are reloading the index from a sidecar file.\n \"\"\"\n pass\n"
},
{
"file": "yt/geometry/particle_geometry_handler.py",
"description": "We won't be diving in to this routine, but let's talk about what it does.\n\nThe `regions` attribute on a `ParticleIndex` object is where we set \"on\" and \"off\" bits that correspond between spatial locations and `data_file` objects.\n\nIf we think about our domain as a big 3D volume, we could divide it up into a grid of positions. Each of these positions -- `i, j, k` -- can be turned into a single number. To do this we use a simple morton index.\n\nThe purpose of this loop is to turn each `data_file` into a set of \"on\" and \"off\" marks, where \"off\" indicates that no particles exist, and \"on\" indicates they do.\n\nSo, going *in*, each `data_file` is given an array of all zeros, and the array has `I*J*K` *bits*, where `I` is the number of subdivisions in x, `J` is the number in y and `K` is the number in z. The way we set it up, these are always the same, and they are always equal to `2**index_order1`. So if your `index_order1` is 3, this would be `I==J==K==8`, and you'd have a total of `8*8*8` bits in the array, corresponding to a total size of 64 bytes.\n\nOne important thing to keep in mind is that we save on memory by only storing the *bits*, not the counts! That's because this is just an indexing system meant to tell us where to look, so we want to keep it as lightweight as possible.",
"line": 208,
"contents": "import collections\nimport errno\nimport os\nimport struct\nimport weakref\n\nimport numpy as np\n\nfrom yt.data_objects.index_subobjects.particle_container import ParticleContainer\nfrom yt.funcs import get_pbar, only_on_root\nfrom yt.geometry.geometry_handler import Index, YTDataChunk\nfrom yt.geometry.particle_oct_container import ParticleBitmap\nfrom yt.utilities.lib.fnv_hash import fnv_hash\nfrom yt.utilities.logger import ytLogger as mylog\n\n\nclass ParticleIndex(Index):\n \"\"\"The Index subclass for particle datasets\"\"\"\n\n def __init__(self, ds, dataset_type):\n self.dataset_type = dataset_type\n self.dataset = weakref.proxy(ds)\n self.float_type = np.float64\n super().__init__(ds, dataset_type)\n self._initialize_index()\n\n def _setup_geometry(self):\n self.regions = None\n\n def get_smallest_dx(self):\n \"\"\"\n Returns (in code units) the smallest cell size in the simulation.\n \"\"\"\n return self.ds.arr(0, \"code_length\")\n\n def _get_particle_type_counts(self):\n result = collections.defaultdict(lambda: 0)\n for df in self.data_files:\n for k in df.total_particles.keys():\n result[k] += df.total_particles[k]\n return dict(result)\n\n def convert(self, unit):\n return self.dataset.conversion_factors[unit]\n\n @property\n def chunksize(self):\n # This can be overridden in subclasses\n return 64 ** 3\n\n _data_files = None\n\n @property\n def data_files(self):\n if self._data_files is not None:\n return self._data_files\n\n self._setup_filenames()\n return self._data_files\n\n @data_files.setter\n def data_files(self, value):\n self._data_files = value\n\n _total_particles = None\n\n @property\n def total_particles(self):\n if self._total_particles is not None:\n return self._total_particles\n\n self._total_particles = sum(\n sum(d.total_particles.values()) for d in self.data_files\n )\n return self._total_particles\n\n def _setup_filenames(self):\n template = self.dataset.filename_template\n ndoms = self.dataset.file_count\n cls = self.dataset._file_class\n self.data_files = []\n fi = 0\n for i in range(int(ndoms)):\n start = 0\n if self.chunksize > 0:\n end = start + self.chunksize\n else:\n end = None\n while True:\n df = cls(self.dataset, self.io, template % {\"num\": i}, fi, (start, end))\n if max(df.total_particles.values()) == 0:\n break\n fi += 1\n self.data_files.append(df)\n if self.chunksize <= 0:\n break\n start = end\n end += self.chunksize\n\n def _initialize_index(self):\n ds = self.dataset\n only_on_root(\n mylog.info,\n \"Allocating for %0.3e particles\",\n self.total_particles,\n global_rootonly=True,\n )\n\n # if we have not yet set domain_left_edge and domain_right_edge then do\n # an I/O pass over the particle coordinates to determine a bounding box\n if self.ds.domain_left_edge is None:\n min_ppos = np.empty(3, dtype=\"float64\")\n min_ppos[:] = np.nan\n max_ppos = np.empty(3, dtype=\"float64\")\n max_ppos[:] = np.nan\n only_on_root(\n mylog.info,\n \"Bounding box cannot be inferred from metadata, reading \"\n \"particle positions to infer bounding box\",\n )\n for df in self.data_files:\n for _, ppos in self.io._yield_coordinates(df):\n min_ppos = np.nanmin(np.vstack([min_ppos, ppos]), axis=0)\n max_ppos = np.nanmax(np.vstack([max_ppos, ppos]), axis=0)\n only_on_root(\n mylog.info,\n \"Load this dataset with bounding_box=[%s, %s] to avoid I/O \"\n \"overhead from inferring bounding_box.\" % (min_ppos, max_ppos),\n )\n ds.domain_left_edge = ds.arr(1.05 * min_ppos, \"code_length\")\n ds.domain_right_edge = ds.arr(1.05 * max_ppos, \"code_length\")\n ds.domain_width = ds.domain_right_edge - ds.domain_left_edge\n\n # use a trivial morton index for datasets containing a single chunk\n if len(self.data_files) == 1:\n order1 = 1\n order2 = 1\n else:\n order1 = ds.index_order[0]\n order2 = ds.index_order[1]\n\n if order1 == 1 and order2 == 1:\n dont_cache = True\n else:\n dont_cache = False\n\n # If we have applied a bounding box then we can't cache the\n # ParticleBitmap because it is doman dependent\n if getattr(ds, \"_domain_override\", False):\n dont_cache = True\n\n if not hasattr(self.ds, \"_file_hash\"):\n self.ds._file_hash = self._generate_hash()\n\n self.regions = ParticleBitmap(\n ds.domain_left_edge,\n ds.domain_right_edge,\n ds.periodicity,\n self.ds._file_hash,\n len(self.data_files),\n index_order1=order1,\n index_order2=order2,\n )\n\n # Load Morton index from file if provided\n if getattr(ds, \"index_filename\", None) is None:\n fname = ds.parameter_filename + \".index{}_{}.ewah\".format(\n self.regions.index_order1, self.regions.index_order2\n )\n else:\n fname = ds.index_filename\n\n dont_load = dont_cache and not hasattr(ds, \"index_filename\")\n try:\n if dont_load:\n raise OSError\n rflag = self.regions.load_bitmasks(fname)\n rflag = self.regions.check_bitmasks()\n self._initialize_frontend_specific()\n if rflag == 0:\n raise OSError\n except (OSError, struct.error):\n self.regions.reset_bitmasks()\n self._initialize_coarse_index()\n self._initialize_refined_index()\n wdir = os.path.dirname(fname)\n if not dont_cache and os.access(wdir, os.W_OK):\n # Sometimes os mis-reports whether a directory is writable,\n # So pass if writing the bitmask file fails.\n try:\n self.regions.save_bitmasks(fname)\n except OSError:\n pass\n rflag = self.regions.check_bitmasks()\n\n def _initialize_coarse_index(self):\n pb = get_pbar(\"Initializing coarse index \", len(self.data_files))\n for i, data_file in enumerate(self.data_files):\n pb.update(i + 1)\n for ptype, pos in self.io._yield_coordinates(data_file):\n ds = self.ds\n if hasattr(ds, \"_sph_ptypes\") and ptype == ds._sph_ptypes[0]:\n hsml = self.io._get_smoothing_length(\n data_file, pos.dtype, pos.shape\n )\n else:\n hsml = None\n self.regions._coarse_index_data_file(pos, hsml, data_file.file_id)\n self.regions._set_coarse_index_data_file(data_file.file_id)\n pb.finish()\n self.regions.find_collisions_coarse()\n\n def _initialize_refined_index(self):\n mask = self.regions.masks.sum(axis=1).astype(\"uint8\")\n max_npart = max(sum(d.total_particles.values()) for d in self.data_files) * 28\n sub_mi1 = np.zeros(max_npart, \"uint64\")\n sub_mi2 = np.zeros(max_npart, \"uint64\")\n pb = get_pbar(\"Initializing refined index\", len(self.data_files))\n mask_threshold = getattr(self, \"_index_mask_threshold\", 2)\n count_threshold = getattr(self, \"_index_count_threshold\", 256)\n mylog.debug(\n \"Using estimated thresholds of %s and %s for refinement\",\n mask_threshold,\n count_threshold,\n )\n total_refined = 0\n total_coarse_refined = (\n (mask >= 2) & (self.regions.particle_counts > count_threshold)\n ).sum()\n mylog.debug(\n \"This should produce roughly %s zones, for %s of the domain\",\n total_coarse_refined,\n 100 * total_coarse_refined / mask.size,\n )\n for i, data_file in enumerate(self.data_files):\n coll = None\n pb.update(i + 1)\n nsub_mi = 0\n for ptype, pos in self.io._yield_coordinates(data_file):\n if pos.size == 0:\n continue\n if hasattr(self.ds, \"_sph_ptypes\") and ptype == self.ds._sph_ptypes[0]:\n hsml = self.io._get_smoothing_length(\n data_file, pos.dtype, pos.shape\n )\n else:\n hsml = None\n nsub_mi, coll = self.regions._refined_index_data_file(\n coll,\n pos,\n hsml,\n mask,\n sub_mi1,\n sub_mi2,\n data_file.file_id,\n nsub_mi,\n count_threshold=count_threshold,\n mask_threshold=mask_threshold,\n )\n total_refined += nsub_mi\n self.regions.bitmasks.append(data_file.file_id, coll)\n pb.finish()\n self.regions.find_collisions_refined()\n\n def _detect_output_fields(self):\n # TODO: Add additional fields\n dsl = []\n units = {}\n pcounts = self._get_particle_type_counts()\n field_cache = {}\n for dom in self.data_files:\n if dom.filename in field_cache:\n fl, _units = field_cache[dom.filename]\n else:\n fl, _units = self.io._identify_fields(dom)\n field_cache[dom.filename] = fl, _units\n units.update(_units)\n dom._calculate_offsets(fl, pcounts)\n for f in fl:\n if f not in dsl:\n dsl.append(f)\n self.field_list = dsl\n ds = self.dataset\n ds.particle_types = tuple({pt for pt, ds in dsl})\n # This is an attribute that means these particle types *actually*\n # exist. As in, they are real, in the dataset.\n ds.field_units.update(units)\n ds.particle_types_raw = ds.particle_types\n\n def _identify_base_chunk(self, dobj):\n # Must check that chunk_info contains the right number of ghost zones\n if getattr(dobj, \"_chunk_info\", None) is None:\n if isinstance(dobj, ParticleContainer):\n dobj._chunk_info = [dobj]\n else:\n # TODO: only return files\n if getattr(dobj.selector, \"is_all_data\", False):\n nfiles = self.regions.nfiles\n dfi = np.arange(nfiles)\n else:\n dfi, file_masks, addfi = self.regions.identify_file_masks(\n dobj.selector\n )\n nfiles = len(file_masks)\n dobj._chunk_info = [None for _ in range(nfiles)]\n\n # The following was moved here from ParticleContainer in order\n # to make the ParticleContainer object pickleable. By having\n # the base_selector as its own argument, we avoid having to\n # rebuild the index on unpickling a ParticleContainer.\n if hasattr(dobj, \"base_selector\"):\n base_selector = dobj.base_selector\n base_region = dobj.base_region\n else:\n base_region = dobj\n base_selector = dobj.selector\n\n for i in range(nfiles):\n domain_id = i + 1\n dobj._chunk_info[i] = ParticleContainer(\n base_region,\n base_selector,\n [self.data_files[dfi[i]]],\n domain_id=domain_id,\n )\n # NOTE: One fun thing about the way IO works is that it\n # consolidates things quite nicely. So we should feel free to\n # create as many objects as part of the chunk as we want, since\n # it'll take the set() of them. So if we break stuff up like\n # this here, we end up in a situation where we have the ability\n # to break things down further later on for buffer zones and the\n # like.\n (dobj._current_chunk,) = self._chunk_all(dobj)\n\n def _chunk_all(self, dobj):\n oobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n yield YTDataChunk(dobj, \"all\", oobjs, None)\n\n def _chunk_spatial(self, dobj, ngz, sort=None, preload_fields=None):\n sobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n for og in sobjs:\n with og._expand_data_files():\n if ngz > 0:\n g = og.retrieve_ghost_zones(ngz, [], smoothed=True)\n else:\n g = og\n yield YTDataChunk(dobj, \"spatial\", [g])\n\n def _chunk_io(self, dobj, cache=True, local_only=False):\n oobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n for container in oobjs:\n yield YTDataChunk(dobj, \"io\", [container], None, cache=cache)\n\n def _generate_hash(self):\n # Generate an FNV hash by creating a byte array containing the\n # modification time of as well as the first and last 1 MB of data in\n # every output file\n ret = bytearray()\n for pfile in self.data_files:\n\n # only look at \"real\" files, not \"fake\" files generated by the\n # chunking system\n if pfile.start not in (0, None):\n continue\n try:\n mtime = os.path.getmtime(pfile.filename)\n except OSError as e:\n if e.errno == errno.ENOENT:\n # this is an in-memory file so we return with a dummy\n # value\n return -1\n else:\n raise\n ret.extend(str(mtime).encode(\"utf-8\"))\n size = os.path.getsize(pfile.filename)\n if size > 1e6:\n size = int(1e6)\n with open(pfile.filename, \"rb\") as fh:\n # read in first and last 1 MB of data\n data = fh.read(size)\n fh.seek(-size, os.SEEK_END)\n data = fh.read(size)\n ret.extend(data)\n return fnv_hash(ret)\n\n def _initialize_frontend_specific(self):\n \"\"\"This is for frontend-specific initialization code\n\n If there are frontend-specific things that need to be set while\n creating the index, this function forces these operations to happen\n in cases where we are reloading the index from a sidecar file.\n \"\"\"\n pass\n"
},
{
"file": "yt/geometry/particle_geometry_handler.py",
"description": "This line, which happens *outside* the loop over particle positions, compresses the bitarrays. That way we keep our memory usage to a minimum.",
"line": 209,
"contents": "import collections\nimport errno\nimport os\nimport struct\nimport weakref\n\nimport numpy as np\n\nfrom yt.data_objects.index_subobjects.particle_container import ParticleContainer\nfrom yt.funcs import get_pbar, only_on_root\nfrom yt.geometry.geometry_handler import Index, YTDataChunk\nfrom yt.geometry.particle_oct_container import ParticleBitmap\nfrom yt.utilities.lib.fnv_hash import fnv_hash\nfrom yt.utilities.logger import ytLogger as mylog\n\n\nclass ParticleIndex(Index):\n \"\"\"The Index subclass for particle datasets\"\"\"\n\n def __init__(self, ds, dataset_type):\n self.dataset_type = dataset_type\n self.dataset = weakref.proxy(ds)\n self.float_type = np.float64\n super().__init__(ds, dataset_type)\n self._initialize_index()\n\n def _setup_geometry(self):\n self.regions = None\n\n def get_smallest_dx(self):\n \"\"\"\n Returns (in code units) the smallest cell size in the simulation.\n \"\"\"\n return self.ds.arr(0, \"code_length\")\n\n def _get_particle_type_counts(self):\n result = collections.defaultdict(lambda: 0)\n for df in self.data_files:\n for k in df.total_particles.keys():\n result[k] += df.total_particles[k]\n return dict(result)\n\n def convert(self, unit):\n return self.dataset.conversion_factors[unit]\n\n @property\n def chunksize(self):\n # This can be overridden in subclasses\n return 64 ** 3\n\n _data_files = None\n\n @property\n def data_files(self):\n if self._data_files is not None:\n return self._data_files\n\n self._setup_filenames()\n return self._data_files\n\n @data_files.setter\n def data_files(self, value):\n self._data_files = value\n\n _total_particles = None\n\n @property\n def total_particles(self):\n if self._total_particles is not None:\n return self._total_particles\n\n self._total_particles = sum(\n sum(d.total_particles.values()) for d in self.data_files\n )\n return self._total_particles\n\n def _setup_filenames(self):\n template = self.dataset.filename_template\n ndoms = self.dataset.file_count\n cls = self.dataset._file_class\n self.data_files = []\n fi = 0\n for i in range(int(ndoms)):\n start = 0\n if self.chunksize > 0:\n end = start + self.chunksize\n else:\n end = None\n while True:\n df = cls(self.dataset, self.io, template % {\"num\": i}, fi, (start, end))\n if max(df.total_particles.values()) == 0:\n break\n fi += 1\n self.data_files.append(df)\n if self.chunksize <= 0:\n break\n start = end\n end += self.chunksize\n\n def _initialize_index(self):\n ds = self.dataset\n only_on_root(\n mylog.info,\n \"Allocating for %0.3e particles\",\n self.total_particles,\n global_rootonly=True,\n )\n\n # if we have not yet set domain_left_edge and domain_right_edge then do\n # an I/O pass over the particle coordinates to determine a bounding box\n if self.ds.domain_left_edge is None:\n min_ppos = np.empty(3, dtype=\"float64\")\n min_ppos[:] = np.nan\n max_ppos = np.empty(3, dtype=\"float64\")\n max_ppos[:] = np.nan\n only_on_root(\n mylog.info,\n \"Bounding box cannot be inferred from metadata, reading \"\n \"particle positions to infer bounding box\",\n )\n for df in self.data_files:\n for _, ppos in self.io._yield_coordinates(df):\n min_ppos = np.nanmin(np.vstack([min_ppos, ppos]), axis=0)\n max_ppos = np.nanmax(np.vstack([max_ppos, ppos]), axis=0)\n only_on_root(\n mylog.info,\n \"Load this dataset with bounding_box=[%s, %s] to avoid I/O \"\n \"overhead from inferring bounding_box.\" % (min_ppos, max_ppos),\n )\n ds.domain_left_edge = ds.arr(1.05 * min_ppos, \"code_length\")\n ds.domain_right_edge = ds.arr(1.05 * max_ppos, \"code_length\")\n ds.domain_width = ds.domain_right_edge - ds.domain_left_edge\n\n # use a trivial morton index for datasets containing a single chunk\n if len(self.data_files) == 1:\n order1 = 1\n order2 = 1\n else:\n order1 = ds.index_order[0]\n order2 = ds.index_order[1]\n\n if order1 == 1 and order2 == 1:\n dont_cache = True\n else:\n dont_cache = False\n\n # If we have applied a bounding box then we can't cache the\n # ParticleBitmap because it is doman dependent\n if getattr(ds, \"_domain_override\", False):\n dont_cache = True\n\n if not hasattr(self.ds, \"_file_hash\"):\n self.ds._file_hash = self._generate_hash()\n\n self.regions = ParticleBitmap(\n ds.domain_left_edge,\n ds.domain_right_edge,\n ds.periodicity,\n self.ds._file_hash,\n len(self.data_files),\n index_order1=order1,\n index_order2=order2,\n )\n\n # Load Morton index from file if provided\n if getattr(ds, \"index_filename\", None) is None:\n fname = ds.parameter_filename + \".index{}_{}.ewah\".format(\n self.regions.index_order1, self.regions.index_order2\n )\n else:\n fname = ds.index_filename\n\n dont_load = dont_cache and not hasattr(ds, \"index_filename\")\n try:\n if dont_load:\n raise OSError\n rflag = self.regions.load_bitmasks(fname)\n rflag = self.regions.check_bitmasks()\n self._initialize_frontend_specific()\n if rflag == 0:\n raise OSError\n except (OSError, struct.error):\n self.regions.reset_bitmasks()\n self._initialize_coarse_index()\n self._initialize_refined_index()\n wdir = os.path.dirname(fname)\n if not dont_cache and os.access(wdir, os.W_OK):\n # Sometimes os mis-reports whether a directory is writable,\n # So pass if writing the bitmask file fails.\n try:\n self.regions.save_bitmasks(fname)\n except OSError:\n pass\n rflag = self.regions.check_bitmasks()\n\n def _initialize_coarse_index(self):\n pb = get_pbar(\"Initializing coarse index \", len(self.data_files))\n for i, data_file in enumerate(self.data_files):\n pb.update(i + 1)\n for ptype, pos in self.io._yield_coordinates(data_file):\n ds = self.ds\n if hasattr(ds, \"_sph_ptypes\") and ptype == ds._sph_ptypes[0]:\n hsml = self.io._get_smoothing_length(\n data_file, pos.dtype, pos.shape\n )\n else:\n hsml = None\n self.regions._coarse_index_data_file(pos, hsml, data_file.file_id)\n self.regions._set_coarse_index_data_file(data_file.file_id)\n pb.finish()\n self.regions.find_collisions_coarse()\n\n def _initialize_refined_index(self):\n mask = self.regions.masks.sum(axis=1).astype(\"uint8\")\n max_npart = max(sum(d.total_particles.values()) for d in self.data_files) * 28\n sub_mi1 = np.zeros(max_npart, \"uint64\")\n sub_mi2 = np.zeros(max_npart, \"uint64\")\n pb = get_pbar(\"Initializing refined index\", len(self.data_files))\n mask_threshold = getattr(self, \"_index_mask_threshold\", 2)\n count_threshold = getattr(self, \"_index_count_threshold\", 256)\n mylog.debug(\n \"Using estimated thresholds of %s and %s for refinement\",\n mask_threshold,\n count_threshold,\n )\n total_refined = 0\n total_coarse_refined = (\n (mask >= 2) & (self.regions.particle_counts > count_threshold)\n ).sum()\n mylog.debug(\n \"This should produce roughly %s zones, for %s of the domain\",\n total_coarse_refined,\n 100 * total_coarse_refined / mask.size,\n )\n for i, data_file in enumerate(self.data_files):\n coll = None\n pb.update(i + 1)\n nsub_mi = 0\n for ptype, pos in self.io._yield_coordinates(data_file):\n if pos.size == 0:\n continue\n if hasattr(self.ds, \"_sph_ptypes\") and ptype == self.ds._sph_ptypes[0]:\n hsml = self.io._get_smoothing_length(\n data_file, pos.dtype, pos.shape\n )\n else:\n hsml = None\n nsub_mi, coll = self.regions._refined_index_data_file(\n coll,\n pos,\n hsml,\n mask,\n sub_mi1,\n sub_mi2,\n data_file.file_id,\n nsub_mi,\n count_threshold=count_threshold,\n mask_threshold=mask_threshold,\n )\n total_refined += nsub_mi\n self.regions.bitmasks.append(data_file.file_id, coll)\n pb.finish()\n self.regions.find_collisions_refined()\n\n def _detect_output_fields(self):\n # TODO: Add additional fields\n dsl = []\n units = {}\n pcounts = self._get_particle_type_counts()\n field_cache = {}\n for dom in self.data_files:\n if dom.filename in field_cache:\n fl, _units = field_cache[dom.filename]\n else:\n fl, _units = self.io._identify_fields(dom)\n field_cache[dom.filename] = fl, _units\n units.update(_units)\n dom._calculate_offsets(fl, pcounts)\n for f in fl:\n if f not in dsl:\n dsl.append(f)\n self.field_list = dsl\n ds = self.dataset\n ds.particle_types = tuple({pt for pt, ds in dsl})\n # This is an attribute that means these particle types *actually*\n # exist. As in, they are real, in the dataset.\n ds.field_units.update(units)\n ds.particle_types_raw = ds.particle_types\n\n def _identify_base_chunk(self, dobj):\n # Must check that chunk_info contains the right number of ghost zones\n if getattr(dobj, \"_chunk_info\", None) is None:\n if isinstance(dobj, ParticleContainer):\n dobj._chunk_info = [dobj]\n else:\n # TODO: only return files\n if getattr(dobj.selector, \"is_all_data\", False):\n nfiles = self.regions.nfiles\n dfi = np.arange(nfiles)\n else:\n dfi, file_masks, addfi = self.regions.identify_file_masks(\n dobj.selector\n )\n nfiles = len(file_masks)\n dobj._chunk_info = [None for _ in range(nfiles)]\n\n # The following was moved here from ParticleContainer in order\n # to make the ParticleContainer object pickleable. By having\n # the base_selector as its own argument, we avoid having to\n # rebuild the index on unpickling a ParticleContainer.\n if hasattr(dobj, \"base_selector\"):\n base_selector = dobj.base_selector\n base_region = dobj.base_region\n else:\n base_region = dobj\n base_selector = dobj.selector\n\n for i in range(nfiles):\n domain_id = i + 1\n dobj._chunk_info[i] = ParticleContainer(\n base_region,\n base_selector,\n [self.data_files[dfi[i]]],\n domain_id=domain_id,\n )\n # NOTE: One fun thing about the way IO works is that it\n # consolidates things quite nicely. So we should feel free to\n # create as many objects as part of the chunk as we want, since\n # it'll take the set() of them. So if we break stuff up like\n # this here, we end up in a situation where we have the ability\n # to break things down further later on for buffer zones and the\n # like.\n (dobj._current_chunk,) = self._chunk_all(dobj)\n\n def _chunk_all(self, dobj):\n oobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n yield YTDataChunk(dobj, \"all\", oobjs, None)\n\n def _chunk_spatial(self, dobj, ngz, sort=None, preload_fields=None):\n sobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n for og in sobjs:\n with og._expand_data_files():\n if ngz > 0:\n g = og.retrieve_ghost_zones(ngz, [], smoothed=True)\n else:\n g = og\n yield YTDataChunk(dobj, \"spatial\", [g])\n\n def _chunk_io(self, dobj, cache=True, local_only=False):\n oobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n for container in oobjs:\n yield YTDataChunk(dobj, \"io\", [container], None, cache=cache)\n\n def _generate_hash(self):\n # Generate an FNV hash by creating a byte array containing the\n # modification time of as well as the first and last 1 MB of data in\n # every output file\n ret = bytearray()\n for pfile in self.data_files:\n\n # only look at \"real\" files, not \"fake\" files generated by the\n # chunking system\n if pfile.start not in (0, None):\n continue\n try:\n mtime = os.path.getmtime(pfile.filename)\n except OSError as e:\n if e.errno == errno.ENOENT:\n # this is an in-memory file so we return with a dummy\n # value\n return -1\n else:\n raise\n ret.extend(str(mtime).encode(\"utf-8\"))\n size = os.path.getsize(pfile.filename)\n if size > 1e6:\n size = int(1e6)\n with open(pfile.filename, \"rb\") as fh:\n # read in first and last 1 MB of data\n data = fh.read(size)\n fh.seek(-size, os.SEEK_END)\n data = fh.read(size)\n ret.extend(data)\n return fnv_hash(ret)\n\n def _initialize_frontend_specific(self):\n \"\"\"This is for frontend-specific initialization code\n\n If there are frontend-specific things that need to be set while\n creating the index, this function forces these operations to happen\n in cases where we are reloading the index from a sidecar file.\n \"\"\"\n pass\n"
},
{
"file": "yt/geometry/particle_geometry_handler.py",
"description": "This line is crucial for the next step of computing the refined indexes. It conducts a set of logical operations to see which bits in the array are set *more than once* -- which means that there's more than one place you'll have to look if you're looking for particles in that region. It's in those places that we make a second, more refined index.",
"line": 211,
"contents": "import collections\nimport errno\nimport os\nimport struct\nimport weakref\n\nimport numpy as np\n\nfrom yt.data_objects.index_subobjects.particle_container import ParticleContainer\nfrom yt.funcs import get_pbar, only_on_root\nfrom yt.geometry.geometry_handler import Index, YTDataChunk\nfrom yt.geometry.particle_oct_container import ParticleBitmap\nfrom yt.utilities.lib.fnv_hash import fnv_hash\nfrom yt.utilities.logger import ytLogger as mylog\n\n\nclass ParticleIndex(Index):\n \"\"\"The Index subclass for particle datasets\"\"\"\n\n def __init__(self, ds, dataset_type):\n self.dataset_type = dataset_type\n self.dataset = weakref.proxy(ds)\n self.float_type = np.float64\n super().__init__(ds, dataset_type)\n self._initialize_index()\n\n def _setup_geometry(self):\n self.regions = None\n\n def get_smallest_dx(self):\n \"\"\"\n Returns (in code units) the smallest cell size in the simulation.\n \"\"\"\n return self.ds.arr(0, \"code_length\")\n\n def _get_particle_type_counts(self):\n result = collections.defaultdict(lambda: 0)\n for df in self.data_files:\n for k in df.total_particles.keys():\n result[k] += df.total_particles[k]\n return dict(result)\n\n def convert(self, unit):\n return self.dataset.conversion_factors[unit]\n\n @property\n def chunksize(self):\n # This can be overridden in subclasses\n return 64 ** 3\n\n _data_files = None\n\n @property\n def data_files(self):\n if self._data_files is not None:\n return self._data_files\n\n self._setup_filenames()\n return self._data_files\n\n @data_files.setter\n def data_files(self, value):\n self._data_files = value\n\n _total_particles = None\n\n @property\n def total_particles(self):\n if self._total_particles is not None:\n return self._total_particles\n\n self._total_particles = sum(\n sum(d.total_particles.values()) for d in self.data_files\n )\n return self._total_particles\n\n def _setup_filenames(self):\n template = self.dataset.filename_template\n ndoms = self.dataset.file_count\n cls = self.dataset._file_class\n self.data_files = []\n fi = 0\n for i in range(int(ndoms)):\n start = 0\n if self.chunksize > 0:\n end = start + self.chunksize\n else:\n end = None\n while True:\n df = cls(self.dataset, self.io, template % {\"num\": i}, fi, (start, end))\n if max(df.total_particles.values()) == 0:\n break\n fi += 1\n self.data_files.append(df)\n if self.chunksize <= 0:\n break\n start = end\n end += self.chunksize\n\n def _initialize_index(self):\n ds = self.dataset\n only_on_root(\n mylog.info,\n \"Allocating for %0.3e particles\",\n self.total_particles,\n global_rootonly=True,\n )\n\n # if we have not yet set domain_left_edge and domain_right_edge then do\n # an I/O pass over the particle coordinates to determine a bounding box\n if self.ds.domain_left_edge is None:\n min_ppos = np.empty(3, dtype=\"float64\")\n min_ppos[:] = np.nan\n max_ppos = np.empty(3, dtype=\"float64\")\n max_ppos[:] = np.nan\n only_on_root(\n mylog.info,\n \"Bounding box cannot be inferred from metadata, reading \"\n \"particle positions to infer bounding box\",\n )\n for df in self.data_files:\n for _, ppos in self.io._yield_coordinates(df):\n min_ppos = np.nanmin(np.vstack([min_ppos, ppos]), axis=0)\n max_ppos = np.nanmax(np.vstack([max_ppos, ppos]), axis=0)\n only_on_root(\n mylog.info,\n \"Load this dataset with bounding_box=[%s, %s] to avoid I/O \"\n \"overhead from inferring bounding_box.\" % (min_ppos, max_ppos),\n )\n ds.domain_left_edge = ds.arr(1.05 * min_ppos, \"code_length\")\n ds.domain_right_edge = ds.arr(1.05 * max_ppos, \"code_length\")\n ds.domain_width = ds.domain_right_edge - ds.domain_left_edge\n\n # use a trivial morton index for datasets containing a single chunk\n if len(self.data_files) == 1:\n order1 = 1\n order2 = 1\n else:\n order1 = ds.index_order[0]\n order2 = ds.index_order[1]\n\n if order1 == 1 and order2 == 1:\n dont_cache = True\n else:\n dont_cache = False\n\n # If we have applied a bounding box then we can't cache the\n # ParticleBitmap because it is doman dependent\n if getattr(ds, \"_domain_override\", False):\n dont_cache = True\n\n if not hasattr(self.ds, \"_file_hash\"):\n self.ds._file_hash = self._generate_hash()\n\n self.regions = ParticleBitmap(\n ds.domain_left_edge,\n ds.domain_right_edge,\n ds.periodicity,\n self.ds._file_hash,\n len(self.data_files),\n index_order1=order1,\n index_order2=order2,\n )\n\n # Load Morton index from file if provided\n if getattr(ds, \"index_filename\", None) is None:\n fname = ds.parameter_filename + \".index{}_{}.ewah\".format(\n self.regions.index_order1, self.regions.index_order2\n )\n else:\n fname = ds.index_filename\n\n dont_load = dont_cache and not hasattr(ds, \"index_filename\")\n try:\n if dont_load:\n raise OSError\n rflag = self.regions.load_bitmasks(fname)\n rflag = self.regions.check_bitmasks()\n self._initialize_frontend_specific()\n if rflag == 0:\n raise OSError\n except (OSError, struct.error):\n self.regions.reset_bitmasks()\n self._initialize_coarse_index()\n self._initialize_refined_index()\n wdir = os.path.dirname(fname)\n if not dont_cache and os.access(wdir, os.W_OK):\n # Sometimes os mis-reports whether a directory is writable,\n # So pass if writing the bitmask file fails.\n try:\n self.regions.save_bitmasks(fname)\n except OSError:\n pass\n rflag = self.regions.check_bitmasks()\n\n def _initialize_coarse_index(self):\n pb = get_pbar(\"Initializing coarse index \", len(self.data_files))\n for i, data_file in enumerate(self.data_files):\n pb.update(i + 1)\n for ptype, pos in self.io._yield_coordinates(data_file):\n ds = self.ds\n if hasattr(ds, \"_sph_ptypes\") and ptype == ds._sph_ptypes[0]:\n hsml = self.io._get_smoothing_length(\n data_file, pos.dtype, pos.shape\n )\n else:\n hsml = None\n self.regions._coarse_index_data_file(pos, hsml, data_file.file_id)\n self.regions._set_coarse_index_data_file(data_file.file_id)\n pb.finish()\n self.regions.find_collisions_coarse()\n\n def _initialize_refined_index(self):\n mask = self.regions.masks.sum(axis=1).astype(\"uint8\")\n max_npart = max(sum(d.total_particles.values()) for d in self.data_files) * 28\n sub_mi1 = np.zeros(max_npart, \"uint64\")\n sub_mi2 = np.zeros(max_npart, \"uint64\")\n pb = get_pbar(\"Initializing refined index\", len(self.data_files))\n mask_threshold = getattr(self, \"_index_mask_threshold\", 2)\n count_threshold = getattr(self, \"_index_count_threshold\", 256)\n mylog.debug(\n \"Using estimated thresholds of %s and %s for refinement\",\n mask_threshold,\n count_threshold,\n )\n total_refined = 0\n total_coarse_refined = (\n (mask >= 2) & (self.regions.particle_counts > count_threshold)\n ).sum()\n mylog.debug(\n \"This should produce roughly %s zones, for %s of the domain\",\n total_coarse_refined,\n 100 * total_coarse_refined / mask.size,\n )\n for i, data_file in enumerate(self.data_files):\n coll = None\n pb.update(i + 1)\n nsub_mi = 0\n for ptype, pos in self.io._yield_coordinates(data_file):\n if pos.size == 0:\n continue\n if hasattr(self.ds, \"_sph_ptypes\") and ptype == self.ds._sph_ptypes[0]:\n hsml = self.io._get_smoothing_length(\n data_file, pos.dtype, pos.shape\n )\n else:\n hsml = None\n nsub_mi, coll = self.regions._refined_index_data_file(\n coll,\n pos,\n hsml,\n mask,\n sub_mi1,\n sub_mi2,\n data_file.file_id,\n nsub_mi,\n count_threshold=count_threshold,\n mask_threshold=mask_threshold,\n )\n total_refined += nsub_mi\n self.regions.bitmasks.append(data_file.file_id, coll)\n pb.finish()\n self.regions.find_collisions_refined()\n\n def _detect_output_fields(self):\n # TODO: Add additional fields\n dsl = []\n units = {}\n pcounts = self._get_particle_type_counts()\n field_cache = {}\n for dom in self.data_files:\n if dom.filename in field_cache:\n fl, _units = field_cache[dom.filename]\n else:\n fl, _units = self.io._identify_fields(dom)\n field_cache[dom.filename] = fl, _units\n units.update(_units)\n dom._calculate_offsets(fl, pcounts)\n for f in fl:\n if f not in dsl:\n dsl.append(f)\n self.field_list = dsl\n ds = self.dataset\n ds.particle_types = tuple({pt for pt, ds in dsl})\n # This is an attribute that means these particle types *actually*\n # exist. As in, they are real, in the dataset.\n ds.field_units.update(units)\n ds.particle_types_raw = ds.particle_types\n\n def _identify_base_chunk(self, dobj):\n # Must check that chunk_info contains the right number of ghost zones\n if getattr(dobj, \"_chunk_info\", None) is None:\n if isinstance(dobj, ParticleContainer):\n dobj._chunk_info = [dobj]\n else:\n # TODO: only return files\n if getattr(dobj.selector, \"is_all_data\", False):\n nfiles = self.regions.nfiles\n dfi = np.arange(nfiles)\n else:\n dfi, file_masks, addfi = self.regions.identify_file_masks(\n dobj.selector\n )\n nfiles = len(file_masks)\n dobj._chunk_info = [None for _ in range(nfiles)]\n\n # The following was moved here from ParticleContainer in order\n # to make the ParticleContainer object pickleable. By having\n # the base_selector as its own argument, we avoid having to\n # rebuild the index on unpickling a ParticleContainer.\n if hasattr(dobj, \"base_selector\"):\n base_selector = dobj.base_selector\n base_region = dobj.base_region\n else:\n base_region = dobj\n base_selector = dobj.selector\n\n for i in range(nfiles):\n domain_id = i + 1\n dobj._chunk_info[i] = ParticleContainer(\n base_region,\n base_selector,\n [self.data_files[dfi[i]]],\n domain_id=domain_id,\n )\n # NOTE: One fun thing about the way IO works is that it\n # consolidates things quite nicely. So we should feel free to\n # create as many objects as part of the chunk as we want, since\n # it'll take the set() of them. So if we break stuff up like\n # this here, we end up in a situation where we have the ability\n # to break things down further later on for buffer zones and the\n # like.\n (dobj._current_chunk,) = self._chunk_all(dobj)\n\n def _chunk_all(self, dobj):\n oobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n yield YTDataChunk(dobj, \"all\", oobjs, None)\n\n def _chunk_spatial(self, dobj, ngz, sort=None, preload_fields=None):\n sobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n for og in sobjs:\n with og._expand_data_files():\n if ngz > 0:\n g = og.retrieve_ghost_zones(ngz, [], smoothed=True)\n else:\n g = og\n yield YTDataChunk(dobj, \"spatial\", [g])\n\n def _chunk_io(self, dobj, cache=True, local_only=False):\n oobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n for container in oobjs:\n yield YTDataChunk(dobj, \"io\", [container], None, cache=cache)\n\n def _generate_hash(self):\n # Generate an FNV hash by creating a byte array containing the\n # modification time of as well as the first and last 1 MB of data in\n # every output file\n ret = bytearray()\n for pfile in self.data_files:\n\n # only look at \"real\" files, not \"fake\" files generated by the\n # chunking system\n if pfile.start not in (0, None):\n continue\n try:\n mtime = os.path.getmtime(pfile.filename)\n except OSError as e:\n if e.errno == errno.ENOENT:\n # this is an in-memory file so we return with a dummy\n # value\n return -1\n else:\n raise\n ret.extend(str(mtime).encode(\"utf-8\"))\n size = os.path.getsize(pfile.filename)\n if size > 1e6:\n size = int(1e6)\n with open(pfile.filename, \"rb\") as fh:\n # read in first and last 1 MB of data\n data = fh.read(size)\n fh.seek(-size, os.SEEK_END)\n data = fh.read(size)\n ret.extend(data)\n return fnv_hash(ret)\n\n def _initialize_frontend_specific(self):\n \"\"\"This is for frontend-specific initialization code\n\n If there are frontend-specific things that need to be set while\n creating the index, this function forces these operations to happen\n in cases where we are reloading the index from a sidecar file.\n \"\"\"\n pass\n"
},
{
"file": "yt/geometry/particle_geometry_handler.py",
"description": "This is the tricky part, where we conduct a second bitmapping operation.\n\nHere, we look at those places where collisions in the coarse index have occurred. We want to do our best to disambiguate the regions that different data files encompass, so in all of those regions, we insert a new *subdivided* region. So for instance, if the region in `i,j,k` of the coarse index is touched by a couple files, we insert in `i,j,k` a whole new index, this time based on `index_order2`. And, we index that. Because we're using compressed indexes, this usually does not take up that much memory, but it can get unwieldy in those cases where you have particles with big smoothing lengths that need to be taken into account.\n\nThis loop requires a bit more of a dance, because for each collision in the coarse index, we have to construct a refined index. So the memory usage is a bit higher here, because we can't compress the arrays until we're totally done with them.\n\nBut, at the end of this, what we have is a set of bitmaps -- one for each data file -- that tell us where the data file exerts its influence. And then, wherever more than one data file exerts its influence, we have *another* set of bitmaps for each of *those* data files that tell us which sub-regions are impacted.\n\nThis then gets used whenever we want to read data, to tell us which data files we have to look at. For sub-selecting from a really big set of particles, we can save an awful lot of time and IO!",
"line": 235,
"contents": "import collections\nimport errno\nimport os\nimport struct\nimport weakref\n\nimport numpy as np\n\nfrom yt.data_objects.index_subobjects.particle_container import ParticleContainer\nfrom yt.funcs import get_pbar, only_on_root\nfrom yt.geometry.geometry_handler import Index, YTDataChunk\nfrom yt.geometry.particle_oct_container import ParticleBitmap\nfrom yt.utilities.lib.fnv_hash import fnv_hash\nfrom yt.utilities.logger import ytLogger as mylog\n\n\nclass ParticleIndex(Index):\n \"\"\"The Index subclass for particle datasets\"\"\"\n\n def __init__(self, ds, dataset_type):\n self.dataset_type = dataset_type\n self.dataset = weakref.proxy(ds)\n self.float_type = np.float64\n super().__init__(ds, dataset_type)\n self._initialize_index()\n\n def _setup_geometry(self):\n self.regions = None\n\n def get_smallest_dx(self):\n \"\"\"\n Returns (in code units) the smallest cell size in the simulation.\n \"\"\"\n return self.ds.arr(0, \"code_length\")\n\n def _get_particle_type_counts(self):\n result = collections.defaultdict(lambda: 0)\n for df in self.data_files:\n for k in df.total_particles.keys():\n result[k] += df.total_particles[k]\n return dict(result)\n\n def convert(self, unit):\n return self.dataset.conversion_factors[unit]\n\n @property\n def chunksize(self):\n # This can be overridden in subclasses\n return 64 ** 3\n\n _data_files = None\n\n @property\n def data_files(self):\n if self._data_files is not None:\n return self._data_files\n\n self._setup_filenames()\n return self._data_files\n\n @data_files.setter\n def data_files(self, value):\n self._data_files = value\n\n _total_particles = None\n\n @property\n def total_particles(self):\n if self._total_particles is not None:\n return self._total_particles\n\n self._total_particles = sum(\n sum(d.total_particles.values()) for d in self.data_files\n )\n return self._total_particles\n\n def _setup_filenames(self):\n template = self.dataset.filename_template\n ndoms = self.dataset.file_count\n cls = self.dataset._file_class\n self.data_files = []\n fi = 0\n for i in range(int(ndoms)):\n start = 0\n if self.chunksize > 0:\n end = start + self.chunksize\n else:\n end = None\n while True:\n df = cls(self.dataset, self.io, template % {\"num\": i}, fi, (start, end))\n if max(df.total_particles.values()) == 0:\n break\n fi += 1\n self.data_files.append(df)\n if self.chunksize <= 0:\n break\n start = end\n end += self.chunksize\n\n def _initialize_index(self):\n ds = self.dataset\n only_on_root(\n mylog.info,\n \"Allocating for %0.3e particles\",\n self.total_particles,\n global_rootonly=True,\n )\n\n # if we have not yet set domain_left_edge and domain_right_edge then do\n # an I/O pass over the particle coordinates to determine a bounding box\n if self.ds.domain_left_edge is None:\n min_ppos = np.empty(3, dtype=\"float64\")\n min_ppos[:] = np.nan\n max_ppos = np.empty(3, dtype=\"float64\")\n max_ppos[:] = np.nan\n only_on_root(\n mylog.info,\n \"Bounding box cannot be inferred from metadata, reading \"\n \"particle positions to infer bounding box\",\n )\n for df in self.data_files:\n for _, ppos in self.io._yield_coordinates(df):\n min_ppos = np.nanmin(np.vstack([min_ppos, ppos]), axis=0)\n max_ppos = np.nanmax(np.vstack([max_ppos, ppos]), axis=0)\n only_on_root(\n mylog.info,\n \"Load this dataset with bounding_box=[%s, %s] to avoid I/O \"\n \"overhead from inferring bounding_box.\" % (min_ppos, max_ppos),\n )\n ds.domain_left_edge = ds.arr(1.05 * min_ppos, \"code_length\")\n ds.domain_right_edge = ds.arr(1.05 * max_ppos, \"code_length\")\n ds.domain_width = ds.domain_right_edge - ds.domain_left_edge\n\n # use a trivial morton index for datasets containing a single chunk\n if len(self.data_files) == 1:\n order1 = 1\n order2 = 1\n else:\n order1 = ds.index_order[0]\n order2 = ds.index_order[1]\n\n if order1 == 1 and order2 == 1:\n dont_cache = True\n else:\n dont_cache = False\n\n # If we have applied a bounding box then we can't cache the\n # ParticleBitmap because it is doman dependent\n if getattr(ds, \"_domain_override\", False):\n dont_cache = True\n\n if not hasattr(self.ds, \"_file_hash\"):\n self.ds._file_hash = self._generate_hash()\n\n self.regions = ParticleBitmap(\n ds.domain_left_edge,\n ds.domain_right_edge,\n ds.periodicity,\n self.ds._file_hash,\n len(self.data_files),\n index_order1=order1,\n index_order2=order2,\n )\n\n # Load Morton index from file if provided\n if getattr(ds, \"index_filename\", None) is None:\n fname = ds.parameter_filename + \".index{}_{}.ewah\".format(\n self.regions.index_order1, self.regions.index_order2\n )\n else:\n fname = ds.index_filename\n\n dont_load = dont_cache and not hasattr(ds, \"index_filename\")\n try:\n if dont_load:\n raise OSError\n rflag = self.regions.load_bitmasks(fname)\n rflag = self.regions.check_bitmasks()\n self._initialize_frontend_specific()\n if rflag == 0:\n raise OSError\n except (OSError, struct.error):\n self.regions.reset_bitmasks()\n self._initialize_coarse_index()\n self._initialize_refined_index()\n wdir = os.path.dirname(fname)\n if not dont_cache and os.access(wdir, os.W_OK):\n # Sometimes os mis-reports whether a directory is writable,\n # So pass if writing the bitmask file fails.\n try:\n self.regions.save_bitmasks(fname)\n except OSError:\n pass\n rflag = self.regions.check_bitmasks()\n\n def _initialize_coarse_index(self):\n pb = get_pbar(\"Initializing coarse index \", len(self.data_files))\n for i, data_file in enumerate(self.data_files):\n pb.update(i + 1)\n for ptype, pos in self.io._yield_coordinates(data_file):\n ds = self.ds\n if hasattr(ds, \"_sph_ptypes\") and ptype == ds._sph_ptypes[0]:\n hsml = self.io._get_smoothing_length(\n data_file, pos.dtype, pos.shape\n )\n else:\n hsml = None\n self.regions._coarse_index_data_file(pos, hsml, data_file.file_id)\n self.regions._set_coarse_index_data_file(data_file.file_id)\n pb.finish()\n self.regions.find_collisions_coarse()\n\n def _initialize_refined_index(self):\n mask = self.regions.masks.sum(axis=1).astype(\"uint8\")\n max_npart = max(sum(d.total_particles.values()) for d in self.data_files) * 28\n sub_mi1 = np.zeros(max_npart, \"uint64\")\n sub_mi2 = np.zeros(max_npart, \"uint64\")\n pb = get_pbar(\"Initializing refined index\", len(self.data_files))\n mask_threshold = getattr(self, \"_index_mask_threshold\", 2)\n count_threshold = getattr(self, \"_index_count_threshold\", 256)\n mylog.debug(\n \"Using estimated thresholds of %s and %s for refinement\",\n mask_threshold,\n count_threshold,\n )\n total_refined = 0\n total_coarse_refined = (\n (mask >= 2) & (self.regions.particle_counts > count_threshold)\n ).sum()\n mylog.debug(\n \"This should produce roughly %s zones, for %s of the domain\",\n total_coarse_refined,\n 100 * total_coarse_refined / mask.size,\n )\n for i, data_file in enumerate(self.data_files):\n coll = None\n pb.update(i + 1)\n nsub_mi = 0\n for ptype, pos in self.io._yield_coordinates(data_file):\n if pos.size == 0:\n continue\n if hasattr(self.ds, \"_sph_ptypes\") and ptype == self.ds._sph_ptypes[0]:\n hsml = self.io._get_smoothing_length(\n data_file, pos.dtype, pos.shape\n )\n else:\n hsml = None\n nsub_mi, coll = self.regions._refined_index_data_file(\n coll,\n pos,\n hsml,\n mask,\n sub_mi1,\n sub_mi2,\n data_file.file_id,\n nsub_mi,\n count_threshold=count_threshold,\n mask_threshold=mask_threshold,\n )\n total_refined += nsub_mi\n self.regions.bitmasks.append(data_file.file_id, coll)\n pb.finish()\n self.regions.find_collisions_refined()\n\n def _detect_output_fields(self):\n # TODO: Add additional fields\n dsl = []\n units = {}\n pcounts = self._get_particle_type_counts()\n field_cache = {}\n for dom in self.data_files:\n if dom.filename in field_cache:\n fl, _units = field_cache[dom.filename]\n else:\n fl, _units = self.io._identify_fields(dom)\n field_cache[dom.filename] = fl, _units\n units.update(_units)\n dom._calculate_offsets(fl, pcounts)\n for f in fl:\n if f not in dsl:\n dsl.append(f)\n self.field_list = dsl\n ds = self.dataset\n ds.particle_types = tuple({pt for pt, ds in dsl})\n # This is an attribute that means these particle types *actually*\n # exist. As in, they are real, in the dataset.\n ds.field_units.update(units)\n ds.particle_types_raw = ds.particle_types\n\n def _identify_base_chunk(self, dobj):\n # Must check that chunk_info contains the right number of ghost zones\n if getattr(dobj, \"_chunk_info\", None) is None:\n if isinstance(dobj, ParticleContainer):\n dobj._chunk_info = [dobj]\n else:\n # TODO: only return files\n if getattr(dobj.selector, \"is_all_data\", False):\n nfiles = self.regions.nfiles\n dfi = np.arange(nfiles)\n else:\n dfi, file_masks, addfi = self.regions.identify_file_masks(\n dobj.selector\n )\n nfiles = len(file_masks)\n dobj._chunk_info = [None for _ in range(nfiles)]\n\n # The following was moved here from ParticleContainer in order\n # to make the ParticleContainer object pickleable. By having\n # the base_selector as its own argument, we avoid having to\n # rebuild the index on unpickling a ParticleContainer.\n if hasattr(dobj, \"base_selector\"):\n base_selector = dobj.base_selector\n base_region = dobj.base_region\n else:\n base_region = dobj\n base_selector = dobj.selector\n\n for i in range(nfiles):\n domain_id = i + 1\n dobj._chunk_info[i] = ParticleContainer(\n base_region,\n base_selector,\n [self.data_files[dfi[i]]],\n domain_id=domain_id,\n )\n # NOTE: One fun thing about the way IO works is that it\n # consolidates things quite nicely. So we should feel free to\n # create as many objects as part of the chunk as we want, since\n # it'll take the set() of them. So if we break stuff up like\n # this here, we end up in a situation where we have the ability\n # to break things down further later on for buffer zones and the\n # like.\n (dobj._current_chunk,) = self._chunk_all(dobj)\n\n def _chunk_all(self, dobj):\n oobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n yield YTDataChunk(dobj, \"all\", oobjs, None)\n\n def _chunk_spatial(self, dobj, ngz, sort=None, preload_fields=None):\n sobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n for og in sobjs:\n with og._expand_data_files():\n if ngz > 0:\n g = og.retrieve_ghost_zones(ngz, [], smoothed=True)\n else:\n g = og\n yield YTDataChunk(dobj, \"spatial\", [g])\n\n def _chunk_io(self, dobj, cache=True, local_only=False):\n oobjs = getattr(dobj._current_chunk, \"objs\", dobj._chunk_info)\n for container in oobjs:\n yield YTDataChunk(dobj, \"io\", [container], None, cache=cache)\n\n def _generate_hash(self):\n # Generate an FNV hash by creating a byte array containing the\n # modification time of as well as the first and last 1 MB of data in\n # every output file\n ret = bytearray()\n for pfile in self.data_files:\n\n # only look at \"real\" files, not \"fake\" files generated by the\n # chunking system\n if pfile.start not in (0, None):\n continue\n try:\n mtime = os.path.getmtime(pfile.filename)\n except OSError as e:\n if e.errno == errno.ENOENT:\n # this is an in-memory file so we return with a dummy\n # value\n return -1\n else:\n raise\n ret.extend(str(mtime).encode(\"utf-8\"))\n size = os.path.getsize(pfile.filename)\n if size > 1e6:\n size = int(1e6)\n with open(pfile.filename, \"rb\") as fh:\n # read in first and last 1 MB of data\n data = fh.read(size)\n fh.seek(-size, os.SEEK_END)\n data = fh.read(size)\n ret.extend(data)\n return fnv_hash(ret)\n\n def _initialize_frontend_specific(self):\n \"\"\"This is for frontend-specific initialization code\n\n If there are frontend-specific things that need to be set while\n creating the index, this function forces these operations to happen\n in cases where we are reloading the index from a sidecar file.\n \"\"\"\n pass\n"
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment