-
-
Save mhasself/e2abc0749ac4552085d37734ef97d007 to your computer and use it in GitHub Desktop.
Extracted from actmergelib (vectors.py)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This code is being released into the public domain, but if you want to | |
# use it and state its provenance, it is taken from actmergelib, part of | |
# the Atacama Cosmology Telescope site software. | |
import numpy as np | |
def find_linear_sequence(x, delta_x=1, tolerance=100, establishment=400): | |
""" | |
Find the linear sequence v that most closely agrees with x, which | |
is assumed to be drawn from v with a small number of missing or | |
extra elements. | |
Returns three arrays the same size as x: | |
vx, v_ok, x_ok | |
The mask x_ok selects elements from x that would (with some | |
corrections) match a linear series. vx is the linear series, but | |
with gaps or padding as necessary to have it match x element for | |
element. The monotonic subset is vx[v_ok]. The monotonic subset | |
for which the input data appear to be valid is vx[v_ok*x_ok]. | |
In the simplest case, there is gap or error in the sequence; e.g.: | |
>>> find_linear_sequence([0,1,2,4,5]) | |
(array([0, 1, 2, 4, 5]), | |
array([ True, True, True, True, True], dtype=bool), | |
array([ True, True, True, True, True], dtype=bool)) | |
>>> find_linear_sequence([0,1,2,1000,4,5]) | |
(array([0, 1, 2, 3, 4, 5]), | |
array([ True, True, True, True, True, True], dtype=bool), | |
array([ True, True, True, False, True, True], dtype=bool)) | |
There could be extra samples though: | |
>>> find_linear_sequence([0,1,2,1000,1000,4,5]) | |
(array([0, 1, 2, 3, 4, 4, 5]), | |
array([ True, True, True, True, False, True, True], dtype=bool), | |
array([ True, True, True, False, False, True, True], dtype=bool)) | |
Note that v_ok is always a superset of x_ok, in the sense that | |
np.any(x_ok*~v_ok) is False. | |
""" | |
x = np.asarray(x,int) | |
dx = np.diff(x) | |
# Index where slope changes | |
x_changes = np.hstack((0, np.diff(dx).nonzero()[0]+2, len(x))) | |
superx = np.hstack((x[0], x, x[-1])) | |
good_intervals = np.diff((np.diff(superx)==delta_x).astype('int8')).nonzero()[0].reshape((-1,2)) | |
v_ok = np.zeros(x.shape, bool) | |
idx_v = np.zeros(x.shape, int) | |
# Take largest interval as reference | |
imax = np.dot(good_intervals, [-1,1]).argmax() | |
idx0 = good_intervals[imax][0] | |
x0 = x[idx0] | |
for i0, i1 in good_intervals: | |
delta = x[i0] - x0 + idx0 - i0 | |
# Permit small, short, slips, or longer gaps as long as they | |
# are persistent. | |
make_ok = ( (abs(delta) < tolerance) or | |
( (establishment >= 0) and | |
(i1-i0 > establishment) and | |
(abs(delta) < i1-i0) ) ) | |
if make_ok: | |
idx_v[i0:i1+1] = delta | |
v_ok[i0:i1+1] = True | |
# Give isolated samples one last chance | |
new_goods = [] | |
## Check left of first good interval... | |
i1 = good_intervals[0][0] | |
delta_hi = idx_v[i1] | |
shiftable = 0 | |
for i in xrange(i1-1, -1, -1): | |
delta = x[i] - x0 + idx0 - i | |
if delta <= delta_hi+shiftable and delta > delta_hi - tolerance: | |
idx_v[i] = delta | |
delta_hi = delta | |
v_ok[i] = True | |
shiftable = 0 | |
new_goods.append((i,i)) | |
else: | |
shiftable += 1 | |
## Check right of all intervals | |
for ival in xrange(1, len(good_intervals)+1): | |
if ival == len(good_intervals): | |
i0, i1 = good_intervals[-1][1], len(x) | |
delta_lo = idx_v[i0] | |
delta_hi = delta_lo + tolerance | |
else: | |
i0, i1 = good_intervals[ival-1][1], good_intervals[ival][0] | |
delta_lo = idx_v[i0] | |
delta_hi = idx_v[i1] | |
shiftable = 0 | |
for i in range(i0+1, i1): | |
delta = x[i] - x0 + idx0 - i | |
if delta >= delta_lo-shiftable and delta <= delta_hi: | |
idx_v[i] = delta | |
v_ok[i] = True | |
delta_lo = delta | |
shiftable = 0 | |
new_goods.append((i,i)) | |
else: | |
shiftable += 1 | |
good_intervals = np.array(sorted([(i0,i1) for (i0,i1) in good_intervals] + new_goods)) | |
x_ok = v_ok.copy() | |
# Simple flat-line over the gaps. | |
i0 = good_intervals[0][0] | |
idx_v[:i0] = idx_v[i0] | |
v_ok[:i0] = True | |
i1 = good_intervals[-1][-1] | |
if i1 < len(x)-1: | |
idx_v[i1+1:] = idx_v[i1] | |
v_ok[i1+1:] = True | |
# If the timeline jogs down, we have to drop some frames. | |
for i0, i1 in good_intervals.reshape(-1)[1:-1].reshape(-1,2): | |
jogs = idx_v[i1] - idx_v[i0] | |
idx_v[i0+1:i1] = idx_v[i0] | |
v_ok[i0+1:i1] = True | |
if jogs < 0: | |
start = max(i0+1, i1+jogs) | |
end = start - jogs | |
v_ok[start:end] = False | |
idx_v += np.arange(len(x)) + (x0 - idx0) | |
return idx_v, v_ok, x_ok | |
def get_mapping(source, target): | |
""" | |
Let source and target be two non-decreasing sequences. Return | |
disjoint pairs of ranges ((source_start, source_end), | |
(target_start, target_end)) over which the two sequences are | |
perfectly overlapping. | |
""" | |
def requeue_vector(vector, index, target): | |
# Increase index so that vector[i]>=target). | |
s = (vector[index:] >= target) | |
if not np.any(s): | |
return len(vector) | |
return index + s.nonzero()[0][0] | |
def length_of_match(vector0, vector1): | |
# Return number of samples n for which vector0[0:n]==vector1[0:n]. | |
max_len = min(len(vector0), len(vector1)) | |
s = (vector0[:max_len] == vector1[:max_len]) | |
if np.all(s): | |
return max_len | |
return (~s).nonzero()[0][0] | |
vector0, vector1 = np.asarray(source), np.asarray(target) | |
i0, i1 = 0, 0 | |
intervals = [] | |
while i0 < len(vector0) and i1 < len(vector1): | |
if vector0[i0] < vector1[i1]: | |
i0 = requeue_vector(vector0, i0, vector1[i1]) | |
elif vector1[i1] < vector0[i0]: | |
i1 = requeue_vector(vector1, i1, vector0[i0]) | |
else: | |
n = length_of_match(vector0[i0:], vector1[i1:]) | |
intervals.append(((i0,i0+n),(i1,i1+n))) | |
i0 += n | |
i1 += n | |
return intervals | |
class VectorMapper: | |
""" | |
This class is used to manage the remapping of data vectors from | |
one gappy timeline into another gappy timeline. | |
""" | |
@classmethod | |
def from_sync_in(cls, sync_in): | |
# Clean up the input sync numbers by getting a mask and monotonic reference. | |
sync_in_mono, mono_ok, in_ok = find_linear_sequence(sync_in, 1, 400) | |
# Use the cleaned sync vector to set the substrate parameters. | |
sync_in_cleaned = sync_in_mono[in_ok] | |
self = cls(sync_in_cleaned[0], sync_in_cleaned[-1]+1) | |
self.mapping = get_mapping(sync_in_cleaned, self.get_substrate()) | |
self.in_mask = in_ok | |
return self | |
def __init__(self, sync_start, sync_stop): | |
# Save the reference timeline. | |
self.sync_range = (sync_start, sync_stop) | |
def get_substrate(self): | |
return np.arange(self.sync_range[0], self.sync_range[1]) | |
def revector(self, x, default=0): | |
""" | |
The input array "x" is mapped into a vector with the substrate | |
shape, according to the mapping stored within. | |
""" | |
output = np.empty((self.sync_range[1] - self.sync_range[0]), x.dtype) | |
output[:] = default | |
x = x[self.in_mask] | |
for (s0,s1),(d0,d1) in self.mapping: | |
output[d0:d1] = x[s0:s1] | |
return output | |
""" | |
Resampling functions. Depending on the payload, it makes sense to | |
"interpolate" data in different ways. For example, flags should be | |
or-ed together, while periodic signals like HWP angles need to be | |
interpolated with care. | |
""" | |
def _get_delta(d, idx): | |
if isinstance(d.dtype, np.unsignedinteger): | |
dp = d.astype('int64') | |
else: | |
dp = d | |
return d[idx], dp[idx+1] - dp[idx] | |
def _condition_output(d, dtype): | |
if isinstance(dtype, np.integer) and not isinstance(d.dtype, np.integer): | |
return d.round().astype(dtype) | |
return d.astype(dtype) | |
def resample_nearest(index_left, frac, v_in): | |
nearest = index_left + np.round(frac).astype(int) | |
return v_in[nearest] | |
def resample_ctime_split32(index_left, frac, s_in, us_in): | |
# You could probably get away with just working with | |
# doubles... but let's be perfect. | |
d_us = ((s_in[index_left+1].astype(int) - s_in[index_left])*1000000 + | |
(us_in[index_left+1].astype(int) - us_in[index_left])) | |
step_us = np.round(d_us*frac).astype(int) + us_in[index_left] | |
step_s, step_us = (step_us / 1000000, step_us % 1000000) | |
return (s_in[index_left] + step_s.astype('uint32'), | |
step_us.astype('uint32')) | |
def resample_periodic(index_left, frac, lo, hi, v_in): | |
d0, delta = _get_delta(v_in, index_left) | |
# Move the raw delta into unwrapped space. | |
delta[delta*2 >= (hi-lo)] -= (hi-lo) | |
delta[delta*2 <= -(hi-lo)] += (hi-lo) | |
# Scale. | |
d_out = d0.astype(delta.dtype) | |
if issubclass(v_in.dtype.type, np.integer): | |
d_out += (delta*frac).round().astype(delta.dtype) | |
else: | |
d_out += (delta*frac) | |
d_out[d_out >= hi] -= (hi-lo) | |
d_out[d_out < lo] += (hi-lo) | |
return d_out.astype(d0.dtype) | |
def resample_interpolate(index_left, frac, v_in): | |
vtype = v_in.dtype.type | |
if issubclass(vtype, np.integer): | |
v_in = v_in.astype(int) | |
dy = (np.diff(v_in)[index_left] * frac).round().astype(int) | |
return (v_in[index_left] + dy).astype(vtype) | |
y, dy = v_in[index_left], np.diff(v_in)[index_left] | |
return (y + dy*frac).astype(vtype) | |
def resample_bitcombine(index_left, frac, v_in, inverted_mask=0): | |
assert issubclass(v_in.dtype.type, np.integer) | |
v_out = v_in[index_left] | |
mix = frac > 0 | |
# Invert some bits before or-ing. | |
v_in_inv = v_in[index_left[mix]+1] ^ inverted_mask | |
v_out_inv = (v_out[mix] ^ inverted_mask) | v_in_inv | |
v_out[mix] = v_out_inv ^ inverted_mask | |
#v_out[mix] |= v_in[index_left[mix]+1] | |
return v_out | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment