-
-
Save maethu/7c1dec3489d68f71b0b0b10442cf23a8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from argparse import ArgumentParser | |
from plone import api | |
from plone.app.contenttypes.content import Document | |
from Products.CMFPlone.factory import _DEFAULT_PROFILE | |
from Products.CMFPlone.factory import addPloneSite | |
from z3c.relationfield import RelationValue | |
from zope.component import getUtility | |
from zope.intid.interfaces import IIntIds | |
import sys | |
import transaction | |
class Detective: | |
def __init__(self, plone): | |
self.plone = plone | |
def pack(self): | |
self.plone._p_jar.db().pack() | |
def assert_relations(self, amount): | |
msg = 'Invalid amount of RelationValues found' | |
assert len(self.get_content_from_storage(RelationValue)) == amount, msg | |
def assert_documents(self, amount): | |
msg = 'Invalid amount of Documents found' | |
assert len(self.get_content_from_storage(Document)) == amount, msg | |
def create_content_with_relation(self): | |
intids = getUtility(IIntIds) | |
document_a = api.content.create( | |
container=self.plone, | |
type='Document', | |
id='document_a', | |
title='Document A' | |
) | |
document_b = api.content.create( | |
container=self.plone, | |
type='Document', | |
id='document_b', | |
title='Document B', | |
relatedItems=[RelationValue(intids.getId(document_a)), ] | |
) | |
transaction.commit() | |
return document_a, document_b | |
def remove_document(self, document): | |
api.content.delete(document, check_linkintegrity=True) | |
transaction.commit() | |
def get_content_from_storage(self, klass): | |
result = [] | |
for tx in self.plone._p_jar.db()._storage.iterator(): | |
for record in tx: | |
obj = self.plone._p_jar[record.oid] | |
if isinstance(obj, klass): | |
result.append(obj) | |
return result | |
def main(app, siteid=None): | |
if not siteid: | |
raise ValueError('Please provide a siteid for the plone site') | |
if siteid in app.objectIds(): | |
app.manage_delObjects(ids=[siteid, ]) | |
transaction.commit() | |
plone = addPloneSite( | |
app, | |
siteid, | |
title=siteid, | |
profile_id=_DEFAULT_PROFILE, | |
setup_content=False, | |
default_language='en', | |
portal_timezone='UTC', | |
) | |
detective = Detective(plone) | |
detective.pack() | |
detective.assert_relations(amount=0) | |
detective.assert_documents(amount=0) | |
document_a, document_b = detective.create_content_with_relation() | |
detective.assert_relations(amount=1) | |
detective.assert_documents(amount=4) # api.content.create does a rename (both documents are twice stored) | |
detective.remove_document(document_b) | |
detective.pack() | |
# This will break if there are relations left | |
detective.assert_relations(amount=0) | |
detective.assert_documents(amount=2) | |
print('All good - no relation found in DB') | |
if __name__ == '__main__': | |
sys.argv = sys.argv[2:] | |
parser = ArgumentParser(prog='Prove relation value issue') | |
parser.add_argument( | |
'-s', | |
'--siteid', | |
type=str, | |
default='', | |
help='Plone site ID.' | |
) | |
args = parser.parse_args() | |
main(app, args.siteid) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment