Last active
February 15, 2022 05:55
-
-
Save mikofski/7537522 to your computer and use it in GitHub Desktop.
time series examples that subclass numpy ndarray
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
import numpy as np | |
from datetime import datetime, time, timedelta | |
import pytz | |
class Timeseries(object): | |
def __init__(self, x, t): | |
self.x = np.array(x) | |
self.t = np.array(t,dtype='datetime64[s]') | |
def __getitem__(self, dt): | |
# use duck typing | |
# is it an integer or sequence? | |
try: | |
return self.x[dt] | |
except (TypeError, IndexError) as e: | |
pass | |
# it must be datetime | |
# make sure it is a numpy datetime | |
dt = np.datetime64(dt,'s') | |
idx = dt == self.t | |
if np.any(idx): | |
return self.x[idx][0] | |
idx = np.argwhere(dt < self.t) | |
idx = idx[0][0] | |
x_lo = self.x[idx - 1] | |
x_hi = self.x[idx] | |
t_lo = self.t[idx - 1] | |
t_hi = self.t[idx] | |
return x_lo + (x_hi - x_lo) * ((dt - t_lo) / (t_hi - t_lo)) | |
def __repr__(self): | |
return repr(self.x) | |
def __str__(self): | |
return str(self.x) | |
if __name__ == '__main__': | |
x = [1,2,3,4] | |
PST = pytz.timezone('America/Los_Angeles') | |
dt = [datetime(2013,1,1,12,30,0,tzinfo=PST), | |
datetime(2013,1,1,13,30,0,tzinfo=PST), | |
datetime(2013,1,1,14,30,0,tzinfo=PST), | |
datetime(2013,1,1,15,30,0,tzinfo=PST)] | |
ts = Timeseries(x,dt) | |
print ts.x + 1 | |
print ts | |
print ts[0], ts[1], ts[2], ts[3] | |
print ts[datetime(2013,1,1,15,30,0,tzinfo=PST)] | |
print ts[datetime(2013,1,1,14,45,0,tzinfo=PST)] | |
print ts.__class__ | |
print ts.__class__.__bases__ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
""" | |
http://docs.scipy.org/doc/numpy/user/basics.subclassing.html | |
""" | |
import numpy as np | |
from datetime import datetime, time, timedelta | |
import pytz | |
class TimeseriesNP(np.ndarray): | |
def __new__(cls, x, t): | |
# Input array is an already formed ndarray instance | |
# We first cast to be our class type | |
obj = np.asarray(x).view(cls) | |
# add the new attribute to the created instance | |
obj.x = np.array(x,dtype=float) | |
obj.t = np.array(t,dtype='datetime64[s]') | |
# Finally, we must return the newly created object: | |
return obj | |
def __array_finalize__(self, obj): | |
# see InfoArray.__array_finalize__ for comments | |
if obj is None: return | |
self.x = getattr(obj, 'x', None) | |
self.t = getattr(obj, 't', None) | |
def __getitem__(self, dt): | |
# use duck typing | |
# is it an integer or sequence? | |
try: | |
return super(TimeseriesNP, self).__getitem__(dt) | |
except (TypeError, IndexError) as e: | |
pass | |
# it must be datetime | |
# make sure it is a numpy datetime | |
dt = np.datetime64(dt,'s') | |
idx = dt == self.t | |
if np.any(idx): | |
return self[idx][0] | |
idx = np.argwhere(dt < self.t) | |
idx = idx[0][0] | |
x_lo = self[idx - 1] | |
x_hi = self[idx] | |
t_lo = self.t[idx - 1] | |
t_hi = self.t[idx] | |
return x_lo + (x_hi - x_lo) * ((dt - t_lo) / (t_hi - t_lo)) | |
def __array_prepare__(self, out_arr, context=None): | |
print 'In __array_prepare__:' | |
print ' self is %s' % repr(self) | |
print ' arr is %s' % repr(out_arr) | |
# then just call the parent | |
return np.ndarray.__array_prepare__(self, out_arr, context) | |
def __array_wrap__(self, out_arr, context=None): | |
print 'In __array_wrap__:' | |
print ' self is %s' % repr(self) | |
print ' arr is %s' % repr(out_arr) | |
# then just call the parent | |
return np.ndarray.__array_wrap__(self, out_arr, context) | |
if __name__ == '__main__': | |
x = [1,2,3,4] | |
PST = pytz.timezone('America/Los_Angeles') | |
dt = [datetime(2013,1,1,12,30,0,tzinfo=PST), | |
datetime(2013,1,1,13,30,0,tzinfo=PST), | |
datetime(2013,1,1,14,30,0,tzinfo=PST), | |
datetime(2013,1,1,15,30,0,tzinfo=PST)] | |
ts = TimeseriesNP(x,dt) | |
ts2 = ts + 1 | |
print ts2, ts2.x, ts2.t, ts2.__class__, ts2.__class__.__bases__ | |
print ts | |
print ts[0], ts[1], ts[2], ts[3] | |
print ts[datetime(2013,1,1,15,30,0,tzinfo=PST)] | |
print ts[datetime(2013,1,1,14,45,0,tzinfo=PST)] | |
print ts.__class__ | |
print ts.__class__.__bases__ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
""" | |
http://docs.scipy.org/doc/numpy/user/basics.subclassing.html | |
""" | |
import numpy as np | |
from datetime import datetime, time, timedelta | |
import pytz | |
class TimeseriesNP2(np.ndarray): | |
def __new__(cls, x, t): | |
dt = np.dtype([('x', float), ('t', 'datetime64[s]')]) | |
buffer = np.array(zip(x,t),dtype=dt) | |
obj = super(TimeseriesNP2, cls).__new__(cls, buffer.shape, dtype=dt, | |
buffer=buffer) | |
obj.x = np.array(x,dtype='float') | |
obj.t = np.array(t,dtype='datetime64[s]') | |
return obj | |
def __array_finalize__(self, obj): | |
# ``self`` is a new object resulting from | |
# ndarray.__new__(InfoArray, ...), therefore it only has | |
# attributes that the ndarray.__new__ constructor gave it - | |
# i.e. those of a standard ndarray. | |
# | |
# We could have got to the ndarray.__new__ call in 3 ways: | |
# From an explicit constructor - e.g. InfoArray(): | |
# obj is None | |
# (we're in the middle of the InfoArray.__new__ | |
# constructor, and self.info will be set when we return to | |
# InfoArray.__new__) | |
if obj is None: return | |
# From view casting - e.g arr.view(InfoArray): | |
# obj is arr | |
# (type(obj) can be InfoArray) | |
# From new-from-template - e.g infoarr[:3] | |
# type(obj) is InfoArray | |
# | |
# Note that it is here, rather than in the __new__ method, | |
# that we set the default value for 'info', because this | |
# method sees all creation of default objects - with the | |
# InfoArray.__new__ constructor, but also with | |
# arr.view(InfoArray). | |
self.x = getattr(obj, 'x', None) | |
self.t = getattr(obj, 't', None) | |
# We do not need to return anything | |
def __getitem__(self, dt): | |
# use duck typing | |
# is it an integer or sequence? | |
try: | |
return super(TimeseriesNP2, self).__getitem__(dt) | |
except (TypeError, IndexError, ValueError) as e: | |
pass | |
# it must be datetime | |
x = self['x'] | |
t = self['t'] | |
# make sure it is a numpy datetime | |
dt = np.datetime64(dt,'s') | |
idx = dt == t | |
if np.any(idx): | |
return x[idx][0] | |
idx = np.argwhere(dt < t) | |
idx = idx[0][0] | |
x_lo = x[idx - 1] | |
x_hi = x[idx] | |
t_lo = t[idx - 1] | |
t_hi = t[idx] | |
return np.array((x_lo + (x_hi - x_lo) * ((dt - t_lo) / (t_hi - t_lo)), | |
dt),dtype=self.dtype) | |
def __array_prepare__(self, out_arr, context=None): | |
print 'In __array_prepare__:' | |
print ' self is %s' % repr(self) | |
print ' arr is %s' % repr(out_arr) | |
# then just call the parent | |
return np.ndarray.__array_prepare__(self, out_arr, context) | |
def __array_wrap__(self, out_arr, context=None): | |
print 'In __array_wrap__:' | |
print ' self is %s' % repr(self) | |
print ' arr is %s' % repr(out_arr) | |
# then just call the parent | |
return np.ndarray.__array_wrap__(self, out_arr, context) | |
if __name__ == '__main__': | |
x = [1,2,3,4] | |
PST = pytz.timezone('America/Los_Angeles') | |
dt = [datetime(2013,1,1,12,30,0,tzinfo=PST), | |
datetime(2013,1,1,13,30,0,tzinfo=PST), | |
datetime(2013,1,1,14,30,0,tzinfo=PST), | |
datetime(2013,1,1,15,30,0,tzinfo=PST)] | |
ts = TimeseriesNP2(x,dt) | |
ts2 = ts['x'] + 1.0 | |
print ts2, ts2.x, ts2.t, ts2.__class__, ts2.__class__.__bases__ | |
print ts | |
print ts[0], ts[1], ts[2], ts[3] | |
print ts[datetime(2013,1,1,15,30,0,tzinfo=PST)] | |
print ts[datetime(2013,1,1,14,45,0,tzinfo=PST)] | |
print ts.__class__ | |
print ts.__class__.__bases__ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
""" | |
http://docs.scipy.org/doc/numpy/user/basics.subclassing.html | |
""" | |
import numpy as np | |
from datetime import datetime, time, timedelta | |
import pytz | |
import quantities as pq | |
class TimeseriesPQ(pq.quantity.Quantity): | |
def __new__(cls, x, t): | |
# Input array is an already formed ndarray instance | |
# We first cast to be our class type | |
if isinstance(x,pq.quantity.Quantity): | |
units = x.units | |
magnitude = x.magnitude | |
obj = np.asarray(magnitude).view(cls) * units | |
else: | |
obj = np.asarray(x).view(cls) * pq.dimensionless | |
# add the new attribute to the created instance | |
obj.x = np.array(x,dtype=float) | |
obj.t = np.array(t,dtype='datetime64[s]') | |
# Finally, we must return the newly created object: | |
return obj | |
def __array_finalize__(self, obj): | |
# see InfoArray.__array_finalize__ for comments | |
if obj is None: return | |
self.x = getattr(obj, 'x', None) | |
self.t = getattr(obj, 't', None) | |
def __getitem__(self, dt): | |
# use duck typing | |
# is it an integer or sequence? | |
try: | |
return super(TimeseriesPQ, self).__getitem__(dt) | |
except (TypeError, IndexError) as e: | |
pass | |
# it must be datetime | |
# make sure it is a numpy datetime | |
dt = np.datetime64(dt,'s') | |
idx = dt == self.t | |
if np.any(idx): | |
return self[idx][0] | |
idx = np.argwhere(dt < self.t) | |
idx = idx[0][0] | |
x_lo = self[idx - 1] | |
x_hi = self[idx] | |
t_lo = self.t[idx - 1] | |
t_hi = self.t[idx] | |
return x_lo + (x_hi - x_lo) * ((dt - t_lo) / (t_hi - t_lo)) | |
def __array_prepare__(self, out_arr, context=None): | |
print 'In __array_prepare__:' | |
print ' self is %s' % repr(self) | |
print ' arr is %s' % repr(out_arr) | |
# then just call the parent | |
return np.ndarray.__array_prepare__(self, out_arr, context) | |
def __array_wrap__(self, out_arr, context=None): | |
print 'In __array_wrap__:' | |
print ' self is %s' % repr(self) | |
print ' arr is %s' % repr(out_arr) | |
# then just call the parent | |
return np.ndarray.__array_wrap__(self, out_arr, context) | |
if __name__ == '__main__': | |
x = [1,2,3,4] | |
PST = pytz.timezone('America/Los_Angeles') | |
dt = [datetime(2013,1,1,12,30,0,tzinfo=PST), | |
datetime(2013,1,1,13,30,0,tzinfo=PST), | |
datetime(2013,1,1,14,30,0,tzinfo=PST), | |
datetime(2013,1,1,15,30,0,tzinfo=PST)] | |
ts = TimeseriesPQ(x,dt) | |
ts2 = ts + 1 | |
print ts2, ts2.x, ts2.t, ts2.__class__, ts2.__class__.__bases__ | |
print ts | |
print ts[0], ts[1], ts[2], ts[3] | |
print ts[datetime(2013,1,1,15,30,0,tzinfo=PST)] | |
print ts[datetime(2013,1,1,14,45,0,tzinfo=PST)] | |
print ts.__class__ | |
print ts.__class__.__bases__ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Kindly check - in timeseriesNP.py, once you call the .view(cls) method, the rest of the code after that line in new() doesn't seem to execute.