Skip to content

Instantly share code, notes, and snippets.

@mikofski
Last active February 15, 2022 05:55
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mikofski/7537522 to your computer and use it in GitHub Desktop.
Save mikofski/7537522 to your computer and use it in GitHub Desktop.
time series examples that subclass numpy ndarray
#! /usr/bin/env python
import numpy as np
from datetime import datetime, time, timedelta
import pytz
class Timeseries(object):
def __init__(self, x, t):
self.x = np.array(x)
self.t = np.array(t,dtype='datetime64[s]')
def __getitem__(self, dt):
# use duck typing
# is it an integer or sequence?
try:
return self.x[dt]
except (TypeError, IndexError) as e:
pass
# it must be datetime
# make sure it is a numpy datetime
dt = np.datetime64(dt,'s')
idx = dt == self.t
if np.any(idx):
return self.x[idx][0]
idx = np.argwhere(dt < self.t)
idx = idx[0][0]
x_lo = self.x[idx - 1]
x_hi = self.x[idx]
t_lo = self.t[idx - 1]
t_hi = self.t[idx]
return x_lo + (x_hi - x_lo) * ((dt - t_lo) / (t_hi - t_lo))
def __repr__(self):
return repr(self.x)
def __str__(self):
return str(self.x)
if __name__ == '__main__':
x = [1,2,3,4]
PST = pytz.timezone('America/Los_Angeles')
dt = [datetime(2013,1,1,12,30,0,tzinfo=PST),
datetime(2013,1,1,13,30,0,tzinfo=PST),
datetime(2013,1,1,14,30,0,tzinfo=PST),
datetime(2013,1,1,15,30,0,tzinfo=PST)]
ts = Timeseries(x,dt)
print ts.x + 1
print ts
print ts[0], ts[1], ts[2], ts[3]
print ts[datetime(2013,1,1,15,30,0,tzinfo=PST)]
print ts[datetime(2013,1,1,14,45,0,tzinfo=PST)]
print ts.__class__
print ts.__class__.__bases__
#! /usr/bin/env python
"""
http://docs.scipy.org/doc/numpy/user/basics.subclassing.html
"""
import numpy as np
from datetime import datetime, time, timedelta
import pytz
class TimeseriesNP(np.ndarray):
def __new__(cls, x, t):
# Input array is an already formed ndarray instance
# We first cast to be our class type
obj = np.asarray(x).view(cls)
# add the new attribute to the created instance
obj.x = np.array(x,dtype=float)
obj.t = np.array(t,dtype='datetime64[s]')
# Finally, we must return the newly created object:
return obj
def __array_finalize__(self, obj):
# see InfoArray.__array_finalize__ for comments
if obj is None: return
self.x = getattr(obj, 'x', None)
self.t = getattr(obj, 't', None)
def __getitem__(self, dt):
# use duck typing
# is it an integer or sequence?
try:
return super(TimeseriesNP, self).__getitem__(dt)
except (TypeError, IndexError) as e:
pass
# it must be datetime
# make sure it is a numpy datetime
dt = np.datetime64(dt,'s')
idx = dt == self.t
if np.any(idx):
return self[idx][0]
idx = np.argwhere(dt < self.t)
idx = idx[0][0]
x_lo = self[idx - 1]
x_hi = self[idx]
t_lo = self.t[idx - 1]
t_hi = self.t[idx]
return x_lo + (x_hi - x_lo) * ((dt - t_lo) / (t_hi - t_lo))
def __array_prepare__(self, out_arr, context=None):
print 'In __array_prepare__:'
print ' self is %s' % repr(self)
print ' arr is %s' % repr(out_arr)
# then just call the parent
return np.ndarray.__array_prepare__(self, out_arr, context)
def __array_wrap__(self, out_arr, context=None):
print 'In __array_wrap__:'
print ' self is %s' % repr(self)
print ' arr is %s' % repr(out_arr)
# then just call the parent
return np.ndarray.__array_wrap__(self, out_arr, context)
if __name__ == '__main__':
x = [1,2,3,4]
PST = pytz.timezone('America/Los_Angeles')
dt = [datetime(2013,1,1,12,30,0,tzinfo=PST),
datetime(2013,1,1,13,30,0,tzinfo=PST),
datetime(2013,1,1,14,30,0,tzinfo=PST),
datetime(2013,1,1,15,30,0,tzinfo=PST)]
ts = TimeseriesNP(x,dt)
ts2 = ts + 1
print ts2, ts2.x, ts2.t, ts2.__class__, ts2.__class__.__bases__
print ts
print ts[0], ts[1], ts[2], ts[3]
print ts[datetime(2013,1,1,15,30,0,tzinfo=PST)]
print ts[datetime(2013,1,1,14,45,0,tzinfo=PST)]
print ts.__class__
print ts.__class__.__bases__
#! /usr/bin/env python
"""
http://docs.scipy.org/doc/numpy/user/basics.subclassing.html
"""
import numpy as np
from datetime import datetime, time, timedelta
import pytz
class TimeseriesNP2(np.ndarray):
def __new__(cls, x, t):
dt = np.dtype([('x', float), ('t', 'datetime64[s]')])
buffer = np.array(zip(x,t),dtype=dt)
obj = super(TimeseriesNP2, cls).__new__(cls, buffer.shape, dtype=dt,
buffer=buffer)
obj.x = np.array(x,dtype='float')
obj.t = np.array(t,dtype='datetime64[s]')
return obj
def __array_finalize__(self, obj):
# ``self`` is a new object resulting from
# ndarray.__new__(InfoArray, ...), therefore it only has
# attributes that the ndarray.__new__ constructor gave it -
# i.e. those of a standard ndarray.
#
# We could have got to the ndarray.__new__ call in 3 ways:
# From an explicit constructor - e.g. InfoArray():
# obj is None
# (we're in the middle of the InfoArray.__new__
# constructor, and self.info will be set when we return to
# InfoArray.__new__)
if obj is None: return
# From view casting - e.g arr.view(InfoArray):
# obj is arr
# (type(obj) can be InfoArray)
# From new-from-template - e.g infoarr[:3]
# type(obj) is InfoArray
#
# Note that it is here, rather than in the __new__ method,
# that we set the default value for 'info', because this
# method sees all creation of default objects - with the
# InfoArray.__new__ constructor, but also with
# arr.view(InfoArray).
self.x = getattr(obj, 'x', None)
self.t = getattr(obj, 't', None)
# We do not need to return anything
def __getitem__(self, dt):
# use duck typing
# is it an integer or sequence?
try:
return super(TimeseriesNP2, self).__getitem__(dt)
except (TypeError, IndexError, ValueError) as e:
pass
# it must be datetime
x = self['x']
t = self['t']
# make sure it is a numpy datetime
dt = np.datetime64(dt,'s')
idx = dt == t
if np.any(idx):
return x[idx][0]
idx = np.argwhere(dt < t)
idx = idx[0][0]
x_lo = x[idx - 1]
x_hi = x[idx]
t_lo = t[idx - 1]
t_hi = t[idx]
return np.array((x_lo + (x_hi - x_lo) * ((dt - t_lo) / (t_hi - t_lo)),
dt),dtype=self.dtype)
def __array_prepare__(self, out_arr, context=None):
print 'In __array_prepare__:'
print ' self is %s' % repr(self)
print ' arr is %s' % repr(out_arr)
# then just call the parent
return np.ndarray.__array_prepare__(self, out_arr, context)
def __array_wrap__(self, out_arr, context=None):
print 'In __array_wrap__:'
print ' self is %s' % repr(self)
print ' arr is %s' % repr(out_arr)
# then just call the parent
return np.ndarray.__array_wrap__(self, out_arr, context)
if __name__ == '__main__':
x = [1,2,3,4]
PST = pytz.timezone('America/Los_Angeles')
dt = [datetime(2013,1,1,12,30,0,tzinfo=PST),
datetime(2013,1,1,13,30,0,tzinfo=PST),
datetime(2013,1,1,14,30,0,tzinfo=PST),
datetime(2013,1,1,15,30,0,tzinfo=PST)]
ts = TimeseriesNP2(x,dt)
ts2 = ts['x'] + 1.0
print ts2, ts2.x, ts2.t, ts2.__class__, ts2.__class__.__bases__
print ts
print ts[0], ts[1], ts[2], ts[3]
print ts[datetime(2013,1,1,15,30,0,tzinfo=PST)]
print ts[datetime(2013,1,1,14,45,0,tzinfo=PST)]
print ts.__class__
print ts.__class__.__bases__
#! /usr/bin/env python
"""
http://docs.scipy.org/doc/numpy/user/basics.subclassing.html
"""
import numpy as np
from datetime import datetime, time, timedelta
import pytz
import quantities as pq
class TimeseriesPQ(pq.quantity.Quantity):
def __new__(cls, x, t):
# Input array is an already formed ndarray instance
# We first cast to be our class type
if isinstance(x,pq.quantity.Quantity):
units = x.units
magnitude = x.magnitude
obj = np.asarray(magnitude).view(cls) * units
else:
obj = np.asarray(x).view(cls) * pq.dimensionless
# add the new attribute to the created instance
obj.x = np.array(x,dtype=float)
obj.t = np.array(t,dtype='datetime64[s]')
# Finally, we must return the newly created object:
return obj
def __array_finalize__(self, obj):
# see InfoArray.__array_finalize__ for comments
if obj is None: return
self.x = getattr(obj, 'x', None)
self.t = getattr(obj, 't', None)
def __getitem__(self, dt):
# use duck typing
# is it an integer or sequence?
try:
return super(TimeseriesPQ, self).__getitem__(dt)
except (TypeError, IndexError) as e:
pass
# it must be datetime
# make sure it is a numpy datetime
dt = np.datetime64(dt,'s')
idx = dt == self.t
if np.any(idx):
return self[idx][0]
idx = np.argwhere(dt < self.t)
idx = idx[0][0]
x_lo = self[idx - 1]
x_hi = self[idx]
t_lo = self.t[idx - 1]
t_hi = self.t[idx]
return x_lo + (x_hi - x_lo) * ((dt - t_lo) / (t_hi - t_lo))
def __array_prepare__(self, out_arr, context=None):
print 'In __array_prepare__:'
print ' self is %s' % repr(self)
print ' arr is %s' % repr(out_arr)
# then just call the parent
return np.ndarray.__array_prepare__(self, out_arr, context)
def __array_wrap__(self, out_arr, context=None):
print 'In __array_wrap__:'
print ' self is %s' % repr(self)
print ' arr is %s' % repr(out_arr)
# then just call the parent
return np.ndarray.__array_wrap__(self, out_arr, context)
if __name__ == '__main__':
x = [1,2,3,4]
PST = pytz.timezone('America/Los_Angeles')
dt = [datetime(2013,1,1,12,30,0,tzinfo=PST),
datetime(2013,1,1,13,30,0,tzinfo=PST),
datetime(2013,1,1,14,30,0,tzinfo=PST),
datetime(2013,1,1,15,30,0,tzinfo=PST)]
ts = TimeseriesPQ(x,dt)
ts2 = ts + 1
print ts2, ts2.x, ts2.t, ts2.__class__, ts2.__class__.__bases__
print ts
print ts[0], ts[1], ts[2], ts[3]
print ts[datetime(2013,1,1,15,30,0,tzinfo=PST)]
print ts[datetime(2013,1,1,14,45,0,tzinfo=PST)]
print ts.__class__
print ts.__class__.__bases__
@InfinityByTen
Copy link

Kindly check - in timeseriesNP.py, once you call the .view(cls) method, the rest of the code after that line in new() doesn't seem to execute.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment