Skip to content

Instantly share code, notes, and snippets.

@mhasself
Created May 21, 2021 16:24
Show Gist options
  • Save mhasself/e2abc0749ac4552085d37734ef97d007 to your computer and use it in GitHub Desktop.
Save mhasself/e2abc0749ac4552085d37734ef97d007 to your computer and use it in GitHub Desktop.
Extracted from actmergelib (vectors.py)
# This code is being released into the public domain, but if you want to
# use it and state its provenance, it is taken from actmergelib, part of
# the Atacama Cosmology Telescope site software.
import numpy as np
def find_linear_sequence(x, delta_x=1, tolerance=100, establishment=400):
"""
Find the linear sequence v that most closely agrees with x, which
is assumed to be drawn from v with a small number of missing or
extra elements.
Returns three arrays the same size as x:
vx, v_ok, x_ok
The mask x_ok selects elements from x that would (with some
corrections) match a linear series. vx is the linear series, but
with gaps or padding as necessary to have it match x element for
element. The monotonic subset is vx[v_ok]. The monotonic subset
for which the input data appear to be valid is vx[v_ok*x_ok].
In the simplest case, there is gap or error in the sequence; e.g.:
>>> find_linear_sequence([0,1,2,4,5])
(array([0, 1, 2, 4, 5]),
array([ True, True, True, True, True], dtype=bool),
array([ True, True, True, True, True], dtype=bool))
>>> find_linear_sequence([0,1,2,1000,4,5])
(array([0, 1, 2, 3, 4, 5]),
array([ True, True, True, True, True, True], dtype=bool),
array([ True, True, True, False, True, True], dtype=bool))
There could be extra samples though:
>>> find_linear_sequence([0,1,2,1000,1000,4,5])
(array([0, 1, 2, 3, 4, 4, 5]),
array([ True, True, True, True, False, True, True], dtype=bool),
array([ True, True, True, False, False, True, True], dtype=bool))
Note that v_ok is always a superset of x_ok, in the sense that
np.any(x_ok*~v_ok) is False.
"""
x = np.asarray(x,int)
dx = np.diff(x)
# Index where slope changes
x_changes = np.hstack((0, np.diff(dx).nonzero()[0]+2, len(x)))
superx = np.hstack((x[0], x, x[-1]))
good_intervals = np.diff((np.diff(superx)==delta_x).astype('int8')).nonzero()[0].reshape((-1,2))
v_ok = np.zeros(x.shape, bool)
idx_v = np.zeros(x.shape, int)
# Take largest interval as reference
imax = np.dot(good_intervals, [-1,1]).argmax()
idx0 = good_intervals[imax][0]
x0 = x[idx0]
for i0, i1 in good_intervals:
delta = x[i0] - x0 + idx0 - i0
# Permit small, short, slips, or longer gaps as long as they
# are persistent.
make_ok = ( (abs(delta) < tolerance) or
( (establishment >= 0) and
(i1-i0 > establishment) and
(abs(delta) < i1-i0) ) )
if make_ok:
idx_v[i0:i1+1] = delta
v_ok[i0:i1+1] = True
# Give isolated samples one last chance
new_goods = []
## Check left of first good interval...
i1 = good_intervals[0][0]
delta_hi = idx_v[i1]
shiftable = 0
for i in xrange(i1-1, -1, -1):
delta = x[i] - x0 + idx0 - i
if delta <= delta_hi+shiftable and delta > delta_hi - tolerance:
idx_v[i] = delta
delta_hi = delta
v_ok[i] = True
shiftable = 0
new_goods.append((i,i))
else:
shiftable += 1
## Check right of all intervals
for ival in xrange(1, len(good_intervals)+1):
if ival == len(good_intervals):
i0, i1 = good_intervals[-1][1], len(x)
delta_lo = idx_v[i0]
delta_hi = delta_lo + tolerance
else:
i0, i1 = good_intervals[ival-1][1], good_intervals[ival][0]
delta_lo = idx_v[i0]
delta_hi = idx_v[i1]
shiftable = 0
for i in range(i0+1, i1):
delta = x[i] - x0 + idx0 - i
if delta >= delta_lo-shiftable and delta <= delta_hi:
idx_v[i] = delta
v_ok[i] = True
delta_lo = delta
shiftable = 0
new_goods.append((i,i))
else:
shiftable += 1
good_intervals = np.array(sorted([(i0,i1) for (i0,i1) in good_intervals] + new_goods))
x_ok = v_ok.copy()
# Simple flat-line over the gaps.
i0 = good_intervals[0][0]
idx_v[:i0] = idx_v[i0]
v_ok[:i0] = True
i1 = good_intervals[-1][-1]
if i1 < len(x)-1:
idx_v[i1+1:] = idx_v[i1]
v_ok[i1+1:] = True
# If the timeline jogs down, we have to drop some frames.
for i0, i1 in good_intervals.reshape(-1)[1:-1].reshape(-1,2):
jogs = idx_v[i1] - idx_v[i0]
idx_v[i0+1:i1] = idx_v[i0]
v_ok[i0+1:i1] = True
if jogs < 0:
start = max(i0+1, i1+jogs)
end = start - jogs
v_ok[start:end] = False
idx_v += np.arange(len(x)) + (x0 - idx0)
return idx_v, v_ok, x_ok
def get_mapping(source, target):
"""
Let source and target be two non-decreasing sequences. Return
disjoint pairs of ranges ((source_start, source_end),
(target_start, target_end)) over which the two sequences are
perfectly overlapping.
"""
def requeue_vector(vector, index, target):
# Increase index so that vector[i]>=target).
s = (vector[index:] >= target)
if not np.any(s):
return len(vector)
return index + s.nonzero()[0][0]
def length_of_match(vector0, vector1):
# Return number of samples n for which vector0[0:n]==vector1[0:n].
max_len = min(len(vector0), len(vector1))
s = (vector0[:max_len] == vector1[:max_len])
if np.all(s):
return max_len
return (~s).nonzero()[0][0]
vector0, vector1 = np.asarray(source), np.asarray(target)
i0, i1 = 0, 0
intervals = []
while i0 < len(vector0) and i1 < len(vector1):
if vector0[i0] < vector1[i1]:
i0 = requeue_vector(vector0, i0, vector1[i1])
elif vector1[i1] < vector0[i0]:
i1 = requeue_vector(vector1, i1, vector0[i0])
else:
n = length_of_match(vector0[i0:], vector1[i1:])
intervals.append(((i0,i0+n),(i1,i1+n)))
i0 += n
i1 += n
return intervals
class VectorMapper:
"""
This class is used to manage the remapping of data vectors from
one gappy timeline into another gappy timeline.
"""
@classmethod
def from_sync_in(cls, sync_in):
# Clean up the input sync numbers by getting a mask and monotonic reference.
sync_in_mono, mono_ok, in_ok = find_linear_sequence(sync_in, 1, 400)
# Use the cleaned sync vector to set the substrate parameters.
sync_in_cleaned = sync_in_mono[in_ok]
self = cls(sync_in_cleaned[0], sync_in_cleaned[-1]+1)
self.mapping = get_mapping(sync_in_cleaned, self.get_substrate())
self.in_mask = in_ok
return self
def __init__(self, sync_start, sync_stop):
# Save the reference timeline.
self.sync_range = (sync_start, sync_stop)
def get_substrate(self):
return np.arange(self.sync_range[0], self.sync_range[1])
def revector(self, x, default=0):
"""
The input array "x" is mapped into a vector with the substrate
shape, according to the mapping stored within.
"""
output = np.empty((self.sync_range[1] - self.sync_range[0]), x.dtype)
output[:] = default
x = x[self.in_mask]
for (s0,s1),(d0,d1) in self.mapping:
output[d0:d1] = x[s0:s1]
return output
"""
Resampling functions. Depending on the payload, it makes sense to
"interpolate" data in different ways. For example, flags should be
or-ed together, while periodic signals like HWP angles need to be
interpolated with care.
"""
def _get_delta(d, idx):
if isinstance(d.dtype, np.unsignedinteger):
dp = d.astype('int64')
else:
dp = d
return d[idx], dp[idx+1] - dp[idx]
def _condition_output(d, dtype):
if isinstance(dtype, np.integer) and not isinstance(d.dtype, np.integer):
return d.round().astype(dtype)
return d.astype(dtype)
def resample_nearest(index_left, frac, v_in):
nearest = index_left + np.round(frac).astype(int)
return v_in[nearest]
def resample_ctime_split32(index_left, frac, s_in, us_in):
# You could probably get away with just working with
# doubles... but let's be perfect.
d_us = ((s_in[index_left+1].astype(int) - s_in[index_left])*1000000 +
(us_in[index_left+1].astype(int) - us_in[index_left]))
step_us = np.round(d_us*frac).astype(int) + us_in[index_left]
step_s, step_us = (step_us / 1000000, step_us % 1000000)
return (s_in[index_left] + step_s.astype('uint32'),
step_us.astype('uint32'))
def resample_periodic(index_left, frac, lo, hi, v_in):
d0, delta = _get_delta(v_in, index_left)
# Move the raw delta into unwrapped space.
delta[delta*2 >= (hi-lo)] -= (hi-lo)
delta[delta*2 <= -(hi-lo)] += (hi-lo)
# Scale.
d_out = d0.astype(delta.dtype)
if issubclass(v_in.dtype.type, np.integer):
d_out += (delta*frac).round().astype(delta.dtype)
else:
d_out += (delta*frac)
d_out[d_out >= hi] -= (hi-lo)
d_out[d_out < lo] += (hi-lo)
return d_out.astype(d0.dtype)
def resample_interpolate(index_left, frac, v_in):
vtype = v_in.dtype.type
if issubclass(vtype, np.integer):
v_in = v_in.astype(int)
dy = (np.diff(v_in)[index_left] * frac).round().astype(int)
return (v_in[index_left] + dy).astype(vtype)
y, dy = v_in[index_left], np.diff(v_in)[index_left]
return (y + dy*frac).astype(vtype)
def resample_bitcombine(index_left, frac, v_in, inverted_mask=0):
assert issubclass(v_in.dtype.type, np.integer)
v_out = v_in[index_left]
mix = frac > 0
# Invert some bits before or-ing.
v_in_inv = v_in[index_left[mix]+1] ^ inverted_mask
v_out_inv = (v_out[mix] ^ inverted_mask) | v_in_inv
v_out[mix] = v_out_inv ^ inverted_mask
#v_out[mix] |= v_in[index_left[mix]+1]
return v_out
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment