Skip to content

Instantly share code, notes, and snippets.

@mmore500
Created February 5, 2023 07:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mmore500/06a359e528f59f3feb0c72dfc01b8fef to your computer and use it in GitHub Desktop.
Save mmore500/06a359e528f59f3feb0c72dfc01b8fef to your computer and use it in GitHub Desktop.
DecantingPhyloTracker in-progress
import typing
import numpy as np
import pandas as pd
from ..._auxiliary_lib import indices_of_unique
from ._compile_phylogeny_from_lineage_iters import (
compile_phylogeny_from_lineage_iters,
)
class DecantingPhyloTracker:
"""Data structure to enable perfect tracking over a fixed-size population
with synchronous generations.
Generally less performant than GarbageCollectingPhyloGracker; represents
organism records as independent objects (which each require an independent
allocations and, on lineage extinction, deletions). Uses a decanting buffer
to reduce object creation overhead by only allocating record objects for
organisms that produce extant lineages beyond a threshold length.
Does not include organism population loc and trait values in phylogenetic
record.
"""
# static class constant
_nan_val = -1
@staticmethod
def _is_nan(v: int) -> bool:
return v < 0
@staticmethod
@np.vectorize
def _descendant_wrap(
ancestor: typing.Any,
loc: int,
trait: float,
) -> typing.Tuple:
return {
"ancestor": ancestor,
"loc": loc,
"trait": trait,
}
# decanting buffer layout
#
# population loc of extant organisms
# |
# |
# generations ago ----|-> 0 1 2 3 4 ... buffer_size - 1
# from extant V +----------------------------------------
# organisms 0 |
# 1 | POPULATION
# 2 | POSITION
# 3 | IDS
# 4 |
# ... |
# population_size - 1 |
#
# layer 0 (z axis): organism's own population loc
# layer 1 (z axis): parent's population loc
#
# every generation,
# * the backtrack tree is updated using the subset population loc ids'
# that survived to the rightmost colun,
# * population loc ids roll one column to the right,
# * new population loc id's are pasted over column 0,
# * and rows shuffle/duplicate according to the selected parent indices
#
_decanting_buffer: np.array # [int]
_trait_buffer: np.array # [float]
_buffer_pos: int # current start position of circular decanting buffer
# permanent phylogeny storage after consolidation traversing decant buffer
# using tuples instead of PerfectBacktrackHandle gives significant speedup
_decanted_tree_tips: np.array # [typing.Tuple]
def __init__(
self: "DecantingPhyloTracker",
initial_population: typing.Union[int, np.array], # [float]
buffer_size: int = 10,
share_common_ancestor: bool = True,
) -> None:
"""Initialize data structure to perfectly track phylogenetic history of
a population.
Parameters
----------
population_size : int
How many locations are available within the tracked fixed-size
population?
buffer_size : int, optional
How many generations should lineages be decanted before creating
record objects for organisms with extant lineages?
share_common_ancestor : bool, default True
Should a dummy common ancestor be inserted as the first entry in
the tracker?
If True, all initial population members will be recorded as
children of this dummy ancestor. If False, all initial
population members will be recorded as having no parent.
"""
if isinstance(initial_population, int):
initial_population = np.full(initial_population, np.nan)
population_size = len(initial_population)
# initialize decanting buffer with all nan values
self._decanting_buffer = np.full(
(population_size, buffer_size, 2),
self._nan_val,
dtype=np.min_scalar_type(-population_size + 1),
)
self._trait_buffer = np.full(
(population_size, buffer_size),
np.nan,
dtype=float,
)
self._buffer_pos = 0
handles = []
if share_common_ancestor:
common_ancestor = {
"ancestor": None,
"loc": 0,
"trait": np.nan,
}
handles = [
{
"ancestor": common_ancestor,
"loc": loc,
"trait": trait,
}
for loc, trait in enumerate(initial_population)
]
assert len(set(map(id, handles))) == len(handles)
assert len(set(map(lambda x: id(x["ancestor"]), handles))) == 1
else:
handles = [
{
"ancestor": None,
"loc": loc,
"trait": trait,
}
for loc, trait in enumerate(initial_population)
]
assert len(set(map(id, handles))) == len(handles)
handles_array = np.empty(len(handles), dtype=object)
handles_array[:] = handles
self._decanted_tree_tips = handles_array
assert len(set(map(id, self._decanted_tree_tips))) == len(handles)
def _ArchiveBufferTail(
self: "DecantingPhyloTracker",
):
"""Create record objects for decanted organisms with extant
lineages."""
assert not self._is_nan(self._decanting_buffer[0, self._buffer_pos, 0])
# find locs of unique ancestors with extant descendants
unique_row_indices = indices_of_unique(
self._decanting_buffer[:, self._buffer_pos, 0],
)
unique_locs = self._decanting_buffer[
unique_row_indices, self._buffer_pos, 0
]
unique_parent_locs = self._decanting_buffer[
unique_row_indices, self._buffer_pos, 1
]
# apply unique ancestors' generational step
# to relevant perfect tracking handles
# step 1: copy parents
self._decanted_tree_tips[unique_locs] = self._descendant_wrap(
self._decanted_tree_tips[unique_parent_locs],
unique_locs,
self._trait_buffer[unique_locs, self._buffer_pos],
)
def _AdvanceBuffer(
self: "DecantingPhyloTracker",
) -> None:
"""Progress circular decanting buffer, archiving buffer tail first if
necessary."""
# if rightmost column not empty, archive it before overwriting
if not self._is_nan(self._decanting_buffer[0, self._buffer_pos, 0]):
self._ArchiveBufferTail()
# copy columns 0 thru buffer_size - 2 one column rightwards
self._buffer_pos += 1
# wrap buffer position around
self._buffer_pos -= self._decanting_buffer.shape[1] * (
self._buffer_pos >= self._decanting_buffer.shape[1]
)
def ElapseGeneration(
self: "DecantingPhyloTracker",
parent_locs: typing.List[int],
traits: typing.Optional[typing.List[float]] = None, # np.array ok
) -> None:
"""Append generational turnover to the phylogenetic record.
Parameters
----------
parent_locs : array_like of int
Parent's population loc for each population loc.
Position within array corresponds to post-turnover population
members' population positions. Values within array correspond to
those members' parents' population positions within the pre-
turnover population.
"""
# consolidate rows that now share common ancestry
self._decanting_buffer = self._decanting_buffer[parent_locs, :, :]
# shift buffer one position right,
# archiving rightmost column if necessary
self._AdvanceBuffer()
# set new (leftmost) column
# note: 0 - 1 == -1 is valid for indexing
# layer 0: own population loc
self._decanting_buffer[:, self._buffer_pos - 1, 0] = np.arange(
self._decanting_buffer.shape[0]
)
# layer 1: parent population loc
self._decanting_buffer[:, self._buffer_pos - 1, 1] = parent_locs
if traits is not None:
self._trait_buffer[:, self._buffer_pos - 1] = traits
def _FlushBuffer(self: "DecantingPhyloTracker") -> None:
"""Clear decanting buffer, creating record objects for all organisms
currently progressing through the decanting process."""
# advance buffer and fill column 0 with nan until empty
# note: buffer may only be partway full, in which case
# columns will shift rightwards until reaching rightmost column
# and then begin to be archived
while not np.all(self._decanting_buffer.flatten(), self._nan_val):
self._AdvanceBuffer()
self._decanting_buffer[:, self._buffer_pos - 1, :] = self._nan_val
def CompilePhylogeny(
self: "DecantingPhyloTracker",
progress_wrap=lambda x: x,
) -> pd.DataFrame:
"""Generate full phylogenetic record of extant organisms.
Parameters
----------
progress_wrap : Callable, default identity function
Wrapper applied around record row iterator; pass tqdm or equivalent
to display progress bar for compilation process.
Returns
-------
pandas.DataFrame
Full phylogenetic record of extant organisms alife standard format.
Notes
-----
This operation is non-destructive; further record-keeping may be
performed on the tracker object afterwards.
"""
self._FlushBuffer()
assert set(self._decanting_buffer.flatten()) == {self._nan_val}
def iter_lineage(
tuple_handle: typing.Tuple,
) -> typing.Iterable[typing.Tuple]:
while True:
yield tuple_handle
if tuple_handle[0] is None:
break
else:
tuple_handle = tuple_handle[0]
return compile_phylogeny_from_lineage_iters(
iter_lineage(tip)
for tip in progress_wrap(self._decanted_tree_tips)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment