Skip to content

Instantly share code, notes, and snippets.

@manodeep
Last active July 14, 2020 03:17
Show Gist options
  • Save manodeep/1ce949bb51be1d93ba1ce374f556bc2d to your computer and use it in GitHub Desktop.
Save manodeep/1ce949bb51be1d93ba1ce374f556bc2d to your computer and use it in GitHub Desktop.
Utilities for Uchuu Converter (ascii -> hdf5)
#!/usr/bin/env python
from __future__ import print_function
__author__ = "Manodeep Sinha"
__all__ = ["convert_single_ascii_halocat", "convert_ascii_halocat_files"]
import os
from uchuu_utils import get_parser, get_approx_totnumhalos, generic_reader,\
get_metadata, resize_halo_datasets, write_halos
def convert_single_ascii_halocat(input_file, rank=0,
outputdir="./", write_halo_props_cont=True,
fields=None, drop_fields=None,
chunksize=10000, compression='gzip',
show_progressbar=False):
"""
Convert a single Rockstar/Consistent Trees (an hlist catalogue) file
into an (optionally compressed) hdf5 file.
Parameters
-----------
input_file: string, required
The input filename for the Rockstar/Consistent Trees file. Can
be a compressed (.gz, .bz2) file.
rank: integer, optional, default: 0
The (MPI) rank for the process. The output filename is determined
with this rank to ensure unique filenames when running in parallel.
outputdir: string, optional, default: current working directory ('./')
The directory where the converted hdf5 file will be written in. The
output filename is obtained by appending '.h5' to the ``input_file``.
If the output file already exists, then it will be truncated.
write_halo_props_cont: boolean, optional, default: True
Controls if the individual halo properties are written as distinct
datasets such that any given property for ALL halos is written
contiguously (structure of arrays, SOA).
When set to False, only one dataset ('halos') is created, and ALL
properties of a halo is written out contiguously (array of
structures).
fields: list of strings, optional, default: None
Describes which specific columns in the input file to carry across
to the hdf5 file. Default action is to convert ALL columns.
drop_fields: list of strings, optional, default: None
Describes which columns are not carried through to the hdf5 file.
Processed after ``fields``, i.e., you can specify ``fields=None`` to
create an initial list of *all* columns in the ascii file, and then
specify ``drop_fields = [colname2, colname7, ...]``, and those columns
will not be present in the hdf5 output.
chunksize: integer, optional, default: 100000
Controls how many lines are read in from the input file before being
written out to the hdf5 file.
compression: string, optional, default: 'gzip'
Controls the kind of compression applied. Valid options are anything
that ``h5py`` accepts.
show_progressbar: boolean, optional, default: False
Controls whether a progressbar is printed. Only enables progressbar
on rank==0, the remaining ranks ignore this keyword.
Returns
-------
Returns ``True`` on successful completion.
"""
import numpy as np
import h5py
import time
import sys
import pandas as pd
import warnings
# Progress-bar related setup
try:
from tqdm import tqdm
except ImportError:
tqdm = None
if rank !=0:
show_progressbar = False
if show_progressbar and not tqdm:
msg = "Warning: You have requested to show a progress-bar but could not "\
"import 'tqdm'. Please install 'tqdm' to display a progressbar."\
"Disabling the progressbar for this run"
warnings.warn(msg)
show_progressbar = False
# Done with progressbar setup
if not os.path.isdir(outputdir):
msg = f"Error: The first parameter (output directory) = '{outputdir}' "\
"should be of type directory"
raise ValueError(msg)
if chunksize < 1:
print(f"Warning: chunksize (the number of lines read in one shot = "\
f"'{chunksize}' must be at least 1")
raise ValueError(msg)
print(f"[Rank={rank}]: processing file '{input_file}'...")
sys.stdout.flush()
t0 = time.perf_counter()
## Read the entire header meta-data
metadata_dict = get_metadata(input_file)
metadata = metadata_dict['metadata']
version_info = metadata_dict['version']
input_catalog_type = metadata_dict['catalog_type']
hdrline = metadata_dict['headerline']
# Check that this is not a Consistent-Tree "tree_*.dat" file
if ('Consistent' in input_catalog_type) and \
('hlist' not in input_catalog_type):
msg = f"Error: This script can *only* convert the 'hlist' halo catalogues "\
f"generated by Consistent-Trees. Seems like a Consistent-Tree generated "\
f"tree catalogue was present in the file = '{input_file}' supplied...exiting"
raise ValueError(msg)
parser = get_parser(input_file, fields=fields, drop_fields=drop_fields)
approx_totnumhalos = get_approx_totnumhalos(input_file)
if show_progressbar:
pbar = tqdm(total=approx_totnumhalos, unit=' halos', disable=None)
halos_offset = 0
input_filebase = os.path.basename(input_file)
output_file = f"{outputdir}/{input_filebase}.h5"
with h5py.File(output_file, "w") as hf:
line_with_scale_factor = ([l for l in metadata if l.startswith("#a")])[0]
scale_factor = float((line_with_scale_factor.split('='))[1])
redshift = 1.0/scale_factor - 1.0
# give the HDF5 root some more attributes
hf.attrs[u"input_filename"] = np.string_(input_file)
hf.attrs[u"input_filedatestamp"] = np.array(os.path.getmtime(input_file))
hf.attrs[u"input_catalog_type"] = np.string_(input_catalog_type)
hf.attrs[f"{input_catalog_type}_version"] = np.string_(version_info)
hf.attrs[f"{input_catalog_type}_columns"] = np.string_(hdrline)
hf.attrs[f"{input_catalog_type}_metadata"] = np.string_(metadata)
sim_grp = hf.create_group('simulation_params')
simulation_params = metadata_dict['simulation_params']
for k, v in simulation_params.items():
sim_grp.attrs[f"{k}"] = v
hf.attrs[u"HDF5_version"] = np.string_(h5py.version.hdf5_version)
hf.attrs[u"h5py_version"] = np.string_(h5py.version.version)
hf.attrs[u"TotNhalos"] = -1
hf.attrs[u"scale_factor"] = scale_factor
hf.attrs[u"redshift"] = redshift
halos_grp = hf.create_group('HaloCatalogue')
halos_grp.attrs['scale_factor'] = scale_factor
halos_grp.attrs['redshift'] = redshift
dset_size = approx_totnumhalos
if write_halo_props_cont:
halos_dset = dict()
# Create a dataset for every halo property
# For any given halo property, the value
# for halos will be written contiguously
# (structure of arrays)
for name, dtype in parser.dtype.descr:
halos_dset[name] = halos_grp.create_dataset(name, (dset_size, ),
dtype=dtype,
chunks=True,
compression=compression,
maxshape=(None,)
)
else:
# Create a single dataset that contains all properties
# of a given halo, then all properties of the next halo,
# and so on (array of structures)
halos_dset = halos_grp.create_dataset('halos', (dset_size,),
dtype=parser.dtype,
chunks=True,
compression=compression,
maxshape=(None,)
)
# Open the file with the generic reader ('inp' will be
# a generator). Compressed files are also fine
with generic_reader(input_file, 'rt') as inp:
# Create a generator with pandas ->
# the generator will yield 'chunksize' lines at a time
gen = pd.read_csv(inp, dtype=parser.dtype, memory_map=True,
names=parser.dtype.names, delim_whitespace=True,
index_col=False, chunksize=chunksize, comment='#')
for chunk in gen:
halos = chunk.to_records(index=False) # convert to a rec-array
halos = np.asarray(halos, dtype=parser.dtype) # convert to structured nd-array
nhalos = halos.shape[0]
if (halos_offset + nhalos) > dset_size:
resize_halo_datasets(halos_dset, halos_offset + nhalos,
write_halo_props_cont, parser.dtype)
dset_size = halos_offset + nhalos
write_halos(halos_dset, halos_offset, halos, nhalos,
write_halo_props_cont)
halos_offset += nhalos
if show_progressbar:
# Since the total number of halos being
# processed is approximate -- let's
# update the progressbar total to the
# best guess at the moment
pbar.total = dset_size
pbar.update(nhalos)
# The ascii file has now been read in entirely -> Now fix the actual
# dataset sizes to reflect the total number of halos written
resize_halo_datasets(halos_dset, halos_offset, write_halo_props_cont, parser.dtype)
dset_size = halos_offset
hf.attrs['TotNhalos'] = halos_offset
if show_progressbar:
pbar.close()
totnumhalos = halos_offset
t1 = time.perf_counter()
print(f"[Rank={rank}]: processing file {input_file}.....done. "
f"Wrote {totnumhalos} halos in {t1-t0:.2f} seconds")
sys.stdout.flush()
return True
def convert_ascii_halocat_files(filenames, outputdir="./",
write_halo_props_cont=True,
fields=None, drop_fields=None,
chunksize=100000, compression='gzip',
show_progressbar=False):
"""
Converts a list of Rockstar/Consistent-Trees halo catalogues from ascii to hdf5.
Can be used with MPI but requires that the number of files to be larger than the
number of MPI tasks spawned.
Parameters
-----------
filenames: list of strings, required
A list of filename(s) for the Rockstar/Consistent Trees file. Can
be compressed (.gz, .bz2, .xz, .zip) files.
outputdir: string, optional, default: current working directory ('./')
The directory where the converted hdf5 file will be written in. The
output filename is obtained by appending '.h5' to the ``input_file``.
If the output file already exists, then it will be truncated.
write_halo_props_cont: boolean, optional, default: True
Controls if the individual halo properties are written as distinct
datasets such that any given property for ALL halos is written
contiguously (structure of arrays, SOA).
When set to False, only one dataset ('halos') is created, and ALL
properties of a halo is written out contiguously (array of
structures).
fields: list of strings, optional, default: None
Describes which specific columns in the input file to carry across
to the hdf5 file. Default action is to convert ALL columns.
drop_fields: list of strings, optional, default: None
Describes which columns are not carried through to the hdf5 file.
Processed after ``fields``, i.e., you can specify ``fields=None`` to
create an initial list of *all* columns in the ascii file, and then
specify ``drop_fields = [colname2, colname7, ...]``, and those columns
will not be present in the hdf5 output.
chunksize: integer, optional, default: 100000
Controls how many lines are read in from the input file before being
written out to the hdf5 file.
compression: string, optional, default: 'gzip'
Controls the kind of compression applied. Valid options are anything
that ``h5py`` accepts.
show_progressbar: boolean, optional, default: False
Controls whether a progressbar is printed. Only enables progressbar
on rank==0, the remaining ranks ignore this keyword.
Returns
-------
Returns ``True`` on successful completion.
"""
import sys
import time
rank = 0
ntasks = 1
MPI = None
comm = None
try:
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
ntasks = comm.Get_size()
except ImportError:
pass
sys.stdout.flush()
nfiles = len(filenames)
if nfiles < ntasks:
msg = f"Nfiles = {nfiles} must be >= the number of tasks = {ntasks}"
raise ValueError(msg)
tstart = time.perf_counter()
if rank == 0:
print(f"[Rank={rank}]: Converting nfiles = {nfiles} over ntasks = {ntasks}...")
# Convert files in MPI parallel (if requested)
# the range will produce filenum starting with "rank"
# and then incrementing by "ntasks" all the way upto
# and inclusive of [nfiles-1]. That is, the range [0, nfiles-1]
# will be uniquely distributed over ntasks.
for filenum in range(rank, nfiles, ntasks):
convert_single_ascii_halocat(filenames[filenum], rank,
outputdir=outputdir,
write_halo_props_cont=write_halo_props_cont,
fields=fields, drop_fields=drop_fields,
chunksize=chunksize, compression=compression,
show_progressbar=show_progressbar)
# The barrier is only essential so that the total time printed
# out on rank==0 is correct.
if comm:
comm.Barrier()
if rank == 0:
t1 = time.perf_counter()
print(f"[Rank={rank}]: Converting nfiles = {nfiles} over ntasks = "\
f"{ntasks}...done. Time taken = {t1-tstart:0.2f} seconds")
return True
if __name__ == "__main__":
import sys
# Do we want to write each halo as a struct (i.e., ALL the
# properties of any given halo are written together), or do we want
# to write out individual datasets for each of the
# halo properties (i.e., any given property of ALL halos are written
# together)
write_halo_props_cont = True # True -> structure of arrays,
# False-> array of structures
if len(sys.argv) < 3:
print("Error: Please provide at least two command-line arguments")
print("First argument -- <output directory>")
print("Second (and all following arguments) -- <input filenames to convert>")
sys.exit(1)
outputdir = sys.argv[1]
# The entire list of filenames to be converted should
# be passed at the command-line
filenames = sys.argv[2:]
convert_ascii_halocat_files(filenames, outputdir=outputdir,
write_halo_props_cont=write_halo_props_cont)
#!/usr/bin/env python
from __future__ import print_function
__author__ = "Manodeep Sinha"
__all__ = ["convert_ctrees_files"]
import os
import time
import io
from uchuu_utils import get_metadata, get_parser, \
distribute_array_over_ntasks, get_approx_totnumhalos, \
check_and_decompress, resize_halo_datasets, write_halos
from ctrees_utils import read_locations_and_forests, get_aggregate_forest_info,\
get_all_parallel_ctrees_filenames, \
validate_inputs_are_ctrees_files, \
get_treewalk_dtype_descr, add_tree_walk_indices
def _create_and_validate_halos_dset(hf, dtype, write_halo_props_cont=True):
"""
Internal utility function to check the existing halo dataset in the file,
and return a reference where the halos dataset can be written to.
"""
forests_grp = hf['Forests']
if write_halo_props_cont:
halos_dset = dict()
# Create a dataset for every halo property
# For any given halo property, the value
# for halos will be written contiguously
# (structure of arrays)
for name in dtype.names:
halos_dset[name] = forests_grp[name]
if hf.attrs['Nhalos'] != halos_dset[name].shape[0]:
msg = f"Error: The dataset for halo property = '{name}' "\
f"does not contain *exactly* the same number of halos "\
f"as specified in the file attribute. "\
f"shape of halo property dataset = '{halos_dset[name].shape}' "\
f"nhalos in file attribute = {hf.attrs['Nhalos']}"
raise AssertionError(msg)
else:
# Create a single dataset that contains all properties
# of a given halo, then all properties of the next halo,
# and so on (array of structures)
halos_dset = forests_grp['halos']
if hf.attrs['Nhalos'] != halos_dset.shape[0]:
msg = f"Error: The halos dataset does not contain *exactly* the "\
f"same number of halos as specified in the file attribute. "\
f"shape of halo property dataset = '{halos_dset.shape}' nhalos in file "\
f"attribute = {hf.attrs['Nhalos']}"
raise AssertionError(msg)
return halos_dset
def _convert_ctrees_forest_range(forest_info, trees_and_locations, rank,
outputdir="./", output_filebase="forest",
write_halo_props_cont=True,
fields=None, drop_fields=None,
truncate=True, compression='gzip',
buffersize=1024*1024, use_pread=True,
show_progressbar=False):
"""
Convert a set of forests from Consistent Trees ascii file(s) into an
(optionally compressed) hdf5 file.
Parameters
-----------
forest_info: numpy structured array, required, shape: (nforests, )
The numpy structured array containing the following info
<ForestID ForestNhalos Input_ForestNbytes Ntrees> for the
*forests* that are to be converted by this task
trees_and_locations: numpy structured array, required, shape: (ntrees, )
The numpy structured array containing the following info
<ForestID TreeRootID FileName Offset TreeNbytes> for the
*trees* that are to be converted by this task
rank: integer, required
The (MPI) rank for the process. The output filename is determined
with this rank to ensure unique filenames when running in parallel.
outputdir: string, optional, default: current working directory ('./')
The directory where the converted hdf5 file will be written in. The
output filename is obtained by appending '.h5' to the ``input_file``.
output_filebase: string, optional, default: "forest"
The output filename is constructed using '<outputdir>/<output_filebase>_<rank>.h5'
write_halo_props_cont: boolean, optional, default: True
Controls if the individual halo properties are written as distinct
datasets such that any given property for *all* halos is written
contiguously (structure of arraysA).
When set to False, only one dataset ('halos') is created under the
group 'Forests', and *all* properties of a halo is written out
contiguously (array of structures).
fields: list of strings, optional, default: None
Describes which specific columns in the input file to carry across
to the hdf5 file. Default action is to convert ALL columns.
drop_fields: list of strings, optional, default: None
Describes which columns are not carried through to the hdf5 file.
Processed after ``fields``, i.e., you can specify ``fields=None`` to
create an initial list of *all* columns in the ascii file, and then
specify ``drop_fields = [colname2, colname7, ...]``, and those columns
will not be present in the hdf5 output.
truncate: boolean, default: True
Controls whether a new file is created on this 'rank'. When set to
``True``, the header info file is written out. Otherwise, the file
is appended to.
compression: string, optional, default: 'gzip'
Controls the kind of compression applied. Valid options are anything
that ``h5py`` accepts.
buffersize: integer, optional, default: 1 MB
Controls the size of the buffer for how many halos are written out
per write call to the hdf5 file. The number of halos written out is
this buffersize divided the size of the datatype for individual halos.
use_pread: boolean, optional, default: True
Controls whether low-level i/o operations (through ``os.pread``) is
used. Otherwise, higher-level i/o operations (via ``io.open``) is
used. This option is only meaningful on linux systems (and python3+).
Since ``pread`` does not change the file offset, additional
parallelisation can be implemented reasonably easily.
show_progressbar: boolean, optional, default: False
Controls whether a progressbar is printed. Only enables progressbar
on rank==0, the remaining ranks ignore this keyword.
Returns
-------
Returns ``True`` on successful completion.
"""
import numpy as np
import h5py
import sys
import warnings
# Progress-bar related setup
try:
from tqdm import tqdm
except ImportError:
tqdm = None
if rank != 0:
show_progressbar = False
if show_progressbar and not tqdm:
msg = "Warning: You have requested to show a progress-bar but could not "\
"import 'tqdm'. Please install 'tqdm' to display a progressbar."\
"Disabling the progressbar for this run"
warnings.warn(msg)
show_progressbar = False
# Done with progressbar setup
try:
os.pread
except NameError:
use_pread = False
sys.stdout.flush()
tstart = time.perf_counter()
# Set the datalen for strings
string_dtype = 'S1024'
if not os.path.isdir(outputdir):
msg = f"Error: The first parameter (output directory) = '{outputdir}' "\
"should be of type directory"
raise ValueError(msg)
ntrees = trees_and_locations.shape[0]
nforests = forest_info.shape[0]
if nforests > ntrees:
msg = f"Error: Expected the number of trees = '{ntrees}' "\
"to be *at most* equal to the number of "\
f"forests = '{nforests}'"
raise AssertionError(msg)
if ntrees <= 0:
msg = f"[Rank={rank}] Error: ntrees = {ntrees} should be >= 0"
raise AssertionError(msg)
print(f"[Rank {rank}]: processing {forest_info['Input_ForestNbytes'].sum()} "\
f"bytes (in {ntrees} trees) spread over {nforests} forests...")
alltreedatafiles = list(set(trees_and_locations['Filename']))
assert len(alltreedatafiles) > 0
validate_inputs_are_ctrees_files(alltreedatafiles)
metadata_dict = get_metadata(alltreedatafiles[0])
metadata = metadata_dict['metadata']
version_info = metadata_dict['version']
input_catalog_type = metadata_dict['catalog_type']
hdrline = metadata_dict['headerline']
parser = get_parser(alltreedatafiles[0], fields=fields,
drop_fields=drop_fields)
mergertree_descr = get_treewalk_dtype_descr()
output_dtype = np.dtype(parser.dtype.descr + mergertree_descr)
approx_totnumhalos = 0
for fname in alltreedatafiles:
ind = np.where(trees_and_locations['Filename'] == fname)
nbytes = np.sum(trees_and_locations['TreeNbytes'][ind])
approx_totnumhalos += get_approx_totnumhalos(fname, ndatabytes=nbytes)
if show_progressbar:
pbar = tqdm(total=ntrees, unit=' trees', disable=None)
if (not buffersize) or (buffersize < output_dtype.itemsize):
buffersize = 1024*1024 # 1 MB
nbuffer_halos = buffersize // output_dtype.itemsize
chunks = (nbuffer_halos, )
output_file = f"{outputdir}/{output_filebase}_{rank}.h5"
if truncate:
with h5py.File(output_file, "w") as hf:
# give the HDF5 root some more attributes
hf.attrs['input_files'] = np.string_(alltreedatafiles)
mtimes = [os.path.getmtime(f) for f in alltreedatafiles]
hf.attrs['input_filedatestamp'] = np.array(mtimes)
hf.attrs["input_catalog_type"] = np.string_(input_catalog_type)
hf.attrs[f"{input_catalog_type}_version"] = np.string_(version_info)
hf.attrs[f"{input_catalog_type}_columns"] = np.string_(hdrline)
hf.attrs[f"{input_catalog_type}_metadata"] = np.string_(metadata)
sim_grp = hf.create_group('simulation_params')
simulation_params = metadata_dict['simulation_params']
for k, v in simulation_params.items():
sim_grp.attrs[f"{k}"] = v
hf.attrs['HDF5_version'] = np.string_(h5py.version.hdf5_version)
hf.attrs['h5py_version'] = np.string_(h5py.version.version)
hf.attrs['Nforests'] = 0
hf.attrs['Ntrees'] = 0
hf.attrs['Nhalos'] = 0
forest_dtype = np.dtype([('ForestID', np.int64),
('ForestHalosOffset', np.int64),
('ForestNhalos', np.int64),
('ForestNtrees', np.int64),
])
hf.create_dataset('ForestInfo', (0,), dtype=forest_dtype,
chunks=True, compression=compression,
maxshape=(None,))
tree_dtype = np.dtype([('ForestID', np.int64),
('TreeRootID', np.int64),
('TreeHalosOffset', np.int64),
('TreeNhalos', np.int64),
('Input_Filename', string_dtype),
('Input_FileDateStamp', np.float),
('Input_TreeByteOffset', np.int64),
('Input_TreeNbytes', np.int64),
])
hf.create_dataset('TreeInfo', (0,), dtype=tree_dtype,
chunks=True, compression=compression,
maxshape=(None,))
forests_grp = hf.create_group('Forests')
if write_halo_props_cont:
# Create a dataset for every halo property
# For any given halo property, the value
# for halos will be written contiguously
# (structure of arrays)
for name, dtype in output_dtype.descr:
forests_grp.create_dataset(name, (0,), dtype=dtype,
chunks=chunks,
compression=compression,
maxshape=(None,))
else:
# Create a single dataset that contains all properties
# of a given halo, then all properties of the next halo,
# and so on (array of structures)
forests_grp.create_dataset('halos', (0,),
dtype=output_dtype,
chunks=chunks,
compression=compression,
maxshape=(None,))
halos_buffer = np.empty(nbuffer_halos, dtype=output_dtype)
nhalos_in_buffer = 0
with h5py.File(output_file, "a") as hf:
# The filenames are written as byte-arrays (through np.string_) into the hdf5 file
# Therefore, we will need to convert back into `str` objects
existing_files = hf.attrs['input_files']
target_all_files = np.unique(np.hstack((existing_files, alltreedatafiles)))
# Strictly speaking this decode is not necessary but without this extra decode
# we end up with an array that contains both np.str_ and str -- MS 01/05/2020
target_all_files = [x.decode() if isinstance(x, bytes) else str(x) for x in target_all_files]
if len(target_all_files) > len(existing_files):
# Since we are appending to the hdf5 file, let's make sure that *all* the files belong
# to the same setup of simulation + mergertree. However, the ascii files corresponding
# to the existing data might have been deleted, so we should pass the metadata info
# directly from the hdf5 file
base_input_catalog_type = hf.attrs['input_catalog_type'].decode()
base_metadata = hf.attrs[f'{base_input_catalog_type}_metadata']
base_version = hf.attrs[f'{base_input_catalog_type}_version'].decode()
# Only validate the *current* files being processed
assert len(alltreedatafiles) > 0
validate_inputs_are_ctrees_files(alltreedatafiles,
base_metadata=base_metadata,
base_version=base_version,
base_input_catalog_type=base_input_catalog_type)
# We need to update how many *unique* input files have gone into this hdf5 file
hf.attrs['input_files'] = np.string_(target_all_files)
hf.attrs['input_filedatestamp'] = np.array([os.path.getmtime(f) for f in target_all_files])
tree_dset = hf['TreeInfo']
forest_dset = hf['ForestInfo']
if forest_dset.shape[0] != hf.attrs['Nforests']:
msg = "Error: The forest dataset does not contain *exactly* "\
"the same number of forests as specified in the file "\
f"attribute. Shape of forest dataset = '{forest_dset.shape}', "\
f"nforests in file attribute = '{hf.attrs['Nforests']}'"
raise AssertionError(msg)
forest_offset = hf.attrs['Nforests']
if tree_dset.shape[0] != hf.attrs['Ntrees']:
msg = "Error: The tree dataset does not contain *exactly* the same "\
"number of trees as specified in the file attribute. "\
f"shape of tree dataset = '{tree_dset.shape}' ntrees in "\
f"file attribute = '{hf.attrs['Ntrees']}'"
raise AssertionError(msg)
tree_offset = hf.attrs['Ntrees']
# resize both the datasets containing the forestlevel info and
# treelevel info
forest_dset.resize((forest_offset + nforests, ))
tree_dset.resize((tree_offset + ntrees, ))
# Now check the halos dataset
halos_dset = _create_and_validate_halos_dset(hf, output_dtype,
write_halo_props_cont)
# Okay - we have validated the halos offset
halos_offset = hf.attrs['Nhalos']
halos_dset_offset = halos_offset
# resize the halos dataset so we don't have to resize at every step
dset_size = halos_offset + approx_totnumhalos
resize_halo_datasets(halos_dset, dset_size, write_halo_props_cont, output_dtype)
forest_dset[-nforests:, 'ForestID'] = forest_info['ForestID'][:]
forest_dset[-nforests:, 'ForestNtrees'] = forest_info['Ntrees'][:]
tree_dset[-ntrees:, 'ForestID', ] = trees_and_locations['ForestID'][:]
tree_dset[-ntrees:, 'TreeRootID'] = trees_and_locations['TreeRootID'][:]
# These quantities relate to the input files
tree_dset[-ntrees:, 'Input_Filename'] = np.string_(trees_and_locations['Filename'][:])
mtimes = [os.path.getmtime(fn) for fn in trees_and_locations['Filename']]
tree_dset[-ntrees:, 'Input_FileDateStamp'] = np.array(mtimes)
tree_dset[-ntrees:, 'Input_TreeByteOffset'] = trees_and_locations['Offset'][:]
tree_dset[-ntrees:, 'Input_TreeNbytes'] = trees_and_locations['TreeNbytes'][:]
alltreedatafiles = list(set(trees_and_locations['Filename']))
if use_pread:
filehandlers = {f: os.open(f, os.O_RDONLY) for f in alltreedatafiles}
else:
filehandlers = {f: io.open(f, 'rt') for f in alltreedatafiles}
ntrees_processed = 0
treenhalos = np.empty(ntrees, dtype=np.int64)
treehalos_offset = np.empty(ntrees, dtype=np.int64)
forestnhalos = np.empty(nforests, dtype=np.int64)
foresthalos_offset = np.empty(nforests, dtype=np.int64)
for iforest in range(nforests):
foresthalos_offset[iforest] = halos_offset
forest_halos = np.empty(0, dtype=output_dtype)
for _ in range(forest_info['Ntrees'][iforest]):
treedata_file = trees_and_locations['Filename'][ntrees_processed]
offset = trees_and_locations['Offset'][ntrees_processed]
numbytes = trees_and_locations['TreeNbytes'][ntrees_processed]
inp = filehandlers[treedata_file]
if use_pread:
chunk = os.pread(inp, numbytes, offset)
else:
inp.seek(offset, os.SEEK_SET)
chunk = inp.read(numbytes)
parse_line = parser.parse_line
halos = parser.pack([parse_line(l) for l in chunk.splitlines()])
nhalos = halos.shape[0]
forest_halos.resize(forest_halos.shape[0] + nhalos)
# forest_halos have additional mergertree indices, therefore
# the datatypes are not the same between halos (parser.dtype)
# and forest_halos (output_dtype) -> assign by columns
for name in parser.dtype.names:
forest_halos[name][-nhalos:] = halos[name][:]
# Add the tree level info
treenhalos[ntrees_processed] = nhalos
treehalos_offset[ntrees_processed] = halos_offset
ntrees_processed += 1
if show_progressbar:
pbar.update(1)
# Update the total number of halos read-in with
# the number of halos in this tree
halos_offset += nhalos
# Entire forest has been loaded. Reset nhalos
# to be the number of halos in the forest
nhalos = forest_halos.shape[0]
# Add the forest level info
forestnhalos[iforest] = nhalos
# Entire forest is now loaded -> add the mergertree indices
add_tree_walk_indices(forest_halos, rank)
# If there are not enough to trigger a write, simply fill up
# the halos_buffer
if (nhalos_in_buffer + nhalos) < nbuffer_halos:
assert halos_buffer.dtype == forest_halos.dtype
halos_buffer[nhalos_in_buffer:nhalos_in_buffer+nhalos] = forest_halos[:]
nhalos_in_buffer += nhalos
continue
# Need to write to disk
# Resize to make sure there is enough space to append the new halos
if halos_offset > dset_size:
resize_halo_datasets(halos_dset, halos_offset, write_halo_props_cont, output_dtype)
dset_size = halos_offset
# write the halos that are already in the buffer
write_halos(halos_dset, halos_dset_offset, halos_buffer,
nhalos_in_buffer, write_halo_props_cont)
halos_dset_offset += nhalos_in_buffer
nhalos_in_buffer = 0
# Now write the halos that have just been read-in
# Note: The halos in the buffer *must* be written out before the halos
# that have just been read-in. Otherwise, there will be data corruption
write_halos(halos_dset, halos_dset_offset, forest_halos,
nhalos, write_halo_props_cont)
halos_dset_offset += nhalos
if halos_offset != halos_dset_offset:
msg = f"Error: After writing out halos into the hdf5 file, expected "\
f"to find that halos_offset = '{halos_offset}' to be *exactly* "\
f"equal to the offset in the hdf5 dataset = '{halos_dset_offset}'"
raise AssertionError(msg)
if nhalos_in_buffer > 0:
write_halos(halos_dset, halos_dset_offset, halos_buffer,
nhalos_in_buffer, write_halo_props_cont)
halos_dset_offset += nhalos_in_buffer
nhalos_in_buffer = 0
if halos_offset != halos_dset_offset:
msg = f"Error: After writing *all* the halos into the hdf5 file, expected "\
f"to find that halos_offset = '{halos_offset}' to be *exactly* "\
f"equal to the offset in the hdf5 dataset = '{halos_dset_offset}'"
raise AssertionError(msg)
msg = f"Error: Expected to process {ntrees} trees but processed {ntrees_processed} trees instead"
assert ntrees_processed == ntrees, msg
# All the trees for this call have now been read in entirely -> Now fix the actual
# dataset sizes to reflect the total number of halos written
resize_halo_datasets(halos_dset, halos_offset, write_halo_props_cont, output_dtype)
# all halos from all forests have been written out and the halo dataset has been
# correctly resized. Now write the aggregate quantities at the tree and forest levels
tree_dset[-ntrees:, 'TreeNhalos'] = treenhalos[:]
tree_dset[-ntrees:, 'TreeHalosOffset'] = treehalos_offset[:]
forest_dset[-nforests:, 'ForestNhalos'] = forestnhalos[:]
forest_dset[-nforests:, 'ForestHalosOffset'] = foresthalos_offset[:]
hf.attrs['Nforests'] += nforests
hf.attrs['Ntrees'] += ntrees
hf.attrs['Nhalos'] = halos_offset
if show_progressbar:
pbar.close()
# Close all the open file handlers
if use_pread:
for f in filehandlers.values():
os.close(f)
else:
for f in filehandlers.values():
f.close()
totnumhalos = halos_offset
t1 = time.perf_counter()
print(f"[Rank {rank}]: processing {forest_info['Input_ForestNbytes'].sum()} "\
f"bytes (in {ntrees} trees) spread over {nforests} forests...done. "\
f"Wrote {totnumhalos} halos in {t1-tstart:.2f} seconds")
sys.stdout.flush()
return True
def convert_ctrees_files(filenames, standard_consistent_trees=None,
outputdir="./", output_filebase="forest",
write_halo_props_cont=True, fields=None, drop_fields=None,
truncate=True, compression='gzip', buffersize=None, use_pread=True,
show_progressbar=False):
"""
Convert a set of forests from Consistent Trees ascii file(s) into an
(optionally compressed) hdf5 file. Can be invoked with MPI.
Parameters
-----------
filenames: list of strings for Consistent-Trees catalogues, required
The input ascii files will be decompressed, if required.
standard_consistent_tree: boolean, optional, default: None
Whether the input filres were generated by the Uchuu collaboration's
parallel Consistent-Trees code. If only two files are specified in
``filenames``, and these two filenames end with 'forests.list', and
'locations.dat', then a standard Consistent-Trees output will be
inferred. By default, a parallel Consistent-Trees output
is used
files are specified in
all files specified in ``filenames``
end with '.tree', then
outputdir: string, optional, default: current working directory ('./')
The directory where the converted hdf5 file will be written in. The
output filename is obtained by appending '.h5' to the ``input_file``.
output_filebase: string, optional, default: "forest"
The output filename is constructed using '<outputdir>/<output_filebase>_<rank>.h5'
write_halo_props_cont: boolean, optional, default: True
Controls if the individual halo properties are written as distinct
datasets such that any given property for *all* halos is written
contiguously (structure of arraysA).
When set to False, only one dataset ('halos') is created under the
group 'Forests', and *all* properties of a halo is written out
contiguously (array of structures).
fields: list of strings, optional, default: None
Describes which specific columns in the input file to carry across
to the hdf5 file. Default action is to convert ALL columns.
drop_fields: list of strings, optional, default: None
Contains a list of column names that will *not* be carried through
to the hdf5 file. If ``drop_fields`` is not set for a
parallel Consistent-Trees run, then [``Tidal_Force``, ``Tidal_ID``]
will be used.
``drop_fields`` is processed after ``fields``, i.e., you can specify
``fields=None`` to create an initial list of *all* columns in the
ascii file, and then specify ``drop_fields = [colname2, colname7, ...]``,
and *only* those columns will not be present in the hdf5 output.
truncate: boolean, default: True
Controls whether a new file is created on this 'rank'. When set to
``True``, the header info file is written out. Otherwise, the file
is appended to. The code checks to make sure that the existing metadata
in the hdf5 file is identical to the new metadata in the ascii files
being currently converted (i.e., tries to avoid different
simulation + mergertree results being present in the same file)
compression: string, optional, default: 'gzip'
Controls the kind of compression applied. Valid options are anything
that ``h5py`` accepts.
buffersize: integer, optional, default: 1 MB
Controls the size of the buffer how many halos are written out
per write call to the hdf5 file. The number of halos written out is
this buffersize divided the size of the datatype for individual halos.
use_pread: boolean, optional, default: True
Controls whether low-level i/o operations (through ``os.pread``) is
used. Otherwise, higher-level i/o operations (via ``io.open``) is
used. This option is only meaningful on linux systems (and python3+).
Since ``pread`` does not change the file offset, additional
parallelisation can be implemented reasonably easily.
show_progressbar: boolean, optional, default: False
Controls whether a progressbar is printed. Only enables progressbar
on rank==0, the remaining ranks ignore this keyword.
Returns
-------
Returns ``True`` on successful completion.
"""
import os
import sys
import time
import numpy as np
import h5py
rank = 0
ntasks = 1
MPI = None
comm = None
try:
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
ntasks = comm.Get_size()
except ImportError:
pass
if not os.path.isdir(outputdir):
msg = f"Error: Output directory = {outputdir} is not a valid directory"
raise ValueError(msg)
tstart = time.perf_counter()
sys.stdout.flush()
if not standard_consistent_trees:
standard_consistent_trees = True
# The Uchuu collaboration has a special parallel version of the
# Consistent-Tree developed by @Tomo. This code generates a set of
# files equivalent to (forests.list, locations.dat, tree_*.dat) file
# per CPU task. In that code, forests are guaranteed to be located
# completely within one tree data file. However, the public version
# of Consistent-Trees is different and there is exactly one 'forests.list'
# and 'locations.dat' files for the entire simulation, and as many
# tree_*.dat files as the number of BOX_DIVISIONS^3 set in the
# Consistent-Trees config file at runtime.
# This script aims to handle both scenarios with this logic -- if only the
# "forests.list" and "locations.dat" files are supplied as command-line
# arguments, then the public version of the Consistent-Trees catalog
# is assumed. Otherwise, a list of *all* the "XXXXXX.tree" files
# should be passed (i.e., for people in the Uchuu collaboration). The
# correspnding 'forest' and 'location' file names will be automatically
# generated by assuming the Uchuu convention:
# -> tree data files are called '<prefix>.tree'
# -> associated forest.list file is called '<prefix>.forest'
# -> associated locations.dat file is called '<prefix>.loc'
#
# If all files supplied at the command-line endwith '.tree(.bz2,.zip,.gz)' then
# it is a parallel Consistent-Trees run.
check_pctrees_files = [True if 'tree' in set(f.split('.')) else False for f in filenames]
if np.all(check_pctrees_files):
standard_consistent_trees = False
if standard_consistent_trees:
if len(filenames) != 2:
msg = "Error: To convert a standard Consistent-Trees output, "\
"please specify *exactly* two files -- the 'forests.list' and "\
"the 'locations.dat' files (order is unimportant). "\
f"Instead found filenames = '{filenames}'"
raise ValueError(msg)
filebasenames = set([os.path.basename(f) for f in filenames])
expected_filebasenames = set(['forests.list', 'locations.dat'])
if filebasenames != expected_filebasenames:
msg = "Error: To convert a standard Consistent-Trees output, "\
"please specify *exactly* two files -- the 'forests.list' and "\
"the 'locations.dat' files (order is unimportant). While exactly "\
"two files were specified, at least one of the 'forests.list' or "\
"'locations.dat' files were in present in the supplied filenames = "\
f" '{filenames}'"
raise ValueError(msg)
if standard_consistent_trees:
forests_file, locations_file = get_forests_locations_filenames(filenames)
forests_and_locations_fnames = [(forests_file, locations_file)]
else:
# We are converting parallel Ctrees files; however, these files might still be
# compressed and we need to decompress them before processing.
forests_and_locations_fnames = []
decompressed_filenames = []
for fname in filenames:
decomp_fname = check_and_decompress(fname)
extname = '.tree'
if extname not in decomp_fname:
msg = "Error: Should pass the tree data file names (i.e., "\
f"the filenames should end in '{extname}'. "\
f"Instead got filename = '{decomp_fname}'"
raise ValueError(msg)
decompressed_filenames.append(decomp_fname)
forests_file, locations_file, _ = get_all_parallel_ctrees_filenames(decomp_fname)
forests_and_locations_fnames.append((forests_file, locations_file))
# Since multiple tree files are specified at the command-line, let's make
# sure that all the files belong to the same simulation + mergertree setup
assert len(decompressed_filenames) > 0
validate_inputs_are_ctrees_files(decompressed_filenames)
if (not drop_fields) and (not standard_consistent_trees):
# The Tidal fields can not be correctly calculated in the
# parallel CTrees code and are dropped from the hdf5 file
drop_fields = ['Tidal_Force', 'Tidal_ID']
nfiles = len(forests_and_locations_fnames)
if rank == 0:
print(f"[Rank={rank}]: Converting {nfiles} sets of (forests, locations) "\
f"files over {ntasks} tasks ... ")
nconverted = 0
for (forests_file, locations_file) in forests_and_locations_fnames:
t0 = time.perf_counter()
print(f"[Rank={rank}]: Reading forests and locations files...")
trees_and_locations = read_locations_and_forests(forests_file, locations_file, rank)
ntrees = trees_and_locations.shape[0]
t1 = time.perf_counter()
print(f"[Rank={rank}]: Reading forests and locations files...done. "\
f"Time taken = {t1-t0:.2f} seconds")
alltreedatafiles = list(set(trees_and_locations['Filename']))
if standard_consistent_trees:
# Validate that all the files are the same version and all
# contain a Consistent Trees catalog
assert len(alltreedatafiles) > 0
validate_inputs_are_ctrees_files(alltreedatafiles)
else:
# Since we are processing parallel CTrees output, *every* tree data
# file has an associated forests and locations file. Which means
# that every locations file should only have one treedata file
# Check that that is the case
if len(alltreedatafiles) != 1:
msg = "Error: Expected to find *exactly* one tree data file "\
"per locations file. However, while processing the "\
f"locations_file = '{locations_file}', found "\
f"{len(alltreedatafiles)} tree data files. "\
f"The unique tree data files are: {alltreedatafiles}"
raise AssertionError(msg)
# No need to validate that the input files are valid CTrees files
# That has already been done
if rank == 0:
print(f"[Rank={rank}]: Converting a single set of (forests, locations) "\
f"over {ntasks} tasks...")
# We have the tree-level info, let's create a similar info for
# the forests (by grouping trees by ForestID)
forest_info = get_aggregate_forest_info(trees_and_locations, rank)
# If you need to convert the initial subset of trees
# then just uncomment the following two lines -- MS 22/06/2020
# nforests = 50000
# forest_info = forest_info[0:nforests]
# Distribute the forests over ntasks
(forest_start, forest_stop) = distribute_array_over_ntasks(forest_info['Input_ForestNbytes'], rank, ntasks)
forest_ntrees_offset = forest_info['Ntrees'][:].cumsum()
tree_start = 0
if forest_start >= 1:
tree_start = forest_ntrees_offset[forest_start - 1]
# tree_stop is not-inclusive, and can be directly used in a
# slicing context
tree_stop = forest_ntrees_offset[forest_stop]
print(f"[Rank={rank}]: Processing trees [tree_start, tree_stop) = "\
f"[{tree_start}, {tree_stop}) totntrees = {ntrees}")
# Now we can start converting the forests
# *Note* ``forest_stop`` is inclusive but since we are slicing,
# we need to account for the python convention hence, the slice
# on ``forest_info`` goes up to ``forest_stop + 1``
_convert_ctrees_forest_range(forest_info[forest_start:forest_stop+1],
trees_and_locations[tree_start:tree_stop],
rank, outputdir=outputdir, output_filebase=output_filebase,
write_halo_props_cont=write_halo_props_cont,
fields=fields, drop_fields=drop_fields,
truncate=truncate, compression=compression,
buffersize=buffersize, use_pread=use_pread,
show_progressbar=show_progressbar)
truncate = False
nconverted += 1
if rank == 0:
print(f"[Rank={rank}]: Converting a single set of (forests, locations) "
f"over {ntasks} tasks...done. Converted {nconverted} out "\
f"of {nfiles} sets of files")
# Done converting all files
if comm:
comm.Barrier()
if rank != 0:
return True
# Create the main file that contains the other files as (hdf5-sym-)links
fname = f'{outputdir}/{output_filebase}.h5'
try:
hf = h5py.File(fname, 'r')
existing_nfiles = hf['/'].attrs['Nfiles']
if existing_nfiles < ntasks:
msg = "Error: You should not have been able to re-run the "\
"converter on a larger number of tasks. The converter "\
"should have crashed before reaching this point. In "\
f"the main file '{fname}', the previous run was spread "\
f"over '{existing_nfiles}' individual files. However "\
f"the new run uses {ntasks} tasks which is larger. "
raise AssertionError(msg)
hf.close()
except OSError:
pass
with h5py.File(fname, 'w') as hf:
hf['/'].attrs['Nfiles'] = ntasks
hf['/'].attrs['TotNforests'] = 0
hf['/'].attrs['TotNtrees'] = 0
hf['/'].attrs['TotNhalos'] = 0
attr_props = [('TotNforests', 'Nforests'),
('TotNtrees', 'Ntrees'),
('TotNhalos', 'Nhalos')]
for itask in range(ntasks):
outfile = f'{outputdir}/{output_filebase}_{itask}.h5'
with h5py.File(outfile, 'a') as hf_task:
if standard_consistent_trees:
hf_task.attrs['consistent-trees-type'] = 'standard'
else:
hf_task.attrs['consistent-trees-type'] = 'parallel'
hf_task.attrs['contiguous-halo-props'] = write_halo_props_cont
for (out, inp) in attr_props:
hf['/'].attrs[out] += hf_task['/'].attrs[inp]
hf[f'File{itask}'] = h5py.ExternalLink(outfile, '/')
t1 = time.perf_counter()
print(f"Converting {nfiles} sets of (forests, locations) files "
f"over {ntasks} tasks ...done. Time taken = "
f"{t1-tstart:.2f} seconds")
return True
if __name__ == "__main__":
# import argparse
# # Taken from https://stackoverflow.com/a/34736291
# class NegateAction(argparse.Action):
# def __call__(self, parser, ns, values, option):
# setattr(ns, self.dest, option[2:4] != 'no')
import argparse
# helpmsg = "\nThis script needs at least two command-line arguments\n"
# helpmsg += "First argument -- <output directory>\n"
# helpmsg += "Second (and all following arguments) -- <input Consistent-Trees filenames to convert>\n\n"
helpmsg = "\nUsage Scenario 1:\n"
helpmsg += "-----------------\n"
helpmsg += "If you are converting the output of the standard Consistent-Trees code, then \n"\
"please provide the full-path to the 'forests.list' and 'locations.dat' (order is unimportant).\n\n"
helpmsg += "Usage Scenario 2:\n"
helpmsg += "-----------------\n"
helpmsg += "If you are converting the output of the parallel Consistent-Trees code \n"\
"from the Uchuu collaboration, then please provide all the tree filenames \n"\
"that you would like to convert (i.e., files ending with '<prefix>.tree').\n"\
"The names for relevant 'forests.list (<prefix>.forest)' and \n"\
"'locations.dat (<prefix>.loc)' will be automatically constructed.\n\n"
parser = argparse.ArgumentParser(description="Convert ascii Consistent-Trees files "
"into hdf5 (optionally in MPI parallel)")
parser.add_argument('outputdir', metavar='<Output directory>', type=str,
help='the output directory for the hdf5 file(s)')
parser.add_argument("filenames", metavar="<CTrees filenames>",
type=str, nargs='+',
help="list of input (ascii) Consistent-Trees filenames")
group = parser.add_mutually_exclusive_group()
group.add_argument("-m", "--multiple-dsets", dest='write_halo_props_cont',
action="store_true", default=True,
help="write a separate dataset for each halo property")
group.add_argument("-s", "--single-dset", dest='write_halo_props_cont',
action="store_false", default=False,
help="write a single dataset containing all halo properties")
parser.add_argument("-p", "--progressbar", dest='show_progressbar',
action="store_true", default=True,
help="display a progressbar on rank=0")
parser.add_argument("-v", "--verbose", dest='verbose',
action="store_true", default=False,
help="output additional info messages")
try:
args = parser.parse_args()
except SystemExit as e:
print(helpmsg)
raise e
print(f"args = {args}")
print(f"args.outputdir = {args.outputdir}")
print(f"args.filenames = {args.filenames}")
print(f"args.write_halo_props_cont = {args.write_halo_props_cont}")
print(f"args.show_progressbar = {args.show_progressbar}")
print(f"args.verbose = {args.verbose}")
# Do you want to write each halo as a struct (i.e., ALL the
# properties of any given halo are written together, array of structures),
# or do you want to write out individual datasets for each of the
# halo properties (i.e., any given property of ALL halos are written
# together, structure of arrays -> default)
# write_halo_props_cont = True # True -> structure of arrays,
# # False-> array of structures
if len(sys.argv) < 3:
print(helpmsg)
sys.exit(1)
# outputdir = sys.argv[1]
# The entire list of filenames to be converted should
# be passed at the command-line
# filenames = sys.argv[2:]
import sys
sys.exit(0)
convert_ctrees_files(args.filenames, outputdir=args.outputdir,
write_halo_props_cont=args.write_halo_props_cont,
show_progressbar=args.show_progressbar,
verbose=args.verbose)
#!/usr/bin/env python
from __future__ import print_function
__author__ = "Manodeep Sinha"
__all__ = ["read_locations_and_forests", "get_aggregate_forest_info",
"get_all_parallel_ctrees_filenames", "validate_inputs_are_ctrees_files",
"get_treewalk_dtype_descr", "add_tree_walk_indices"]
import time
import functools
from uchuu_utils import generic_reader, check_and_decompress, get_metadata, \
sanitize_ctrees_header, BaseParseFields
# requires numpy >= 1.9.0
def read_locations_and_forests(forests_fname, locations_fname, rank=0):
"""
Returns a numpy structured array that contains *both* the forest and
tree level info.
Parameters
----------
forests_fname: string, required
The name of the file containing forest-level info like the Consistent-Trees
'forests.list' file
locations_fname: string, required
The name of the file containing tree-level info like the Consistent-Trees
'locations.dat' file
rank: integer, optional, default:0
An integer identifying which task is calling this function. Only used in
status messages
Returns
-------
trees_and_locations: A numpy structured array
A numpy structured array containing the fields
<TreeRootID ForestID Filename FileID Offset TreeNbytes>
The array is sorted by ``(ForestID, Filename, Offset)`` in that order.
This sorting means that *all* trees belonging to the same forest
*will* appear consecutively regardless of the file that the corresponding
tree data might appear in. The number of elements in the array is equal
to the number of trees.
Note: Sorting by ``Filename`` is implemented by an equivalent, but faster
sorting on ``FileID``.
"""
import os # use os.stat to get filesize
import numpy as np
import numpy.lib.recfunctions as rfn
def _guess_dtype_from_val(val):
converters_and_type = [(np.float, np.float),
(str, 'U1024')]
fallback_dtype = 'V'
for (conv, dtype) in converters_and_type:
try:
_ = conv(val)
return dtype
except ValueError:
pass
print(f"Warning: Could not guess datatype for val = '{val}'. Returning "\
f"datatype = '{fallback_dtype}'")
return fallback_dtype
def _get_dtype_from_header(fname):
with generic_reader(fname, 'r') as f:
f = iter(f)
hdr = next(f)
hdr = hdr.strip('#').rstrip()
dataline = next(f)
known_fields = {
'Offset':np.int64,
'Filename':'U512',
}
# Now split on whitespace
fields = hdr.split()
datavals = dataline.split()
fields_and_type = [None]*len(fields)
for ii, (fld, val) in enumerate(zip(fields, datavals)):
if 'ID' in fld.upper():
dtype = np.int64
else:
try:
dtype = known_fields[fld]
except KeyError:
dtype = _guess_dtype_from_val(val)
print(f"Warning: Did not find key = '{fld}' in the known "\
f"columns. Assuming '{dtype}'")
pass
fields_and_type[ii] = (fld, dtype)
return np.dtype(fields_and_type)
# The main `read_locations_and_forests` function begins here
forests_dtype = _get_dtype_from_header(forests_fname)
locations_dtype = _get_dtype_from_header(locations_fname)
# Check that both the dtypes have the 'common' field that
# we are later going to use to join the two files
join_key = 'TreeRootID'
if not join_key in forests_dtype.names or \
not join_key in locations_dtype.names:
msg = f"Error: Expected to find column = '{join_key}' in "\
f"both the forests file ('{forests_fname}') and "\
f"the locations file ('{locations_fname}')"
msg += f"Columns in forests file = {forests_dtype.names}"
msg += f"Columns in locations file = {locations_dtype.names}"
raise KeyError(msg)
t0 = time.perf_counter()
print(f"[Rank={rank}]: Reading forests file '{forests_fname}'")
forests = np.loadtxt(forests_fname, comments='#', dtype=forests_dtype)
t1 = time.perf_counter()
print(f"[Rank={rank}]: Reading forests file '{forests_fname}'...done. "
f"Time taken = {t1-t0:.2f} seconds")
t0 = time.perf_counter()
print(f"[Rank={rank}]: Reading locations file '{locations_fname}' ...")
locations = np.loadtxt(locations_fname, comments='#', dtype=locations_dtype)
t1 = time.perf_counter()
print(f"[Rank={rank}]: Reading locations file '{locations_fname}' ...done. "
f"Time taken = {t1-t0:.2f} seconds")
# Check that the base directory is the same
# for the 'forests.list' and 'locations.dat' files
# Otherwise, since the code the code logic will fail in the conversion
dirname = list(set([os.path.dirname(f) for f in [forests_fname, locations_fname]]))
if len(dirname) != 1:
msg = "Error: Standard Consistent-Trees catalogs *should* "\
"create the 'forests.list' and 'locations.dat' in the "\
f"same directory. Instead found directories = {dirname}\n"\
f"Input files were = ({forests_fname}, {loocations_fname})"
raise ValueError(msg)
dirname = dirname[0]
# The 'locations.dat' file does not have fully-qualified paths
# Add the prefix so all future queries woork out just fine
filenames = [f'{dirname}/{fname}' for fname in locations['Filename'] if '/' not in fname]
locations['Filename'][:] = filenames
t0 = time.perf_counter()
print(f"[Rank={rank}]: Joining the forests and locations arrays...")
trees_and_locations = rfn.join_by(join_key, forests, locations, jointype='inner')
if trees_and_locations.shape[0] != forests.shape[0]:
raise AssertionError("Error: Inner join failed to preserve the shape")
trees_and_locations = rfn.append_fields(trees_and_locations, 'TreeNbytes',
np.zeros(trees_and_locations.shape[0],
dtype=np.int64))
t1 = time.perf_counter()
print(f"[Rank={rank}]: Joining the forests and locations arrays...done. "
f"Time taken = {t1-t0:.2f} seconds")
# All the fields have been assigned into the combined structure
# Now, we need to figure out the bytes size of each forest by sorting on
# FileID and Offset. The locations.dat file contains the offset where
# the tree data starts but does not contain where the tree data ends. By
# sorting on ('FileID', 'Offset'), we can have a guess at where the current
# tree ends by looking at where the next tree starts, and then removing all
# intervening bytes. Here, the intervening bytes are what's written out by
# this line -> https://bitbucket.org/pbehroozi/consistent-trees/src/9668e1f4d396dcecdcd1afab6ac0c80cbd561d72/src/assemble_halo_trees.c#lines-202
#
# fprintf(tree_outputs[file_id], "#tree %"PRId64"\n", halos[0].id);
#
# Since we already have the TreeRootID, we know *exactly* how many bytes
# are taken up by that printf statement. We subtract that many bytes from
# the next-tree-starting-offset, then we have the current-tree-ending-offset.
# The last tree in the file will have to special-cased -- in this case, the
# end of tree data is known to be at end-of-file (EOF). -- MS 16/04/2020
# Sort the array on filename, and then by offset
# Sorting by fileID is faster than sorting by filename
trees_and_locations.sort(order=['FileID', 'Offset'])
ntrees = trees_and_locations[join_key].shape[0]
# Calculate the number of bytes in every tree
t0 = time.perf_counter()
print(f"[Rank={rank}]: Computing number of bytes per tree...")
treenbytes = np.zeros(ntrees, dtype=np.int64)
nexttid = trees_and_locations[join_key][1:]
nexttidlen = np.array([len(f"#tree {x:d}\n") for x in nexttid], dtype=np.int64)
nextstart = trees_and_locations['Offset'][1:]
thisend = nextstart - nexttidlen
treenbytes[0:-1] = thisend - trees_and_locations['Offset'][0:-1]
# Now fix the last tree for any file, (special-case the absolute last tree)
uniq_tree_filenames = list(set(trees_and_locations['Filename']))
filesizes_dict = {fname: os.stat(fname).st_size for fname in uniq_tree_filenames}
fileid_changes_ind = [ntrees-1]
if len(uniq_tree_filenames) > 1:
fileid = trees_and_locations['FileID'][:]
fileid_diffs = np.diff(fileid)
ind = (np.where(fileid_diffs != 0))[0]
if ind:
fileid_changes_ind.extend(ind)
for ii in fileid_changes_ind:
fname, off = trees_and_locations['Filename'][ii], trees_and_locations['Offset'][ii]
treenbytes[ii] = filesizes_dict[fname] - off
if treenbytes.min() <= 0:
msg = "Error: Trees must span more than 0 bytes. "\
f"Found treenbytes.min() = {treenbytes.min()}"
raise AssertionError(msg)
trees_and_locations['TreeNbytes'][:] = treenbytes
t1 = time.perf_counter()
print(f"[Rank={rank}]: Computing number of bytes per tree...done. "
f"Time taken = {t1-t0:.2f} seconds")
# Now every tree has an associated size in bytes
# Now let's re-sort the trees such that all trees from
# the same forest are contiguous in the array.
trees_and_locations.sort(order=['ForestID', 'Filename', 'Offset'], kind='heapsort')
return trees_and_locations
def get_aggregate_forest_info(trees_and_locations, rank=0):
"""
Returns forest-level information from the tree-level information
supplied.
Parameters
-----------
trees_and_locations: numpy structured array, required
A numpy structured array that contains the tree-level
information. Should be the output from the function
``read_locations_and_forests``
rank: integer, optional, default:0
An integer identifying which task is calling this function. Only used in
status messages
Returns
-------
forest_info: A numpy structured array
The structured array contains the fields
['ForestID', 'ForestNhalos', 'Input_ForestNbytes', 'Ntrees']
The 'ForestNhalos' field is set to 0, and is populated as the
trees are processed. The number of elements in the array is
equal to the number of forests.
"""
import numpy as np
# The smallest granularity for parallelisation is a
# single forest --> therefore, we also create this
# additional array with info at the forest level.
ntrees = trees_and_locations.shape[0]
# The strategy requires the 'trees_and_locations' to be
# already sorted on 'ForestID'
uniq_forestids, ntrees_per_forest = np.unique(trees_and_locations['ForestID'], \
return_counts=True)
nforests = uniq_forestids.shape[0]
forest_info_dtype = np.dtype([('ForestID', np.int64),
('ForestNhalos', np.int64),
('Input_ForestNbytes', np.int64),
('Ntrees', np.int64)])
forest_info = np.empty(nforests, dtype=forest_info_dtype)
forest_info['ForestID'][:] = uniq_forestids
# We can only fill in the number of halos *after* reading in
# the halos (across all the trees)
forest_info['ForestNhalos'][:] = 0
forest_info['Ntrees'][:] = ntrees_per_forest
t0 = time.perf_counter()
print(f"[Rank={rank}]: Calculating number of bytes per forest...")
ends = np.cumsum(ntrees_per_forest)
if ends[-1] != ntrees:
msg = "Error: Something strange happened while computing the number of bytes "\
"per tree. Expected the last element of the array containing the "\
"cumulative sum of number of trees per forest = '{}' to equal the total number "\
"of trees being processed = '{}'. Please check that the TreeRootID and ForestID "\
"values are unique...exiting"
raise AssertionError(msg)
starts = np.zeros_like(ends, dtype=np.int64)
starts[1:] = ends[0:-1]
treenbytes = trees_and_locations['TreeNbytes'][:]
if treenbytes.min() <= 0:
msg = "Error: While computing the number of bytes per forest. Expected *all* trees "\
"to contain at least one entry (i.e., span non-zero bytes in the input file)"\
"However, minimum of the array containing the number of bytes per tree "\
f"= '{treenbytes.min()}'. Perhaps the input files are corrupt? Exiting..."
raise AssertionError(msg)
forestnbytes = np.array([treenbytes[start:end].sum() for start, end in zip(starts, ends)])
if forestnbytes.shape[0] != nforests:
msg = "Error: There must be some logic issue in the code. Expected "\
"that the shape of the array containing the number of bytes "\
f"per forest = '{forestnbytes.shape[0]}' to be *exactly* "\
f"equal to nforests = '{nforests}'. Exiting..."
raise AssertionError(msg)
# Since we have already checked that 'treenbytes.min() > 0'
# --> no need to check that forestnbytes.min() is > 0
forest_info['Input_ForestNbytes'][:] = forestnbytes
t1 = time.perf_counter()
print(f"[Rank={rank}]: Calculating number of bytes per forest...done. "
f"Time taken = {t1-t0:0.2f} seconds")
return forest_info
def get_all_parallel_ctrees_filenames(fname):
"""
Returns three filenames corresponding to the 'forests.list',
'locations.dat', and 'tree_*.dat' files *assuming* the naming
convention the Uchuu collaboration's parallel Consistent-Trees code
Parameters
-----------
fname: string, required
A filename specifying the tree data file for a parallel Consistent-Trees
output
Returns
--------
forests_file, locations_file, treedata_file: strings, filenames
The filenames corresponding to the 'forests.list', 'locations.dat' and
the 'tree_*.dat' file *generated using* the convention of the Uchuu
colloboration. The convention is:
'forests.list' ---> '<prefix>.forest'
'locations.dat' ---> '<prefix>.loc'
'tree_*.dat' ---> '<prefix>.tree'
"""
import os
fname = check_and_decompress(fname)
# Need to create the three filenames for the
# forests.list, locations.dat and tree*.dat files
dirname = os.path.dirname(fname)
# Get the filename and remove the file extension
basename, _ = os.path.splitext(os.path.basename(fname))
forests_file = f'{dirname}/{basename}.forest'
locations_file = f'{dirname}/{basename}.loc'
treedata_file = f'{dirname}{basename}.tree'
return forests_file, locations_file, treedata_file
def get_forests_locations_filenames(filenames):
import os
if len(filenames) != 2:
msg = "Error: Expected to get only two filenames. Instead got "\
f"filenames = '{filenames}' with len(filenames) = {len(filenames)}"
raise AssertionError(msg)
forests_file, locations_file = filenames[0], filenames[1]
# Check if the first line in the potential `forests_file` contains
# the expected 'ForestID'. We could even check that the entire
# first line matches '#TreeRootID ForestID'
with generic_reader(forests_file, 'rt') as f:
line = f.readline()
if 'ForestID' not in line:
forests_file, locations_file = locations_file, forests_file
# Now check that the locations file is a valid CTrees file
with generic_reader(locations_file, 'rt') as f:
line = f.readline()
if 'FileID' not in line or \
'Offset' not in line or \
'Filename' not in line:
msg = f"Error: The first line in the locations_file = '{locations_file}' "\
f"does not contain the expected fields. First line = '{line}'"
raise AssertionError(msg)
return forests_file, locations_file
def validate_inputs_are_ctrees_files(ctrees_filenames, base_metadata=None,
base_version=None, base_input_catalog_type=None):
"""
Checks the files contain Consistent-Trees catalogues derived from
the same simulation.
Parameters
-----------
ctrees_filenames: list of filenames, string, required
The input filenames (potentially) containing Consistent-Trees
catalogues.
Note: Only unique filenames within this list are checked
Returns
--------
The FofID field Returns ``True`` when all the files containing valid Consistent-Trees
catalogues, *and* the same header info, i.e., same simulation + Consistent-Trees
setup. Otherwise, a ``ValueError`` is raised.
"""
import numpy as np
## Read the entire header meta-data from the first file
files = np.unique(ctrees_filenames)
if not base_version:
base_metadata_dict = get_metadata(files[0])
base_metadata = np.string_(base_metadata_dict['metadata'])
base_version = base_metadata_dict['version']
base_input_catalog_type = base_metadata_dict['catalog_type']
nfiles = files.shape[0]
for fname in files:
metadata_dict = get_metadata(fname)
metadata = np.string_(metadata_dict['metadata'])
version = metadata_dict['version']
input_catalog_type = metadata_dict['catalog_type']
if 'Consistent' not in input_catalog_type:
msg = "Error: This script is meant *only* to process "\
"Consistent-Tree catalogs. Found catalog type = "\
f"{input_catalog_type} instead (input file = {fname})"
raise ValueError(msg)
if 'hlist' in input_catalog_type:
msg = "Error: This script is meant *only* to process "\
"Consistent-Tree catalogs containing tree data. "\
"Found a halo catalogue (generated by "\
"Consistent-Trees)\" instead. catalog type = "\
f"{input_catalog_type} instead (input file = {fname})"
raise ValueError(msg)
if base_input_catalog_type != input_catalog_type:
msg = f"Error: Catalog type = '{input_catalog_type}' for file '{fname}' does *not* "\
f"match the catalog type = '{base_input_catalog_type}' for file '{files[0]}'"
raise ValueError(msg)
if base_version != version:
msg = "Error: Version = '{version}' for file '{fname}' does *not* "\
"match the version = '{base_version}' for file '{files[0]}'"
raise ValueError(msg)
if not np.array_equal(base_metadata, metadata):
msg = "Error: metadata = '{metadata}' for file '{fname}' does *not* "\
"match the metadata = '{base_metadata}' for file '{files[0]}'"
raise ValueError(msg)
return True
def assign_fofids(forest, rank=0):
"""
Fills the FofID field for all halos.
Parameters
-----------
forest: numpy structured array, required
An array containing all halos from the same forest
Note: The tree-walking indices (i.e., columns returned by
the function ``get_treewalk_dtype_descr``) are expected to be
filled with -1.
rank: integer, optional, default=0
The unique identifier for the current task. Only used
within an error statement
Returns
--------
The FofID field is filled in-place, and this function has
no return value
"""
import numpy as np
# Assign fofhaloids to the halos
fofhalos = (np.where(forest['pid'] == -1))[0]
if len(fofhalos) == 0:
nhalos = forest.shape[0]
msg = f"Error: There are no FOF halos among these {nhalos} passed.\n"
msg += f"forest = {forest}"
raise ValueError(msg)
# First fix the upid, and assign the (self) FofIDs
# to the FOF halos
fofhalo_ids = forest['id'][fofhalos]
forest['upid'][fofhalos] = fofhalo_ids
forest['FofID'][fofhalos] = fofhalo_ids
# First set the upid, fofid for the FOFs
haloids = forest['id'][:]
sorted_halo_idx = np.argsort(haloids)
rem_sub_inds = (np.where(forest['FofID'] == -1))[0]
nleft = len(rem_sub_inds)
while nleft > 0:
# Need to match un-assigned (pid, upid) with fofid, id of halos with assigned fofid
nassigned = 0
for fld in ['upid', 'pid']:
if len(rem_sub_inds) != nleft:
msg = "Error: (Bug in code) Expected to still have some "\
f"remaining subhalos that needed assigning, since nleft = {nleft} "\
f"with len(rem_sub_inds) = {len(rem_sub_inds)}\n"\
f"rem_sub_inds = {rem_sub_inds} "
raise AssertionError(msg)
fld_id = forest[fld][rem_sub_inds]
# 'pid'/'upid' might be duplicated ->
# np.intersect1d will only return one value; other
# halos with the same 'pid' will not get updated to the
# correct fof. Need a 'searchsorted' like implementation
# but know that not *all* pids will be found
fld_idx = np.searchsorted(haloids, fld_id, sorter=sorted_halo_idx)
forest['FofID'][rem_sub_inds] = forest['FofID'][sorted_halo_idx[fld_idx]]
nassigned += len(rem_sub_inds)
rem_sub_inds = np.where(forest['FofID'] == -1)[0]
nassigned -= len(rem_sub_inds)
assert nassigned >= 0, f"Error: nassigned = {nassigned} must be at least 0."
nleft = len(rem_sub_inds)
if nleft == 0:
break
if nleft > 0 and nassigned == 0:
msg = f"[Rank={rank}]: Error: There are {nleft} halos left without fofhalos assigned but "\
"could not assign a single fof halo in this iteration\n"
xx = np.where(forest['FofID'] == -1)[0]
msg += f"forest['pid'][xx] = {forest['pid'][xx]}\n"
msg += f"forest['upid'][xx] = {forest['upid'][xx]}\n"
msg += f"forest[xx] = {forest[xx]}\n"
pid = forest['pid'][xx]
xx = np.where(forest['id'] == pid)[0]
msg += f"pid = {pid} forest['id'] = {forest['id'][xx]} (should be equal to 'pid')\n"
msg += f"fofid for the 'pid' halo = {forest['FofID'][xx]}"
raise ValueError(msg)
# Reset the FOF upid field
assert forest['pid'][fofhalos].min() == -1
assert forest['pid'][fofhalos].max() == -1
forest['upid'][fofhalos] = -1
return
def get_treewalk_dtype_descr():
"""
Returns the description for the additional fields
to add to the forest for walking the mergertree
Parameters
-----------
None
Returns
--------
mergertree_descr: list of tuples
A list of tuples containing the names and datatypes
for the additional columns needed for walking the
mergertree. This list can be used to create a numpy
datatype suitable to contain the additional mergertree
indices
"""
import numpy as np
mergertree_descr = [('FofID', np.int64),
('FirstHaloInFOFgroup', np.int64),
('NextHaloInFOFgroup', np.int64),
('PrevHaloInFOFgroup', np.int64),
('FirstProgenitor', np.int64),
('NextProgenitor', np.int64),
('PrevProgenitor', np.int64),
('Descendant', np.int64)]
return mergertree_descr
def add_tree_walk_indices(forest, rank=0):
"""
Adds the various mergertree walking indices
Parameters
-----------
forest: numpy structured array, required
An array containing all halos from the same forest
rank: integer, optional, default=0
The unique identifier for the current task. Only used
within an error statement
Returns
--------
None
The mergertree indices are filled in-place and no additional
return occurs
"""
import numpy as np
mergertree_descr = get_treewalk_dtype_descr()
# initialise all the mergertree indices
for name, _ in mergertree_descr:
forest[name][:] = -1
# Assign the FOFhalo IDs
assign_fofids(forest, rank)
# Assign the index for the fofhalo
forest['Mvir'] *= -1 # hack so that the sort is decreasing order
sorted_fof_order = forest.argsort(order=['scale', 'FofID', 'upid', 'Mvir', 'id'])
forest['Mvir'] *= -1 # reset mass back
# upid of FOFs is set to -1, therefore the FOF halo should be the first
# halo with that FOFID (i.e., equal to its own haloID)
uniq_fofs, firstfof_inds = np.unique(forest['FofID'][sorted_fof_order],
return_index=True)
nextsub_loc = np.split(sorted_fof_order, firstfof_inds[1:])
for lhs in nextsub_loc:
assert forest['FofID'][lhs].min() == forest['FofID'][lhs].max()
assert forest['FofID'][lhs].min() == forest['id'][lhs[0]]
forest['FirstHaloInFOFgroup'][lhs] = lhs[0]
forest['NextHaloInFOFgroup'][lhs] = [*lhs[1:], -1]
forest['PrevHaloInFOFgroup'][lhs] = [-1, *lhs[:-1]]
# Now match the descendants
haloids = forest['id'][:]
desc_ids = forest['desc_id'][:]
sorted_halo_idx = np.argsort(haloids)
valid_desc_idx = desc_ids != -1
desc_ind = np.searchsorted(haloids, desc_ids[valid_desc_idx], side='left', sorter=sorted_halo_idx)
np.testing.assert_array_equal(desc_ids[valid_desc_idx], haloids[sorted_halo_idx[desc_ind]],
err_msg="searchsorted did not work as expected")
forest['Descendant'][valid_desc_idx] = sorted_halo_idx[desc_ind]
# Check that *all* desc_ids were found
assert set(haloids[sorted_halo_idx[desc_ind]]) == set(desc_ids[valid_desc_idx])
np.testing.assert_array_equal(forest['desc_id'][valid_desc_idx],
forest['id'][sorted_halo_idx[desc_ind]],
err_msg='descendant ids are incorrect')
np.testing.assert_array_equal(forest['desc_scale'][valid_desc_idx],
forest['scale'][sorted_halo_idx[desc_ind]],
err_msg='descendant scales are incorrect')
# # Now assign (firstprog, nextprog)
# # Sort on the descendants and then mass, but only get the index
# # This is confusing, but really what the sorted index refers to are the progenitors
# # and this is the order in which the progenitors should be assigned as
# # FirstProg, NextProg
forest['Mvir'] *= -1 # hack so that the sort is decreasing order
sorted_prog_inds = np.argsort(forest, order=['desc_id', 'Mvir'])
forest['Mvir'] *= -1 # reset the masses back
# t0 = time.perf_counter()
valid_desc_idx = np.where(forest['desc_id'][sorted_prog_inds] != -1)[0]
uniq_descid, firstprog_inds = np.unique(forest['desc_id'][sorted_prog_inds[valid_desc_idx]],
return_index=True)
desc_for_firstprog_idx = np.searchsorted(haloids, uniq_descid,
sorter=sorted_halo_idx)
# np.testing.assert_array_equal(firstprog, forest['FirstProgenitor'])
forest['FirstProgenitor'][sorted_halo_idx[desc_for_firstprog_idx]] = \
sorted_prog_inds[valid_desc_idx[firstprog_inds]]
# Check that the firstprog assignment was okay
xx = np.where(forest['FirstProgenitor'] != -1)[0]
firstprog = forest['FirstProgenitor'][xx]
np.testing.assert_array_equal(forest['desc_id'][firstprog], forest['id'][xx])
valid_prog_loc = np.split(sorted_prog_inds[valid_desc_idx], firstprog_inds[1:])
for lhs in valid_prog_loc:
assert forest['desc_id'][lhs].min() == forest['desc_id'][lhs].max()
desc = forest['Descendant'][lhs[0]]
assert forest['FirstProgenitor'][desc] == lhs[0]
mvir = forest['Mvir'][lhs]
assert np.all(np.diff(mvir) <= 0.0)
forest['NextProgenitor'][lhs] = [*lhs[1:], -1]
forest['PrevProgenitor'][lhs] = [-1, *lhs[0:-1]]
return
#!/bin/bash
#######################################################
# Author: Manodeep Sinha
# Email: manodeep@gmail.com
# Date: 23/01/2020
# License: MIT
# Purpose: Combine multiple Rockstar into
# a single ascii file. The generated files
# will conform to the file-format of the
# input files, and the header will appear
# once.
# ALL input files are checked that they are
# of the same type (i.e., will prevent you from
# accidentally merging Rockstar catalogues from
# different simulations)
#######################################################
function get_header() {
fname="$1"
hdr="$2"
# Print all lines preceeding the first data line (something that begins with digits)
# Adapted from https://unix.stackexchange.com/a/11323
# Awk prints until it encounters a data line -> but then awk
# exits before printing that line. Therefore, all comment lines
# are captured.
# NOTE: Does not assume a comment character
awk '/^[0-9]+/ {exit} {print}' "$fname" > "$hdr"
return $?
}
function verify_headers() {
numfiles="$#"
firstfile="$1"
# Check that the file exists
if [ ! -f "$firstfile" ]; then
echo "Error: Could not locate input file: '$firstfile'"
return 1
fi
#echo "$firstfile is the first file"
# This 'hdrfile' will only contain the comments
# (which are expected to be lines beginning with '#')
# Add in the maximum expected number of header lines and
# first print only those lines
hdrfile="$(mktemp)"
tmpfile="$(mktemp)"
# Shift moves the parameter list to the left (think of
# it as popping of the first argument since we have
# stored that in "$firstfile"
shift
# Now get the header in the first file
get_header "$firstfile" "$hdrfile"
retval=$?
if [ $retval -ne 0 ]; then
echo "Error with extracting header (i.e., comments) lines from input file $firstfile and writing to $hdrfile"
return $retval
fi
# Check that 'Rockstar' is in the header
required_string='Rockstar'
grep "$required_string" "$hdrfile" > /dev/null
retval=$?
if [ $retval -ne 0 ]; then
echo "Error: This script only works for specific kinds of files"
echo "Could not find in the string '$required_string' in the file '$firstfile'"
return $retval
fi
# Loop over the remaining files that need to be combined (i.e., second file onwards)
for fname in "$@"; do
if [ ! -f "$fname" ]; then
echo "Error: Could not locate input file: '$fname'"
return 1
fi
#echo "Now checking whether $fname has the same header as $firstfile"
# Extract the header
get_header "$fname" "$tmpfile"
retval=$?
if [ $retval -ne 0 ]; then
echo "Error with extracting header (i.e., comments) lines from input file $fname and writing to $tmpfile"
return $retval
fi
# Check that the header matches with the header from first file
diff -q "$hdrfile" "$tmpfile" >/dev/null
retval=$?
if [ $retval -ne 0 ]; then
echo "Error: The headers are not identical between two files"
echo "Offending files $fname (header in $tmpfile) is not identical to the header for $firstfile (header in $hdrfile)"
return $retval
fi
done
# Remove the temporary header files
rm $hdrfile $tmpfile
echo "Success! All of the $numfiles input files have the same header"
return 0
}
function join_files() {
outputfile="$1"
# shift pops the outputfile from the (variable) argument list
# stored in "$@"
shift
# All arguments succeeding are input files
numinputfiles="$#"
# Now ensure that ALL the input files have the same header
verify_headers "$@"
retval=$?
if [ $retval -ne 0 ]; then
return $retval
fi
# All the files exist and have been verified to have the same header
firstfile="$1"
shift
#echo "$firstfile is the first file"
cat "$firstfile" > "$outputfile"
retval=$?
if [ $retval -ne 0 ]; then
echo "Error: Could not write the first file into the output file"
echo "Offending files are: Input file is $firstfile, and output file is $outputfile"
return $retval
fi
for fname in "$@"; do
#echo "Adding the data from $fname into $outputfile"
grep -v "^#" "$fname" >> "$outputfile"
retval=$?
if [ $retval -ne 0 ]; then
echo "Error: Could not concatenate $file into the output file: $outputfile"
return $retval
fi
done
echo "Success! Combined all $numinputfiles into a single output file = '$outputfile'"
return 0
}
# Now call the join_files function with the output file
# and the list of input files
# Invocation could be `/bin/bash join_files.sh 'combined.rockstar' 'out_28_*.rockstar'`
if [ "$#" -lt 3 ]; then
echo "Error: Please at least three command-line arguments"
echo "<output file> -- string specifying the *combined* ascii filename (*gets overwritten*)"
echo "<list of filenames> -- input filenames that will be combined (all files must exist, must be of same type)"
exit 1
fi
join_files "$@"
#!/bin/bash
#######################################################
# Author: Manodeep Sinha
# Email: manodeep@gmail.com
# Date: 22/01/2019
# License: MIT
# Purpose: Split Rockstar ascii catalogues into
# smaller files while retaining the header.
# The generated files will conform to the
# file-format of the input files
#######################################################
split_filter() {
{ cat "$hdrfile"; cat; } > "$FILE";
}
function split_file() {
fullfile=$1
nfiles=$2
outdir=$3
# Trim any repeating trailing slashes
# Taken from https://stackoverflow.com/a/32845647/2237582
outdir=$(echo $outdir | sed 's:/*$::')
# Check that the file exists
if [ ! -f "$fullfile" ]; then
echo "Error: Could not locate file: '$fullfile'"
return 1
fi
# If only requesting <= 1 chunk -> there is nothing to do
if [ $nfiles -le 1 ]; then
echo "Requested chunks = $nfiles implies no work...returning"
return 0
fi
# Check that output directory exists
if [ ! -d "$outdir" ]; then
echo "Error: Output directory '$outdir' does not exist"
return 1
fi
# Check that output directory is writable
if [ ! -w "$outdir" ]; then
echo "Error: Output directory '$outdir' is not writable"
echo "Please select a different output directory where you have write permissions"
return 1
fi
# Taken from https://unix.stackexchange.com/a/285928
currentver=$(split --version | head -n 1 | awk '{print $NF}')
# The bottleneck is the coreutils tool 'split'
# We need the '--filter' functionality (introduced in v8.11)
# and the '--additional-suffix' keyword (introduced in v8.16)
requiredver="8.16"
# Taken from: https://unix.stackexchange.com/a/285928
if [ ! "$(printf '%s\n' "$requiredver" "$currentver" | sort -V | head -n1)" = "$requiredver" ]; then
echo "Error: The version of the system command 'split' is too old"
echo "Minimum version of the coreutils tool required is $requiredver and only found version = $currentver"
echo "You will need to upgrade coreutils - either ask your system administrator"
echo "Or install through conda with 'conda install -c conda-forge coreutils'"
return 1
fi
# This 'hdrfile' will only contain the comments
# (which are expected to be lines beginning with '#')
# Add in the maximum expected number of header lines and
# first print only those lines
export hdrfile="$(mktemp -p $outdir)"
# Print all lines preceeding the first data line (something that begins with digits)
# Adapted from https://unix.stackexchange.com/a/11323
awk '/^[0-9]+/ {exit} {print}' "$fullfile" > "$hdrfile"
retval=$?
if [ $retval -ne 0 ]; then
echo "Error with extracting header (i.e., comments) lines from input file $fullfile and writing to $hdrfile"
return $retval
fi
# Check that the input file is a Rockstar catalogue
# (since there is a logical grouping for the Consistent-Trees
# catalogues where all halos belonging to the same tree
# are sequential -> therefore splitting a CTrees file requires
# a more specialised splitting tool)
required_string='Rockstar'
grep "$required_string" "$hdrfile" > /dev/null
retval=$?
if [ $retval -ne 0 ]; then
echo "Error: This script only works for specific kinds of files"
echo "Could not find in the string '$required_string' in the file '$fullfile'"
return $retval
fi
# All input validation and tool validation is done -> let's split the input file
echo "Splitting '$fullfile' into '$nfiles' chunks and writing to '$outdir' ..."
export -f split_filter
# The header file has already been generated successfully
nhdrlines=$(wc -l < "$hdrfile")
firstdataline=$((nhdrlines+1))
# Extract the actual filename (remove the paths, if present)
filename=$(basename -- "$fullfile")
extension=".${filename##*.}"
filebasename="${filename%.*}_"
# Split the input file "nfiles" chunks
# The generated filename will be "'outdir/filebasename' + '%02d' + 'extension'"
tail -n +"$firstdataline" "$fullfile" | split -n r/$nfiles --filter=split_filter -d --additional-suffix="$extension" - "${outdir}/${filebasename}"
retval=$?
if [ $retval -ne 0 ]; then
echo "****** Error with split *******"
echo "If you see an error message relating to 'unrecognized option --additional-suffix='"
echo "Then your version of the system command 'split' is too old"
echo "You will need to upgrade coreutils - either ask your system administrator"
echo "Or install through conda with 'conda install -c conda-forge coreutils'"
echo "Intermediate file created is: " "$hdrfile"
echo "*******************************"
return $retval
fi
# Remove the temporary header file
rm $hdrfile
echo "Splitting '$fullfile' into '$nfiles' chunks and writing to '$outdir' ......done"
return 0
}
# Now call the function with the input file and
# the number of chunks, and the output directory
# as the (only) three arguments
# Invocation could be `/bin/bash split_file.sh 'out_28.rockstar' 5 './'`
if [ "$#" -ne 3 ]; then
echo "Error: Please pass exactly two command-line arguments"
echo "<input file> -- string specifying the ascii filename to be split"
echo "<number of split files> -- integer specifying the number of files to split into"
echo "<output dir> -- string specifying the directory where the output files will be written to (and temporary files)"
exit 1
fi
split_file "$@"
#!/usr/bin/env python
from __future__ import print_function
import os
## From Yao-Yuan Mao's SimulationAnalysis.py (part of helper package)
import re
import io
import numpy as np
from builtins import range, zip
import inspect
def _isstring(s):
try:
s + ''
except TypeError:
return False
return True
class BaseParseFields():
def __init__(self, header, fields=None):
if len(header)==0:
if all(isinstance(f, int) for f in fields):
self._usecols = fields
self._formats = [float]*len(fields)
self._names = ['f%d'%f for f in fields]
else:
raise ValueError('header is empty, so fields must be a list '\
'of int.')
else:
header_s = [self._name_strip(__) for __ in header]
if not fields:
self._names = header
names_s = header_s
self._usecols = list(range(len(names_s)))
else:
if _isstring(fields):
fields = [fields]
self._names = [header[f] if isinstance(f, int) else str(f) \
for f in fields]
names_s = [self._name_strip(__) for __ in self._names]
wrong_fields = [str(f) for s, f in zip(names_s, fields) \
if s not in header_s]
if wrong_fields:
raise ValueError('The following field(s) are not available'\
': %s.\nAvailable fields: %s.'%(\
', '.join(wrong_fields), ', '.join(header)))
self._usecols = [header_s.index(__) for __ in names_s]
self._formats = [self._get_format(__) for __ in names_s]
self.dtype = np.dtype({'names':self._names, \
'formats':self._formats})
def parse_line(self, l):
items = l.split()
try:
return tuple(c(items[i]) for i, c in zip(self._usecols, self._formats))
except Exception as _error:
print('Something wrong when parsing this line:\n{0}'.format(l))
raise _error
def pack(self, X):
return np.array(X, self.dtype)
def _name_strip(self, s):
return self._re_name_strip.sub('', s).lower()
def _get_format(self, s):
return float if self._re_formats.search(s) is None else int
_re_name_strip = re.compile('\W|_')
_re_formats = re.compile('^phantom$|^mmp$|id$|^num|num$')
## End of code lifted from Yao-Yuan Mao's SimulationAnalysis.py
def get_parser(filename, fields=None):
with io.open(filename, 'r') as f:
# Taken from SimulationAnalysis.py
hdrline = next(f)
hdrline = hdrline.strip().lstrip('#').lstrip()
header = [re.sub('\(\d+\)$', '', s) for s in hdrline.split()]
# print("Header = {}".format(header))
p = BaseParseFields(header, fields)
# print("p.dtype = {}".format(p.dtype))
# Above taken from SimulationAnalysis.py
return p
def read_lines_chunks(filename, chunksize, usecols, formats):
import io
def parse_line(l, usecols, formats):
items = l.split()
return tuple(c(items[i]) for i, c in zip(usecols, formats))
with io.open(filename, 'rt') as f:
X = [None]*chunksize
size = 0
for l in f:
if l.startswith('#'): continue
X[size] = parse_line(l, usecols, formats)
size += 1
if size == chunksize:
size = 0
yield X
if size > 0:
yield X[0:size]
return []
def parse_ascii_file_default(input_file, parser, chunksize):
from itertools import islice
print_line=True
nread = 0
with io.open(input_file, "r") as f:
while True:
# Read from the ascii file in (up to) chunksize lines
# parse the line based on the columns wanted and create
# list (where each item of the list is a tuple returned
# by the parse_line method)
X = [parser.parse_line(l) for l in islice(f, chunksize) \
if not l.startswith('#')]
if not X:
break
halos = parser.pack(X)
nread += halos.shape[0]
# if print_line:
# print_line=False
# print("type(halos) = {}\nhalos shape = {} dtype = {}\nhalos[0] = {}".format(type(halos),
# halos.shape,
# halos.dtype,
# halos[0]))
print("In {}: nread = {}".format(inspect.currentframe().f_code.co_name, nread))
return
def parse_ascii_file_pandas(input_file, parser, chunksize):
import pandas as pd
gen = pd.read_csv(input_file, dtype=parser.dtype,
names=parser.dtype.names, delim_whitespace=True,
index_col=False, chunksize=chunksize, comment='#')
print_line=True
nread = 0
for chunk in gen:
halos = np.array(chunk.to_records(index=False), dtype=parser.dtype)
nread += halos.shape[0]
if print_line:
print_line=False
print("type(halos) = {}\nhalos shape = {} dtype = {}\nhalos[0] = {}".format(type(halos),
halos.shape,
halos.dtype,
halos[0]))
print("In {}: nread = {}".format(inspect.currentframe().f_code.co_name, nread))
return
def parse_ascii_file_csv(input_file, parser, chunksize):
import csv
from itertools import islice
def decomment(csvfile):
for row in csvfile:
raw = row.split('#')[0].strip()
if raw: yield raw
nread = 0
print_line = True
with io.open(input_file, 'r') as f:
reader = csv.reader(decomment(f), delimiter=' ')
while True:
X = [tuple(row) for row in islice(reader, chunksize)]
if not X:
break
halos = np.array(X, dtype=parser.dtype)
nread += halos.shape[0]
# if print_line:
# print_line=False
# print("type(halos) = {}\nhalos shape = {} dtype = {}\n".format(type(halos),
# halos.shape,
# halos.dtype))
print("In {}: nread = {}".format(inspect.currentframe().f_code.co_name, nread))
return
def main(filename):
import timeit
import functools
import sys
chunksize = 100000
nrepeats = 1
parser = get_parser(filename)
#parse_ascii_file_default(filename, parser, chunksize)
parse_ascii_file_pandas(filename, parser, chunksize)
#parse_ascii_file_csv(filename, parser, chunksize)
readers = [parse_ascii_file_default, parse_ascii_file_pandas, parse_ascii_file_csv]
for func in readers:
t = timeit.Timer(functools.partial(func, filename, parser, chunksize))
print("Time required for function {} is {:.3f}".format(func.__name__,
t.timeit(nrepeats)))
sys.stdout.flush()
if __name__ == "__main__":
filename = "test_split/out_35.list_00_00.p5"
main(filename)
print("This is untested code - please use with caution")
#!/usr/bin/env python
from __future__ import print_function
__author__ = "Manodeep Sinha"
__all__ = ["test_parallel_ctrees"]
import numpy as np
from uchuu_utils import iterTrees
from ctrees_utils import get_treewalk_dtype_descr
def _validate_prog_desc(forest):
print(f"forest.dtype.names = {forest.dtype.names}")
for prog, desc in enumerate(forest['Descendant']):
if desc == -1:
continue
if forest['scale'][desc] <= forest['scale'][prog]:
msg = f"Error: desc_scale = {forest['scale'][desc]} must be greater "\
f"than progenitor scale = {forest['scale'][prog]}"\
f"desc = {desc} prog = {prog} desc_id = {forest['desc_id'][prog]}"\
f"id of desc = {forest['id'][desc]}, prog, Tree_root_ID = {forest['Tree_root_ID'][prog]} "\
f"desc, Tree_root_ID = {forest['Tree_root_ID'][desc]}"
raise AssertionError(msg)
assert forest['id'][desc] == forest['desc_id'][prog]
if forest['FirstProgenitor'][desc] == prog:
# count the number of progenitors
num_prog = 1
nextprog = forest['NextProgenitor'][prog]
while nextprog != -1:
num_prog += 1
nextprog = forest['NextProgenitor'][prog]
assert forest['num_prog'][desc] == num_prog
else:
# This progenitor is not the primary progenitor
# -> we need to check the nextprog values -> nextprog
# *must* exist
firstprog = forest['FirstProgenitor'][desc]
nextprog = forest['NextProgenitor'][firstprog]
msg = f"firstprog = {firstprog} desc = {desc} prog = {prog} nextprog = {nextprog}"
assert nextprog != -1, msg
while nextprog != prog:
assert forest['desc_id'][nextprog] == forest['id'][desc]
ii = forest['NextProgenitor'][nextprog]
assert ii != -1
assert forest['Mvir'][nextprog] >= forest['Mvir'][ii]
nextprog = ii
def test_parallel_ctrees(h5files, show_progressbar=True):
"""
"""
import h5py
from tqdm import tqdm
from numpy.testing import assert_array_equal
# for h5file in h5files:
with h5py.File(h5file, 'r') as hf:
ctrees_type = hf.attrs['consistent-trees-type']
if ctrees_type != 'parallel':
msg = "Error: This test script is only intended for Parallel-Ctrees output"
raise ValueError(msg)
# raise NotImplementedError()
# Need to do a many-to-many matching
# for h5file in h5files:
with h5py.File(h5files, 'r') as hf:
ntrees = hf['TreeInfo'].shape[0]
tree_roots = hf['TreeInfo']
print(f"Reading {ntrees} TreeRootIDs from hdf5 file ...")
tree_rootids = tree_roots['TreeRootID'][:]
print(f"Reading {ntrees} TreeRootIDs from hdf5 file ...done.")
# tree_roots.sort(order=['Input_Filename', 'Input_TreeByteOffset'], kind='heapsort')
cont_halo_props = False
try:
dtype = hf['Forests/halos'].dtype
except KeyError:
cont_halo_props = True
if cont_halo_props:
halo_props = dict()
props = []
for name in tqdm(hf['Forests'].keys()):
if isinstance(hf[f"Forests/{name}"], h5py.Dataset):
props.append(name)
halo_props[name] = np.asarray(hf[f"Forests/{name}"])
else:
props = dtype.names
halo_props = np.asarray(hf["Forests/halos"])
print(f"properties = {props}")
mergertree_dtype = np.dtype(get_treewalk_dtype_descr())
if show_progressbar:
pbar = tqdm(total=ntrees, unit=' trees', disable=None)
# h5_treeindex = 0
# print(f"tree roots = {tree_roots['TreeRootID'][:]}")
treefiles = [tf.decode() for tf in hf.attrs[u'input_files']]
# Create the generators from the ascii file
for tf in treefiles:
trees = iterTrees(tf, return_tree_root_id=True)
skipped = 0
tested = 0
numtrees = 0
for root, tree in trees:
numtrees += 1
rootindex = (np.where(tree_rootids == root))[0]
if len(rootindex) == 0:
print(f"Did not find tree-root-id = {root}")
skipped += 1
continue
if len(rootindex) != 1:
msg = f"Error: Should have found *exactly* one instance of TreeRootID = {root}\n"\
f"Instead, found {len(rootindex)} matches, with indices = {rootindex}"
tested += 1
h5_treeindex = rootindex[0]
h5_tree_root = tree_roots['TreeRootID'][h5_treeindex]
msg = f"Error: For tree # {h5_treeindex}, the TreeRootID = {h5_tree_root} in hdf5 file = {h5file} "\
f"does not match the TreeRootID in the ascii file = {root}"
assert root == h5_tree_root, msg
h5_start = tree_roots['TreeHalosOffset'][h5_treeindex]
h5_end = h5_start + tree_roots['TreeNhalos'][h5_treeindex]
#print(f"Checking treerootid = {root}, (start, stop) = [{h5_start}, {h5_end}) h5_treeindex = {h5_treeindex}")
for prop in props:
if prop in mergertree_dtype.names:
continue
# print(f"Checking property = {prop}...")
msg = f"Error: Halo property = '{prop}' is not the same "\
"between the hdf5 and ascii files"
assert_array_equal(tree[prop][:],
halo_props[prop][h5_start:h5_end],
err_msg=msg)
# print(f"Checking property = {prop}...done")
# h5_treeindex += 1
if show_progressbar:
pbar.update(1)
if show_progressbar:
pbar.close()
print(f"File {h5file} passed validation - hooray!")
return True
if __name__ == "__main__":
base = ""
# h5file = "/home/msinha/scratch/simulations/uchuu/U2000/trees/hdf5/treerootid/tmp/forest_0.h5"
# treefile = "/home/msinha/scratch/simulations/uchuu/U2000/trees/00000148.tree"
base = "/home/msinha/scratch/simulations/uchuu/U2000/trees/hdf5/with-tree-indices"
h5files = [f"{base}/forest_{rank}.h5" for rank in range(5)]
for h5file in h5files:
test_parallel_ctrees(h5file)
#!/usr/bin/env python
from __future__ import print_function
__author__ = "Manodeep Sinha"
__all__ = ["get_parser", "get_approx_totnumhalos", "generic_reader",
"get_metadata", "resize_halo_datasets",
"check_and_decompress", "distribute_array_over_ntasks",
"write_halos", "iterTrees"]
## From Yao-Yuan Mao's SimulationAnalysis.py (part of YYM's helper package)
import re
import numpy as np
from builtins import range, zip
from contextlib import contextmanager
def sanitize_ctrees_header(headerline):
import re
header = [re.sub('\(\d+\)$', '', s) for s in headerline]
# print("After normal sub: header = {}\n".format(header))
header = [re.sub('[^a-zA-Z0-9 \n\.]', '_', s) for s in header]
# print("After replacing special characters with underscore: header = {}\n".format(header))
header = [re.sub('_$', '', s) for s in header]
# print("After replacing trailing underscore: header = {}\n".format(header))
header = [re.sub('(_)+', '_', s) for s in header]
# print("After replacing multiple underscores: header = {}".format(header))
return header
def _isstring(s):
try:
s + ''
except TypeError:
return False
return True
class BaseParseFields():
def __init__(self, header, fields=None):
if len(header)==0:
if all(isinstance(f, int) for f in fields):
self._usecols = fields
self._formats = [float]*len(fields)
self._names = ['f%d'%f for f in fields]
else:
raise ValueError('header is empty, so fields must be a list '\
'of int.')
else:
# print(f"in baseparsefields: header = {header}")
header_s = [self._name_strip(__) for __ in header]
# print(f"header_s = {header_s}")
if not fields:
self._names = header
names_s = header_s
self._usecols = list(range(len(names_s)))
else:
# print(f"fields = {fields}")
if _isstring(fields):
fields = [fields]
self._names = [header[f] if isinstance(f, int) else str(f) \
for f in fields]
names_s = [self._name_strip(__) for __ in self._names]
wrong_fields = [str(f) for s, f in zip(names_s, fields) \
if s not in header_s]
if wrong_fields:
raise ValueError('The following field(s) are not available'\
': %s.\nAvailable fields: %s.'%(\
', '.join(wrong_fields), ', '.join(header)))
self._usecols = [header_s.index(__) for __ in names_s]
# print(f"_usecols = {self._usecols}, names_s = {names_s}")
self._formats = [self._get_format(__) for __ in names_s]
#xx = list(zip(self._usecols, self._names, self._formats))
# print(f"self._usecols = {self._usecols} self._names = {self._names} self._formats = {self._formats}")
# print(f"zipped (usecols, name, format) = {xx}")
self.dtype = np.dtype({'names':self._names, \
'formats':self._formats})
def parse_line(self, l):
items = l.split()
try:
return tuple(c(items[i]) for i, c in zip(self._usecols, self._formats))
except Exception as _error:
print(f'Something wrong when parsing this line:\n{l}')
raise _error
def pack(self, X):
return np.array(X, self.dtype)
def _name_strip(self, s):
return self._re_name_strip.sub('', s).lower()
def _get_format(self, s):
return float if self._re_formats.search(s) is None else int
_re_name_strip = re.compile('\W|_')
_re_formats = re.compile('^phantom$|^mmp$|id$|^num|num$')
def _loadtree(locations, parser, tree_root):
ind = (np.where(locations['TreeRootID'] == tree_root))[0]
if not ind:
msg = f"Error: Could not tree root = {tree_root}"
raise ValueError(msg)
fname, offset = locations['Filename'][ind], locations['Offset'][ind]
with open(fname, 'rt') as f:
pass
raise NotImplementedError()
def iterTrees(tree_dat, fields=None, buffering=100000000, return_tree_root_id=False):
"""
Iterate over all the trees in a tree_X_X_X.dat file.
Parameters
----------
tree_dat : str or file obj
The path to the file (can be an URL) or a file object.
fields : str, int, array_like, optional
The desired fields. It can be a list of string or int. If fields is None (default), return all the fields listed in the header.
buffering : int
Returns
-------
trees : generator
A python generator which iterates over all the trees in that file.
Example
-------
>>> trees = iterTrees('tree_0_0_0.dat', ['id', 'mvir', 'upid', 'num_prog'])
>>> tree = next(trees)
>>> tree.dtype
dtype([('id', '<i8'), ('mvir', '<f8'), ('upid', '<i8')])
>>> mb = tree[getMainBranch(tree)]
"""
def _generic_open(f, buffering=100000000):
"""
Returns
-------
fp : file handle
need_to_close : bool
"""
if hasattr(f, 'read'):
return f, False
else:
if re.match(r'(s?ftp|https?)://', f, re.I):
r = requests.get(f, stream=True)
r.raw.decode_content = True
fp = r.raw
elif f.endswith('.gz'):
fp = gzip.open(f, 'r')
else:
fp = open(f, 'r', int(buffering))
return fp, True
f, need_to_close = _generic_open(tree_dat, buffering)
try:
l = next(f)
if l[0] != '#':
raise ValueError('File does not have a proper header line.')
split_headerline = [s for s in l[1:].split()]
header = sanitize_ctrees_header(split_headerline)
# header = [re.sub('\(\d+\)$', '', s) for s in l[1:].split()]
p = BaseParseFields(header, fields)
while l[0] == '#':
try:
l = next(f)
except StopIteration:
num_trees = 0
break
else:
try:
num_trees = int(l)
except ValueError:
raise ValueError('File does not have a proper tree_X_X_X.dat format.')
try:
l = next(f)
except StopIteration:
if num_trees:
raise ValueError('File does not have a proper tree_X_X_X.dat format.')
eof = False
for _ in range(num_trees):
if eof or l[0] != '#':
raise ValueError('File does not have a proper tree_X_X_X.dat format.')
try:
tree_root_id = int(l.rpartition(' ')[-1])
except ValueError:
raise ValueError('File does not have a proper tree_X_X_X.dat format.')
X = []
for l in f:
if l[0] == '#':
if return_tree_root_id:
yield tree_root_id, p.pack(X)
else:
yield p.pack(X)
break
X.append(p.parse_line(l))
else:
eof = True
finally:
if need_to_close:
f.close()
## End of code taken straight from Yao-Yuan Mao's SimulationAnalysis.py
def get_parser(filename, fields=None, drop_fields=None):
"""
Returns a parser that parses a single line from the input ascii file
Parameters
----------
filename: string, required
A filename containg Rockstar/Consistent-Trees data. Can be a compressed
file if the compression is one of the types supported by the
``generic_reader`` function.
fields: list of strings, optional, default: None
Describes which specific columns in the input file to carry across
to the hdf5 file. Default action is to convert ALL columns.
drop_fields: list of strings, optional, default: None
Describes which columns are not carried through to the hdf5 file.
Processed after ``fields``, i.e., you can specify ``fields=None`` to
create an initial list of *all* columns in the ascii file, and then
specify ``drop_fields = [colname2, colname7, ...]``, and those columns
will not be present in the hdf5 output.
Returns
-------
parser: an instance of BaseParseFields
A parser that will parse a single line (read from a Rockstar/Consistent-Trees file)
and create a tuple containing *only* the relevant columns.
"""
with generic_reader(filename, 'rt') as f:
# Taken from SimulationAnalysis.py
hdrline = next(f)
hdrline = hdrline.strip().lstrip('#').lstrip()
#print(f"hdrline = {hdrline}")
split_headerline = [s for s in hdrline.split()]
header = sanitize_ctrees_header(split_headerline)
#print(f"sanitised header = {header}")
keep_fields = header[:]
if fields:
print(f"Since Fields = {fields} has been specified, only keeping those")
fields = [s.upper() for s in fields]
for fld in split_headerline:
upfld = fld.upper()
if upfld not in fields:
keep_fields.remove(fld)
print(f"Removing fld = {fld} since it does not exist "
"in 'fields'")
else:
print(f"Keeping fld = {fld}")
if drop_fields:
for fld in drop_fields:
try:
keep_fields.remove(fld)
except ValueError:
print(f"Field = '{fld}' was specified in the list of fields "\
f"to be dropped but could not find in the list "\
f"of current fields = {keep_fields}. Perhaps the case needs "\
"to be fixed (the matching is case-sensitive)?")
raise ValueError(msg)
p = BaseParseFields(header, keep_fields)
# Above taken from SimulationAnalysis.py
return p
@contextmanager
def generic_reader(filename, mode='rt'):
"""
Returns a file-reader with capability to read line-by-line
for both compressed and normal text files.
Parameters
-----------
filename: string, required
The filename for the associated input/output. Can be a
compressed (.bz2, .gz, .xz, .zip) file as well as a regular
ascii file
mode: string, optional, default: 'rt' (readonly-text mode)
Controls the kind of i/o operation that will be
performed
Returns
--------
f: file handle, generator
Yields a generator that has the ``readline`` feature (i.e.,
supports the paradigm ``for line in f:``). This file-reader
generator is suitable for use in ``with`` statements, e.g.,
``with generic_reader(<fname>) as f:``
"""
if not isinstance(filename, str) and hasattr(filename, 'decode'):
filename = filename.decode()
if filename.endswith(".bz2"):
import bz2
f = bz2.open(filename, mode=mode)
elif filename.endswith(".gz"):
import gzip
f = gzip.open(filename, mode=mode)
elif filename.endswith('.xz'):
import lzma
f = lzma.open(filename, mode=mode)
elif filename.endswith('.zip'):
validmodes = set(['r', 'w'])
if mode not in validmodes:
msg = f"Error: filename = '{filename}' is a zipfile but the "\
f"requested mode = '{mode}' is not the valid zipfile "\
f"modes = '{validmodes}'"
raise ValueError(msg)
import zipfile
f = zipfile.open(filename, mode)
else:
import io
f = io.open(filename, mode=mode)
yield f
f.close()
def get_metadata(input_file):
"""
Returns metadata information for ``input_file``. Includes all
comment lines in the header, Rockstar/Consistent-Trees version,
and the input catalog type (either Rockstar or Consistent-Trees).
Assumes that the only comment lines in the file occur
at the beginning. Comment lines are assumed to begin with '#'.
Parameters
-----------
input_file: string, required
The input filename for the Rockstar/Consistent Trees file
Compressed files ('.bz2', '.gz', '.xz', '.zip') are also allowed
as valid kinds of ``input_file``
Returns
--------
metadata_dict: dictionary
The dictionary contains four key-value pairs corresponding to the
keys: ['metadata', 'version', 'catalog_type', 'headerline'].
metadata: string
All lines in the beginning of the file that start with the
character '#'.
version: string
Rockstar or Consistent-Trees version that was used to generate
``input_file``
catalog_type: string
Is one of [``Rockstar``, ``Consistent Trees``, ``Consistent Trees (hlist)``]
and indicates what kind of catalog is contained in ``input_file``
headerline: string
The first line in the input file with any leading/trailing white-space, and
any leading '#' removed
"""
def get_catalog_and_version(line):
input_catalog_type = None
version = None
## The rockstar version has the pattern
## '#Rockstar Version: <VERSION STRING>'
## We split the input line on ':', then take
## the second chunk by indexing with [1] to
## retrieve the version string. The final step
## removes all white space from the string
## The Consistent Trees version has the pattern
## '#Consistent Trees Version <VERION_STRING>'
## We split the line on ' ' to get four
## chunks, and take the last one. Remove
## the white-space as usual
catalog_types = [('Rockstar', lambda line:(line.split(":")[1]).strip()),
('Consistent Trees', lambda line:(line.split(" ")[-1]).strip())]
for (catalog, fn) in catalog_types:
if catalog in line:
input_catalog_type = catalog
version = fn(line)
break
return input_catalog_type, version
metadata = []
ctrees_hlist = True
with generic_reader(input_file, 'rt') as f:
for line in f:
if not line.startswith('#'):
# Check if the first 'data' line contains
# multiple columns. This is required to distinguish
# between CTrees-generated halo catalogues ('hlist' files)
# and the CTrees-generated tree files ('tree_?_?_?.dat' files)
if len(line.split()) == 1:
ctrees_hlist = False
break
if 'VERSION' in line.upper():
input_catalog_type, version = get_catalog_and_version(line)
metadata.append(line)
if not version:
msg = f"Error: Could not locate version in the input file = '{input_file}'"
raise ValueError(msg)
if ctrees_hlist and 'Consistent' in input_catalog_type:
input_catalog_type = f"{input_catalog_type} (hlist)"
hdrline = metadata[0]
hdrline = hdrline.strip().lstrip('#').lstrip()
simulation_params = get_simulation_params_from_metadata(metadata)
metadata_dict = dict()
metadata_dict['metadata'] = metadata
metadata_dict['version'] = version
metadata_dict['catalog_type'] = input_catalog_type
metadata_dict['headerline'] = hdrline
metadata_dict['simulation_params'] = simulation_params
return metadata_dict
def get_simulation_params_from_metadata(metadata):
simulation_params = dict()
for line in metadata[1:]:
# cosmological parameters
if "Omega_M" in line or ("Om" in line and "Ol" in line):
pars = line[1:].split(";")
for j, par in enumerate(["Omega_M",
"Omega_L",
"hubble"]):
v = float(pars[j].split(" = ")[1])
simulation_params[par] = v
# box size
elif "Full box size" in line or "Box size" in line:
pars = line.split("=")[1].strip().split()
box = float(pars[0])
simulation_params['Boxsize'] = box
# We can break because boxsize always occurs later than
# the cosmological parameters (regardless of CTrees/Rockstar origin)
break
return simulation_params
def get_approx_totnumhalos(input_file, ndatabytes=None):
"""
Returns an (approximate) number of lines containing data
in the ``input_file``.
Assumes that the only comment lines in the file occur
at the beginning. Comment lines are assumed to begin with '#'.
Parameters
-----------
input_file: string, required
The input filename for the Rockstar/Consistent Trees file
ndatabytes: integer, optional
The total number of bytes being processed. If not passed, the
entire disk size of the ``input_file`` minus the initial header
lines will be used (i.e. assumes that the entire file is being
processed)
Returns
--------
approx_totnumhalos: integer
The approximate number of halos in the input file. The actual
number of halos should be close but can be smaller/greater than
the approximate value.
"""
import io
import os
with generic_reader(input_file, 'rt') as f:
hdr_bytes = 0
for line in f:
if line.startswith('#'):
hdr_bytes += len(line)
else:
# The first line of a CTrees file is the number of trees
# in that file. We should skip that line
if len(line.split()) == 1:
hdr_bytes += len(line)
continue
data_line_len = len(line)
break
if not ndatabytes:
statinfo = os.stat(input_file)
totbytes = statinfo.st_size
ndatabytes = totbytes - hdr_bytes
approx_nhalos = (ndatabytes // data_line_len) + 1
return approx_nhalos
def check_and_decompress(fname):
"""
Decompresses the input file (if necessary) and returns the
decompressed filename
Parameters
----------
fname: string, required
Input filename, can be compressed
Returns
-------
decomp_fname: string
The decompressed filename
"""
decompressors = [('.bz2', 'bunzip2'),
('.gz', 'gunzip'),
('.zip', 'unzip')]
if not isinstance(fname, str) and hasattr(fname, 'decode'):
fname = fname.decode()
for (ext, decomp) in decompressors:
if fname.endswith(ext):
import subprocess
print(f"Uncompressing compressed file '{fname}'...")
subprocess.run([decomp, fname], check=True)
print(f"Uncompressing compressed file '{fname}'...done")
fname = fname.rstrip(ext)
break
return fname
def distribute_array_over_ntasks(cost_array, rank, ntasks):
"""
Calculates the subscript range for the ``rank``'th task
such that the work-load is evenly distributed across
``ntasks``.
Parameters
-----------
cost_array: numpy array, required
Contains the cost associated with processing *each* element
of the array
rank: integer, required
The integer rank for the task that we need to compute the
work-load division for
ntasks: integer, required
Total number of tasks that the array should be (evenly) distributed
across
Returns
--------
(start, stop): A tuple of (np.int64, np.int64)
Contains the initial and final subscripts that the ``rank``
task should process.
Note: start, stop are both inclusive, i.e., all elements from ``start``
to ``stop`` should be included. For python array indexing with slices,
this translates to arr[start:stop + 1].
"""
if rank >= ntasks or rank < 0 or ntasks < 1:
msg = f"Error: rank = {rank} and NTasks = {ntasks} must satisfy "\
" i) ThisTask < NTasks, "\
" ii) ThisTask > 0, and "\
" iii) NTasks >= 1"
raise ValueError(msg)
ncosts = cost_array.shape[0]
if ncosts < 0:
msg = f"Error: On rank = {rank} total number of elements = {ncosts} must be >= 0"
raise ValueError(msg)
if ncosts == 0:
print(f"Warning: On rank = {rank}, got 0 elements to distribute...returning")
start, stop = None, None
return (start, stop)
if ntasks == 1:
start, stop = 0, ncosts - 1
print(f"Only a single task -- distributing the entire array [start, stop] = [{start}, {stop}]")
return (start, stop)
cumul_cost_array = cost_array.cumsum()
total_cost = cumul_cost_array[-1]
target_cost = total_cost / ntasks
cost_remaining, cost_assigned, ntasks_left = total_cost, 0.0, ntasks
start = 0
for icore in range(rank + 1):
full_array_target_cost = cost_assigned + target_cost
if icore == (ntasks - 1):
stop = ncosts - 1
else:
stop = min(np.where(cumul_cost_array >= full_array_target_cost)[0])
if stop < ncosts:
cost_assigned_this_rank = cumul_cost_array[stop] - cost_assigned
# Only print the work divisions on the last rank
if rank == (ntasks - 1):
# Yes, this is intentional that `icore` is used. The intent is to show
# which forests ranges are assigned to which task (i.e., rank) - MS 22/06/2020
print(f"[Rank={icore}]: Assigning forests: start, stop = [{start}, {stop}]. "\
f"Target cost = {target_cost}, cost actually assigned = {cost_assigned_this_rank}")
if icore == rank:
break
cost_remaining -= cost_assigned_this_rank
cost_assigned += cost_assigned_this_rank
ntasks_left -=1
target_cost = cost_remaining / ntasks_left
start = stop + 1
else:
msg = "Error: While determining optimal distribution of forests "\
"across MPI tasks. Could not locate an appropriate "\
f"stopping point for rank = {rank}. Expected stop = {stop} "\
"to be less than the number of elements in the cost "\
f"array = {ncosts}"
raise ValueError(msg)
return (start, stop)
def resize_halo_datasets(halos_dset, new_size, write_halo_props_cont, dtype):
"""
Resizes the halo datasets
Parameters
-----------
halos_dset: dictionary, required
new_size: scalar integer, required
write_halo_props_cont: boolean, required
Controls if the individual halo properties are written as distinct
datasets such that any given property for ALL halos is written
contiguously (structure of arrays, SOA).
dtype: numpy datatype
Returns
-------
Returns ``True`` on successful completion
"""
if write_halo_props_cont:
for name in dtype.names:
dset = halos_dset[name]
dset.resize((new_size, ))
else:
halos_dset.resize((new_size, ))
return True
def write_halos(halos_dset, halos_dset_offset, halos, nhalos_to_write,
write_halo_props_cont):
"""
Writes halos into the relevant dataset(s) within a hdf5 file
Parameters
-----------
halos_dset: dictionary, required
Contains the halos dataset(s) within a hdf5 file where either
the entire halos array or the individual halo properties should
be written to. See parameter ``write_halo_props_cont`` for
further details
halos_dset_offset: scalar integer, required
Contains the index within the halos dataset(s) where the write
should start
halos: numpy structured array, required
An array containing the halo properties that should be written out
into the hdf5 file. The entire array may not be written out, see
the parameter ``nhalos_to_write``
nhalos_to_write: scalar integer, required
Number of halos from the ``halos`` array that should be written
out. Can be smaller than the shape of the ``halos`` array
write_halo_props_cont: boolean, required
Controls if the individual halo properties are written as distinct
datasets such that any given property for ALL halos is written
contiguously (structure of arrays, SOA).
Returns
-------
Returns ``True`` on successful completion of the write
"""
if write_halo_props_cont:
for name in halos.dtype.names:
dset = halos_dset[name]
dset[halos_dset_offset:halos_dset_offset + nhalos_to_write] \
= halos[name][0:nhalos_to_write]
else:
halos_dset[halos_dset_offset:halos_dset_offset + nhalos_to_write] = \
halos[0:nhalos_to_write]
return True
# # Taken from https://stackoverflow.com/a/53507580
# # UNTESTED! -- MS 16/06/2020
# def better_np_unique(arr):
# import numpy as np
# sort_indexes = np.argsort(arr)
# arr = np.asarray(arr)[sort_indexes]
# vals, first_indexes, inverse, counts = \
# np.unique(arr, return_index=True,
# return_inverse=True, return_counts=True)
# indexes = np.split(sort_indexes, first_indexes[1:])
# for x in indexes:
# x.sort()
# return vals, indexes, inverse, counts
# Another untested option: https://stackoverflow.com/a/54736464
# def ndix_unique(x):
# """
# Returns an N-dimensional array of indices
# of the unique values in x
# ----------
# x: np.array
# Array with arbitrary dimensions
# Returns
# -------
# - 1D-array of sorted unique values
# - Array of arrays. Each array contains the indices where a
# given value in x is found
# """
# x_flat = x.ravel()
# ix_flat = np.argsort(x_flat)
# u, ix_u = np.unique(x_flat[ix_flat], return_index=True)
# ix_ndim = np.unravel_index(ix_flat, x.shape)
# ix_ndim = np.c_[ix_ndim] if x.ndim > 1 else ix_flat
# return u, np.split(ix_ndim, ix_u[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment