Skip to content

Instantly share code, notes, and snippets.

@1st1
Created October 10, 2018 19:27
Show Gist options
  • Save 1st1/eee88be8735f27b0c523584f9b4cf1c3 to your computer and use it in GitHub Desktop.
Save 1st1/eee88be8735f27b0c523584f9b4cf1c3 to your computer and use it in GitHub Desktop.
LRUIndex & MappedDeque datastructures
#
# This source file is part of the EdgeDB open source project.
#
# Copyright 2016-present MagicStack Inc. and the EdgeDB authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import collections
class MappedDeque:
"""A deque-like object with an O(1) discard operation."""
def __init__(self, source=None):
if source is None:
self._list = collections.OrderedDict()
else:
self._list = collections.OrderedDict.fromkeys(source)
def __contains__(self, item):
return item in self._list
def __getitem__(self, item):
return self._list[item]
def discard(self, item):
try:
self._list.pop(item)
except KeyError:
raise LookupError(f'{item!r} is not in {self!r}') from None
def append(self, item, val=None):
if item in self._list:
raise ValueError(f'{item!r} is already in the list {self!r}')
self._list[item] = val
def pop(self):
item, _ = self._list.popitem(last=True)
return item
def popleft(self):
item, _ = self._list.popitem(last=False)
return item
def popleftitem(self):
return self._list.popitem(last=False)
def __len__(self):
return len(self._list)
def __bool__(self):
return bool(self._list)
def __iter__(self):
return iter(self._list)
def __repr__(self):
return f'<{type(self).__name__} {list(self)!r} {id(self):#x}>'
class LRUIndex:
"""A multidict-like mapping with internal LRU lists.
Key properties:
* One key can be mapped to a list of objects.
* Objects must be unique for the entire mapping. It's an error
if two different keys point to one object, or of one object is
referenced by the same key more than once.
* Every key maps to a LIFO list of objects internally. LIFO
is essential to make the global LRU list of connections work:
if a DB has too many open connections some of them will become
unused for long enough period of time to be closed.
* There's a global LIFO list of objects, accessible via the
"lru()" method. The "count()" method returns the total number
of objects in the index.
"""
def __init__(self):
self._index = {}
self._lru_list = MappedDeque()
def pop(self, key):
try:
items = self._index[key]
except KeyError:
return None
o = items.pop()
self._lru_list.discard(o)
if not items:
del self._index[key]
return o
def append(self, key, o):
if o in self._lru_list:
raise ValueError(f'{key!r}:{o!r} is already in the index {self!r}')
try:
items = self._index[key]
except KeyError:
items = self._index[key] = MappedDeque()
items.append(o)
self._lru_list.append(o, key)
def popleft(self):
o, key = self._lru_list.popleftitem()
items = self._index[key]
items.discard(o)
if not items:
del self._index[key]
return o
def discard(self, o):
try:
key = self._lru_list[o]
except KeyError:
return False
items = self._index[key]
items.discard(o)
self._lru_list.discard(o)
if not items:
del self._index[key]
return True
def count_keys(self):
return len(self._index)
def count(self):
return len(self._lru_list)
def lru(self):
return iter(self._lru_list)
@1st1
Copy link
Author

1st1 commented Oct 10, 2018

#
# This source file is part of the EdgeDB open source project.
#
# Copyright 2016-present MagicStack Inc. and the EdgeDB authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


import asyncio
import dataclasses
import random
import time

from edb.server2 import pgpool
from edb.server2 import taskgroup
from edb.server import _testbase as tb


@dataclasses.dataclass(frozen=True)
class Named:
    name: str


class TestExc(Exception):
    pass


class TestLRUIndex(tb.TestCase):

    def test_lru_index_1(self):
        idx = pgpool.LRUIndex()

        idx.append(1, '1')
        idx.append(2, '2')
        idx.append(1, '11')
        idx.append(2, '22')
        idx.append(1, '111')

        self.assertEqual(idx.count_keys(), 2)

        self.assertEqual(idx.pop(1), '111')
        self.assertEqual(idx.pop(1), '11')
        self.assertEqual(idx.pop(2), '22')

        idx.append(1, '11')
        self.assertEqual(idx.pop(1), '11')
        self.assertEqual(idx.pop(2), '2')
        self.assertEqual(idx.count_keys(), 1)
        self.assertEqual(idx.pop(1), '1')
        self.assertEqual(idx.count_keys(), 0)

        self.assertIsNone(idx.pop(2))
        self.assertIsNone(idx.pop(1))

        self.assertEqual(idx.count_keys(), 0)

    def test_lru_index_2(self):
        idx = pgpool.LRUIndex()

        idx.append(1, '1')
        idx.append(2, '2')
        idx.append(1, '11')
        idx.append(2, '22')
        idx.append(1, '111')

        self.assertTrue(idx.discard('11'))
        self.assertFalse(idx.discard('11'))
        self.assertFalse(idx.discard('11'))

        self.assertEqual(idx.pop(1), '111')
        self.assertEqual(idx.pop(2), '22')

        idx.append(1, '11')
        self.assertEqual(idx.pop(1), '11')
        self.assertEqual(idx.pop(2), '2')
        self.assertEqual(idx.pop(1), '1')

        self.assertIsNone(idx.pop(2))
        self.assertIsNone(idx.pop(1))

        self.assertEqual(idx.count_keys(), 0)

    def test_lru_index_3(self):
        idx = pgpool.LRUIndex()

        o1 = Named('o1')
        o11 = Named('o11')

        idx.append(1, o1)
        idx.append(1, o11)

        with self.assertRaisesRegex(ValueError, 'already in the index'):
            idx.append(1, o1)

        self.assertTrue(idx.discard(o1))
        self.assertFalse(idx.discard(o1))

        idx.append(1, o1)
        self.assertIs(idx.pop(1), o1)
        self.assertIs(idx.pop(1), o11)

        self.assertEqual(idx.count_keys(), 0)

        self.assertFalse(idx.discard(o1))

    def test_lru_index_4(self):
        idx = pgpool.LRUIndex()

        o1 = Named('o1')
        o11 = Named('o11')
        o111 = Named('o111')
        o2 = Named('o2')

        idx.append(1, o1)
        idx.append(1, o11)
        idx.append(1, o111)
        idx.append(2, o2)

        self.assertEqual(list(idx.lru()), [o1, o11, o111, o2])
        self.assertEqual(idx.count(), 4)

        self.assertIs(idx.pop(1), o111)
        self.assertIs(idx.pop(1), o11)
        self.assertIs(idx.pop(2), o2)
        self.assertEqual(list(idx.lru()), [o1])
        self.assertEqual(idx.count(), 1)

        idx.append(1, o111)
        idx.append(1, o11)

        self.assertEqual(list(idx.lru()), [o1, o111, o11])
        self.assertEqual(idx.count(), 3)

        idx.append(2, o2)

        self.assertIs(idx.pop(1), o11)
        self.assertEqual(list(idx.lru()), [o1, o111, o2])
        self.assertEqual(idx.count(), 3)

        idx.discard(o111)
        self.assertEqual(list(idx.lru()), [o1, o2])
        self.assertEqual(idx.count(), 2)

        self.assertIs(idx.popleft(), o1)
        self.assertEqual(list(idx.lru()), [o2])
        self.assertEqual(idx.count(), 1)

        self.assertIs(idx.pop(2), o2)
        self.assertEqual(list(idx.lru()), [])
        self.assertEqual(idx.count(), 0)


class TestMappedDeque(tb.TestCase):

    def test_mapped_deque_1(self):
        lst = pgpool.MappedDeque()

        o1 = Named('o1')
        o2 = Named('o2')
        o3 = Named('o3')
        o4 = Named('o4')

        lst.append(o1)
        lst.append(o2)
        lst.append(o3)
        lst.append(o4)

        self.assertEqual(list(lst), [o1, o2, o3, o4])

        lst.discard(o2)
        self.assertEqual(list(lst), [o1, o3, o4])

        self.assertIn(o1, lst)
        self.assertNotIn(o2, lst)

        self.assertEqual(lst.popleftitem(), (o1, None))
        self.assertEqual(list(lst), [o3, o4])

        with self.assertRaisesRegex(ValueError, 'already in the list'):
            lst.append(o3)

        lst.append(o1)
        self.assertEqual(list(lst), [o3, o4, o1])

        with self.assertRaises(LookupError):
            lst.discard(o2)

        self.assertEqual(len(lst), 3)
        self.assertTrue(bool(lst))

        self.assertIs(lst.popleft(), o3)
        self.assertIs(lst.pop(), o1)
        self.assertIs(lst.pop(), o4)

        self.assertEqual(list(lst), [])
        self.assertEqual(len(lst), 0)
        self.assertFalse(bool(lst))

        with self.assertRaises(KeyError):
            lst.pop()
        with self.assertRaises(KeyError):
            lst.popleft()

    def test_mapped_deque_2(self):
        orig = [1, 2, 3]
        lst = pgpool.MappedDeque(orig)
        self.assertEqual(list(lst), [1, 2, 3])
        orig.pop()
        self.assertEqual(list(lst), [1, 2, 3])

    def test_mapped_deque_3(self):
        lst = pgpool.MappedDeque()
        lst.append(1, '1')
        self.assertEqual(lst[1], '1')

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment