Skip to content

Instantly share code, notes, and snippets.

@jcrubino
Forked from mtambos/ring_buffer
Last active August 29, 2015 14:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jcrubino/74dca3d3991d3a2dcb85 to your computer and use it in GitHub Desktop.
Save jcrubino/74dca3d3991d3a2dcb85 to your computer and use it in GitHub Desktop.
import numpy as np
class RingBuffer(np.ndarray):
'A multidimensional ring buffer.'
def __new__(cls, input_array):
obj = np.asarray(input_array).view(cls)
return obj
def __array_finalize__(self, obj):
if obj is None: return
def __array_wrap__(self, out_arr, context=None):
return np.ndarray.__array_wrap__(self, out_arr, context)
def extend(self, xs):
'Adds array xs to the ring buffer. If xs is longer than the ring '
'buffer, the last len(ring buffer) of xs are added the ring buffer.'
xs = np.asarray(xs)
if self.shape[1:] != xs.shape[1:]:
raise ValueError("Element's shape mismatch. RingBuffer.shape={}. "
"xs.shape={}".format(self.shape, xs.shape))
len_self = len(self)
len_xs = len(xs)
if len_self <= len_xs:
xs = xs[-len_self:]
len_xs = len(xs)
else:
self[:-len_xs] = self[len_xs:]
self[-len_xs:] = xs
def append(self, x):
'Adds element x to the ring buffer.'
x = np.asarray(x)
if self.shape[1:] != x.shape:
raise ValueError("Element's shape mismatch. RingBuffer.shape={}. "
"xs.shape={}".format(self.shape, x.shape))
len_self = len(self)
self[:-1] = self[1:]
self[-1] = x
#!/usr/bin/env python
import sys
import unittest
import numpy as np
import ring_buffer
class RingBufferTests(unittest.TestCase):
def test_extend_less_than_len_1d(self):
#unidimensional case
ringbuff = ring_buffer.RingBuffer(np.zeros(10))
expected = np.random.random(8)
ringbuff.extend(expected)
self.assertEqual(expected.tolist(),
ringbuff[-len(expected):].tolist())
def test_extend_less_than_len_nd(self):
#n-dimensional case
ringbuff = ring_buffer.RingBuffer(np.zeros((10, 4)))
expected = np.random.random((8, 4))
ringbuff.extend(expected)
self.assertEqual(expected.tolist(),
ringbuff[-len(expected):].tolist())
def test_extend_equal_len_1d(self):
#unidimensional case
ringbuff = ring_buffer.RingBuffer(np.zeros(10))
expected = np.random.random(10)
ringbuff.extend(expected)
self.assertEqual(expected.tolist(), ringbuff.tolist())
def test_extend_equal_len_nd(self):
#n-dimensional case
ringbuff = ring_buffer.RingBuffer(np.zeros((10, 4)))
expected = np.random.random((10, 4))
ringbuff.extend(expected)
self.assertEqual(expected.tolist(), ringbuff.tolist())
def test_extend_more_than_len_1d(self):
#unidimensional case
ringbuff = ring_buffer.RingBuffer(np.zeros(10))
expected = np.random.random(12)
ringbuff.extend(expected)
self.assertEqual(expected[-len(ringbuff):].tolist(),
ringbuff.tolist())
def test_extend_more_than_len_nd(self):
#n-dimensional case
ringbuff = ring_buffer.RingBuffer(np.zeros((10, 4)))
expected = np.random.random((12, 4))
ringbuff.extend(expected)
self.assertEqual(expected[-len(ringbuff):].tolist(),
ringbuff.tolist())
def test_append_less_than_len_1d(self):
#unidimensional case
ringbuff = ring_buffer.RingBuffer(np.zeros(10))
expected = np.random.random(8)
for i in range(len(expected)):
ringbuff.append(expected[i])
self.assertEqual(expected.tolist(),
ringbuff[-len(expected):].tolist())
def test_append_less_than_len_nd(self):
#n-dimensional case
ringbuff = ring_buffer.RingBuffer(np.zeros((10, 4)))
expected = np.random.random((8, 4))
for i in range(len(expected)):
ringbuff.append(expected[i])
self.assertEqual(expected.tolist(),
ringbuff[-len(expected):].tolist())
def test_extend_equal_len_1d(self):
#unidimensional case
ringbuff = ring_buffer.RingBuffer(np.zeros(10))
expected = np.random.random(10)
for i in range(len(expected)):
ringbuff.append(expected[i])
self.assertEqual(expected.tolist(), ringbuff.tolist())
def test_extend_equal_len_nd(self):
#n-dimensional case
ringbuff = ring_buffer.RingBuffer(np.zeros((10, 4)))
expected = np.random.random((10, 4))
for i in range(len(expected)):
ringbuff.append(expected[i])
self.assertEqual(expected.tolist(), ringbuff.tolist())
def test_extend_more_than_len_1d(self):
#unidimensional case
ringbuff = ring_buffer.RingBuffer(np.zeros(10))
expected = np.random.random(12)
for i in range(len(expected)):
ringbuff.append(expected[i])
self.assertEqual(expected[-len(ringbuff):].tolist(),
ringbuff.tolist())
def test_extend_more_than_len_nd(self):
#n-dimensional case
ringbuff = ring_buffer.RingBuffer(np.zeros((10, 4)))
expected = np.random.random((12, 4))
for i in range(len(expected)):
ringbuff.append(expected[i])
self.assertEqual(expected[-len(ringbuff):].tolist(),
ringbuff.tolist())
if __name__ == '__main__':
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment