Skip to content

Instantly share code, notes, and snippets.

@maethu
Created March 17, 2022 18:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save maethu/7c1dec3489d68f71b0b0b10442cf23a8 to your computer and use it in GitHub Desktop.
Save maethu/7c1dec3489d68f71b0b0b10442cf23a8 to your computer and use it in GitHub Desktop.
from argparse import ArgumentParser
from plone import api
from plone.app.contenttypes.content import Document
from Products.CMFPlone.factory import _DEFAULT_PROFILE
from Products.CMFPlone.factory import addPloneSite
from z3c.relationfield import RelationValue
from zope.component import getUtility
from zope.intid.interfaces import IIntIds
import sys
import transaction
class Detective:
def __init__(self, plone):
self.plone = plone
def pack(self):
self.plone._p_jar.db().pack()
def assert_relations(self, amount):
msg = 'Invalid amount of RelationValues found'
assert len(self.get_content_from_storage(RelationValue)) == amount, msg
def assert_documents(self, amount):
msg = 'Invalid amount of Documents found'
assert len(self.get_content_from_storage(Document)) == amount, msg
def create_content_with_relation(self):
intids = getUtility(IIntIds)
document_a = api.content.create(
container=self.plone,
type='Document',
id='document_a',
title='Document A'
)
document_b = api.content.create(
container=self.plone,
type='Document',
id='document_b',
title='Document B',
relatedItems=[RelationValue(intids.getId(document_a)), ]
)
transaction.commit()
return document_a, document_b
def remove_document(self, document):
api.content.delete(document, check_linkintegrity=True)
transaction.commit()
def get_content_from_storage(self, klass):
result = []
for tx in self.plone._p_jar.db()._storage.iterator():
for record in tx:
obj = self.plone._p_jar[record.oid]
if isinstance(obj, klass):
result.append(obj)
return result
def main(app, siteid=None):
if not siteid:
raise ValueError('Please provide a siteid for the plone site')
if siteid in app.objectIds():
app.manage_delObjects(ids=[siteid, ])
transaction.commit()
plone = addPloneSite(
app,
siteid,
title=siteid,
profile_id=_DEFAULT_PROFILE,
setup_content=False,
default_language='en',
portal_timezone='UTC',
)
detective = Detective(plone)
detective.pack()
detective.assert_relations(amount=0)
detective.assert_documents(amount=0)
document_a, document_b = detective.create_content_with_relation()
detective.assert_relations(amount=1)
detective.assert_documents(amount=4) # api.content.create does a rename (both documents are twice stored)
detective.remove_document(document_b)
detective.pack()
# This will break if there are relations left
detective.assert_relations(amount=0)
detective.assert_documents(amount=2)
print('All good - no relation found in DB')
if __name__ == '__main__':
sys.argv = sys.argv[2:]
parser = ArgumentParser(prog='Prove relation value issue')
parser.add_argument(
'-s',
'--siteid',
type=str,
default='',
help='Plone site ID.'
)
args = parser.parse_args()
main(app, args.siteid)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment