Skip to content

Instantly share code, notes, and snippets.

@1st1
Created September 28, 2018 17:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save 1st1/79caa457434aa7f728395162f7f6f4dd to your computer and use it in GitHub Desktop.
Save 1st1/79caa457434aa7f728395162f7f6f4dd to your computer and use it in GitHub Desktop.
rolling average
#
# This source file is part of the EdgeDB open source project.
#
# Copyright 2016-present MagicStack Inc. and the EdgeDB authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from cpython.mem cimport PyMem_Malloc, PyMem_Free
from libc.stdint cimport uint32_t
cdef class RollingAverage:
cdef:
uint32_t _cur
uint32_t _maxlen
uint32_t _len
double *_vals
readonly double avg
def __cinit__(self, uint32_t length):
self._vals = NULL
def __dealloc__(self):
PyMem_Free(self._vals)
def __init__(self, uint32_t length):
if length <= 2:
raise ValueError(
f'RollingAverage history length is expected '
f'to be greater than 2, got {length}')
self._maxlen = length
self._len = 0
self._cur = 0
self.avg = 0
self._vals = <double *>PyMem_Malloc(length * sizeof(double))
if self._vals == NULL:
raise MemoryError
for i in range(length):
self._vals[i] = 0
def add(self, double tick):
cdef:
double oldval
uint32_t oldlen
uint32_t newlen
if tick < 0:
tick = 0
self._cur += 1
if self._cur >= self._maxlen:
self._cur = 0
oldlen = self._len
if self._len < self._maxlen:
self._len += 1
newlen = self._len
oldval = self._vals[self._cur]
self._vals[self._cur] = tick
self.avg = ((self.avg * oldlen) - oldval + tick) / newlen
@1st1
Copy link
Author

1st1 commented Oct 10, 2018

#
# This source file is part of the EdgeDB open source project.
#
# Copyright 2016-present MagicStack Inc. and the EdgeDB authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


import math
import unittest

from edb.server2 import avg


class TestServerAvg(unittest.TestCase):

    def test_server_avg_01(self):
        a = avg.RollingAverage(3)

        a.add(1)
        self.assertTrue(math.isclose(a.avg, 1.0))

        a.add(1)
        self.assertTrue(math.isclose(a.avg, 1.0))

        a.add(1)
        self.assertTrue(math.isclose(a.avg, 1.0))

        for _ in range(100):
            a.add(1.1)
            a.add(1.1)
            a.add(1.1)

        self.assertTrue(math.isclose(a.avg, 1.1))

    def test_server_avg_02(self):
        a = avg.RollingAverage(3)
        self.assertEqual(a.avg, 0)

        for _ in range(100):
            a.add(0)

        self.assertEqual(a.avg, 0)

        a.add(1)
        self.assertTrue(math.isclose(a.avg, 1 / 3))

        a.add(2)
        self.assertTrue(math.isclose(a.avg, 3 / 3))

        a.add(3)
        self.assertTrue(math.isclose(a.avg, 6 / 3))

        a.add(5)
        self.assertTrue(math.isclose(a.avg, 10 / 3))

        a.add(1)
        self.assertTrue(math.isclose(a.avg, 9 / 3))

    def test_server_avg_03(self):
        a = avg.RollingAverage(5)

        self.assertEqual(a.avg, 0)

        a.add(1)
        self.assertTrue(math.isclose(a.avg, 1))

        a.add(2)
        self.assertTrue(math.isclose(a.avg, 3 / 2))

        a.add(3)
        self.assertTrue(math.isclose(a.avg, 6 / 3))

        a.add(5)
        self.assertTrue(math.isclose(a.avg, 11 / 4))

        a.add(5)
        self.assertTrue(math.isclose(a.avg, 16 / 5))

        a.add(5)
        self.assertTrue(math.isclose(a.avg, 20 / 5))

        # ticks < 0 will round to 0
        a.add(-5)
        self.assertTrue(math.isclose(a.avg, 18 / 5))
        a.add(-0.1)
        self.assertTrue(math.isclose(a.avg, 15 / 5))
        a.add(0)
        self.assertTrue(math.isclose(a.avg, 10 / 5))

    def test_server_avg_04(self):
        with self.assertRaisesRegex(ValueError,
                                    'expected to be greater than 2'):
            avg.RollingAverage(1)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment