Skip to content

Instantly share code, notes, and snippets.

@zopyx
Created March 9, 2018 12:53
Show Gist options
  • Save zopyx/dd94316653b9adb89a34adea7c02df30 to your computer and use it in GitHub Desktop.
Save zopyx/dd94316653b9adb89a34adea7c02df30 to your computer and use it in GitHub Desktop.
Plone 4 -> Plone 5.1 migration via plone.restapi
import os
import sys
import yaml
import pprint
import base64
import requests
import plone.api
from requests.auth import HTTPBasicAuth
from Testing.makerequest import makerequest
from zope.component.hooks import setSite
from AccessControl.SecurityManagement import newSecurityManager
from Products.CMFCore.WorkflowCore import WorkflowException
CONFIG = None
FOLDER_MAPPING = {
'Folder': 'richfolder',
'Rich Document': 'richdocument',
'dynamore.seminarsdx.seminarfolder': 'dynamore.seminarsdx.seminarfolder',
}
def query_yes_no(question, default="yes"):
"""Ask a yes/no question via raw_input() and return their answer.
"question" is a string that is presented to the user.
"default" is the presumed answer if the user just hits <Enter>.
It must be "yes" (the default), "no" or None (meaning
an answer is required of the user).
The "answer" return value is True for "yes" or False for "no".
"""
valid = {"yes": True, "y": True, "ye": True,
"no": False, "n": False}
if default is None:
prompt = " [y/n] "
elif default == "yes":
prompt = " [Y/n] "
elif default == "no":
prompt = " [y/N] "
else:
raise ValueError("invalid default answer: '%s'" % default)
while True:
sys.stdout.write(question + prompt)
choice = raw_input().lower()
if default is not None and choice == '':
return valid[default]
elif choice in valid:
return valid[choice]
else:
sys.stdout.write("Please respond with 'yes' or 'no' "
"(or 'y' or 'n').\n")
def get_object_data(obj):
try:
review_state = plone.api.content.get_state(obj=obj)
except WorkflowException:
review_state = None
teaser_image = extract_lead_image(obj)
teaser_image_caption = extract_lead_image_caption(obj)
file_data = extract_file(obj)
image_data = extract_image(obj)
try:
text = obj.getText()
except AttributeError:
text = None
data = dict(
id=obj.getId(),
title=obj.Title(),
description=obj.Description(),
teaser_image=teaser_image,
teaser_image_caption=teaser_image_caption,
image=image_data,
review_state=review_state)
if text:
data['text'] = text
return data
def find_objects(root):
def _find_parents(node):
parents = []
current = node
running = True
while running:
current = current.aq_parent
if current.portal_type != 'Plone Site':
data = dict(
portal_type=current.portal_type,
review_state=plone.api.content.get_state(obj=current),
path='/'.join(current.getPhysicalPath()))
parents.append(data)
else:
running = False
return parents[::-1]
def _find_objects(node, result):
if not node.portal_type in result:
result[node.portal_type] = []
try:
review_state = plone.api.content.get_state(obj=node),
except WorkflowException:
review_state = None
result[node.portal_type].append(dict(
path='/'.join(node.getPhysicalPath()),
review_state=review_state,
parents=_find_parents(node),
portal_type=node.portal_type))
if node.portal_type in ('RichDocument', 'Folder', 'dynamore.seminarsdx.seminarfolder'):
for child in node.contentValues():
_find_objects(child, result)
try:
print 'Finding objects in {}'.format(root.absolute_url(1))
except:
import pdb
pdb.set_trace()
result = dict()
_find_objects(root, result)
print 'DONE - Finding objects in {}'.format(root.absolute_url(1))
return result
def old2new_path(path):
""" Replace old path with new path """
return path.replace(
'{0}'.format(CONFIG['old_site_id']),
'{0}'.format(CONFIG['new_site_id']))
def extract_lead_image(obj):
lead_image_data = {}
try:
schema = obj.Schema()
except AttributeError:
return {}
lead_image_field = schema.getField('leadImage')
lead_image = str(lead_image_field.get(obj))
if lead_image:
lead_image_data = {
'data': base64.b64encode(lead_image),
'encoding': 'base64',
'content-type': 'image/png'
}
return lead_image_data
def extract_lead_image_caption(obj):
try:
schema = obj.Schema()
except AttributeError:
return None
caption_field = schema.getField('leadImageCaption')
if caption_field:
return caption_field.get(obj)
def extract_file(obj):
field = obj.Schema().getField('file')
if not field:
return None
file_data = str(obj.getFile())
if file_data:
return {
'data': base64.b64encode(file_data),
'encoding': 'base64',
'content-type': field.get(obj).content_type,
'filename': field.get(obj).filename,
}
def extract_image(obj):
field = obj.Schema().getField('image')
if not field:
return
content_type = field.get(obj).content_type
if not content_type.startswith('image/'):
return
image_data = str(obj.getImage())
if image_data:
return {
'data': base64.b64encode(image_data),
'encoding': 'base64',
'content-type': content_type,
'filename': field.get(obj).filename,
}
def extract_image_dx(obj):
""" Extract image from dexterity content """
try:
image_data = str(obj.image.data)
except:
return None
if image_data:
return {
'data': base64.b64encode(image_data),
'encoding': 'base64',
'content-type': obj.image.contentType,
'filename': obj.image.filename,
}
def delete_path(path):
headers = {
'accept': 'application/json',
}
url = '{0}/{1}'.format(CONFIG['endpoint']['url'], path)
auth = HTTPBasicAuth(CONFIG['endpoint']['user'],
CONFIG['endpoint']['password'])
print 'delete', url
result = requests.delete(
url,
auth=auth,
headers=headers)
def publish_resource(resource_path, review_state):
if not review_state or review_state == 'private':
return
headers = {
'accept': 'application/json'
}
url = '{0}/{1}/@workflow/publish'.format(
CONFIG['endpoint']['url'], resource_path)
print 'publish: {0}'.format(resource_path)
auth = HTTPBasicAuth(CONFIG['endpoint']['user'],
CONFIG['endpoint']['password'])
result = requests.post(
url,
auth=auth,
headers=headers)
def add_user(resource_path, username, password, fullname, email, roles):
headers = {
'accept': 'application/json',
'content-type': 'application/json'
}
data = {
'username': username,
'password': password,
'fullname': fullname,
'roles': roles,
'email': email
}
url = '{0}/{1}/@users'.format(CONFIG['endpoint']['url'], resource_path)
print '+user {0}'.format(username)
auth = HTTPBasicAuth(CONFIG['endpoint']['user'],
CONFIG['endpoint']['password'])
result = requests.post(
url,
auth=auth,
headers=headers,
json=data)
if result.status_code not in (200, 201):
raise RuntimeError(result.text)
def create_ct(resource_path, portal_type, **kw):
try:
return _create_ct(resource_path, portal_type, **kw)
except Exception as e:
print 'ERROR: create_ct({0}, "{1}", {2})'.format(resource_path, portal_type, e)
import pdb; pdb.set_trace()
def _create_ct(resource_path, portal_type, **kw):
data = {'@type': portal_type}
data.update(**kw)
if not data.get('title'):
data['title'] = data['id']
data['title'] = data['title'].replace('\r\n', '')
headers = {
'accept': 'application/json',
'content-type': 'application/json'
}
url = '{0}/{1}'.format(CONFIG['endpoint']['url'], resource_path)
print '+{0}: {1}/{2}'.format(portal_type, resource_path, data['id'])
auth = HTTPBasicAuth(CONFIG['endpoint']['user'],
CONFIG['endpoint']['password'])
result = requests.post(
url,
auth=auth,
headers=headers,
json=data)
if result.status_code not in (200, 201):
raise RuntimeError(result.text)
def _remote_exists(folder_path):
url = '{0}/@@remote-exists?path={1}'.format(
CONFIG['endpoint']['url'], folder_path)
auth = HTTPBasicAuth(CONFIG['endpoint']['user'],
CONFIG['endpoint']['password'])
result = requests.get(
url,
auth=auth)
# print 'remote_exists', url, result.status_code
return result.status_code in [200]
def recreate_remote_plone_site():
url = '{0}/@@recreate-plone-site'.format(CONFIG['endpoint']['url'])
auth = HTTPBasicAuth(CONFIG['endpoint']['user'],
CONFIG['endpoint']['password'])
data = {
'site_id': CONFIG['new_site_id'],
'extension_ids': CONFIG['extension_ids']
}
result = requests.post(
url,
auth=auth,
json=data)
assert result.status_code == 201
def _migrate_content(hierarchy_path):
all_objects = find_objects(SITE.restrictedTraverse(hierarchy_path))
all_items = (all_objects.get('RichDocument', []) +
all_objects.get('File', []) +
all_objects.get('Link', []) +
all_objects.get('ImageAttachment', []) +
all_objects.get('FileAttachment', []) +
all_objects.get('dynamore.seminarsdx.person', []) +
all_objects.get('dynamore.seminarsdx.location', []) +
all_objects.get('dynamore.seminarsdx.seminarfolder', []) +
all_objects.get('dynamore.seminarsdx.seminar', []) +
all_objects.get('Image', []))
paths_created = []
for d in all_items:
obj_path = d['path']
if obj_path in paths_created:
continue
obj_portal_type = d['portal_type']
obj = SITE.restrictedTraverse(obj_path)
try:
review_state = plone.api.content.get_state(obj=obj)
except WorkflowException:
review_state = None
for parent in d['parents']:
if parent['path'] in paths_created:
continue
parent_exists = _remote_exists(parent['path'])
if parent_exists:
continue
parent_obj = SITE.restrictedTraverse(parent['path'])
parent_data = get_object_data(parent_obj)
components = parent['path'].split('/')
resource_path = '/'.join(components[1:-1]) # no site root, no id
paths_created.append(parent['path'])
print 'Creating PARENT'
pt = FOLDER_MAPPING[parent['portal_type']]
if parent_data['id'] in ('seminar_teacher', 'seminar-locations', 'seminar_orte'):
pt = 'dynamore.seminarsdx.seminarfolder'
create_ct(
resource_path,
pt,
**parent_data)
last_parent = d['parents'][-1]
components = last_parent['path'].split('/')
resource_path= '/'.join(components[1:]) # no site root, no id
paths_created.append(obj_path)
print 'CREATING: {0} {1}/{2}'.format(obj.portal_type, resource_path, obj.getId())
if obj.portal_type == 'RichDocument':
teaser_image = extract_lead_image(obj)
teaser_image_caption = extract_lead_image_caption(obj)
create_ct(
resource_path,
'richdocument',
**get_object_data(obj))
elif obj.portal_type in ('File', 'FileAttachment'):
file_data = extract_file(obj)
if file_data:
create_ct(
resource_path,
'File',
id=obj.getId(),
title=obj.Title(),
description=obj.Description(),
file=file_data)
elif obj.portal_type in ('Image', 'ImageAttachment'):
image_data = extract_image(obj)
if image_data:
create_ct(
resource_path,
'Image',
id=obj.getId(),
title=obj.Title(),
description=obj.Description(),
image=image_data)
elif obj.portal_type == 'Link':
create_ct(
resource_path,
'Link',
id=obj.getId(),
title=obj.Title(),
description=obj.Description(),
remoteUrl=obj.getRemoteUrl())
elif obj.portal_type == 'dynamore.seminarsdx.seminarfolder':
create_ct(
resource_path,
'dynamore.seminarsdx.seminarfolder',
id=obj.getId(),
title=obj.Title(),
description=obj.Description(),
year_min=obj.year_min,
year_max=obj.year_max)
elif obj.portal_type == 'dynamore.seminarsdx.person':
image_data = extract_image_dx(obj)
create_ct(
resource_path,
'dynamore.seminarsdx.person',
id=obj.getId(),
title=obj.Title(),
description=obj.Description(),
image=image_data,
salutation=obj.salutation,
firstname=obj.firstname,
lastname=obj.lastname,
text=obj.text.raw,
role=obj.role,
year_max=obj.year_max)
elif obj.portal_type == 'dynamore.seminarsdx.seminar':
create_ct(
resource_path,
'dynamore.seminarsdx.seminar',
id=obj.getId(),
title=obj.Title(),
description=obj.Description(),
title_long=obj.title_long,
text=obj.text.raw,
seminar_code=obj.seminar_code,
seminar_type=obj.seminar_type,
affiliation=obj.affiliation,
image=extract_image_dx(obj),
is_seminar_master=obj.is_seminar_master,
price =obj.price,
currency=obj.currency,
sender_email=obj.sender_email,
notification_email=obj.notification_email,
available_lecturers=obj.available_lecturers,
available_locations=obj.available_locations,
# table=obj.table
)
elif obj.portal_type == 'dynamore.seminarsdx.location':
if obj.text:
text = obj.text.raw
else:
text = None
create_ct(
resource_path,
'dynamore.seminarsdx.location',
id=obj.getId(),
title=obj.Title(),
description=obj.Description(),
text=text
)
else:
print 'UNHANDLED', obj.portal_type
publish_resource(resource_path+ '/' + obj.getId(), review_state)
def migrate_hierarchy(hierarchy_path):
delete_path(old2new_path(hierarchy_path))
_migrate_content(hierarchy_path)
def migrate_users():
""" Migrate all users """
passwords = SITE.acl_users.source_users._user_passwords
for d in plone.api.user.get_users():
user = plone.api.user.get(d.id)
if not user:
continue
pw = passwords.get(d.id)
email = user.getProperty('email')
roles = user.getRoles()
fullname = user.getProperty('fullname')
add_user(old2new_path(CONFIG['new_site_id']), d.id, pw, fullname, email, roles)
def migrate_persons(hierarchy_path):
from dynamore.contact import config
delete_path(hierarchy_path)
resource_path, folder_id = hierarchy_path.rsplit('/', 1)
create_ct(resource_path, 'richfolder', id=folder_id, title='Persons')
user_ids_seen = []
for organization, users in config._users.items():
print organization
for user_id, user in users.items():
if user_id in user_ids_seen:
continue
user_ids_seen.append(user_id)
title = u'{0}, {1}'.format(
user.get('surname', ''), user.get('forename', ''))
keywords = filter(None, user.get('keywords', []))
contacts = []
for name in ('phone', 'mobile', 'fax'):
if name in user:
contacts.append(dict(
number_type=name,
country_code=user[name].dict['intnr'],
area_code=user[name].dict['cnr'],
number=user[name].dict['nr']))
create_ct(
hierarchy_path,
'dynaperson',
id=user_id,
title=title,
firstname=user.get('forename', ''),
lastname=user.get('surname', ''),
organizations=[organization],
email=user.get('mail', ''),
keywords=keywords,
contacts=contacts
)
def migrate_locations(hierarchy_path):
from dynamore.contact import config
delete_path(hierarchy_path)
resource_path, folder_id = hierarchy_path.rsplit('/', 1)
create_ct(resource_path, 'richfolder', id=folder_id, title='Locations')
for location_id, location in config._establishments.items():
title = u'{0}, {1}'.format(
user.get('surname', ''), user.get('forename', ''))
contacts = []
for name in ('phone', 'mobile', 'fax'):
if name in location:
contacts.append(dict(
number_type=name,
country_code=location[name].dict['intnr'],
area_code=location[name].dict['cnr'],
number=location[name].dict['nr']))
try:
address1 = location['address'][0]
except IndexError:
address1 = ''
try:
address2 = location['address'][1]
except IndexError:
address2 = ''
try:
address3 = location['address'][2]
except IndexError:
address3 = ''
create_ct(
hierarchy_path,
'dynalocation',
id=location_id,
title=location['title'],
address1=address1,
address2=address2,
address3=address3,
short_title=location['shorttitle'],
organizations=[],
contacts=contacts
)
if __name__ == '__main__':
yaml_fn = os.path.abspath(sys.argv[-1])
print 'Reading {0}'.format(yaml_fn)
if not os.path.exists(yaml_fn):
raise IOError('Migration configuration {0} not found'.format(yaml_fn))
with open(yaml_fn, 'rb') as fp:
CONFIG = yaml.load(fp)
pprint.pprint(CONFIG)
# setup Plone site and security context
SITE = getattr(app, CONFIG['old_site_id'])
SITE = makerequest(SITE)
setSite(SITE)
user = app.acl_users.getUser(CONFIG['local_admin_user'])
if not user:
raise ValueError('No admin account "{0}" found'.format(
CONFIG['local_admin_user']))
print 'Setting new security context'
newSecurityManager(None, user.__of__(app.acl_users))
# pre-check
for path in CONFIG['migrate_folders']:
print 'Precheck....{0}'.format(path)
folder = SITE.restrictedTraverse(path, None)
if folder is None:
raise ValueError('Folder {0} does not exist'.format(path))
if query_yes_no('Clear and recreate remote Plone site?'):
recreate_remote_plone_site()
CATALOG = plone.api.portal.get_tool('portal_catalog')
for remote_folder in CONFIG['initial_remote_remove'] or []:
print 'Removing remote folder: {0}'.format(remote_folder)
delete_path(old2new_path(remote_folder))
for remote_folder in CONFIG['initial_create_folders'] or []:
print 'Creating remote folder: {0}'.format(remote_folder)
resource_path, folder_id = remote_folder.rsplit('/', 1)
create_ct(old2new_path(resource_path), 'Folder', id=folder_id)
# Plone accounts
if CONFIG['migrate_users']:
migrate_users()
# `Person` objects
if CONFIG['dynamore_contacts_migrate']:
migrate_persons(CONFIG['dynamore_contacts_folder'])
# `Location` objects
if CONFIG['dynamore_locations_migrate']:
migrate_locations(CONFIG['dynamore_locations_folder'])
for path in CONFIG['migrate_folders']:
print '#' * 80
print 'Migrating {0}'.format(path)
print '#' * 80
migrate_hierarchy(path)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment