Last active
April 10, 2018 19:29
-
-
Save eprigorodov/b0311434f1381ab08b163682f3d9ba10 to your computer and use it in GitHub Desktop.
Race scenario for andbag-zcatalog-caching branch of zopefoundation/Products.ZCatalog
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
from persistent import Persistent | |
from threading import current_thread, Event, local, Lock, Thread | |
import sys | |
import transaction | |
from ZODB import DB | |
from ZODB.DemoStorage import DemoStorage | |
from ZODB.POSException import ConflictError | |
from Products.PluginIndexes.FieldIndex.FieldIndex import FieldIndex | |
from Products.ZCatalog.ZCatalog import ZCatalog | |
try: | |
from Products.ZCatalog.cache import CatalogCacheKey | |
except ImportError: | |
CatalogCacheKey = None | |
thread_locals = local() | |
print_lock = Lock() | |
def synced_print(*args, **kwargs): | |
try: | |
print_lock.acquire() | |
print(current_thread().name, *args, **kwargs) | |
sys.stdout.flush() | |
finally: | |
print_lock.release() | |
def publish(method, args): | |
for i in range(3): | |
try: | |
transaction.begin() | |
method(*args) | |
synced_print('commiting transaction') | |
transaction.commit() | |
except ConflictError: | |
synced_print('ConflictError!') | |
transaction.abort() | |
continue | |
except: | |
transaction.abort() | |
raise | |
else: | |
break | |
class DataObject(Persistent): | |
def __init__(self, object_id, attribute): | |
self.id = object_id | |
self.attribute = attribute | |
@classmethod | |
def catalog_new_object(cls, app, object_id, value): | |
obj = cls(object_id, value) | |
setattr(app, object_id, obj) | |
app.catalog.catalog_object(obj, uid=object_id) | |
def handle_request(object_id, value, events=[]): | |
global query, thread_locals | |
app = thread_locals.app | |
cat = app.catalog | |
if events: | |
events.pop(0).wait() | |
synced_print('indexing new object:', object_id) | |
DataObject.catalog_new_object(app, object_id, value) | |
if events: | |
events.pop(0).wait() | |
if CatalogCacheKey is not None: | |
key = CatalogCacheKey(cat._catalog, query).make_key(query) | |
synced_print('verifying indexing results, cache key:', key) | |
else: | |
synced_print('verifying indexing results') | |
indexed_ids = {rec.id for rec in cat(**query)} | |
if object_id not in indexed_ids: | |
# the leaking cache will already fail in the set comprehension above, | |
# because it will return a result set that references an ID of a document | |
# which was not indexed yet in this thread | |
synced_print('new object has not been indexed:', indexed_ids) | |
raise ValueError(object_id) | |
class MockZopeThread(Thread): | |
def __init__(self, db, *args, **kwargs): | |
self.db = db | |
super(MockZopeThread, self).__init__(*args, **kwargs) | |
def run(self): | |
thread_locals.conn = conn = self.db.open() | |
thread_locals.app = conn.root() | |
super(MockZopeThread, self).run() | |
conn.close() | |
query = {'attribute': {'query': 'A', 'range': 'min'}} | |
if __name__ == '__main__': | |
db = DB(DemoStorage()) | |
# single-threaded setup | |
conn = db.open() | |
app = conn.root() | |
cat = app.catalog = ZCatalog('catalog') | |
cat.addIndex('attribute', FieldIndex('attribute')) | |
cat.addColumn('id') | |
DataObject.catalog_new_object(app, 'object0', 'A') | |
transaction.commit() | |
print('catalog contents:', {rec.id for rec in cat(**query)}) | |
# run concurrent threads and orchestrate a cache overwrite | |
thread1_proceed_to_index = e1 = Event() | |
thread2_proceed_to_overwrite = e2 = Event() | |
thread1_proceed_to_fail = e3 = Event() | |
t1 = MockZopeThread(db, target=publish, | |
args=(handle_request, ('object1', 'B', [e1, e3]))) | |
t2 = MockZopeThread(db, target=publish, | |
args=(handle_request, ('object2', 'C', [e2]))) | |
t1.start() | |
t2.start() | |
thread1_proceed_to_index.set() | |
thread2_proceed_to_overwrite.set() | |
t2.join() | |
thread1_proceed_to_fail.set() | |
t1.join() | |
txn = transaction.begin() | |
print('catalog contents:', {rec.id for rec in cat(**query)}) | |
conn.close() | |
db.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment