Skip to content

Instantly share code, notes, and snippets.

@lemon24
Last active February 5, 2024 21:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lemon24/558955ad82ba2e4f50c0184c630c668c to your computer and use it in GitHub Desktop.
Save lemon24/558955ad82ba2e4f50c0184c630c668c to your computer and use it in GitHub Desktop.
Using model-based testing on a search sync implementation for https://github.com/lemon24/reader/issues/323
"""
...in which we use property-based / model-based testing to find
an eventually consistent way of synchronizing entries
from the main reader storage to an (unrelated) search index,
without coupling the storage and search with transactions / triggers.
A real model checker like TLA+ or Alloy may have been a better tool for this,
but I don't know how to use them.
This is the first step in https://github.com/lemon24/reader/issues/323.
---
Note that even the current implementation does not update the search index
inside a trigger / transaction, because this would keep the database
write-locked for too long.
Instead, in triggers[1] we only record which entries need to be updated.
In a later search update step, we pre-process entries (e.g. strip HTML)
outside of a transaction, and only use transactions to atomically:
1. write the entry to the search index
2. clear the "needs to be updated" flag
3. ensure entries were not updated during processing;
we use entry.ts for this[2], but the entry hash[3] or
a random uniquely associated with the entry version would work too
In a world where the storage and the search index are separate systems,
1-2-3 above cannot happen atomically (hence, this).
There are three events when an entry needs to be synced:
* update – the entry was added/updated to the storage;
also used when the title of a feed changes (not modeled here)
* delete – the entry was deleted from the storage
* move – the entry moved from one feed to another
(change_feed_url()), but its content has not changed;
equivalent to delete+update, and is only an optimization
The property we are testing for is that after an arbitrary number
of entry changes and (possibly concurrent / partial) syncs,
a final sync will make the storage and search index match.
---
Assuming that (1) Sync models things correctly and
(2) there are no exceedingly rare bugs Hypothesis couldn't find,
a correct solution looks like this:
* Each entry has a "sequence" that changes every time the entry changes
in a way the search index cares about. The sequence can be
a global counter, a random number, or a timestamp;
the only requirement is that it won't be used again
(or it is extremely unlikely that will happen).
* Each sequence change gets recorded. For changes to an existing entry,
an update+delete pair is recorded, with the old/new sequence, respectively.
* Some time later, search processes pending changes and deletes them.
For update, search checks the change sequence against the entry sequence;
if the sequences do not match, the change is ignored
(deleted without an update).
---
Prior versions of this that modeled storage and search in separate classes[4]
supported the move event. I decided not having it is acceptable,
since it produces the same amount of (re-)processing as feed title changes,
which right now aren't optimized in any way.
[1]: https://github.com/lemon24/reader/blob/3.11/src/reader/_search.py#L247-L404
[2]: https://github.com/lemon24/reader/blob/3.11/src/reader/_search.py#L695-L707
[3]: https://github.com/lemon24/reader/blob/3.11/src/reader/_types.py#L166-L168
[4]: https://gist.github.com/lemon24/558955ad82ba2e4f50c0184c630c668c/e3da12e7b16a4f4bf63ed0f81b86f5ea1f2b42cd
"""
import time
import random
from typing import NamedTuple
import pytest
import hypothesis.strategies as st
from hypothesis import *
from hypothesis.stateful import *
class Entry(NamedTuple):
text: str
seq: int
_seq = 1000
def next_seq():
global _seq
_seq += 1
return _seq
class Sync(RuleBasedStateMachine):
"""Model of things that can happen.
See https://hypothesis.readthedocs.io/en/latest/stateful.html
"""
def __init__(self):
super().__init__()
self.storage = {}
self.changes = {}
self.search = {}
@precondition(lambda self: not self.changes)
@invariant()
def in_sync(self):
"""The property: when all the changes have been processed,
the storage and the search index match.
"""
storage = [
(feed, id, entry.text)
for feed, entries in self.storage.items()
for id, entry in entries.items()
]
storage.sort()
search = [
(feed, id, text)
for (feed, id, _), text in self.search.items()
]
search.sort()
assert storage == search, str(self)
# storage methods
feeds = Bundle("feeds")
@rule(target=feeds, feed=st.integers(0, 3))
def add_feed(self, feed):
assume(feed not in self.storage)
self.storage[feed] = {}
return feed
@rule(feed=feeds, id=st.integers(0, 2), text=st.sampled_from('abc'))
def add_or_update_entry(self, feed, id, text, seq=None):
if seq is None:
seq = next_seq()
try:
old = self.storage[feed][id]
except KeyError:
old = None
self.storage[feed][id] = Entry(text, seq)
self.changes[feed, id, seq] = 'update'
if old:
self.changes[feed, id, old.seq] = 'delete'
@rule(feed=feeds, data=st.data())
def delete_entry(self, feed, data, *, id=None):
entries = self.storage.get(feed)
assume(entries)
if id is None:
id = data.draw(st.sampled_from(list(entries)), 'entry')
entry = entries.pop(id)
self.changes[feed, id, entry.seq] = 'delete'
@rule(feed=consumes(feeds))
def delete_feed(self, feed):
entries = self.storage.pop(feed)
if not entries:
return
for id, entry in entries.items():
self.changes[feed, id, entry.seq] = 'delete'
@rule(target=feeds, old=consumes(feeds), new=st.integers(0, 3))
def change_feed_url(self, old, new):
assume(new not in self.storage)
old_entries = self.storage.pop(old)
new_entries = self.storage[new] = {}
for id, old_entry in old_entries.items():
new_entry = new_entries[id] = Entry(old_entry.text, next_seq())
self.changes[new, id, new_entry.seq] = 'update'
self.changes[old, id, old_entry.seq] = 'delete'
return new
# search methods
@precondition(lambda self: self.changes)
@rule(random=st.randoms())
def update(self, random=random):
changes = list(self.changes.items())
random.shuffle(changes)
for change in changes:
self.do_change(change)
self.change_done(change)
# model interleaved updates to simulate concurrent synchronizations (?)
todo_changes = Bundle("todo_changes")
done_changes = Bundle("done_changes")
@precondition(lambda self: self.changes)
@rule(target=todo_changes, data=st.data())
def get_change(self, data, *, key=None):
if key is not None:
return key, self.changes[key]
return data.draw(st.sampled_from(list(self.changes.items())), 'change')
@rule(target=done_changes, change=consumes(todo_changes))
def do_change(self, change):
(feed, id, seq), action = change
getattr(self, 'do_' + action)(feed, id, seq)
return change
@rule(change=consumes(done_changes))
def change_done(self, change):
key, action = change
if self.changes.get(key) == action:
del self.changes[key]
def do_update(self, feed, id, seq):
try:
entry = self.storage[feed][id]
except KeyError:
return
if entry.seq != seq:
return
self.search[feed, id, seq] = entry.text
def do_delete(self, feed, id, seq):
try:
del self.search[feed, id, seq]
except KeyError:
pass
def __str__(self):
return '\n'.join(self._str_parts())
def _str_parts(self):
yield "---"
yield "storage:"
for feed, entries in sorted(self.storage.items()):
for id, entry in sorted(entries.items()):
yield f" {feed} {id} {entry.text!r:>6} {entry.seq}"
yield "search:"
for (feed, id, seq), text in sorted(self.search.items()):
yield f" {feed} {id} {text!r:>6} {seq}"
yield "changes:"
for (feed, id, seq), action in sorted(self.changes.items()):
yield f" {feed} {id} {action:>6} {seq}"
def test_late_change_of_reverted_move():
"""Fixed by changing entry seq on change_feed_url()."""
state = Sync()
state.add_feed(feed=1)
state.add_or_update_entry(feed=1, id=0, text='a', seq=123)
state.update()
state.change_feed_url(new=0, old=1)
change = state.get_change(None, key=(1, 0, 123))
state.change_feed_url(new=1, old=0)
state.update()
state.do_change(change=change)
# AssertionError: assert [(1, 0, 'a')] == []
state.in_sync()
def test_late_change_of_old_update():
"""Fixed by checking seq in do_delete()."""
state = Sync()
state.add_feed(feed=0)
state.add_or_update_entry(feed=0, id=0, text='a', seq=123)
change = state.get_change(None, key=(0, 0, 123))
state.add_or_update_entry(feed=0, id=0, text='b', seq=456)
state.update()
state.do_change(change=change)
# AssertionError: assert [(0, 0, 'a')] == [(0, 0, 'a'), (0, 0, 'a')]
state.in_sync()
test_sync = Sync.TestCase
test_sync.settings = settings(
# verbosity=Verbosity.debug,
suppress_health_check=[HealthCheck.filter_too_much],
max_examples=1000,
deadline=1000
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment