Skip to content

Instantly share code, notes, and snippets.

@eprigorodov
Last active April 10, 2018 19:29
Show Gist options
  • Save eprigorodov/b0311434f1381ab08b163682f3d9ba10 to your computer and use it in GitHub Desktop.
Save eprigorodov/b0311434f1381ab08b163682f3d9ba10 to your computer and use it in GitHub Desktop.
Race scenario for andbag-zcatalog-caching branch of zopefoundation/Products.ZCatalog
from __future__ import print_function
from persistent import Persistent
from threading import current_thread, Event, local, Lock, Thread
import sys
import transaction
from ZODB import DB
from ZODB.DemoStorage import DemoStorage
from ZODB.POSException import ConflictError
from Products.PluginIndexes.FieldIndex.FieldIndex import FieldIndex
from Products.ZCatalog.ZCatalog import ZCatalog
try:
from Products.ZCatalog.cache import CatalogCacheKey
except ImportError:
CatalogCacheKey = None
thread_locals = local()
print_lock = Lock()
def synced_print(*args, **kwargs):
try:
print_lock.acquire()
print(current_thread().name, *args, **kwargs)
sys.stdout.flush()
finally:
print_lock.release()
def publish(method, args):
for i in range(3):
try:
transaction.begin()
method(*args)
synced_print('commiting transaction')
transaction.commit()
except ConflictError:
synced_print('ConflictError!')
transaction.abort()
continue
except:
transaction.abort()
raise
else:
break
class DataObject(Persistent):
def __init__(self, object_id, attribute):
self.id = object_id
self.attribute = attribute
@classmethod
def catalog_new_object(cls, app, object_id, value):
obj = cls(object_id, value)
setattr(app, object_id, obj)
app.catalog.catalog_object(obj, uid=object_id)
def handle_request(object_id, value, events=[]):
global query, thread_locals
app = thread_locals.app
cat = app.catalog
if events:
events.pop(0).wait()
synced_print('indexing new object:', object_id)
DataObject.catalog_new_object(app, object_id, value)
if events:
events.pop(0).wait()
if CatalogCacheKey is not None:
key = CatalogCacheKey(cat._catalog, query).make_key(query)
synced_print('verifying indexing results, cache key:', key)
else:
synced_print('verifying indexing results')
indexed_ids = {rec.id for rec in cat(**query)}
if object_id not in indexed_ids:
# the leaking cache will already fail in the set comprehension above,
# because it will return a result set that references an ID of a document
# which was not indexed yet in this thread
synced_print('new object has not been indexed:', indexed_ids)
raise ValueError(object_id)
class MockZopeThread(Thread):
def __init__(self, db, *args, **kwargs):
self.db = db
super(MockZopeThread, self).__init__(*args, **kwargs)
def run(self):
thread_locals.conn = conn = self.db.open()
thread_locals.app = conn.root()
super(MockZopeThread, self).run()
conn.close()
query = {'attribute': {'query': 'A', 'range': 'min'}}
if __name__ == '__main__':
db = DB(DemoStorage())
# single-threaded setup
conn = db.open()
app = conn.root()
cat = app.catalog = ZCatalog('catalog')
cat.addIndex('attribute', FieldIndex('attribute'))
cat.addColumn('id')
DataObject.catalog_new_object(app, 'object0', 'A')
transaction.commit()
print('catalog contents:', {rec.id for rec in cat(**query)})
# run concurrent threads and orchestrate a cache overwrite
thread1_proceed_to_index = e1 = Event()
thread2_proceed_to_overwrite = e2 = Event()
thread1_proceed_to_fail = e3 = Event()
t1 = MockZopeThread(db, target=publish,
args=(handle_request, ('object1', 'B', [e1, e3])))
t2 = MockZopeThread(db, target=publish,
args=(handle_request, ('object2', 'C', [e2])))
t1.start()
t2.start()
thread1_proceed_to_index.set()
thread2_proceed_to_overwrite.set()
t2.join()
thread1_proceed_to_fail.set()
t1.join()
txn = transaction.begin()
print('catalog contents:', {rec.id for rec in cat(**query)})
conn.close()
db.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment