Skip to content

Instantly share code, notes, and snippets.

@loretoparisi
Created December 20, 2018 16:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save loretoparisi/449a8dc1050bf6c38d029564e2a69914 to your computer and use it in GitHub Desktop.
Save loretoparisi/449a8dc1050bf6c38d029564e2a69914 to your computer and use it in GitHub Desktop.
Python Circular Buffer - adapted from https://github.com/eric-wieser/numpy_ringbuffer
import numpy as np
class CircularBuffer():
# Initializes NumPy array and head/tail pointers
def __init__(self, capacity, dtype=float):
self._buffer = np.zeros(capacity, dtype)
self._head_index = 0
self._tail_index = 0
self._capacity = capacity
# Makes sure that head and tail pointers cycle back around
def fix_indices(self):
if self._head_index >= self._capacity:
self._head_index -= self._capacity
self._tail_index -= self._capacity
elif self._head_index < 0:
self._head_index += self._capacity
self._tail_index += self._capacity
# Inserts a new value in buffer, overwriting old value if full
def insert(self, value):
if self.is_full():
self._head_index += 1
self._buffer[self._tail_index % self._capacity] = value
self._tail_index += 1
self.fix_indices()
# Returns the circular buffer as an array starting at head index
def unwrap(self):
return np.concatenate((
self._buffer[self._head_index:min(self._tail_index, self._capacity)],
self._buffer[:max(self._tail_index - self._capacity, 0)]
))
# Indicates whether the buffer has been filled yet
def is_full(self):
return self.count() == self._capacity
# Returns the amount of values currently in buffer
def count(self):
return self._tail_index - self._head_index
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment