Last active
July 14, 2020 03:17
-
-
Save manodeep/1ce949bb51be1d93ba1ce374f556bc2d to your computer and use it in GitHub Desktop.
Utilities for Uchuu Converter (ascii -> hdf5)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import print_function | |
__author__ = "Manodeep Sinha" | |
__all__ = ["convert_single_ascii_halocat", "convert_ascii_halocat_files"] | |
import os | |
from uchuu_utils import get_parser, get_approx_totnumhalos, generic_reader,\ | |
get_metadata, resize_halo_datasets, write_halos | |
def convert_single_ascii_halocat(input_file, rank=0, | |
outputdir="./", write_halo_props_cont=True, | |
fields=None, drop_fields=None, | |
chunksize=10000, compression='gzip', | |
show_progressbar=False): | |
""" | |
Convert a single Rockstar/Consistent Trees (an hlist catalogue) file | |
into an (optionally compressed) hdf5 file. | |
Parameters | |
----------- | |
input_file: string, required | |
The input filename for the Rockstar/Consistent Trees file. Can | |
be a compressed (.gz, .bz2) file. | |
rank: integer, optional, default: 0 | |
The (MPI) rank for the process. The output filename is determined | |
with this rank to ensure unique filenames when running in parallel. | |
outputdir: string, optional, default: current working directory ('./') | |
The directory where the converted hdf5 file will be written in. The | |
output filename is obtained by appending '.h5' to the ``input_file``. | |
If the output file already exists, then it will be truncated. | |
write_halo_props_cont: boolean, optional, default: True | |
Controls if the individual halo properties are written as distinct | |
datasets such that any given property for ALL halos is written | |
contiguously (structure of arrays, SOA). | |
When set to False, only one dataset ('halos') is created, and ALL | |
properties of a halo is written out contiguously (array of | |
structures). | |
fields: list of strings, optional, default: None | |
Describes which specific columns in the input file to carry across | |
to the hdf5 file. Default action is to convert ALL columns. | |
drop_fields: list of strings, optional, default: None | |
Describes which columns are not carried through to the hdf5 file. | |
Processed after ``fields``, i.e., you can specify ``fields=None`` to | |
create an initial list of *all* columns in the ascii file, and then | |
specify ``drop_fields = [colname2, colname7, ...]``, and those columns | |
will not be present in the hdf5 output. | |
chunksize: integer, optional, default: 100000 | |
Controls how many lines are read in from the input file before being | |
written out to the hdf5 file. | |
compression: string, optional, default: 'gzip' | |
Controls the kind of compression applied. Valid options are anything | |
that ``h5py`` accepts. | |
show_progressbar: boolean, optional, default: False | |
Controls whether a progressbar is printed. Only enables progressbar | |
on rank==0, the remaining ranks ignore this keyword. | |
Returns | |
------- | |
Returns ``True`` on successful completion. | |
""" | |
import numpy as np | |
import h5py | |
import time | |
import sys | |
import pandas as pd | |
import warnings | |
# Progress-bar related setup | |
try: | |
from tqdm import tqdm | |
except ImportError: | |
tqdm = None | |
if rank !=0: | |
show_progressbar = False | |
if show_progressbar and not tqdm: | |
msg = "Warning: You have requested to show a progress-bar but could not "\ | |
"import 'tqdm'. Please install 'tqdm' to display a progressbar."\ | |
"Disabling the progressbar for this run" | |
warnings.warn(msg) | |
show_progressbar = False | |
# Done with progressbar setup | |
if not os.path.isdir(outputdir): | |
msg = f"Error: The first parameter (output directory) = '{outputdir}' "\ | |
"should be of type directory" | |
raise ValueError(msg) | |
if chunksize < 1: | |
print(f"Warning: chunksize (the number of lines read in one shot = "\ | |
f"'{chunksize}' must be at least 1") | |
raise ValueError(msg) | |
print(f"[Rank={rank}]: processing file '{input_file}'...") | |
sys.stdout.flush() | |
t0 = time.perf_counter() | |
## Read the entire header meta-data | |
metadata_dict = get_metadata(input_file) | |
metadata = metadata_dict['metadata'] | |
version_info = metadata_dict['version'] | |
input_catalog_type = metadata_dict['catalog_type'] | |
hdrline = metadata_dict['headerline'] | |
# Check that this is not a Consistent-Tree "tree_*.dat" file | |
if ('Consistent' in input_catalog_type) and \ | |
('hlist' not in input_catalog_type): | |
msg = f"Error: This script can *only* convert the 'hlist' halo catalogues "\ | |
f"generated by Consistent-Trees. Seems like a Consistent-Tree generated "\ | |
f"tree catalogue was present in the file = '{input_file}' supplied...exiting" | |
raise ValueError(msg) | |
parser = get_parser(input_file, fields=fields, drop_fields=drop_fields) | |
approx_totnumhalos = get_approx_totnumhalos(input_file) | |
if show_progressbar: | |
pbar = tqdm(total=approx_totnumhalos, unit=' halos', disable=None) | |
halos_offset = 0 | |
input_filebase = os.path.basename(input_file) | |
output_file = f"{outputdir}/{input_filebase}.h5" | |
with h5py.File(output_file, "w") as hf: | |
line_with_scale_factor = ([l for l in metadata if l.startswith("#a")])[0] | |
scale_factor = float((line_with_scale_factor.split('='))[1]) | |
redshift = 1.0/scale_factor - 1.0 | |
# give the HDF5 root some more attributes | |
hf.attrs[u"input_filename"] = np.string_(input_file) | |
hf.attrs[u"input_filedatestamp"] = np.array(os.path.getmtime(input_file)) | |
hf.attrs[u"input_catalog_type"] = np.string_(input_catalog_type) | |
hf.attrs[f"{input_catalog_type}_version"] = np.string_(version_info) | |
hf.attrs[f"{input_catalog_type}_columns"] = np.string_(hdrline) | |
hf.attrs[f"{input_catalog_type}_metadata"] = np.string_(metadata) | |
sim_grp = hf.create_group('simulation_params') | |
simulation_params = metadata_dict['simulation_params'] | |
for k, v in simulation_params.items(): | |
sim_grp.attrs[f"{k}"] = v | |
hf.attrs[u"HDF5_version"] = np.string_(h5py.version.hdf5_version) | |
hf.attrs[u"h5py_version"] = np.string_(h5py.version.version) | |
hf.attrs[u"TotNhalos"] = -1 | |
hf.attrs[u"scale_factor"] = scale_factor | |
hf.attrs[u"redshift"] = redshift | |
halos_grp = hf.create_group('HaloCatalogue') | |
halos_grp.attrs['scale_factor'] = scale_factor | |
halos_grp.attrs['redshift'] = redshift | |
dset_size = approx_totnumhalos | |
if write_halo_props_cont: | |
halos_dset = dict() | |
# Create a dataset for every halo property | |
# For any given halo property, the value | |
# for halos will be written contiguously | |
# (structure of arrays) | |
for name, dtype in parser.dtype.descr: | |
halos_dset[name] = halos_grp.create_dataset(name, (dset_size, ), | |
dtype=dtype, | |
chunks=True, | |
compression=compression, | |
maxshape=(None,) | |
) | |
else: | |
# Create a single dataset that contains all properties | |
# of a given halo, then all properties of the next halo, | |
# and so on (array of structures) | |
halos_dset = halos_grp.create_dataset('halos', (dset_size,), | |
dtype=parser.dtype, | |
chunks=True, | |
compression=compression, | |
maxshape=(None,) | |
) | |
# Open the file with the generic reader ('inp' will be | |
# a generator). Compressed files are also fine | |
with generic_reader(input_file, 'rt') as inp: | |
# Create a generator with pandas -> | |
# the generator will yield 'chunksize' lines at a time | |
gen = pd.read_csv(inp, dtype=parser.dtype, memory_map=True, | |
names=parser.dtype.names, delim_whitespace=True, | |
index_col=False, chunksize=chunksize, comment='#') | |
for chunk in gen: | |
halos = chunk.to_records(index=False) # convert to a rec-array | |
halos = np.asarray(halos, dtype=parser.dtype) # convert to structured nd-array | |
nhalos = halos.shape[0] | |
if (halos_offset + nhalos) > dset_size: | |
resize_halo_datasets(halos_dset, halos_offset + nhalos, | |
write_halo_props_cont, parser.dtype) | |
dset_size = halos_offset + nhalos | |
write_halos(halos_dset, halos_offset, halos, nhalos, | |
write_halo_props_cont) | |
halos_offset += nhalos | |
if show_progressbar: | |
# Since the total number of halos being | |
# processed is approximate -- let's | |
# update the progressbar total to the | |
# best guess at the moment | |
pbar.total = dset_size | |
pbar.update(nhalos) | |
# The ascii file has now been read in entirely -> Now fix the actual | |
# dataset sizes to reflect the total number of halos written | |
resize_halo_datasets(halos_dset, halos_offset, write_halo_props_cont, parser.dtype) | |
dset_size = halos_offset | |
hf.attrs['TotNhalos'] = halos_offset | |
if show_progressbar: | |
pbar.close() | |
totnumhalos = halos_offset | |
t1 = time.perf_counter() | |
print(f"[Rank={rank}]: processing file {input_file}.....done. " | |
f"Wrote {totnumhalos} halos in {t1-t0:.2f} seconds") | |
sys.stdout.flush() | |
return True | |
def convert_ascii_halocat_files(filenames, outputdir="./", | |
write_halo_props_cont=True, | |
fields=None, drop_fields=None, | |
chunksize=100000, compression='gzip', | |
show_progressbar=False): | |
""" | |
Converts a list of Rockstar/Consistent-Trees halo catalogues from ascii to hdf5. | |
Can be used with MPI but requires that the number of files to be larger than the | |
number of MPI tasks spawned. | |
Parameters | |
----------- | |
filenames: list of strings, required | |
A list of filename(s) for the Rockstar/Consistent Trees file. Can | |
be compressed (.gz, .bz2, .xz, .zip) files. | |
outputdir: string, optional, default: current working directory ('./') | |
The directory where the converted hdf5 file will be written in. The | |
output filename is obtained by appending '.h5' to the ``input_file``. | |
If the output file already exists, then it will be truncated. | |
write_halo_props_cont: boolean, optional, default: True | |
Controls if the individual halo properties are written as distinct | |
datasets such that any given property for ALL halos is written | |
contiguously (structure of arrays, SOA). | |
When set to False, only one dataset ('halos') is created, and ALL | |
properties of a halo is written out contiguously (array of | |
structures). | |
fields: list of strings, optional, default: None | |
Describes which specific columns in the input file to carry across | |
to the hdf5 file. Default action is to convert ALL columns. | |
drop_fields: list of strings, optional, default: None | |
Describes which columns are not carried through to the hdf5 file. | |
Processed after ``fields``, i.e., you can specify ``fields=None`` to | |
create an initial list of *all* columns in the ascii file, and then | |
specify ``drop_fields = [colname2, colname7, ...]``, and those columns | |
will not be present in the hdf5 output. | |
chunksize: integer, optional, default: 100000 | |
Controls how many lines are read in from the input file before being | |
written out to the hdf5 file. | |
compression: string, optional, default: 'gzip' | |
Controls the kind of compression applied. Valid options are anything | |
that ``h5py`` accepts. | |
show_progressbar: boolean, optional, default: False | |
Controls whether a progressbar is printed. Only enables progressbar | |
on rank==0, the remaining ranks ignore this keyword. | |
Returns | |
------- | |
Returns ``True`` on successful completion. | |
""" | |
import sys | |
import time | |
rank = 0 | |
ntasks = 1 | |
MPI = None | |
comm = None | |
try: | |
from mpi4py import MPI | |
comm = MPI.COMM_WORLD | |
rank = comm.Get_rank() | |
ntasks = comm.Get_size() | |
except ImportError: | |
pass | |
sys.stdout.flush() | |
nfiles = len(filenames) | |
if nfiles < ntasks: | |
msg = f"Nfiles = {nfiles} must be >= the number of tasks = {ntasks}" | |
raise ValueError(msg) | |
tstart = time.perf_counter() | |
if rank == 0: | |
print(f"[Rank={rank}]: Converting nfiles = {nfiles} over ntasks = {ntasks}...") | |
# Convert files in MPI parallel (if requested) | |
# the range will produce filenum starting with "rank" | |
# and then incrementing by "ntasks" all the way upto | |
# and inclusive of [nfiles-1]. That is, the range [0, nfiles-1] | |
# will be uniquely distributed over ntasks. | |
for filenum in range(rank, nfiles, ntasks): | |
convert_single_ascii_halocat(filenames[filenum], rank, | |
outputdir=outputdir, | |
write_halo_props_cont=write_halo_props_cont, | |
fields=fields, drop_fields=drop_fields, | |
chunksize=chunksize, compression=compression, | |
show_progressbar=show_progressbar) | |
# The barrier is only essential so that the total time printed | |
# out on rank==0 is correct. | |
if comm: | |
comm.Barrier() | |
if rank == 0: | |
t1 = time.perf_counter() | |
print(f"[Rank={rank}]: Converting nfiles = {nfiles} over ntasks = "\ | |
f"{ntasks}...done. Time taken = {t1-tstart:0.2f} seconds") | |
return True | |
if __name__ == "__main__": | |
import sys | |
# Do we want to write each halo as a struct (i.e., ALL the | |
# properties of any given halo are written together), or do we want | |
# to write out individual datasets for each of the | |
# halo properties (i.e., any given property of ALL halos are written | |
# together) | |
write_halo_props_cont = True # True -> structure of arrays, | |
# False-> array of structures | |
if len(sys.argv) < 3: | |
print("Error: Please provide at least two command-line arguments") | |
print("First argument -- <output directory>") | |
print("Second (and all following arguments) -- <input filenames to convert>") | |
sys.exit(1) | |
outputdir = sys.argv[1] | |
# The entire list of filenames to be converted should | |
# be passed at the command-line | |
filenames = sys.argv[2:] | |
convert_ascii_halocat_files(filenames, outputdir=outputdir, | |
write_halo_props_cont=write_halo_props_cont) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import print_function | |
__author__ = "Manodeep Sinha" | |
__all__ = ["convert_ctrees_files"] | |
import os | |
import time | |
import io | |
from uchuu_utils import get_metadata, get_parser, \ | |
distribute_array_over_ntasks, get_approx_totnumhalos, \ | |
check_and_decompress, resize_halo_datasets, write_halos | |
from ctrees_utils import read_locations_and_forests, get_aggregate_forest_info,\ | |
get_all_parallel_ctrees_filenames, \ | |
validate_inputs_are_ctrees_files, \ | |
get_treewalk_dtype_descr, add_tree_walk_indices | |
def _create_and_validate_halos_dset(hf, dtype, write_halo_props_cont=True): | |
""" | |
Internal utility function to check the existing halo dataset in the file, | |
and return a reference where the halos dataset can be written to. | |
""" | |
forests_grp = hf['Forests'] | |
if write_halo_props_cont: | |
halos_dset = dict() | |
# Create a dataset for every halo property | |
# For any given halo property, the value | |
# for halos will be written contiguously | |
# (structure of arrays) | |
for name in dtype.names: | |
halos_dset[name] = forests_grp[name] | |
if hf.attrs['Nhalos'] != halos_dset[name].shape[0]: | |
msg = f"Error: The dataset for halo property = '{name}' "\ | |
f"does not contain *exactly* the same number of halos "\ | |
f"as specified in the file attribute. "\ | |
f"shape of halo property dataset = '{halos_dset[name].shape}' "\ | |
f"nhalos in file attribute = {hf.attrs['Nhalos']}" | |
raise AssertionError(msg) | |
else: | |
# Create a single dataset that contains all properties | |
# of a given halo, then all properties of the next halo, | |
# and so on (array of structures) | |
halos_dset = forests_grp['halos'] | |
if hf.attrs['Nhalos'] != halos_dset.shape[0]: | |
msg = f"Error: The halos dataset does not contain *exactly* the "\ | |
f"same number of halos as specified in the file attribute. "\ | |
f"shape of halo property dataset = '{halos_dset.shape}' nhalos in file "\ | |
f"attribute = {hf.attrs['Nhalos']}" | |
raise AssertionError(msg) | |
return halos_dset | |
def _convert_ctrees_forest_range(forest_info, trees_and_locations, rank, | |
outputdir="./", output_filebase="forest", | |
write_halo_props_cont=True, | |
fields=None, drop_fields=None, | |
truncate=True, compression='gzip', | |
buffersize=1024*1024, use_pread=True, | |
show_progressbar=False): | |
""" | |
Convert a set of forests from Consistent Trees ascii file(s) into an | |
(optionally compressed) hdf5 file. | |
Parameters | |
----------- | |
forest_info: numpy structured array, required, shape: (nforests, ) | |
The numpy structured array containing the following info | |
<ForestID ForestNhalos Input_ForestNbytes Ntrees> for the | |
*forests* that are to be converted by this task | |
trees_and_locations: numpy structured array, required, shape: (ntrees, ) | |
The numpy structured array containing the following info | |
<ForestID TreeRootID FileName Offset TreeNbytes> for the | |
*trees* that are to be converted by this task | |
rank: integer, required | |
The (MPI) rank for the process. The output filename is determined | |
with this rank to ensure unique filenames when running in parallel. | |
outputdir: string, optional, default: current working directory ('./') | |
The directory where the converted hdf5 file will be written in. The | |
output filename is obtained by appending '.h5' to the ``input_file``. | |
output_filebase: string, optional, default: "forest" | |
The output filename is constructed using '<outputdir>/<output_filebase>_<rank>.h5' | |
write_halo_props_cont: boolean, optional, default: True | |
Controls if the individual halo properties are written as distinct | |
datasets such that any given property for *all* halos is written | |
contiguously (structure of arraysA). | |
When set to False, only one dataset ('halos') is created under the | |
group 'Forests', and *all* properties of a halo is written out | |
contiguously (array of structures). | |
fields: list of strings, optional, default: None | |
Describes which specific columns in the input file to carry across | |
to the hdf5 file. Default action is to convert ALL columns. | |
drop_fields: list of strings, optional, default: None | |
Describes which columns are not carried through to the hdf5 file. | |
Processed after ``fields``, i.e., you can specify ``fields=None`` to | |
create an initial list of *all* columns in the ascii file, and then | |
specify ``drop_fields = [colname2, colname7, ...]``, and those columns | |
will not be present in the hdf5 output. | |
truncate: boolean, default: True | |
Controls whether a new file is created on this 'rank'. When set to | |
``True``, the header info file is written out. Otherwise, the file | |
is appended to. | |
compression: string, optional, default: 'gzip' | |
Controls the kind of compression applied. Valid options are anything | |
that ``h5py`` accepts. | |
buffersize: integer, optional, default: 1 MB | |
Controls the size of the buffer for how many halos are written out | |
per write call to the hdf5 file. The number of halos written out is | |
this buffersize divided the size of the datatype for individual halos. | |
use_pread: boolean, optional, default: True | |
Controls whether low-level i/o operations (through ``os.pread``) is | |
used. Otherwise, higher-level i/o operations (via ``io.open``) is | |
used. This option is only meaningful on linux systems (and python3+). | |
Since ``pread`` does not change the file offset, additional | |
parallelisation can be implemented reasonably easily. | |
show_progressbar: boolean, optional, default: False | |
Controls whether a progressbar is printed. Only enables progressbar | |
on rank==0, the remaining ranks ignore this keyword. | |
Returns | |
------- | |
Returns ``True`` on successful completion. | |
""" | |
import numpy as np | |
import h5py | |
import sys | |
import warnings | |
# Progress-bar related setup | |
try: | |
from tqdm import tqdm | |
except ImportError: | |
tqdm = None | |
if rank != 0: | |
show_progressbar = False | |
if show_progressbar and not tqdm: | |
msg = "Warning: You have requested to show a progress-bar but could not "\ | |
"import 'tqdm'. Please install 'tqdm' to display a progressbar."\ | |
"Disabling the progressbar for this run" | |
warnings.warn(msg) | |
show_progressbar = False | |
# Done with progressbar setup | |
try: | |
os.pread | |
except NameError: | |
use_pread = False | |
sys.stdout.flush() | |
tstart = time.perf_counter() | |
# Set the datalen for strings | |
string_dtype = 'S1024' | |
if not os.path.isdir(outputdir): | |
msg = f"Error: The first parameter (output directory) = '{outputdir}' "\ | |
"should be of type directory" | |
raise ValueError(msg) | |
ntrees = trees_and_locations.shape[0] | |
nforests = forest_info.shape[0] | |
if nforests > ntrees: | |
msg = f"Error: Expected the number of trees = '{ntrees}' "\ | |
"to be *at most* equal to the number of "\ | |
f"forests = '{nforests}'" | |
raise AssertionError(msg) | |
if ntrees <= 0: | |
msg = f"[Rank={rank}] Error: ntrees = {ntrees} should be >= 0" | |
raise AssertionError(msg) | |
print(f"[Rank {rank}]: processing {forest_info['Input_ForestNbytes'].sum()} "\ | |
f"bytes (in {ntrees} trees) spread over {nforests} forests...") | |
alltreedatafiles = list(set(trees_and_locations['Filename'])) | |
assert len(alltreedatafiles) > 0 | |
validate_inputs_are_ctrees_files(alltreedatafiles) | |
metadata_dict = get_metadata(alltreedatafiles[0]) | |
metadata = metadata_dict['metadata'] | |
version_info = metadata_dict['version'] | |
input_catalog_type = metadata_dict['catalog_type'] | |
hdrline = metadata_dict['headerline'] | |
parser = get_parser(alltreedatafiles[0], fields=fields, | |
drop_fields=drop_fields) | |
mergertree_descr = get_treewalk_dtype_descr() | |
output_dtype = np.dtype(parser.dtype.descr + mergertree_descr) | |
approx_totnumhalos = 0 | |
for fname in alltreedatafiles: | |
ind = np.where(trees_and_locations['Filename'] == fname) | |
nbytes = np.sum(trees_and_locations['TreeNbytes'][ind]) | |
approx_totnumhalos += get_approx_totnumhalos(fname, ndatabytes=nbytes) | |
if show_progressbar: | |
pbar = tqdm(total=ntrees, unit=' trees', disable=None) | |
if (not buffersize) or (buffersize < output_dtype.itemsize): | |
buffersize = 1024*1024 # 1 MB | |
nbuffer_halos = buffersize // output_dtype.itemsize | |
chunks = (nbuffer_halos, ) | |
output_file = f"{outputdir}/{output_filebase}_{rank}.h5" | |
if truncate: | |
with h5py.File(output_file, "w") as hf: | |
# give the HDF5 root some more attributes | |
hf.attrs['input_files'] = np.string_(alltreedatafiles) | |
mtimes = [os.path.getmtime(f) for f in alltreedatafiles] | |
hf.attrs['input_filedatestamp'] = np.array(mtimes) | |
hf.attrs["input_catalog_type"] = np.string_(input_catalog_type) | |
hf.attrs[f"{input_catalog_type}_version"] = np.string_(version_info) | |
hf.attrs[f"{input_catalog_type}_columns"] = np.string_(hdrline) | |
hf.attrs[f"{input_catalog_type}_metadata"] = np.string_(metadata) | |
sim_grp = hf.create_group('simulation_params') | |
simulation_params = metadata_dict['simulation_params'] | |
for k, v in simulation_params.items(): | |
sim_grp.attrs[f"{k}"] = v | |
hf.attrs['HDF5_version'] = np.string_(h5py.version.hdf5_version) | |
hf.attrs['h5py_version'] = np.string_(h5py.version.version) | |
hf.attrs['Nforests'] = 0 | |
hf.attrs['Ntrees'] = 0 | |
hf.attrs['Nhalos'] = 0 | |
forest_dtype = np.dtype([('ForestID', np.int64), | |
('ForestHalosOffset', np.int64), | |
('ForestNhalos', np.int64), | |
('ForestNtrees', np.int64), | |
]) | |
hf.create_dataset('ForestInfo', (0,), dtype=forest_dtype, | |
chunks=True, compression=compression, | |
maxshape=(None,)) | |
tree_dtype = np.dtype([('ForestID', np.int64), | |
('TreeRootID', np.int64), | |
('TreeHalosOffset', np.int64), | |
('TreeNhalos', np.int64), | |
('Input_Filename', string_dtype), | |
('Input_FileDateStamp', np.float), | |
('Input_TreeByteOffset', np.int64), | |
('Input_TreeNbytes', np.int64), | |
]) | |
hf.create_dataset('TreeInfo', (0,), dtype=tree_dtype, | |
chunks=True, compression=compression, | |
maxshape=(None,)) | |
forests_grp = hf.create_group('Forests') | |
if write_halo_props_cont: | |
# Create a dataset for every halo property | |
# For any given halo property, the value | |
# for halos will be written contiguously | |
# (structure of arrays) | |
for name, dtype in output_dtype.descr: | |
forests_grp.create_dataset(name, (0,), dtype=dtype, | |
chunks=chunks, | |
compression=compression, | |
maxshape=(None,)) | |
else: | |
# Create a single dataset that contains all properties | |
# of a given halo, then all properties of the next halo, | |
# and so on (array of structures) | |
forests_grp.create_dataset('halos', (0,), | |
dtype=output_dtype, | |
chunks=chunks, | |
compression=compression, | |
maxshape=(None,)) | |
halos_buffer = np.empty(nbuffer_halos, dtype=output_dtype) | |
nhalos_in_buffer = 0 | |
with h5py.File(output_file, "a") as hf: | |
# The filenames are written as byte-arrays (through np.string_) into the hdf5 file | |
# Therefore, we will need to convert back into `str` objects | |
existing_files = hf.attrs['input_files'] | |
target_all_files = np.unique(np.hstack((existing_files, alltreedatafiles))) | |
# Strictly speaking this decode is not necessary but without this extra decode | |
# we end up with an array that contains both np.str_ and str -- MS 01/05/2020 | |
target_all_files = [x.decode() if isinstance(x, bytes) else str(x) for x in target_all_files] | |
if len(target_all_files) > len(existing_files): | |
# Since we are appending to the hdf5 file, let's make sure that *all* the files belong | |
# to the same setup of simulation + mergertree. However, the ascii files corresponding | |
# to the existing data might have been deleted, so we should pass the metadata info | |
# directly from the hdf5 file | |
base_input_catalog_type = hf.attrs['input_catalog_type'].decode() | |
base_metadata = hf.attrs[f'{base_input_catalog_type}_metadata'] | |
base_version = hf.attrs[f'{base_input_catalog_type}_version'].decode() | |
# Only validate the *current* files being processed | |
assert len(alltreedatafiles) > 0 | |
validate_inputs_are_ctrees_files(alltreedatafiles, | |
base_metadata=base_metadata, | |
base_version=base_version, | |
base_input_catalog_type=base_input_catalog_type) | |
# We need to update how many *unique* input files have gone into this hdf5 file | |
hf.attrs['input_files'] = np.string_(target_all_files) | |
hf.attrs['input_filedatestamp'] = np.array([os.path.getmtime(f) for f in target_all_files]) | |
tree_dset = hf['TreeInfo'] | |
forest_dset = hf['ForestInfo'] | |
if forest_dset.shape[0] != hf.attrs['Nforests']: | |
msg = "Error: The forest dataset does not contain *exactly* "\ | |
"the same number of forests as specified in the file "\ | |
f"attribute. Shape of forest dataset = '{forest_dset.shape}', "\ | |
f"nforests in file attribute = '{hf.attrs['Nforests']}'" | |
raise AssertionError(msg) | |
forest_offset = hf.attrs['Nforests'] | |
if tree_dset.shape[0] != hf.attrs['Ntrees']: | |
msg = "Error: The tree dataset does not contain *exactly* the same "\ | |
"number of trees as specified in the file attribute. "\ | |
f"shape of tree dataset = '{tree_dset.shape}' ntrees in "\ | |
f"file attribute = '{hf.attrs['Ntrees']}'" | |
raise AssertionError(msg) | |
tree_offset = hf.attrs['Ntrees'] | |
# resize both the datasets containing the forestlevel info and | |
# treelevel info | |
forest_dset.resize((forest_offset + nforests, )) | |
tree_dset.resize((tree_offset + ntrees, )) | |
# Now check the halos dataset | |
halos_dset = _create_and_validate_halos_dset(hf, output_dtype, | |
write_halo_props_cont) | |
# Okay - we have validated the halos offset | |
halos_offset = hf.attrs['Nhalos'] | |
halos_dset_offset = halos_offset | |
# resize the halos dataset so we don't have to resize at every step | |
dset_size = halos_offset + approx_totnumhalos | |
resize_halo_datasets(halos_dset, dset_size, write_halo_props_cont, output_dtype) | |
forest_dset[-nforests:, 'ForestID'] = forest_info['ForestID'][:] | |
forest_dset[-nforests:, 'ForestNtrees'] = forest_info['Ntrees'][:] | |
tree_dset[-ntrees:, 'ForestID', ] = trees_and_locations['ForestID'][:] | |
tree_dset[-ntrees:, 'TreeRootID'] = trees_and_locations['TreeRootID'][:] | |
# These quantities relate to the input files | |
tree_dset[-ntrees:, 'Input_Filename'] = np.string_(trees_and_locations['Filename'][:]) | |
mtimes = [os.path.getmtime(fn) for fn in trees_and_locations['Filename']] | |
tree_dset[-ntrees:, 'Input_FileDateStamp'] = np.array(mtimes) | |
tree_dset[-ntrees:, 'Input_TreeByteOffset'] = trees_and_locations['Offset'][:] | |
tree_dset[-ntrees:, 'Input_TreeNbytes'] = trees_and_locations['TreeNbytes'][:] | |
alltreedatafiles = list(set(trees_and_locations['Filename'])) | |
if use_pread: | |
filehandlers = {f: os.open(f, os.O_RDONLY) for f in alltreedatafiles} | |
else: | |
filehandlers = {f: io.open(f, 'rt') for f in alltreedatafiles} | |
ntrees_processed = 0 | |
treenhalos = np.empty(ntrees, dtype=np.int64) | |
treehalos_offset = np.empty(ntrees, dtype=np.int64) | |
forestnhalos = np.empty(nforests, dtype=np.int64) | |
foresthalos_offset = np.empty(nforests, dtype=np.int64) | |
for iforest in range(nforests): | |
foresthalos_offset[iforest] = halos_offset | |
forest_halos = np.empty(0, dtype=output_dtype) | |
for _ in range(forest_info['Ntrees'][iforest]): | |
treedata_file = trees_and_locations['Filename'][ntrees_processed] | |
offset = trees_and_locations['Offset'][ntrees_processed] | |
numbytes = trees_and_locations['TreeNbytes'][ntrees_processed] | |
inp = filehandlers[treedata_file] | |
if use_pread: | |
chunk = os.pread(inp, numbytes, offset) | |
else: | |
inp.seek(offset, os.SEEK_SET) | |
chunk = inp.read(numbytes) | |
parse_line = parser.parse_line | |
halos = parser.pack([parse_line(l) for l in chunk.splitlines()]) | |
nhalos = halos.shape[0] | |
forest_halos.resize(forest_halos.shape[0] + nhalos) | |
# forest_halos have additional mergertree indices, therefore | |
# the datatypes are not the same between halos (parser.dtype) | |
# and forest_halos (output_dtype) -> assign by columns | |
for name in parser.dtype.names: | |
forest_halos[name][-nhalos:] = halos[name][:] | |
# Add the tree level info | |
treenhalos[ntrees_processed] = nhalos | |
treehalos_offset[ntrees_processed] = halos_offset | |
ntrees_processed += 1 | |
if show_progressbar: | |
pbar.update(1) | |
# Update the total number of halos read-in with | |
# the number of halos in this tree | |
halos_offset += nhalos | |
# Entire forest has been loaded. Reset nhalos | |
# to be the number of halos in the forest | |
nhalos = forest_halos.shape[0] | |
# Add the forest level info | |
forestnhalos[iforest] = nhalos | |
# Entire forest is now loaded -> add the mergertree indices | |
add_tree_walk_indices(forest_halos, rank) | |
# If there are not enough to trigger a write, simply fill up | |
# the halos_buffer | |
if (nhalos_in_buffer + nhalos) < nbuffer_halos: | |
assert halos_buffer.dtype == forest_halos.dtype | |
halos_buffer[nhalos_in_buffer:nhalos_in_buffer+nhalos] = forest_halos[:] | |
nhalos_in_buffer += nhalos | |
continue | |
# Need to write to disk | |
# Resize to make sure there is enough space to append the new halos | |
if halos_offset > dset_size: | |
resize_halo_datasets(halos_dset, halos_offset, write_halo_props_cont, output_dtype) | |
dset_size = halos_offset | |
# write the halos that are already in the buffer | |
write_halos(halos_dset, halos_dset_offset, halos_buffer, | |
nhalos_in_buffer, write_halo_props_cont) | |
halos_dset_offset += nhalos_in_buffer | |
nhalos_in_buffer = 0 | |
# Now write the halos that have just been read-in | |
# Note: The halos in the buffer *must* be written out before the halos | |
# that have just been read-in. Otherwise, there will be data corruption | |
write_halos(halos_dset, halos_dset_offset, forest_halos, | |
nhalos, write_halo_props_cont) | |
halos_dset_offset += nhalos | |
if halos_offset != halos_dset_offset: | |
msg = f"Error: After writing out halos into the hdf5 file, expected "\ | |
f"to find that halos_offset = '{halos_offset}' to be *exactly* "\ | |
f"equal to the offset in the hdf5 dataset = '{halos_dset_offset}'" | |
raise AssertionError(msg) | |
if nhalos_in_buffer > 0: | |
write_halos(halos_dset, halos_dset_offset, halos_buffer, | |
nhalos_in_buffer, write_halo_props_cont) | |
halos_dset_offset += nhalos_in_buffer | |
nhalos_in_buffer = 0 | |
if halos_offset != halos_dset_offset: | |
msg = f"Error: After writing *all* the halos into the hdf5 file, expected "\ | |
f"to find that halos_offset = '{halos_offset}' to be *exactly* "\ | |
f"equal to the offset in the hdf5 dataset = '{halos_dset_offset}'" | |
raise AssertionError(msg) | |
msg = f"Error: Expected to process {ntrees} trees but processed {ntrees_processed} trees instead" | |
assert ntrees_processed == ntrees, msg | |
# All the trees for this call have now been read in entirely -> Now fix the actual | |
# dataset sizes to reflect the total number of halos written | |
resize_halo_datasets(halos_dset, halos_offset, write_halo_props_cont, output_dtype) | |
# all halos from all forests have been written out and the halo dataset has been | |
# correctly resized. Now write the aggregate quantities at the tree and forest levels | |
tree_dset[-ntrees:, 'TreeNhalos'] = treenhalos[:] | |
tree_dset[-ntrees:, 'TreeHalosOffset'] = treehalos_offset[:] | |
forest_dset[-nforests:, 'ForestNhalos'] = forestnhalos[:] | |
forest_dset[-nforests:, 'ForestHalosOffset'] = foresthalos_offset[:] | |
hf.attrs['Nforests'] += nforests | |
hf.attrs['Ntrees'] += ntrees | |
hf.attrs['Nhalos'] = halos_offset | |
if show_progressbar: | |
pbar.close() | |
# Close all the open file handlers | |
if use_pread: | |
for f in filehandlers.values(): | |
os.close(f) | |
else: | |
for f in filehandlers.values(): | |
f.close() | |
totnumhalos = halos_offset | |
t1 = time.perf_counter() | |
print(f"[Rank {rank}]: processing {forest_info['Input_ForestNbytes'].sum()} "\ | |
f"bytes (in {ntrees} trees) spread over {nforests} forests...done. "\ | |
f"Wrote {totnumhalos} halos in {t1-tstart:.2f} seconds") | |
sys.stdout.flush() | |
return True | |
def convert_ctrees_files(filenames, standard_consistent_trees=None, | |
outputdir="./", output_filebase="forest", | |
write_halo_props_cont=True, fields=None, drop_fields=None, | |
truncate=True, compression='gzip', buffersize=None, use_pread=True, | |
show_progressbar=False): | |
""" | |
Convert a set of forests from Consistent Trees ascii file(s) into an | |
(optionally compressed) hdf5 file. Can be invoked with MPI. | |
Parameters | |
----------- | |
filenames: list of strings for Consistent-Trees catalogues, required | |
The input ascii files will be decompressed, if required. | |
standard_consistent_tree: boolean, optional, default: None | |
Whether the input filres were generated by the Uchuu collaboration's | |
parallel Consistent-Trees code. If only two files are specified in | |
``filenames``, and these two filenames end with 'forests.list', and | |
'locations.dat', then a standard Consistent-Trees output will be | |
inferred. By default, a parallel Consistent-Trees output | |
is used | |
files are specified in | |
all files specified in ``filenames`` | |
end with '.tree', then | |
outputdir: string, optional, default: current working directory ('./') | |
The directory where the converted hdf5 file will be written in. The | |
output filename is obtained by appending '.h5' to the ``input_file``. | |
output_filebase: string, optional, default: "forest" | |
The output filename is constructed using '<outputdir>/<output_filebase>_<rank>.h5' | |
write_halo_props_cont: boolean, optional, default: True | |
Controls if the individual halo properties are written as distinct | |
datasets such that any given property for *all* halos is written | |
contiguously (structure of arraysA). | |
When set to False, only one dataset ('halos') is created under the | |
group 'Forests', and *all* properties of a halo is written out | |
contiguously (array of structures). | |
fields: list of strings, optional, default: None | |
Describes which specific columns in the input file to carry across | |
to the hdf5 file. Default action is to convert ALL columns. | |
drop_fields: list of strings, optional, default: None | |
Contains a list of column names that will *not* be carried through | |
to the hdf5 file. If ``drop_fields`` is not set for a | |
parallel Consistent-Trees run, then [``Tidal_Force``, ``Tidal_ID``] | |
will be used. | |
``drop_fields`` is processed after ``fields``, i.e., you can specify | |
``fields=None`` to create an initial list of *all* columns in the | |
ascii file, and then specify ``drop_fields = [colname2, colname7, ...]``, | |
and *only* those columns will not be present in the hdf5 output. | |
truncate: boolean, default: True | |
Controls whether a new file is created on this 'rank'. When set to | |
``True``, the header info file is written out. Otherwise, the file | |
is appended to. The code checks to make sure that the existing metadata | |
in the hdf5 file is identical to the new metadata in the ascii files | |
being currently converted (i.e., tries to avoid different | |
simulation + mergertree results being present in the same file) | |
compression: string, optional, default: 'gzip' | |
Controls the kind of compression applied. Valid options are anything | |
that ``h5py`` accepts. | |
buffersize: integer, optional, default: 1 MB | |
Controls the size of the buffer how many halos are written out | |
per write call to the hdf5 file. The number of halos written out is | |
this buffersize divided the size of the datatype for individual halos. | |
use_pread: boolean, optional, default: True | |
Controls whether low-level i/o operations (through ``os.pread``) is | |
used. Otherwise, higher-level i/o operations (via ``io.open``) is | |
used. This option is only meaningful on linux systems (and python3+). | |
Since ``pread`` does not change the file offset, additional | |
parallelisation can be implemented reasonably easily. | |
show_progressbar: boolean, optional, default: False | |
Controls whether a progressbar is printed. Only enables progressbar | |
on rank==0, the remaining ranks ignore this keyword. | |
Returns | |
------- | |
Returns ``True`` on successful completion. | |
""" | |
import os | |
import sys | |
import time | |
import numpy as np | |
import h5py | |
rank = 0 | |
ntasks = 1 | |
MPI = None | |
comm = None | |
try: | |
from mpi4py import MPI | |
comm = MPI.COMM_WORLD | |
rank = comm.Get_rank() | |
ntasks = comm.Get_size() | |
except ImportError: | |
pass | |
if not os.path.isdir(outputdir): | |
msg = f"Error: Output directory = {outputdir} is not a valid directory" | |
raise ValueError(msg) | |
tstart = time.perf_counter() | |
sys.stdout.flush() | |
if not standard_consistent_trees: | |
standard_consistent_trees = True | |
# The Uchuu collaboration has a special parallel version of the | |
# Consistent-Tree developed by @Tomo. This code generates a set of | |
# files equivalent to (forests.list, locations.dat, tree_*.dat) file | |
# per CPU task. In that code, forests are guaranteed to be located | |
# completely within one tree data file. However, the public version | |
# of Consistent-Trees is different and there is exactly one 'forests.list' | |
# and 'locations.dat' files for the entire simulation, and as many | |
# tree_*.dat files as the number of BOX_DIVISIONS^3 set in the | |
# Consistent-Trees config file at runtime. | |
# This script aims to handle both scenarios with this logic -- if only the | |
# "forests.list" and "locations.dat" files are supplied as command-line | |
# arguments, then the public version of the Consistent-Trees catalog | |
# is assumed. Otherwise, a list of *all* the "XXXXXX.tree" files | |
# should be passed (i.e., for people in the Uchuu collaboration). The | |
# correspnding 'forest' and 'location' file names will be automatically | |
# generated by assuming the Uchuu convention: | |
# -> tree data files are called '<prefix>.tree' | |
# -> associated forest.list file is called '<prefix>.forest' | |
# -> associated locations.dat file is called '<prefix>.loc' | |
# | |
# If all files supplied at the command-line endwith '.tree(.bz2,.zip,.gz)' then | |
# it is a parallel Consistent-Trees run. | |
check_pctrees_files = [True if 'tree' in set(f.split('.')) else False for f in filenames] | |
if np.all(check_pctrees_files): | |
standard_consistent_trees = False | |
if standard_consistent_trees: | |
if len(filenames) != 2: | |
msg = "Error: To convert a standard Consistent-Trees output, "\ | |
"please specify *exactly* two files -- the 'forests.list' and "\ | |
"the 'locations.dat' files (order is unimportant). "\ | |
f"Instead found filenames = '{filenames}'" | |
raise ValueError(msg) | |
filebasenames = set([os.path.basename(f) for f in filenames]) | |
expected_filebasenames = set(['forests.list', 'locations.dat']) | |
if filebasenames != expected_filebasenames: | |
msg = "Error: To convert a standard Consistent-Trees output, "\ | |
"please specify *exactly* two files -- the 'forests.list' and "\ | |
"the 'locations.dat' files (order is unimportant). While exactly "\ | |
"two files were specified, at least one of the 'forests.list' or "\ | |
"'locations.dat' files were in present in the supplied filenames = "\ | |
f" '{filenames}'" | |
raise ValueError(msg) | |
if standard_consistent_trees: | |
forests_file, locations_file = get_forests_locations_filenames(filenames) | |
forests_and_locations_fnames = [(forests_file, locations_file)] | |
else: | |
# We are converting parallel Ctrees files; however, these files might still be | |
# compressed and we need to decompress them before processing. | |
forests_and_locations_fnames = [] | |
decompressed_filenames = [] | |
for fname in filenames: | |
decomp_fname = check_and_decompress(fname) | |
extname = '.tree' | |
if extname not in decomp_fname: | |
msg = "Error: Should pass the tree data file names (i.e., "\ | |
f"the filenames should end in '{extname}'. "\ | |
f"Instead got filename = '{decomp_fname}'" | |
raise ValueError(msg) | |
decompressed_filenames.append(decomp_fname) | |
forests_file, locations_file, _ = get_all_parallel_ctrees_filenames(decomp_fname) | |
forests_and_locations_fnames.append((forests_file, locations_file)) | |
# Since multiple tree files are specified at the command-line, let's make | |
# sure that all the files belong to the same simulation + mergertree setup | |
assert len(decompressed_filenames) > 0 | |
validate_inputs_are_ctrees_files(decompressed_filenames) | |
if (not drop_fields) and (not standard_consistent_trees): | |
# The Tidal fields can not be correctly calculated in the | |
# parallel CTrees code and are dropped from the hdf5 file | |
drop_fields = ['Tidal_Force', 'Tidal_ID'] | |
nfiles = len(forests_and_locations_fnames) | |
if rank == 0: | |
print(f"[Rank={rank}]: Converting {nfiles} sets of (forests, locations) "\ | |
f"files over {ntasks} tasks ... ") | |
nconverted = 0 | |
for (forests_file, locations_file) in forests_and_locations_fnames: | |
t0 = time.perf_counter() | |
print(f"[Rank={rank}]: Reading forests and locations files...") | |
trees_and_locations = read_locations_and_forests(forests_file, locations_file, rank) | |
ntrees = trees_and_locations.shape[0] | |
t1 = time.perf_counter() | |
print(f"[Rank={rank}]: Reading forests and locations files...done. "\ | |
f"Time taken = {t1-t0:.2f} seconds") | |
alltreedatafiles = list(set(trees_and_locations['Filename'])) | |
if standard_consistent_trees: | |
# Validate that all the files are the same version and all | |
# contain a Consistent Trees catalog | |
assert len(alltreedatafiles) > 0 | |
validate_inputs_are_ctrees_files(alltreedatafiles) | |
else: | |
# Since we are processing parallel CTrees output, *every* tree data | |
# file has an associated forests and locations file. Which means | |
# that every locations file should only have one treedata file | |
# Check that that is the case | |
if len(alltreedatafiles) != 1: | |
msg = "Error: Expected to find *exactly* one tree data file "\ | |
"per locations file. However, while processing the "\ | |
f"locations_file = '{locations_file}', found "\ | |
f"{len(alltreedatafiles)} tree data files. "\ | |
f"The unique tree data files are: {alltreedatafiles}" | |
raise AssertionError(msg) | |
# No need to validate that the input files are valid CTrees files | |
# That has already been done | |
if rank == 0: | |
print(f"[Rank={rank}]: Converting a single set of (forests, locations) "\ | |
f"over {ntasks} tasks...") | |
# We have the tree-level info, let's create a similar info for | |
# the forests (by grouping trees by ForestID) | |
forest_info = get_aggregate_forest_info(trees_and_locations, rank) | |
# If you need to convert the initial subset of trees | |
# then just uncomment the following two lines -- MS 22/06/2020 | |
# nforests = 50000 | |
# forest_info = forest_info[0:nforests] | |
# Distribute the forests over ntasks | |
(forest_start, forest_stop) = distribute_array_over_ntasks(forest_info['Input_ForestNbytes'], rank, ntasks) | |
forest_ntrees_offset = forest_info['Ntrees'][:].cumsum() | |
tree_start = 0 | |
if forest_start >= 1: | |
tree_start = forest_ntrees_offset[forest_start - 1] | |
# tree_stop is not-inclusive, and can be directly used in a | |
# slicing context | |
tree_stop = forest_ntrees_offset[forest_stop] | |
print(f"[Rank={rank}]: Processing trees [tree_start, tree_stop) = "\ | |
f"[{tree_start}, {tree_stop}) totntrees = {ntrees}") | |
# Now we can start converting the forests | |
# *Note* ``forest_stop`` is inclusive but since we are slicing, | |
# we need to account for the python convention hence, the slice | |
# on ``forest_info`` goes up to ``forest_stop + 1`` | |
_convert_ctrees_forest_range(forest_info[forest_start:forest_stop+1], | |
trees_and_locations[tree_start:tree_stop], | |
rank, outputdir=outputdir, output_filebase=output_filebase, | |
write_halo_props_cont=write_halo_props_cont, | |
fields=fields, drop_fields=drop_fields, | |
truncate=truncate, compression=compression, | |
buffersize=buffersize, use_pread=use_pread, | |
show_progressbar=show_progressbar) | |
truncate = False | |
nconverted += 1 | |
if rank == 0: | |
print(f"[Rank={rank}]: Converting a single set of (forests, locations) " | |
f"over {ntasks} tasks...done. Converted {nconverted} out "\ | |
f"of {nfiles} sets of files") | |
# Done converting all files | |
if comm: | |
comm.Barrier() | |
if rank != 0: | |
return True | |
# Create the main file that contains the other files as (hdf5-sym-)links | |
fname = f'{outputdir}/{output_filebase}.h5' | |
try: | |
hf = h5py.File(fname, 'r') | |
existing_nfiles = hf['/'].attrs['Nfiles'] | |
if existing_nfiles < ntasks: | |
msg = "Error: You should not have been able to re-run the "\ | |
"converter on a larger number of tasks. The converter "\ | |
"should have crashed before reaching this point. In "\ | |
f"the main file '{fname}', the previous run was spread "\ | |
f"over '{existing_nfiles}' individual files. However "\ | |
f"the new run uses {ntasks} tasks which is larger. " | |
raise AssertionError(msg) | |
hf.close() | |
except OSError: | |
pass | |
with h5py.File(fname, 'w') as hf: | |
hf['/'].attrs['Nfiles'] = ntasks | |
hf['/'].attrs['TotNforests'] = 0 | |
hf['/'].attrs['TotNtrees'] = 0 | |
hf['/'].attrs['TotNhalos'] = 0 | |
attr_props = [('TotNforests', 'Nforests'), | |
('TotNtrees', 'Ntrees'), | |
('TotNhalos', 'Nhalos')] | |
for itask in range(ntasks): | |
outfile = f'{outputdir}/{output_filebase}_{itask}.h5' | |
with h5py.File(outfile, 'a') as hf_task: | |
if standard_consistent_trees: | |
hf_task.attrs['consistent-trees-type'] = 'standard' | |
else: | |
hf_task.attrs['consistent-trees-type'] = 'parallel' | |
hf_task.attrs['contiguous-halo-props'] = write_halo_props_cont | |
for (out, inp) in attr_props: | |
hf['/'].attrs[out] += hf_task['/'].attrs[inp] | |
hf[f'File{itask}'] = h5py.ExternalLink(outfile, '/') | |
t1 = time.perf_counter() | |
print(f"Converting {nfiles} sets of (forests, locations) files " | |
f"over {ntasks} tasks ...done. Time taken = " | |
f"{t1-tstart:.2f} seconds") | |
return True | |
if __name__ == "__main__": | |
# import argparse | |
# # Taken from https://stackoverflow.com/a/34736291 | |
# class NegateAction(argparse.Action): | |
# def __call__(self, parser, ns, values, option): | |
# setattr(ns, self.dest, option[2:4] != 'no') | |
import argparse | |
# helpmsg = "\nThis script needs at least two command-line arguments\n" | |
# helpmsg += "First argument -- <output directory>\n" | |
# helpmsg += "Second (and all following arguments) -- <input Consistent-Trees filenames to convert>\n\n" | |
helpmsg = "\nUsage Scenario 1:\n" | |
helpmsg += "-----------------\n" | |
helpmsg += "If you are converting the output of the standard Consistent-Trees code, then \n"\ | |
"please provide the full-path to the 'forests.list' and 'locations.dat' (order is unimportant).\n\n" | |
helpmsg += "Usage Scenario 2:\n" | |
helpmsg += "-----------------\n" | |
helpmsg += "If you are converting the output of the parallel Consistent-Trees code \n"\ | |
"from the Uchuu collaboration, then please provide all the tree filenames \n"\ | |
"that you would like to convert (i.e., files ending with '<prefix>.tree').\n"\ | |
"The names for relevant 'forests.list (<prefix>.forest)' and \n"\ | |
"'locations.dat (<prefix>.loc)' will be automatically constructed.\n\n" | |
parser = argparse.ArgumentParser(description="Convert ascii Consistent-Trees files " | |
"into hdf5 (optionally in MPI parallel)") | |
parser.add_argument('outputdir', metavar='<Output directory>', type=str, | |
help='the output directory for the hdf5 file(s)') | |
parser.add_argument("filenames", metavar="<CTrees filenames>", | |
type=str, nargs='+', | |
help="list of input (ascii) Consistent-Trees filenames") | |
group = parser.add_mutually_exclusive_group() | |
group.add_argument("-m", "--multiple-dsets", dest='write_halo_props_cont', | |
action="store_true", default=True, | |
help="write a separate dataset for each halo property") | |
group.add_argument("-s", "--single-dset", dest='write_halo_props_cont', | |
action="store_false", default=False, | |
help="write a single dataset containing all halo properties") | |
parser.add_argument("-p", "--progressbar", dest='show_progressbar', | |
action="store_true", default=True, | |
help="display a progressbar on rank=0") | |
parser.add_argument("-v", "--verbose", dest='verbose', | |
action="store_true", default=False, | |
help="output additional info messages") | |
try: | |
args = parser.parse_args() | |
except SystemExit as e: | |
print(helpmsg) | |
raise e | |
print(f"args = {args}") | |
print(f"args.outputdir = {args.outputdir}") | |
print(f"args.filenames = {args.filenames}") | |
print(f"args.write_halo_props_cont = {args.write_halo_props_cont}") | |
print(f"args.show_progressbar = {args.show_progressbar}") | |
print(f"args.verbose = {args.verbose}") | |
# Do you want to write each halo as a struct (i.e., ALL the | |
# properties of any given halo are written together, array of structures), | |
# or do you want to write out individual datasets for each of the | |
# halo properties (i.e., any given property of ALL halos are written | |
# together, structure of arrays -> default) | |
# write_halo_props_cont = True # True -> structure of arrays, | |
# # False-> array of structures | |
if len(sys.argv) < 3: | |
print(helpmsg) | |
sys.exit(1) | |
# outputdir = sys.argv[1] | |
# The entire list of filenames to be converted should | |
# be passed at the command-line | |
# filenames = sys.argv[2:] | |
import sys | |
sys.exit(0) | |
convert_ctrees_files(args.filenames, outputdir=args.outputdir, | |
write_halo_props_cont=args.write_halo_props_cont, | |
show_progressbar=args.show_progressbar, | |
verbose=args.verbose) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import print_function | |
__author__ = "Manodeep Sinha" | |
__all__ = ["read_locations_and_forests", "get_aggregate_forest_info", | |
"get_all_parallel_ctrees_filenames", "validate_inputs_are_ctrees_files", | |
"get_treewalk_dtype_descr", "add_tree_walk_indices"] | |
import time | |
import functools | |
from uchuu_utils import generic_reader, check_and_decompress, get_metadata, \ | |
sanitize_ctrees_header, BaseParseFields | |
# requires numpy >= 1.9.0 | |
def read_locations_and_forests(forests_fname, locations_fname, rank=0): | |
""" | |
Returns a numpy structured array that contains *both* the forest and | |
tree level info. | |
Parameters | |
---------- | |
forests_fname: string, required | |
The name of the file containing forest-level info like the Consistent-Trees | |
'forests.list' file | |
locations_fname: string, required | |
The name of the file containing tree-level info like the Consistent-Trees | |
'locations.dat' file | |
rank: integer, optional, default:0 | |
An integer identifying which task is calling this function. Only used in | |
status messages | |
Returns | |
------- | |
trees_and_locations: A numpy structured array | |
A numpy structured array containing the fields | |
<TreeRootID ForestID Filename FileID Offset TreeNbytes> | |
The array is sorted by ``(ForestID, Filename, Offset)`` in that order. | |
This sorting means that *all* trees belonging to the same forest | |
*will* appear consecutively regardless of the file that the corresponding | |
tree data might appear in. The number of elements in the array is equal | |
to the number of trees. | |
Note: Sorting by ``Filename`` is implemented by an equivalent, but faster | |
sorting on ``FileID``. | |
""" | |
import os # use os.stat to get filesize | |
import numpy as np | |
import numpy.lib.recfunctions as rfn | |
def _guess_dtype_from_val(val): | |
converters_and_type = [(np.float, np.float), | |
(str, 'U1024')] | |
fallback_dtype = 'V' | |
for (conv, dtype) in converters_and_type: | |
try: | |
_ = conv(val) | |
return dtype | |
except ValueError: | |
pass | |
print(f"Warning: Could not guess datatype for val = '{val}'. Returning "\ | |
f"datatype = '{fallback_dtype}'") | |
return fallback_dtype | |
def _get_dtype_from_header(fname): | |
with generic_reader(fname, 'r') as f: | |
f = iter(f) | |
hdr = next(f) | |
hdr = hdr.strip('#').rstrip() | |
dataline = next(f) | |
known_fields = { | |
'Offset':np.int64, | |
'Filename':'U512', | |
} | |
# Now split on whitespace | |
fields = hdr.split() | |
datavals = dataline.split() | |
fields_and_type = [None]*len(fields) | |
for ii, (fld, val) in enumerate(zip(fields, datavals)): | |
if 'ID' in fld.upper(): | |
dtype = np.int64 | |
else: | |
try: | |
dtype = known_fields[fld] | |
except KeyError: | |
dtype = _guess_dtype_from_val(val) | |
print(f"Warning: Did not find key = '{fld}' in the known "\ | |
f"columns. Assuming '{dtype}'") | |
pass | |
fields_and_type[ii] = (fld, dtype) | |
return np.dtype(fields_and_type) | |
# The main `read_locations_and_forests` function begins here | |
forests_dtype = _get_dtype_from_header(forests_fname) | |
locations_dtype = _get_dtype_from_header(locations_fname) | |
# Check that both the dtypes have the 'common' field that | |
# we are later going to use to join the two files | |
join_key = 'TreeRootID' | |
if not join_key in forests_dtype.names or \ | |
not join_key in locations_dtype.names: | |
msg = f"Error: Expected to find column = '{join_key}' in "\ | |
f"both the forests file ('{forests_fname}') and "\ | |
f"the locations file ('{locations_fname}')" | |
msg += f"Columns in forests file = {forests_dtype.names}" | |
msg += f"Columns in locations file = {locations_dtype.names}" | |
raise KeyError(msg) | |
t0 = time.perf_counter() | |
print(f"[Rank={rank}]: Reading forests file '{forests_fname}'") | |
forests = np.loadtxt(forests_fname, comments='#', dtype=forests_dtype) | |
t1 = time.perf_counter() | |
print(f"[Rank={rank}]: Reading forests file '{forests_fname}'...done. " | |
f"Time taken = {t1-t0:.2f} seconds") | |
t0 = time.perf_counter() | |
print(f"[Rank={rank}]: Reading locations file '{locations_fname}' ...") | |
locations = np.loadtxt(locations_fname, comments='#', dtype=locations_dtype) | |
t1 = time.perf_counter() | |
print(f"[Rank={rank}]: Reading locations file '{locations_fname}' ...done. " | |
f"Time taken = {t1-t0:.2f} seconds") | |
# Check that the base directory is the same | |
# for the 'forests.list' and 'locations.dat' files | |
# Otherwise, since the code the code logic will fail in the conversion | |
dirname = list(set([os.path.dirname(f) for f in [forests_fname, locations_fname]])) | |
if len(dirname) != 1: | |
msg = "Error: Standard Consistent-Trees catalogs *should* "\ | |
"create the 'forests.list' and 'locations.dat' in the "\ | |
f"same directory. Instead found directories = {dirname}\n"\ | |
f"Input files were = ({forests_fname}, {loocations_fname})" | |
raise ValueError(msg) | |
dirname = dirname[0] | |
# The 'locations.dat' file does not have fully-qualified paths | |
# Add the prefix so all future queries woork out just fine | |
filenames = [f'{dirname}/{fname}' for fname in locations['Filename'] if '/' not in fname] | |
locations['Filename'][:] = filenames | |
t0 = time.perf_counter() | |
print(f"[Rank={rank}]: Joining the forests and locations arrays...") | |
trees_and_locations = rfn.join_by(join_key, forests, locations, jointype='inner') | |
if trees_and_locations.shape[0] != forests.shape[0]: | |
raise AssertionError("Error: Inner join failed to preserve the shape") | |
trees_and_locations = rfn.append_fields(trees_and_locations, 'TreeNbytes', | |
np.zeros(trees_and_locations.shape[0], | |
dtype=np.int64)) | |
t1 = time.perf_counter() | |
print(f"[Rank={rank}]: Joining the forests and locations arrays...done. " | |
f"Time taken = {t1-t0:.2f} seconds") | |
# All the fields have been assigned into the combined structure | |
# Now, we need to figure out the bytes size of each forest by sorting on | |
# FileID and Offset. The locations.dat file contains the offset where | |
# the tree data starts but does not contain where the tree data ends. By | |
# sorting on ('FileID', 'Offset'), we can have a guess at where the current | |
# tree ends by looking at where the next tree starts, and then removing all | |
# intervening bytes. Here, the intervening bytes are what's written out by | |
# this line -> https://bitbucket.org/pbehroozi/consistent-trees/src/9668e1f4d396dcecdcd1afab6ac0c80cbd561d72/src/assemble_halo_trees.c#lines-202 | |
# | |
# fprintf(tree_outputs[file_id], "#tree %"PRId64"\n", halos[0].id); | |
# | |
# Since we already have the TreeRootID, we know *exactly* how many bytes | |
# are taken up by that printf statement. We subtract that many bytes from | |
# the next-tree-starting-offset, then we have the current-tree-ending-offset. | |
# The last tree in the file will have to special-cased -- in this case, the | |
# end of tree data is known to be at end-of-file (EOF). -- MS 16/04/2020 | |
# Sort the array on filename, and then by offset | |
# Sorting by fileID is faster than sorting by filename | |
trees_and_locations.sort(order=['FileID', 'Offset']) | |
ntrees = trees_and_locations[join_key].shape[0] | |
# Calculate the number of bytes in every tree | |
t0 = time.perf_counter() | |
print(f"[Rank={rank}]: Computing number of bytes per tree...") | |
treenbytes = np.zeros(ntrees, dtype=np.int64) | |
nexttid = trees_and_locations[join_key][1:] | |
nexttidlen = np.array([len(f"#tree {x:d}\n") for x in nexttid], dtype=np.int64) | |
nextstart = trees_and_locations['Offset'][1:] | |
thisend = nextstart - nexttidlen | |
treenbytes[0:-1] = thisend - trees_and_locations['Offset'][0:-1] | |
# Now fix the last tree for any file, (special-case the absolute last tree) | |
uniq_tree_filenames = list(set(trees_and_locations['Filename'])) | |
filesizes_dict = {fname: os.stat(fname).st_size for fname in uniq_tree_filenames} | |
fileid_changes_ind = [ntrees-1] | |
if len(uniq_tree_filenames) > 1: | |
fileid = trees_and_locations['FileID'][:] | |
fileid_diffs = np.diff(fileid) | |
ind = (np.where(fileid_diffs != 0))[0] | |
if ind: | |
fileid_changes_ind.extend(ind) | |
for ii in fileid_changes_ind: | |
fname, off = trees_and_locations['Filename'][ii], trees_and_locations['Offset'][ii] | |
treenbytes[ii] = filesizes_dict[fname] - off | |
if treenbytes.min() <= 0: | |
msg = "Error: Trees must span more than 0 bytes. "\ | |
f"Found treenbytes.min() = {treenbytes.min()}" | |
raise AssertionError(msg) | |
trees_and_locations['TreeNbytes'][:] = treenbytes | |
t1 = time.perf_counter() | |
print(f"[Rank={rank}]: Computing number of bytes per tree...done. " | |
f"Time taken = {t1-t0:.2f} seconds") | |
# Now every tree has an associated size in bytes | |
# Now let's re-sort the trees such that all trees from | |
# the same forest are contiguous in the array. | |
trees_and_locations.sort(order=['ForestID', 'Filename', 'Offset'], kind='heapsort') | |
return trees_and_locations | |
def get_aggregate_forest_info(trees_and_locations, rank=0): | |
""" | |
Returns forest-level information from the tree-level information | |
supplied. | |
Parameters | |
----------- | |
trees_and_locations: numpy structured array, required | |
A numpy structured array that contains the tree-level | |
information. Should be the output from the function | |
``read_locations_and_forests`` | |
rank: integer, optional, default:0 | |
An integer identifying which task is calling this function. Only used in | |
status messages | |
Returns | |
------- | |
forest_info: A numpy structured array | |
The structured array contains the fields | |
['ForestID', 'ForestNhalos', 'Input_ForestNbytes', 'Ntrees'] | |
The 'ForestNhalos' field is set to 0, and is populated as the | |
trees are processed. The number of elements in the array is | |
equal to the number of forests. | |
""" | |
import numpy as np | |
# The smallest granularity for parallelisation is a | |
# single forest --> therefore, we also create this | |
# additional array with info at the forest level. | |
ntrees = trees_and_locations.shape[0] | |
# The strategy requires the 'trees_and_locations' to be | |
# already sorted on 'ForestID' | |
uniq_forestids, ntrees_per_forest = np.unique(trees_and_locations['ForestID'], \ | |
return_counts=True) | |
nforests = uniq_forestids.shape[0] | |
forest_info_dtype = np.dtype([('ForestID', np.int64), | |
('ForestNhalos', np.int64), | |
('Input_ForestNbytes', np.int64), | |
('Ntrees', np.int64)]) | |
forest_info = np.empty(nforests, dtype=forest_info_dtype) | |
forest_info['ForestID'][:] = uniq_forestids | |
# We can only fill in the number of halos *after* reading in | |
# the halos (across all the trees) | |
forest_info['ForestNhalos'][:] = 0 | |
forest_info['Ntrees'][:] = ntrees_per_forest | |
t0 = time.perf_counter() | |
print(f"[Rank={rank}]: Calculating number of bytes per forest...") | |
ends = np.cumsum(ntrees_per_forest) | |
if ends[-1] != ntrees: | |
msg = "Error: Something strange happened while computing the number of bytes "\ | |
"per tree. Expected the last element of the array containing the "\ | |
"cumulative sum of number of trees per forest = '{}' to equal the total number "\ | |
"of trees being processed = '{}'. Please check that the TreeRootID and ForestID "\ | |
"values are unique...exiting" | |
raise AssertionError(msg) | |
starts = np.zeros_like(ends, dtype=np.int64) | |
starts[1:] = ends[0:-1] | |
treenbytes = trees_and_locations['TreeNbytes'][:] | |
if treenbytes.min() <= 0: | |
msg = "Error: While computing the number of bytes per forest. Expected *all* trees "\ | |
"to contain at least one entry (i.e., span non-zero bytes in the input file)"\ | |
"However, minimum of the array containing the number of bytes per tree "\ | |
f"= '{treenbytes.min()}'. Perhaps the input files are corrupt? Exiting..." | |
raise AssertionError(msg) | |
forestnbytes = np.array([treenbytes[start:end].sum() for start, end in zip(starts, ends)]) | |
if forestnbytes.shape[0] != nforests: | |
msg = "Error: There must be some logic issue in the code. Expected "\ | |
"that the shape of the array containing the number of bytes "\ | |
f"per forest = '{forestnbytes.shape[0]}' to be *exactly* "\ | |
f"equal to nforests = '{nforests}'. Exiting..." | |
raise AssertionError(msg) | |
# Since we have already checked that 'treenbytes.min() > 0' | |
# --> no need to check that forestnbytes.min() is > 0 | |
forest_info['Input_ForestNbytes'][:] = forestnbytes | |
t1 = time.perf_counter() | |
print(f"[Rank={rank}]: Calculating number of bytes per forest...done. " | |
f"Time taken = {t1-t0:0.2f} seconds") | |
return forest_info | |
def get_all_parallel_ctrees_filenames(fname): | |
""" | |
Returns three filenames corresponding to the 'forests.list', | |
'locations.dat', and 'tree_*.dat' files *assuming* the naming | |
convention the Uchuu collaboration's parallel Consistent-Trees code | |
Parameters | |
----------- | |
fname: string, required | |
A filename specifying the tree data file for a parallel Consistent-Trees | |
output | |
Returns | |
-------- | |
forests_file, locations_file, treedata_file: strings, filenames | |
The filenames corresponding to the 'forests.list', 'locations.dat' and | |
the 'tree_*.dat' file *generated using* the convention of the Uchuu | |
colloboration. The convention is: | |
'forests.list' ---> '<prefix>.forest' | |
'locations.dat' ---> '<prefix>.loc' | |
'tree_*.dat' ---> '<prefix>.tree' | |
""" | |
import os | |
fname = check_and_decompress(fname) | |
# Need to create the three filenames for the | |
# forests.list, locations.dat and tree*.dat files | |
dirname = os.path.dirname(fname) | |
# Get the filename and remove the file extension | |
basename, _ = os.path.splitext(os.path.basename(fname)) | |
forests_file = f'{dirname}/{basename}.forest' | |
locations_file = f'{dirname}/{basename}.loc' | |
treedata_file = f'{dirname}{basename}.tree' | |
return forests_file, locations_file, treedata_file | |
def get_forests_locations_filenames(filenames): | |
import os | |
if len(filenames) != 2: | |
msg = "Error: Expected to get only two filenames. Instead got "\ | |
f"filenames = '{filenames}' with len(filenames) = {len(filenames)}" | |
raise AssertionError(msg) | |
forests_file, locations_file = filenames[0], filenames[1] | |
# Check if the first line in the potential `forests_file` contains | |
# the expected 'ForestID'. We could even check that the entire | |
# first line matches '#TreeRootID ForestID' | |
with generic_reader(forests_file, 'rt') as f: | |
line = f.readline() | |
if 'ForestID' not in line: | |
forests_file, locations_file = locations_file, forests_file | |
# Now check that the locations file is a valid CTrees file | |
with generic_reader(locations_file, 'rt') as f: | |
line = f.readline() | |
if 'FileID' not in line or \ | |
'Offset' not in line or \ | |
'Filename' not in line: | |
msg = f"Error: The first line in the locations_file = '{locations_file}' "\ | |
f"does not contain the expected fields. First line = '{line}'" | |
raise AssertionError(msg) | |
return forests_file, locations_file | |
def validate_inputs_are_ctrees_files(ctrees_filenames, base_metadata=None, | |
base_version=None, base_input_catalog_type=None): | |
""" | |
Checks the files contain Consistent-Trees catalogues derived from | |
the same simulation. | |
Parameters | |
----------- | |
ctrees_filenames: list of filenames, string, required | |
The input filenames (potentially) containing Consistent-Trees | |
catalogues. | |
Note: Only unique filenames within this list are checked | |
Returns | |
-------- | |
The FofID field Returns ``True`` when all the files containing valid Consistent-Trees | |
catalogues, *and* the same header info, i.e., same simulation + Consistent-Trees | |
setup. Otherwise, a ``ValueError`` is raised. | |
""" | |
import numpy as np | |
## Read the entire header meta-data from the first file | |
files = np.unique(ctrees_filenames) | |
if not base_version: | |
base_metadata_dict = get_metadata(files[0]) | |
base_metadata = np.string_(base_metadata_dict['metadata']) | |
base_version = base_metadata_dict['version'] | |
base_input_catalog_type = base_metadata_dict['catalog_type'] | |
nfiles = files.shape[0] | |
for fname in files: | |
metadata_dict = get_metadata(fname) | |
metadata = np.string_(metadata_dict['metadata']) | |
version = metadata_dict['version'] | |
input_catalog_type = metadata_dict['catalog_type'] | |
if 'Consistent' not in input_catalog_type: | |
msg = "Error: This script is meant *only* to process "\ | |
"Consistent-Tree catalogs. Found catalog type = "\ | |
f"{input_catalog_type} instead (input file = {fname})" | |
raise ValueError(msg) | |
if 'hlist' in input_catalog_type: | |
msg = "Error: This script is meant *only* to process "\ | |
"Consistent-Tree catalogs containing tree data. "\ | |
"Found a halo catalogue (generated by "\ | |
"Consistent-Trees)\" instead. catalog type = "\ | |
f"{input_catalog_type} instead (input file = {fname})" | |
raise ValueError(msg) | |
if base_input_catalog_type != input_catalog_type: | |
msg = f"Error: Catalog type = '{input_catalog_type}' for file '{fname}' does *not* "\ | |
f"match the catalog type = '{base_input_catalog_type}' for file '{files[0]}'" | |
raise ValueError(msg) | |
if base_version != version: | |
msg = "Error: Version = '{version}' for file '{fname}' does *not* "\ | |
"match the version = '{base_version}' for file '{files[0]}'" | |
raise ValueError(msg) | |
if not np.array_equal(base_metadata, metadata): | |
msg = "Error: metadata = '{metadata}' for file '{fname}' does *not* "\ | |
"match the metadata = '{base_metadata}' for file '{files[0]}'" | |
raise ValueError(msg) | |
return True | |
def assign_fofids(forest, rank=0): | |
""" | |
Fills the FofID field for all halos. | |
Parameters | |
----------- | |
forest: numpy structured array, required | |
An array containing all halos from the same forest | |
Note: The tree-walking indices (i.e., columns returned by | |
the function ``get_treewalk_dtype_descr``) are expected to be | |
filled with -1. | |
rank: integer, optional, default=0 | |
The unique identifier for the current task. Only used | |
within an error statement | |
Returns | |
-------- | |
The FofID field is filled in-place, and this function has | |
no return value | |
""" | |
import numpy as np | |
# Assign fofhaloids to the halos | |
fofhalos = (np.where(forest['pid'] == -1))[0] | |
if len(fofhalos) == 0: | |
nhalos = forest.shape[0] | |
msg = f"Error: There are no FOF halos among these {nhalos} passed.\n" | |
msg += f"forest = {forest}" | |
raise ValueError(msg) | |
# First fix the upid, and assign the (self) FofIDs | |
# to the FOF halos | |
fofhalo_ids = forest['id'][fofhalos] | |
forest['upid'][fofhalos] = fofhalo_ids | |
forest['FofID'][fofhalos] = fofhalo_ids | |
# First set the upid, fofid for the FOFs | |
haloids = forest['id'][:] | |
sorted_halo_idx = np.argsort(haloids) | |
rem_sub_inds = (np.where(forest['FofID'] == -1))[0] | |
nleft = len(rem_sub_inds) | |
while nleft > 0: | |
# Need to match un-assigned (pid, upid) with fofid, id of halos with assigned fofid | |
nassigned = 0 | |
for fld in ['upid', 'pid']: | |
if len(rem_sub_inds) != nleft: | |
msg = "Error: (Bug in code) Expected to still have some "\ | |
f"remaining subhalos that needed assigning, since nleft = {nleft} "\ | |
f"with len(rem_sub_inds) = {len(rem_sub_inds)}\n"\ | |
f"rem_sub_inds = {rem_sub_inds} " | |
raise AssertionError(msg) | |
fld_id = forest[fld][rem_sub_inds] | |
# 'pid'/'upid' might be duplicated -> | |
# np.intersect1d will only return one value; other | |
# halos with the same 'pid' will not get updated to the | |
# correct fof. Need a 'searchsorted' like implementation | |
# but know that not *all* pids will be found | |
fld_idx = np.searchsorted(haloids, fld_id, sorter=sorted_halo_idx) | |
forest['FofID'][rem_sub_inds] = forest['FofID'][sorted_halo_idx[fld_idx]] | |
nassigned += len(rem_sub_inds) | |
rem_sub_inds = np.where(forest['FofID'] == -1)[0] | |
nassigned -= len(rem_sub_inds) | |
assert nassigned >= 0, f"Error: nassigned = {nassigned} must be at least 0." | |
nleft = len(rem_sub_inds) | |
if nleft == 0: | |
break | |
if nleft > 0 and nassigned == 0: | |
msg = f"[Rank={rank}]: Error: There are {nleft} halos left without fofhalos assigned but "\ | |
"could not assign a single fof halo in this iteration\n" | |
xx = np.where(forest['FofID'] == -1)[0] | |
msg += f"forest['pid'][xx] = {forest['pid'][xx]}\n" | |
msg += f"forest['upid'][xx] = {forest['upid'][xx]}\n" | |
msg += f"forest[xx] = {forest[xx]}\n" | |
pid = forest['pid'][xx] | |
xx = np.where(forest['id'] == pid)[0] | |
msg += f"pid = {pid} forest['id'] = {forest['id'][xx]} (should be equal to 'pid')\n" | |
msg += f"fofid for the 'pid' halo = {forest['FofID'][xx]}" | |
raise ValueError(msg) | |
# Reset the FOF upid field | |
assert forest['pid'][fofhalos].min() == -1 | |
assert forest['pid'][fofhalos].max() == -1 | |
forest['upid'][fofhalos] = -1 | |
return | |
def get_treewalk_dtype_descr(): | |
""" | |
Returns the description for the additional fields | |
to add to the forest for walking the mergertree | |
Parameters | |
----------- | |
None | |
Returns | |
-------- | |
mergertree_descr: list of tuples | |
A list of tuples containing the names and datatypes | |
for the additional columns needed for walking the | |
mergertree. This list can be used to create a numpy | |
datatype suitable to contain the additional mergertree | |
indices | |
""" | |
import numpy as np | |
mergertree_descr = [('FofID', np.int64), | |
('FirstHaloInFOFgroup', np.int64), | |
('NextHaloInFOFgroup', np.int64), | |
('PrevHaloInFOFgroup', np.int64), | |
('FirstProgenitor', np.int64), | |
('NextProgenitor', np.int64), | |
('PrevProgenitor', np.int64), | |
('Descendant', np.int64)] | |
return mergertree_descr | |
def add_tree_walk_indices(forest, rank=0): | |
""" | |
Adds the various mergertree walking indices | |
Parameters | |
----------- | |
forest: numpy structured array, required | |
An array containing all halos from the same forest | |
rank: integer, optional, default=0 | |
The unique identifier for the current task. Only used | |
within an error statement | |
Returns | |
-------- | |
None | |
The mergertree indices are filled in-place and no additional | |
return occurs | |
""" | |
import numpy as np | |
mergertree_descr = get_treewalk_dtype_descr() | |
# initialise all the mergertree indices | |
for name, _ in mergertree_descr: | |
forest[name][:] = -1 | |
# Assign the FOFhalo IDs | |
assign_fofids(forest, rank) | |
# Assign the index for the fofhalo | |
forest['Mvir'] *= -1 # hack so that the sort is decreasing order | |
sorted_fof_order = forest.argsort(order=['scale', 'FofID', 'upid', 'Mvir', 'id']) | |
forest['Mvir'] *= -1 # reset mass back | |
# upid of FOFs is set to -1, therefore the FOF halo should be the first | |
# halo with that FOFID (i.e., equal to its own haloID) | |
uniq_fofs, firstfof_inds = np.unique(forest['FofID'][sorted_fof_order], | |
return_index=True) | |
nextsub_loc = np.split(sorted_fof_order, firstfof_inds[1:]) | |
for lhs in nextsub_loc: | |
assert forest['FofID'][lhs].min() == forest['FofID'][lhs].max() | |
assert forest['FofID'][lhs].min() == forest['id'][lhs[0]] | |
forest['FirstHaloInFOFgroup'][lhs] = lhs[0] | |
forest['NextHaloInFOFgroup'][lhs] = [*lhs[1:], -1] | |
forest['PrevHaloInFOFgroup'][lhs] = [-1, *lhs[:-1]] | |
# Now match the descendants | |
haloids = forest['id'][:] | |
desc_ids = forest['desc_id'][:] | |
sorted_halo_idx = np.argsort(haloids) | |
valid_desc_idx = desc_ids != -1 | |
desc_ind = np.searchsorted(haloids, desc_ids[valid_desc_idx], side='left', sorter=sorted_halo_idx) | |
np.testing.assert_array_equal(desc_ids[valid_desc_idx], haloids[sorted_halo_idx[desc_ind]], | |
err_msg="searchsorted did not work as expected") | |
forest['Descendant'][valid_desc_idx] = sorted_halo_idx[desc_ind] | |
# Check that *all* desc_ids were found | |
assert set(haloids[sorted_halo_idx[desc_ind]]) == set(desc_ids[valid_desc_idx]) | |
np.testing.assert_array_equal(forest['desc_id'][valid_desc_idx], | |
forest['id'][sorted_halo_idx[desc_ind]], | |
err_msg='descendant ids are incorrect') | |
np.testing.assert_array_equal(forest['desc_scale'][valid_desc_idx], | |
forest['scale'][sorted_halo_idx[desc_ind]], | |
err_msg='descendant scales are incorrect') | |
# # Now assign (firstprog, nextprog) | |
# # Sort on the descendants and then mass, but only get the index | |
# # This is confusing, but really what the sorted index refers to are the progenitors | |
# # and this is the order in which the progenitors should be assigned as | |
# # FirstProg, NextProg | |
forest['Mvir'] *= -1 # hack so that the sort is decreasing order | |
sorted_prog_inds = np.argsort(forest, order=['desc_id', 'Mvir']) | |
forest['Mvir'] *= -1 # reset the masses back | |
# t0 = time.perf_counter() | |
valid_desc_idx = np.where(forest['desc_id'][sorted_prog_inds] != -1)[0] | |
uniq_descid, firstprog_inds = np.unique(forest['desc_id'][sorted_prog_inds[valid_desc_idx]], | |
return_index=True) | |
desc_for_firstprog_idx = np.searchsorted(haloids, uniq_descid, | |
sorter=sorted_halo_idx) | |
# np.testing.assert_array_equal(firstprog, forest['FirstProgenitor']) | |
forest['FirstProgenitor'][sorted_halo_idx[desc_for_firstprog_idx]] = \ | |
sorted_prog_inds[valid_desc_idx[firstprog_inds]] | |
# Check that the firstprog assignment was okay | |
xx = np.where(forest['FirstProgenitor'] != -1)[0] | |
firstprog = forest['FirstProgenitor'][xx] | |
np.testing.assert_array_equal(forest['desc_id'][firstprog], forest['id'][xx]) | |
valid_prog_loc = np.split(sorted_prog_inds[valid_desc_idx], firstprog_inds[1:]) | |
for lhs in valid_prog_loc: | |
assert forest['desc_id'][lhs].min() == forest['desc_id'][lhs].max() | |
desc = forest['Descendant'][lhs[0]] | |
assert forest['FirstProgenitor'][desc] == lhs[0] | |
mvir = forest['Mvir'][lhs] | |
assert np.all(np.diff(mvir) <= 0.0) | |
forest['NextProgenitor'][lhs] = [*lhs[1:], -1] | |
forest['PrevProgenitor'][lhs] = [-1, *lhs[0:-1]] | |
return |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
####################################################### | |
# Author: Manodeep Sinha | |
# Email: manodeep@gmail.com | |
# Date: 23/01/2020 | |
# License: MIT | |
# Purpose: Combine multiple Rockstar into | |
# a single ascii file. The generated files | |
# will conform to the file-format of the | |
# input files, and the header will appear | |
# once. | |
# ALL input files are checked that they are | |
# of the same type (i.e., will prevent you from | |
# accidentally merging Rockstar catalogues from | |
# different simulations) | |
####################################################### | |
function get_header() { | |
fname="$1" | |
hdr="$2" | |
# Print all lines preceeding the first data line (something that begins with digits) | |
# Adapted from https://unix.stackexchange.com/a/11323 | |
# Awk prints until it encounters a data line -> but then awk | |
# exits before printing that line. Therefore, all comment lines | |
# are captured. | |
# NOTE: Does not assume a comment character | |
awk '/^[0-9]+/ {exit} {print}' "$fname" > "$hdr" | |
return $? | |
} | |
function verify_headers() { | |
numfiles="$#" | |
firstfile="$1" | |
# Check that the file exists | |
if [ ! -f "$firstfile" ]; then | |
echo "Error: Could not locate input file: '$firstfile'" | |
return 1 | |
fi | |
#echo "$firstfile is the first file" | |
# This 'hdrfile' will only contain the comments | |
# (which are expected to be lines beginning with '#') | |
# Add in the maximum expected number of header lines and | |
# first print only those lines | |
hdrfile="$(mktemp)" | |
tmpfile="$(mktemp)" | |
# Shift moves the parameter list to the left (think of | |
# it as popping of the first argument since we have | |
# stored that in "$firstfile" | |
shift | |
# Now get the header in the first file | |
get_header "$firstfile" "$hdrfile" | |
retval=$? | |
if [ $retval -ne 0 ]; then | |
echo "Error with extracting header (i.e., comments) lines from input file $firstfile and writing to $hdrfile" | |
return $retval | |
fi | |
# Check that 'Rockstar' is in the header | |
required_string='Rockstar' | |
grep "$required_string" "$hdrfile" > /dev/null | |
retval=$? | |
if [ $retval -ne 0 ]; then | |
echo "Error: This script only works for specific kinds of files" | |
echo "Could not find in the string '$required_string' in the file '$firstfile'" | |
return $retval | |
fi | |
# Loop over the remaining files that need to be combined (i.e., second file onwards) | |
for fname in "$@"; do | |
if [ ! -f "$fname" ]; then | |
echo "Error: Could not locate input file: '$fname'" | |
return 1 | |
fi | |
#echo "Now checking whether $fname has the same header as $firstfile" | |
# Extract the header | |
get_header "$fname" "$tmpfile" | |
retval=$? | |
if [ $retval -ne 0 ]; then | |
echo "Error with extracting header (i.e., comments) lines from input file $fname and writing to $tmpfile" | |
return $retval | |
fi | |
# Check that the header matches with the header from first file | |
diff -q "$hdrfile" "$tmpfile" >/dev/null | |
retval=$? | |
if [ $retval -ne 0 ]; then | |
echo "Error: The headers are not identical between two files" | |
echo "Offending files $fname (header in $tmpfile) is not identical to the header for $firstfile (header in $hdrfile)" | |
return $retval | |
fi | |
done | |
# Remove the temporary header files | |
rm $hdrfile $tmpfile | |
echo "Success! All of the $numfiles input files have the same header" | |
return 0 | |
} | |
function join_files() { | |
outputfile="$1" | |
# shift pops the outputfile from the (variable) argument list | |
# stored in "$@" | |
shift | |
# All arguments succeeding are input files | |
numinputfiles="$#" | |
# Now ensure that ALL the input files have the same header | |
verify_headers "$@" | |
retval=$? | |
if [ $retval -ne 0 ]; then | |
return $retval | |
fi | |
# All the files exist and have been verified to have the same header | |
firstfile="$1" | |
shift | |
#echo "$firstfile is the first file" | |
cat "$firstfile" > "$outputfile" | |
retval=$? | |
if [ $retval -ne 0 ]; then | |
echo "Error: Could not write the first file into the output file" | |
echo "Offending files are: Input file is $firstfile, and output file is $outputfile" | |
return $retval | |
fi | |
for fname in "$@"; do | |
#echo "Adding the data from $fname into $outputfile" | |
grep -v "^#" "$fname" >> "$outputfile" | |
retval=$? | |
if [ $retval -ne 0 ]; then | |
echo "Error: Could not concatenate $file into the output file: $outputfile" | |
return $retval | |
fi | |
done | |
echo "Success! Combined all $numinputfiles into a single output file = '$outputfile'" | |
return 0 | |
} | |
# Now call the join_files function with the output file | |
# and the list of input files | |
# Invocation could be `/bin/bash join_files.sh 'combined.rockstar' 'out_28_*.rockstar'` | |
if [ "$#" -lt 3 ]; then | |
echo "Error: Please at least three command-line arguments" | |
echo "<output file> -- string specifying the *combined* ascii filename (*gets overwritten*)" | |
echo "<list of filenames> -- input filenames that will be combined (all files must exist, must be of same type)" | |
exit 1 | |
fi | |
join_files "$@" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
####################################################### | |
# Author: Manodeep Sinha | |
# Email: manodeep@gmail.com | |
# Date: 22/01/2019 | |
# License: MIT | |
# Purpose: Split Rockstar ascii catalogues into | |
# smaller files while retaining the header. | |
# The generated files will conform to the | |
# file-format of the input files | |
####################################################### | |
split_filter() { | |
{ cat "$hdrfile"; cat; } > "$FILE"; | |
} | |
function split_file() { | |
fullfile=$1 | |
nfiles=$2 | |
outdir=$3 | |
# Trim any repeating trailing slashes | |
# Taken from https://stackoverflow.com/a/32845647/2237582 | |
outdir=$(echo $outdir | sed 's:/*$::') | |
# Check that the file exists | |
if [ ! -f "$fullfile" ]; then | |
echo "Error: Could not locate file: '$fullfile'" | |
return 1 | |
fi | |
# If only requesting <= 1 chunk -> there is nothing to do | |
if [ $nfiles -le 1 ]; then | |
echo "Requested chunks = $nfiles implies no work...returning" | |
return 0 | |
fi | |
# Check that output directory exists | |
if [ ! -d "$outdir" ]; then | |
echo "Error: Output directory '$outdir' does not exist" | |
return 1 | |
fi | |
# Check that output directory is writable | |
if [ ! -w "$outdir" ]; then | |
echo "Error: Output directory '$outdir' is not writable" | |
echo "Please select a different output directory where you have write permissions" | |
return 1 | |
fi | |
# Taken from https://unix.stackexchange.com/a/285928 | |
currentver=$(split --version | head -n 1 | awk '{print $NF}') | |
# The bottleneck is the coreutils tool 'split' | |
# We need the '--filter' functionality (introduced in v8.11) | |
# and the '--additional-suffix' keyword (introduced in v8.16) | |
requiredver="8.16" | |
# Taken from: https://unix.stackexchange.com/a/285928 | |
if [ ! "$(printf '%s\n' "$requiredver" "$currentver" | sort -V | head -n1)" = "$requiredver" ]; then | |
echo "Error: The version of the system command 'split' is too old" | |
echo "Minimum version of the coreutils tool required is $requiredver and only found version = $currentver" | |
echo "You will need to upgrade coreutils - either ask your system administrator" | |
echo "Or install through conda with 'conda install -c conda-forge coreutils'" | |
return 1 | |
fi | |
# This 'hdrfile' will only contain the comments | |
# (which are expected to be lines beginning with '#') | |
# Add in the maximum expected number of header lines and | |
# first print only those lines | |
export hdrfile="$(mktemp -p $outdir)" | |
# Print all lines preceeding the first data line (something that begins with digits) | |
# Adapted from https://unix.stackexchange.com/a/11323 | |
awk '/^[0-9]+/ {exit} {print}' "$fullfile" > "$hdrfile" | |
retval=$? | |
if [ $retval -ne 0 ]; then | |
echo "Error with extracting header (i.e., comments) lines from input file $fullfile and writing to $hdrfile" | |
return $retval | |
fi | |
# Check that the input file is a Rockstar catalogue | |
# (since there is a logical grouping for the Consistent-Trees | |
# catalogues where all halos belonging to the same tree | |
# are sequential -> therefore splitting a CTrees file requires | |
# a more specialised splitting tool) | |
required_string='Rockstar' | |
grep "$required_string" "$hdrfile" > /dev/null | |
retval=$? | |
if [ $retval -ne 0 ]; then | |
echo "Error: This script only works for specific kinds of files" | |
echo "Could not find in the string '$required_string' in the file '$fullfile'" | |
return $retval | |
fi | |
# All input validation and tool validation is done -> let's split the input file | |
echo "Splitting '$fullfile' into '$nfiles' chunks and writing to '$outdir' ..." | |
export -f split_filter | |
# The header file has already been generated successfully | |
nhdrlines=$(wc -l < "$hdrfile") | |
firstdataline=$((nhdrlines+1)) | |
# Extract the actual filename (remove the paths, if present) | |
filename=$(basename -- "$fullfile") | |
extension=".${filename##*.}" | |
filebasename="${filename%.*}_" | |
# Split the input file "nfiles" chunks | |
# The generated filename will be "'outdir/filebasename' + '%02d' + 'extension'" | |
tail -n +"$firstdataline" "$fullfile" | split -n r/$nfiles --filter=split_filter -d --additional-suffix="$extension" - "${outdir}/${filebasename}" | |
retval=$? | |
if [ $retval -ne 0 ]; then | |
echo "****** Error with split *******" | |
echo "If you see an error message relating to 'unrecognized option --additional-suffix='" | |
echo "Then your version of the system command 'split' is too old" | |
echo "You will need to upgrade coreutils - either ask your system administrator" | |
echo "Or install through conda with 'conda install -c conda-forge coreutils'" | |
echo "Intermediate file created is: " "$hdrfile" | |
echo "*******************************" | |
return $retval | |
fi | |
# Remove the temporary header file | |
rm $hdrfile | |
echo "Splitting '$fullfile' into '$nfiles' chunks and writing to '$outdir' ......done" | |
return 0 | |
} | |
# Now call the function with the input file and | |
# the number of chunks, and the output directory | |
# as the (only) three arguments | |
# Invocation could be `/bin/bash split_file.sh 'out_28.rockstar' 5 './'` | |
if [ "$#" -ne 3 ]; then | |
echo "Error: Please pass exactly two command-line arguments" | |
echo "<input file> -- string specifying the ascii filename to be split" | |
echo "<number of split files> -- integer specifying the number of files to split into" | |
echo "<output dir> -- string specifying the directory where the output files will be written to (and temporary files)" | |
exit 1 | |
fi | |
split_file "$@" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import print_function | |
import os | |
## From Yao-Yuan Mao's SimulationAnalysis.py (part of helper package) | |
import re | |
import io | |
import numpy as np | |
from builtins import range, zip | |
import inspect | |
def _isstring(s): | |
try: | |
s + '' | |
except TypeError: | |
return False | |
return True | |
class BaseParseFields(): | |
def __init__(self, header, fields=None): | |
if len(header)==0: | |
if all(isinstance(f, int) for f in fields): | |
self._usecols = fields | |
self._formats = [float]*len(fields) | |
self._names = ['f%d'%f for f in fields] | |
else: | |
raise ValueError('header is empty, so fields must be a list '\ | |
'of int.') | |
else: | |
header_s = [self._name_strip(__) for __ in header] | |
if not fields: | |
self._names = header | |
names_s = header_s | |
self._usecols = list(range(len(names_s))) | |
else: | |
if _isstring(fields): | |
fields = [fields] | |
self._names = [header[f] if isinstance(f, int) else str(f) \ | |
for f in fields] | |
names_s = [self._name_strip(__) for __ in self._names] | |
wrong_fields = [str(f) for s, f in zip(names_s, fields) \ | |
if s not in header_s] | |
if wrong_fields: | |
raise ValueError('The following field(s) are not available'\ | |
': %s.\nAvailable fields: %s.'%(\ | |
', '.join(wrong_fields), ', '.join(header))) | |
self._usecols = [header_s.index(__) for __ in names_s] | |
self._formats = [self._get_format(__) for __ in names_s] | |
self.dtype = np.dtype({'names':self._names, \ | |
'formats':self._formats}) | |
def parse_line(self, l): | |
items = l.split() | |
try: | |
return tuple(c(items[i]) for i, c in zip(self._usecols, self._formats)) | |
except Exception as _error: | |
print('Something wrong when parsing this line:\n{0}'.format(l)) | |
raise _error | |
def pack(self, X): | |
return np.array(X, self.dtype) | |
def _name_strip(self, s): | |
return self._re_name_strip.sub('', s).lower() | |
def _get_format(self, s): | |
return float if self._re_formats.search(s) is None else int | |
_re_name_strip = re.compile('\W|_') | |
_re_formats = re.compile('^phantom$|^mmp$|id$|^num|num$') | |
## End of code lifted from Yao-Yuan Mao's SimulationAnalysis.py | |
def get_parser(filename, fields=None): | |
with io.open(filename, 'r') as f: | |
# Taken from SimulationAnalysis.py | |
hdrline = next(f) | |
hdrline = hdrline.strip().lstrip('#').lstrip() | |
header = [re.sub('\(\d+\)$', '', s) for s in hdrline.split()] | |
# print("Header = {}".format(header)) | |
p = BaseParseFields(header, fields) | |
# print("p.dtype = {}".format(p.dtype)) | |
# Above taken from SimulationAnalysis.py | |
return p | |
def read_lines_chunks(filename, chunksize, usecols, formats): | |
import io | |
def parse_line(l, usecols, formats): | |
items = l.split() | |
return tuple(c(items[i]) for i, c in zip(usecols, formats)) | |
with io.open(filename, 'rt') as f: | |
X = [None]*chunksize | |
size = 0 | |
for l in f: | |
if l.startswith('#'): continue | |
X[size] = parse_line(l, usecols, formats) | |
size += 1 | |
if size == chunksize: | |
size = 0 | |
yield X | |
if size > 0: | |
yield X[0:size] | |
return [] | |
def parse_ascii_file_default(input_file, parser, chunksize): | |
from itertools import islice | |
print_line=True | |
nread = 0 | |
with io.open(input_file, "r") as f: | |
while True: | |
# Read from the ascii file in (up to) chunksize lines | |
# parse the line based on the columns wanted and create | |
# list (where each item of the list is a tuple returned | |
# by the parse_line method) | |
X = [parser.parse_line(l) for l in islice(f, chunksize) \ | |
if not l.startswith('#')] | |
if not X: | |
break | |
halos = parser.pack(X) | |
nread += halos.shape[0] | |
# if print_line: | |
# print_line=False | |
# print("type(halos) = {}\nhalos shape = {} dtype = {}\nhalos[0] = {}".format(type(halos), | |
# halos.shape, | |
# halos.dtype, | |
# halos[0])) | |
print("In {}: nread = {}".format(inspect.currentframe().f_code.co_name, nread)) | |
return | |
def parse_ascii_file_pandas(input_file, parser, chunksize): | |
import pandas as pd | |
gen = pd.read_csv(input_file, dtype=parser.dtype, | |
names=parser.dtype.names, delim_whitespace=True, | |
index_col=False, chunksize=chunksize, comment='#') | |
print_line=True | |
nread = 0 | |
for chunk in gen: | |
halos = np.array(chunk.to_records(index=False), dtype=parser.dtype) | |
nread += halos.shape[0] | |
if print_line: | |
print_line=False | |
print("type(halos) = {}\nhalos shape = {} dtype = {}\nhalos[0] = {}".format(type(halos), | |
halos.shape, | |
halos.dtype, | |
halos[0])) | |
print("In {}: nread = {}".format(inspect.currentframe().f_code.co_name, nread)) | |
return | |
def parse_ascii_file_csv(input_file, parser, chunksize): | |
import csv | |
from itertools import islice | |
def decomment(csvfile): | |
for row in csvfile: | |
raw = row.split('#')[0].strip() | |
if raw: yield raw | |
nread = 0 | |
print_line = True | |
with io.open(input_file, 'r') as f: | |
reader = csv.reader(decomment(f), delimiter=' ') | |
while True: | |
X = [tuple(row) for row in islice(reader, chunksize)] | |
if not X: | |
break | |
halos = np.array(X, dtype=parser.dtype) | |
nread += halos.shape[0] | |
# if print_line: | |
# print_line=False | |
# print("type(halos) = {}\nhalos shape = {} dtype = {}\n".format(type(halos), | |
# halos.shape, | |
# halos.dtype)) | |
print("In {}: nread = {}".format(inspect.currentframe().f_code.co_name, nread)) | |
return | |
def main(filename): | |
import timeit | |
import functools | |
import sys | |
chunksize = 100000 | |
nrepeats = 1 | |
parser = get_parser(filename) | |
#parse_ascii_file_default(filename, parser, chunksize) | |
parse_ascii_file_pandas(filename, parser, chunksize) | |
#parse_ascii_file_csv(filename, parser, chunksize) | |
readers = [parse_ascii_file_default, parse_ascii_file_pandas, parse_ascii_file_csv] | |
for func in readers: | |
t = timeit.Timer(functools.partial(func, filename, parser, chunksize)) | |
print("Time required for function {} is {:.3f}".format(func.__name__, | |
t.timeit(nrepeats))) | |
sys.stdout.flush() | |
if __name__ == "__main__": | |
filename = "test_split/out_35.list_00_00.p5" | |
main(filename) | |
print("This is untested code - please use with caution") | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import print_function | |
__author__ = "Manodeep Sinha" | |
__all__ = ["test_parallel_ctrees"] | |
import numpy as np | |
from uchuu_utils import iterTrees | |
from ctrees_utils import get_treewalk_dtype_descr | |
def _validate_prog_desc(forest): | |
print(f"forest.dtype.names = {forest.dtype.names}") | |
for prog, desc in enumerate(forest['Descendant']): | |
if desc == -1: | |
continue | |
if forest['scale'][desc] <= forest['scale'][prog]: | |
msg = f"Error: desc_scale = {forest['scale'][desc]} must be greater "\ | |
f"than progenitor scale = {forest['scale'][prog]}"\ | |
f"desc = {desc} prog = {prog} desc_id = {forest['desc_id'][prog]}"\ | |
f"id of desc = {forest['id'][desc]}, prog, Tree_root_ID = {forest['Tree_root_ID'][prog]} "\ | |
f"desc, Tree_root_ID = {forest['Tree_root_ID'][desc]}" | |
raise AssertionError(msg) | |
assert forest['id'][desc] == forest['desc_id'][prog] | |
if forest['FirstProgenitor'][desc] == prog: | |
# count the number of progenitors | |
num_prog = 1 | |
nextprog = forest['NextProgenitor'][prog] | |
while nextprog != -1: | |
num_prog += 1 | |
nextprog = forest['NextProgenitor'][prog] | |
assert forest['num_prog'][desc] == num_prog | |
else: | |
# This progenitor is not the primary progenitor | |
# -> we need to check the nextprog values -> nextprog | |
# *must* exist | |
firstprog = forest['FirstProgenitor'][desc] | |
nextprog = forest['NextProgenitor'][firstprog] | |
msg = f"firstprog = {firstprog} desc = {desc} prog = {prog} nextprog = {nextprog}" | |
assert nextprog != -1, msg | |
while nextprog != prog: | |
assert forest['desc_id'][nextprog] == forest['id'][desc] | |
ii = forest['NextProgenitor'][nextprog] | |
assert ii != -1 | |
assert forest['Mvir'][nextprog] >= forest['Mvir'][ii] | |
nextprog = ii | |
def test_parallel_ctrees(h5files, show_progressbar=True): | |
""" | |
""" | |
import h5py | |
from tqdm import tqdm | |
from numpy.testing import assert_array_equal | |
# for h5file in h5files: | |
with h5py.File(h5file, 'r') as hf: | |
ctrees_type = hf.attrs['consistent-trees-type'] | |
if ctrees_type != 'parallel': | |
msg = "Error: This test script is only intended for Parallel-Ctrees output" | |
raise ValueError(msg) | |
# raise NotImplementedError() | |
# Need to do a many-to-many matching | |
# for h5file in h5files: | |
with h5py.File(h5files, 'r') as hf: | |
ntrees = hf['TreeInfo'].shape[0] | |
tree_roots = hf['TreeInfo'] | |
print(f"Reading {ntrees} TreeRootIDs from hdf5 file ...") | |
tree_rootids = tree_roots['TreeRootID'][:] | |
print(f"Reading {ntrees} TreeRootIDs from hdf5 file ...done.") | |
# tree_roots.sort(order=['Input_Filename', 'Input_TreeByteOffset'], kind='heapsort') | |
cont_halo_props = False | |
try: | |
dtype = hf['Forests/halos'].dtype | |
except KeyError: | |
cont_halo_props = True | |
if cont_halo_props: | |
halo_props = dict() | |
props = [] | |
for name in tqdm(hf['Forests'].keys()): | |
if isinstance(hf[f"Forests/{name}"], h5py.Dataset): | |
props.append(name) | |
halo_props[name] = np.asarray(hf[f"Forests/{name}"]) | |
else: | |
props = dtype.names | |
halo_props = np.asarray(hf["Forests/halos"]) | |
print(f"properties = {props}") | |
mergertree_dtype = np.dtype(get_treewalk_dtype_descr()) | |
if show_progressbar: | |
pbar = tqdm(total=ntrees, unit=' trees', disable=None) | |
# h5_treeindex = 0 | |
# print(f"tree roots = {tree_roots['TreeRootID'][:]}") | |
treefiles = [tf.decode() for tf in hf.attrs[u'input_files']] | |
# Create the generators from the ascii file | |
for tf in treefiles: | |
trees = iterTrees(tf, return_tree_root_id=True) | |
skipped = 0 | |
tested = 0 | |
numtrees = 0 | |
for root, tree in trees: | |
numtrees += 1 | |
rootindex = (np.where(tree_rootids == root))[0] | |
if len(rootindex) == 0: | |
print(f"Did not find tree-root-id = {root}") | |
skipped += 1 | |
continue | |
if len(rootindex) != 1: | |
msg = f"Error: Should have found *exactly* one instance of TreeRootID = {root}\n"\ | |
f"Instead, found {len(rootindex)} matches, with indices = {rootindex}" | |
tested += 1 | |
h5_treeindex = rootindex[0] | |
h5_tree_root = tree_roots['TreeRootID'][h5_treeindex] | |
msg = f"Error: For tree # {h5_treeindex}, the TreeRootID = {h5_tree_root} in hdf5 file = {h5file} "\ | |
f"does not match the TreeRootID in the ascii file = {root}" | |
assert root == h5_tree_root, msg | |
h5_start = tree_roots['TreeHalosOffset'][h5_treeindex] | |
h5_end = h5_start + tree_roots['TreeNhalos'][h5_treeindex] | |
#print(f"Checking treerootid = {root}, (start, stop) = [{h5_start}, {h5_end}) h5_treeindex = {h5_treeindex}") | |
for prop in props: | |
if prop in mergertree_dtype.names: | |
continue | |
# print(f"Checking property = {prop}...") | |
msg = f"Error: Halo property = '{prop}' is not the same "\ | |
"between the hdf5 and ascii files" | |
assert_array_equal(tree[prop][:], | |
halo_props[prop][h5_start:h5_end], | |
err_msg=msg) | |
# print(f"Checking property = {prop}...done") | |
# h5_treeindex += 1 | |
if show_progressbar: | |
pbar.update(1) | |
if show_progressbar: | |
pbar.close() | |
print(f"File {h5file} passed validation - hooray!") | |
return True | |
if __name__ == "__main__": | |
base = "" | |
# h5file = "/home/msinha/scratch/simulations/uchuu/U2000/trees/hdf5/treerootid/tmp/forest_0.h5" | |
# treefile = "/home/msinha/scratch/simulations/uchuu/U2000/trees/00000148.tree" | |
base = "/home/msinha/scratch/simulations/uchuu/U2000/trees/hdf5/with-tree-indices" | |
h5files = [f"{base}/forest_{rank}.h5" for rank in range(5)] | |
for h5file in h5files: | |
test_parallel_ctrees(h5file) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import print_function | |
__author__ = "Manodeep Sinha" | |
__all__ = ["get_parser", "get_approx_totnumhalos", "generic_reader", | |
"get_metadata", "resize_halo_datasets", | |
"check_and_decompress", "distribute_array_over_ntasks", | |
"write_halos", "iterTrees"] | |
## From Yao-Yuan Mao's SimulationAnalysis.py (part of YYM's helper package) | |
import re | |
import numpy as np | |
from builtins import range, zip | |
from contextlib import contextmanager | |
def sanitize_ctrees_header(headerline): | |
import re | |
header = [re.sub('\(\d+\)$', '', s) for s in headerline] | |
# print("After normal sub: header = {}\n".format(header)) | |
header = [re.sub('[^a-zA-Z0-9 \n\.]', '_', s) for s in header] | |
# print("After replacing special characters with underscore: header = {}\n".format(header)) | |
header = [re.sub('_$', '', s) for s in header] | |
# print("After replacing trailing underscore: header = {}\n".format(header)) | |
header = [re.sub('(_)+', '_', s) for s in header] | |
# print("After replacing multiple underscores: header = {}".format(header)) | |
return header | |
def _isstring(s): | |
try: | |
s + '' | |
except TypeError: | |
return False | |
return True | |
class BaseParseFields(): | |
def __init__(self, header, fields=None): | |
if len(header)==0: | |
if all(isinstance(f, int) for f in fields): | |
self._usecols = fields | |
self._formats = [float]*len(fields) | |
self._names = ['f%d'%f for f in fields] | |
else: | |
raise ValueError('header is empty, so fields must be a list '\ | |
'of int.') | |
else: | |
# print(f"in baseparsefields: header = {header}") | |
header_s = [self._name_strip(__) for __ in header] | |
# print(f"header_s = {header_s}") | |
if not fields: | |
self._names = header | |
names_s = header_s | |
self._usecols = list(range(len(names_s))) | |
else: | |
# print(f"fields = {fields}") | |
if _isstring(fields): | |
fields = [fields] | |
self._names = [header[f] if isinstance(f, int) else str(f) \ | |
for f in fields] | |
names_s = [self._name_strip(__) for __ in self._names] | |
wrong_fields = [str(f) for s, f in zip(names_s, fields) \ | |
if s not in header_s] | |
if wrong_fields: | |
raise ValueError('The following field(s) are not available'\ | |
': %s.\nAvailable fields: %s.'%(\ | |
', '.join(wrong_fields), ', '.join(header))) | |
self._usecols = [header_s.index(__) for __ in names_s] | |
# print(f"_usecols = {self._usecols}, names_s = {names_s}") | |
self._formats = [self._get_format(__) for __ in names_s] | |
#xx = list(zip(self._usecols, self._names, self._formats)) | |
# print(f"self._usecols = {self._usecols} self._names = {self._names} self._formats = {self._formats}") | |
# print(f"zipped (usecols, name, format) = {xx}") | |
self.dtype = np.dtype({'names':self._names, \ | |
'formats':self._formats}) | |
def parse_line(self, l): | |
items = l.split() | |
try: | |
return tuple(c(items[i]) for i, c in zip(self._usecols, self._formats)) | |
except Exception as _error: | |
print(f'Something wrong when parsing this line:\n{l}') | |
raise _error | |
def pack(self, X): | |
return np.array(X, self.dtype) | |
def _name_strip(self, s): | |
return self._re_name_strip.sub('', s).lower() | |
def _get_format(self, s): | |
return float if self._re_formats.search(s) is None else int | |
_re_name_strip = re.compile('\W|_') | |
_re_formats = re.compile('^phantom$|^mmp$|id$|^num|num$') | |
def _loadtree(locations, parser, tree_root): | |
ind = (np.where(locations['TreeRootID'] == tree_root))[0] | |
if not ind: | |
msg = f"Error: Could not tree root = {tree_root}" | |
raise ValueError(msg) | |
fname, offset = locations['Filename'][ind], locations['Offset'][ind] | |
with open(fname, 'rt') as f: | |
pass | |
raise NotImplementedError() | |
def iterTrees(tree_dat, fields=None, buffering=100000000, return_tree_root_id=False): | |
""" | |
Iterate over all the trees in a tree_X_X_X.dat file. | |
Parameters | |
---------- | |
tree_dat : str or file obj | |
The path to the file (can be an URL) or a file object. | |
fields : str, int, array_like, optional | |
The desired fields. It can be a list of string or int. If fields is None (default), return all the fields listed in the header. | |
buffering : int | |
Returns | |
------- | |
trees : generator | |
A python generator which iterates over all the trees in that file. | |
Example | |
------- | |
>>> trees = iterTrees('tree_0_0_0.dat', ['id', 'mvir', 'upid', 'num_prog']) | |
>>> tree = next(trees) | |
>>> tree.dtype | |
dtype([('id', '<i8'), ('mvir', '<f8'), ('upid', '<i8')]) | |
>>> mb = tree[getMainBranch(tree)] | |
""" | |
def _generic_open(f, buffering=100000000): | |
""" | |
Returns | |
------- | |
fp : file handle | |
need_to_close : bool | |
""" | |
if hasattr(f, 'read'): | |
return f, False | |
else: | |
if re.match(r'(s?ftp|https?)://', f, re.I): | |
r = requests.get(f, stream=True) | |
r.raw.decode_content = True | |
fp = r.raw | |
elif f.endswith('.gz'): | |
fp = gzip.open(f, 'r') | |
else: | |
fp = open(f, 'r', int(buffering)) | |
return fp, True | |
f, need_to_close = _generic_open(tree_dat, buffering) | |
try: | |
l = next(f) | |
if l[0] != '#': | |
raise ValueError('File does not have a proper header line.') | |
split_headerline = [s for s in l[1:].split()] | |
header = sanitize_ctrees_header(split_headerline) | |
# header = [re.sub('\(\d+\)$', '', s) for s in l[1:].split()] | |
p = BaseParseFields(header, fields) | |
while l[0] == '#': | |
try: | |
l = next(f) | |
except StopIteration: | |
num_trees = 0 | |
break | |
else: | |
try: | |
num_trees = int(l) | |
except ValueError: | |
raise ValueError('File does not have a proper tree_X_X_X.dat format.') | |
try: | |
l = next(f) | |
except StopIteration: | |
if num_trees: | |
raise ValueError('File does not have a proper tree_X_X_X.dat format.') | |
eof = False | |
for _ in range(num_trees): | |
if eof or l[0] != '#': | |
raise ValueError('File does not have a proper tree_X_X_X.dat format.') | |
try: | |
tree_root_id = int(l.rpartition(' ')[-1]) | |
except ValueError: | |
raise ValueError('File does not have a proper tree_X_X_X.dat format.') | |
X = [] | |
for l in f: | |
if l[0] == '#': | |
if return_tree_root_id: | |
yield tree_root_id, p.pack(X) | |
else: | |
yield p.pack(X) | |
break | |
X.append(p.parse_line(l)) | |
else: | |
eof = True | |
finally: | |
if need_to_close: | |
f.close() | |
## End of code taken straight from Yao-Yuan Mao's SimulationAnalysis.py | |
def get_parser(filename, fields=None, drop_fields=None): | |
""" | |
Returns a parser that parses a single line from the input ascii file | |
Parameters | |
---------- | |
filename: string, required | |
A filename containg Rockstar/Consistent-Trees data. Can be a compressed | |
file if the compression is one of the types supported by the | |
``generic_reader`` function. | |
fields: list of strings, optional, default: None | |
Describes which specific columns in the input file to carry across | |
to the hdf5 file. Default action is to convert ALL columns. | |
drop_fields: list of strings, optional, default: None | |
Describes which columns are not carried through to the hdf5 file. | |
Processed after ``fields``, i.e., you can specify ``fields=None`` to | |
create an initial list of *all* columns in the ascii file, and then | |
specify ``drop_fields = [colname2, colname7, ...]``, and those columns | |
will not be present in the hdf5 output. | |
Returns | |
------- | |
parser: an instance of BaseParseFields | |
A parser that will parse a single line (read from a Rockstar/Consistent-Trees file) | |
and create a tuple containing *only* the relevant columns. | |
""" | |
with generic_reader(filename, 'rt') as f: | |
# Taken from SimulationAnalysis.py | |
hdrline = next(f) | |
hdrline = hdrline.strip().lstrip('#').lstrip() | |
#print(f"hdrline = {hdrline}") | |
split_headerline = [s for s in hdrline.split()] | |
header = sanitize_ctrees_header(split_headerline) | |
#print(f"sanitised header = {header}") | |
keep_fields = header[:] | |
if fields: | |
print(f"Since Fields = {fields} has been specified, only keeping those") | |
fields = [s.upper() for s in fields] | |
for fld in split_headerline: | |
upfld = fld.upper() | |
if upfld not in fields: | |
keep_fields.remove(fld) | |
print(f"Removing fld = {fld} since it does not exist " | |
"in 'fields'") | |
else: | |
print(f"Keeping fld = {fld}") | |
if drop_fields: | |
for fld in drop_fields: | |
try: | |
keep_fields.remove(fld) | |
except ValueError: | |
print(f"Field = '{fld}' was specified in the list of fields "\ | |
f"to be dropped but could not find in the list "\ | |
f"of current fields = {keep_fields}. Perhaps the case needs "\ | |
"to be fixed (the matching is case-sensitive)?") | |
raise ValueError(msg) | |
p = BaseParseFields(header, keep_fields) | |
# Above taken from SimulationAnalysis.py | |
return p | |
@contextmanager | |
def generic_reader(filename, mode='rt'): | |
""" | |
Returns a file-reader with capability to read line-by-line | |
for both compressed and normal text files. | |
Parameters | |
----------- | |
filename: string, required | |
The filename for the associated input/output. Can be a | |
compressed (.bz2, .gz, .xz, .zip) file as well as a regular | |
ascii file | |
mode: string, optional, default: 'rt' (readonly-text mode) | |
Controls the kind of i/o operation that will be | |
performed | |
Returns | |
-------- | |
f: file handle, generator | |
Yields a generator that has the ``readline`` feature (i.e., | |
supports the paradigm ``for line in f:``). This file-reader | |
generator is suitable for use in ``with`` statements, e.g., | |
``with generic_reader(<fname>) as f:`` | |
""" | |
if not isinstance(filename, str) and hasattr(filename, 'decode'): | |
filename = filename.decode() | |
if filename.endswith(".bz2"): | |
import bz2 | |
f = bz2.open(filename, mode=mode) | |
elif filename.endswith(".gz"): | |
import gzip | |
f = gzip.open(filename, mode=mode) | |
elif filename.endswith('.xz'): | |
import lzma | |
f = lzma.open(filename, mode=mode) | |
elif filename.endswith('.zip'): | |
validmodes = set(['r', 'w']) | |
if mode not in validmodes: | |
msg = f"Error: filename = '{filename}' is a zipfile but the "\ | |
f"requested mode = '{mode}' is not the valid zipfile "\ | |
f"modes = '{validmodes}'" | |
raise ValueError(msg) | |
import zipfile | |
f = zipfile.open(filename, mode) | |
else: | |
import io | |
f = io.open(filename, mode=mode) | |
yield f | |
f.close() | |
def get_metadata(input_file): | |
""" | |
Returns metadata information for ``input_file``. Includes all | |
comment lines in the header, Rockstar/Consistent-Trees version, | |
and the input catalog type (either Rockstar or Consistent-Trees). | |
Assumes that the only comment lines in the file occur | |
at the beginning. Comment lines are assumed to begin with '#'. | |
Parameters | |
----------- | |
input_file: string, required | |
The input filename for the Rockstar/Consistent Trees file | |
Compressed files ('.bz2', '.gz', '.xz', '.zip') are also allowed | |
as valid kinds of ``input_file`` | |
Returns | |
-------- | |
metadata_dict: dictionary | |
The dictionary contains four key-value pairs corresponding to the | |
keys: ['metadata', 'version', 'catalog_type', 'headerline']. | |
metadata: string | |
All lines in the beginning of the file that start with the | |
character '#'. | |
version: string | |
Rockstar or Consistent-Trees version that was used to generate | |
``input_file`` | |
catalog_type: string | |
Is one of [``Rockstar``, ``Consistent Trees``, ``Consistent Trees (hlist)``] | |
and indicates what kind of catalog is contained in ``input_file`` | |
headerline: string | |
The first line in the input file with any leading/trailing white-space, and | |
any leading '#' removed | |
""" | |
def get_catalog_and_version(line): | |
input_catalog_type = None | |
version = None | |
## The rockstar version has the pattern | |
## '#Rockstar Version: <VERSION STRING>' | |
## We split the input line on ':', then take | |
## the second chunk by indexing with [1] to | |
## retrieve the version string. The final step | |
## removes all white space from the string | |
## The Consistent Trees version has the pattern | |
## '#Consistent Trees Version <VERION_STRING>' | |
## We split the line on ' ' to get four | |
## chunks, and take the last one. Remove | |
## the white-space as usual | |
catalog_types = [('Rockstar', lambda line:(line.split(":")[1]).strip()), | |
('Consistent Trees', lambda line:(line.split(" ")[-1]).strip())] | |
for (catalog, fn) in catalog_types: | |
if catalog in line: | |
input_catalog_type = catalog | |
version = fn(line) | |
break | |
return input_catalog_type, version | |
metadata = [] | |
ctrees_hlist = True | |
with generic_reader(input_file, 'rt') as f: | |
for line in f: | |
if not line.startswith('#'): | |
# Check if the first 'data' line contains | |
# multiple columns. This is required to distinguish | |
# between CTrees-generated halo catalogues ('hlist' files) | |
# and the CTrees-generated tree files ('tree_?_?_?.dat' files) | |
if len(line.split()) == 1: | |
ctrees_hlist = False | |
break | |
if 'VERSION' in line.upper(): | |
input_catalog_type, version = get_catalog_and_version(line) | |
metadata.append(line) | |
if not version: | |
msg = f"Error: Could not locate version in the input file = '{input_file}'" | |
raise ValueError(msg) | |
if ctrees_hlist and 'Consistent' in input_catalog_type: | |
input_catalog_type = f"{input_catalog_type} (hlist)" | |
hdrline = metadata[0] | |
hdrline = hdrline.strip().lstrip('#').lstrip() | |
simulation_params = get_simulation_params_from_metadata(metadata) | |
metadata_dict = dict() | |
metadata_dict['metadata'] = metadata | |
metadata_dict['version'] = version | |
metadata_dict['catalog_type'] = input_catalog_type | |
metadata_dict['headerline'] = hdrline | |
metadata_dict['simulation_params'] = simulation_params | |
return metadata_dict | |
def get_simulation_params_from_metadata(metadata): | |
simulation_params = dict() | |
for line in metadata[1:]: | |
# cosmological parameters | |
if "Omega_M" in line or ("Om" in line and "Ol" in line): | |
pars = line[1:].split(";") | |
for j, par in enumerate(["Omega_M", | |
"Omega_L", | |
"hubble"]): | |
v = float(pars[j].split(" = ")[1]) | |
simulation_params[par] = v | |
# box size | |
elif "Full box size" in line or "Box size" in line: | |
pars = line.split("=")[1].strip().split() | |
box = float(pars[0]) | |
simulation_params['Boxsize'] = box | |
# We can break because boxsize always occurs later than | |
# the cosmological parameters (regardless of CTrees/Rockstar origin) | |
break | |
return simulation_params | |
def get_approx_totnumhalos(input_file, ndatabytes=None): | |
""" | |
Returns an (approximate) number of lines containing data | |
in the ``input_file``. | |
Assumes that the only comment lines in the file occur | |
at the beginning. Comment lines are assumed to begin with '#'. | |
Parameters | |
----------- | |
input_file: string, required | |
The input filename for the Rockstar/Consistent Trees file | |
ndatabytes: integer, optional | |
The total number of bytes being processed. If not passed, the | |
entire disk size of the ``input_file`` minus the initial header | |
lines will be used (i.e. assumes that the entire file is being | |
processed) | |
Returns | |
-------- | |
approx_totnumhalos: integer | |
The approximate number of halos in the input file. The actual | |
number of halos should be close but can be smaller/greater than | |
the approximate value. | |
""" | |
import io | |
import os | |
with generic_reader(input_file, 'rt') as f: | |
hdr_bytes = 0 | |
for line in f: | |
if line.startswith('#'): | |
hdr_bytes += len(line) | |
else: | |
# The first line of a CTrees file is the number of trees | |
# in that file. We should skip that line | |
if len(line.split()) == 1: | |
hdr_bytes += len(line) | |
continue | |
data_line_len = len(line) | |
break | |
if not ndatabytes: | |
statinfo = os.stat(input_file) | |
totbytes = statinfo.st_size | |
ndatabytes = totbytes - hdr_bytes | |
approx_nhalos = (ndatabytes // data_line_len) + 1 | |
return approx_nhalos | |
def check_and_decompress(fname): | |
""" | |
Decompresses the input file (if necessary) and returns the | |
decompressed filename | |
Parameters | |
---------- | |
fname: string, required | |
Input filename, can be compressed | |
Returns | |
------- | |
decomp_fname: string | |
The decompressed filename | |
""" | |
decompressors = [('.bz2', 'bunzip2'), | |
('.gz', 'gunzip'), | |
('.zip', 'unzip')] | |
if not isinstance(fname, str) and hasattr(fname, 'decode'): | |
fname = fname.decode() | |
for (ext, decomp) in decompressors: | |
if fname.endswith(ext): | |
import subprocess | |
print(f"Uncompressing compressed file '{fname}'...") | |
subprocess.run([decomp, fname], check=True) | |
print(f"Uncompressing compressed file '{fname}'...done") | |
fname = fname.rstrip(ext) | |
break | |
return fname | |
def distribute_array_over_ntasks(cost_array, rank, ntasks): | |
""" | |
Calculates the subscript range for the ``rank``'th task | |
such that the work-load is evenly distributed across | |
``ntasks``. | |
Parameters | |
----------- | |
cost_array: numpy array, required | |
Contains the cost associated with processing *each* element | |
of the array | |
rank: integer, required | |
The integer rank for the task that we need to compute the | |
work-load division for | |
ntasks: integer, required | |
Total number of tasks that the array should be (evenly) distributed | |
across | |
Returns | |
-------- | |
(start, stop): A tuple of (np.int64, np.int64) | |
Contains the initial and final subscripts that the ``rank`` | |
task should process. | |
Note: start, stop are both inclusive, i.e., all elements from ``start`` | |
to ``stop`` should be included. For python array indexing with slices, | |
this translates to arr[start:stop + 1]. | |
""" | |
if rank >= ntasks or rank < 0 or ntasks < 1: | |
msg = f"Error: rank = {rank} and NTasks = {ntasks} must satisfy "\ | |
" i) ThisTask < NTasks, "\ | |
" ii) ThisTask > 0, and "\ | |
" iii) NTasks >= 1" | |
raise ValueError(msg) | |
ncosts = cost_array.shape[0] | |
if ncosts < 0: | |
msg = f"Error: On rank = {rank} total number of elements = {ncosts} must be >= 0" | |
raise ValueError(msg) | |
if ncosts == 0: | |
print(f"Warning: On rank = {rank}, got 0 elements to distribute...returning") | |
start, stop = None, None | |
return (start, stop) | |
if ntasks == 1: | |
start, stop = 0, ncosts - 1 | |
print(f"Only a single task -- distributing the entire array [start, stop] = [{start}, {stop}]") | |
return (start, stop) | |
cumul_cost_array = cost_array.cumsum() | |
total_cost = cumul_cost_array[-1] | |
target_cost = total_cost / ntasks | |
cost_remaining, cost_assigned, ntasks_left = total_cost, 0.0, ntasks | |
start = 0 | |
for icore in range(rank + 1): | |
full_array_target_cost = cost_assigned + target_cost | |
if icore == (ntasks - 1): | |
stop = ncosts - 1 | |
else: | |
stop = min(np.where(cumul_cost_array >= full_array_target_cost)[0]) | |
if stop < ncosts: | |
cost_assigned_this_rank = cumul_cost_array[stop] - cost_assigned | |
# Only print the work divisions on the last rank | |
if rank == (ntasks - 1): | |
# Yes, this is intentional that `icore` is used. The intent is to show | |
# which forests ranges are assigned to which task (i.e., rank) - MS 22/06/2020 | |
print(f"[Rank={icore}]: Assigning forests: start, stop = [{start}, {stop}]. "\ | |
f"Target cost = {target_cost}, cost actually assigned = {cost_assigned_this_rank}") | |
if icore == rank: | |
break | |
cost_remaining -= cost_assigned_this_rank | |
cost_assigned += cost_assigned_this_rank | |
ntasks_left -=1 | |
target_cost = cost_remaining / ntasks_left | |
start = stop + 1 | |
else: | |
msg = "Error: While determining optimal distribution of forests "\ | |
"across MPI tasks. Could not locate an appropriate "\ | |
f"stopping point for rank = {rank}. Expected stop = {stop} "\ | |
"to be less than the number of elements in the cost "\ | |
f"array = {ncosts}" | |
raise ValueError(msg) | |
return (start, stop) | |
def resize_halo_datasets(halos_dset, new_size, write_halo_props_cont, dtype): | |
""" | |
Resizes the halo datasets | |
Parameters | |
----------- | |
halos_dset: dictionary, required | |
new_size: scalar integer, required | |
write_halo_props_cont: boolean, required | |
Controls if the individual halo properties are written as distinct | |
datasets such that any given property for ALL halos is written | |
contiguously (structure of arrays, SOA). | |
dtype: numpy datatype | |
Returns | |
------- | |
Returns ``True`` on successful completion | |
""" | |
if write_halo_props_cont: | |
for name in dtype.names: | |
dset = halos_dset[name] | |
dset.resize((new_size, )) | |
else: | |
halos_dset.resize((new_size, )) | |
return True | |
def write_halos(halos_dset, halos_dset_offset, halos, nhalos_to_write, | |
write_halo_props_cont): | |
""" | |
Writes halos into the relevant dataset(s) within a hdf5 file | |
Parameters | |
----------- | |
halos_dset: dictionary, required | |
Contains the halos dataset(s) within a hdf5 file where either | |
the entire halos array or the individual halo properties should | |
be written to. See parameter ``write_halo_props_cont`` for | |
further details | |
halos_dset_offset: scalar integer, required | |
Contains the index within the halos dataset(s) where the write | |
should start | |
halos: numpy structured array, required | |
An array containing the halo properties that should be written out | |
into the hdf5 file. The entire array may not be written out, see | |
the parameter ``nhalos_to_write`` | |
nhalos_to_write: scalar integer, required | |
Number of halos from the ``halos`` array that should be written | |
out. Can be smaller than the shape of the ``halos`` array | |
write_halo_props_cont: boolean, required | |
Controls if the individual halo properties are written as distinct | |
datasets such that any given property for ALL halos is written | |
contiguously (structure of arrays, SOA). | |
Returns | |
------- | |
Returns ``True`` on successful completion of the write | |
""" | |
if write_halo_props_cont: | |
for name in halos.dtype.names: | |
dset = halos_dset[name] | |
dset[halos_dset_offset:halos_dset_offset + nhalos_to_write] \ | |
= halos[name][0:nhalos_to_write] | |
else: | |
halos_dset[halos_dset_offset:halos_dset_offset + nhalos_to_write] = \ | |
halos[0:nhalos_to_write] | |
return True | |
# # Taken from https://stackoverflow.com/a/53507580 | |
# # UNTESTED! -- MS 16/06/2020 | |
# def better_np_unique(arr): | |
# import numpy as np | |
# sort_indexes = np.argsort(arr) | |
# arr = np.asarray(arr)[sort_indexes] | |
# vals, first_indexes, inverse, counts = \ | |
# np.unique(arr, return_index=True, | |
# return_inverse=True, return_counts=True) | |
# indexes = np.split(sort_indexes, first_indexes[1:]) | |
# for x in indexes: | |
# x.sort() | |
# return vals, indexes, inverse, counts | |
# Another untested option: https://stackoverflow.com/a/54736464 | |
# def ndix_unique(x): | |
# """ | |
# Returns an N-dimensional array of indices | |
# of the unique values in x | |
# ---------- | |
# x: np.array | |
# Array with arbitrary dimensions | |
# Returns | |
# ------- | |
# - 1D-array of sorted unique values | |
# - Array of arrays. Each array contains the indices where a | |
# given value in x is found | |
# """ | |
# x_flat = x.ravel() | |
# ix_flat = np.argsort(x_flat) | |
# u, ix_u = np.unique(x_flat[ix_flat], return_index=True) | |
# ix_ndim = np.unravel_index(ix_flat, x.shape) | |
# ix_ndim = np.c_[ix_ndim] if x.ndim > 1 else ix_flat | |
# return u, np.split(ix_ndim, ix_u[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment