Skip to content

Instantly share code, notes, and snippets.

@zopyx
Created June 2, 2019 08:46
Show Gist options
  • Save zopyx/8905a7c161f4e0c4578ded6670febb89 to your computer and use it in GitHub Desktop.
Save zopyx/8905a7c161f4e0c4578ded6670febb89 to your computer and use it in GitHub Desktop.
#-*- coding: utf-8 -*-
# This scripts imports a JSON export made with `collective.jsonify` into
# ArangoDB (database: ugent, collection: portal by default)
import os
import json
import time
import pprint
import traceback
import datetime
import itertools
import argparse
import dateparser
import tqdm
import yaml
import furl
import attrdict
import requests
from requests.auth import HTTPBasicAuth
from arango import ArangoClient
from .pfg import PFGMigrator
from .topic import TopicMigrator
# folderish portal type for which we reconstruct the initial hierarchy in phase
# 1
FOLDERISH_PT = ["Folder"]
# portal types that are not processed or just ignored because they are
# subobjects e.g. of FormFolder or Collection
IGNORED_TYPES = [
"FormMailerAdapter",
"FormRichLabelField" "FormSaveDataAdapter",
"FormStringField",
"FormTextField",
"FormThanksPage",
]
# list of ignored or obsolete permissions
IGNORED_PERMISSIONS = [
'Change portal events'
]
# portal types for which we have actually a migration as primary objects
PROCESSED_TYPES = [
"Document",
"News Item",
"Link",
"File",
"Image",
"FormFolder",
"Topic",
"Event",
"LibraryDocument",
"Vacancy"
]
# marker interfaces directly provided by content object (_directly_provided JSON key)
# and supported by the migration
SUPPORTED_MARKER_INTERFACES = [
'plone.app.layout.navigation.interfaces.INavigationRoot',
]
PARENT_EXISTS_CACHE = dict()
VERBOSE = False
class CustomJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, (datetime.datetime, datetime.date)):
return obj.isoformat()
return json.JSONEncoder.default(self, obj)
class MigrationError(Exception):
def __init__(self, message, response):
self.message = message
self.response = response
def __str__(self):
return f"{self.__class__.__name__} {self.response}"
def timeit(method):
def timed(*args, **kw):
ts = time.time()
result = method(*args, **kw)
te = time.time()
if "log_time" in kw:
name = kw.get("log_name", method.__name__.upper())
kw["log_time"][name] = int((te - ts) * 1000)
else:
if VERBOSE:
print("%r %2.2f ms" % (method.__name__, (te - ts) * 1000))
return result
return timed
class Migrator:
""" Migration wrapper """
def __init__(self, config):
self.config = config
# ArangoDB connection
arango = self.config.arango
f = furl.furl(arango.url)
self.client = ArangoClient(protocol=f.scheme, host=f.host, port=f.port)
self.db = self.client.db(
arango.database, username=arango.username, password=arango.password
)
if not self.db.has_collection(arango.collection):
raise RuntimeError(f'collection "{arango.collection}" does not exist')
self.collection_name = arango.collection
self.collection = self.db[arango.collection]
self._all_related_items = dict() # list of _key values of migrated content
@timeit
def _query_aql(self, query):
result = self.db.aql.execute(query)
return result
@timeit
def _object_by_key(self, key):
result = self.collection.get(dict(_key=key))
return result
@timeit
def _object_by_path(self, path):
query = f"""
FOR doc in {self.collection_name}
FILTER doc._path == '{path}'
RETURN {{path: doc._path,
portal_type: doc._type,
id: doc._id,
title: doc.title,
_key: doc._key
}}
"""
result = self._query_aql(query)
result = [r for r in result]
if len(result) == 1:
return result[0]
elif len(result) > 1:
raise ValueError(f'More than one object return for search by path "{path}"')
else:
raise ValueError(f'No object return for search by path "{path}"')
def _to_iso8601(self, s):
""" Convert a date string from collective.jsonify export format to ISO8601 """
if s in (None, "None"):
return None
dt = dateparser.parse(s)
return dt.isoformat()
@property
def _json_headers(self):
""" Standard JSON headers """
return {"accept": "application/json", "content-type": "application/json"}
@property
def _auth(self):
""" Credentials for Plone target site """
return HTTPBasicAuth(self.config.plone.username, self.config.plone.password)
@timeit
def create_plone_site(self):
print("creating new plone site")
url = "{0}/@@recreate-plone-site".format(self.config.plone.url)
data = {
"site_id": self.config.site.id,
"extension_ids": self.config.site.extension_ids,
}
response = requests.post(url, auth=self._auth, json=data)
if response.status_code != 201:
raise MigrationError("Site could not be created", response=response)
@timeit
def remote_exists(self, path):
""" Check if the given `path` exists on the remote Plone site """
url = f"{self.config.plone.url}/{self.config.site.id}/{path}"
response = requests.head(url, auth=self._auth)
if response.status_code == 200:
return True
elif response.status_code == 404:
return False
raise MigrationError(
f"GET {path} return error code {response.status_code} (expected 200 or 404)"
)
@timeit
def _create_object(self, path, _key):
""" Create remote content for the given path and the _key data """
object_data = self._object_by_key(_key)
if object_data["_type"] in IGNORED_TYPES:
print("IGNORED: {object_data._type}")
return
data = {
"@type": object_data["_type"],
"id": object_data["_object_id"],
"title": object_data.get("title", object_data["_object_id"]),
"description": object_data.get("description", ""),
"contributors": object_data.get("contributors", ()),
"creators": object_data.get("creators", ()),
"subject": object_data.get("subject", ()),
"language": object_data.get("language", ""),
"location": object_data.get("location", ""),
"excludeFromNav": object_data.get("excludeFromNav", True),
}
related_items = object_data.get('relatedItems', ())
if related_items:
self._all_related_items[object_data['_uid']] = related_items
effective = self._to_iso8601(object_data.get("effectiveDate"))
if effective:
data["effective"] = effective
expires = self._to_iso8601(object_data.get("expirationDate"))
if expires:
data["expires"] = effective
table_of_contents = object_data.get("tableContents")
if table_of_contents:
data["table_of_contents"] = table_of_contents
if object_data["_type"] == "Document":
data["text"] = object_data["text"]
elif object_data["_type"] == "LibraryDocument":
data["faculty_code"] = object_data.get("faculty_code")
data["id_socialmedia"] = object_data.get("id_socialmedia")
data["id_news"] = object_data.get("id_news")
data["id_highlights"] = object_data.get("id_highlights")
data["highlights_url"] = object_data.get("highlights_url")
data["id_dynamic_highlights"] = object_data.get("id_dynamic_highlights")
data["dynamic_highlights_pagelength"] = object_data.get("dynamic_highlights_pagelength")
data["toptext"] = object_data.get("toptext")
data["bottomtext"] = object_data.get("bottomtext")
elif object_data["_type"] == "Vacancy":
data["vacancy_type"] = object_data["vacancyType"]
data["contract"] = object_data["contract"]
data["occupancy_rate"] = object_data["occupancyRate"]
data["grade"] = object_data["grade"]
data["function_class"] = object_data["functionClass"]
data["last_application_date"] = object_data["lastApplicationDate"]
data["degree"] = object_data["diploma"]
data["text"] = object_data["text"]
# zExceptions.BadRequest: [{'message': 'Constraint not satisfied',
# 'field': 'department', 'error': ConstraintNotSatisfied('CA10',
# 'department')}]
# data["department"] = object_data["department"]
elif object_data["_type"] == "Event":
data["startDate"] = object_data["startDate"]
data["endDate"] = object_data["endDate"]
data["eventUrl"] = object_data["eventUrl"]
data["contactEmail"] = object_data["contactEmail"]
data["contactName"] = object_data["contactName"]
data["contactPhone"] = object_data["contactPhone"]
data["attendees"] = object_data["contactPhone"]
data["categories"] = object_data["categories"]
elif object_data["_type"] == "News Item":
data["text"] = object_data["text"]
elif object_data["_type"] == "File":
try:
file_data = object_data["_datafield_file"]
except KeyError:
print(
f"ERROR: JSON export has no _datafield_file for {path} - SKIPPING"
)
return
data["file"] = {
"data": file_data["data"],
"encoding": "base64",
"content-type": file_data["content_type"],
"filename": file_data["filename"],
}
elif object_data["_type"] == "Image":
file_data = object_data["_datafield_image"]
data["image"] = {
"data": file_data["data"],
"encoding": "base64",
"content-type": file_data["content_type"],
"filename": file_data["filename"],
}
elif object_data["_type"] == "Link":
data["remoteUrl"] = object_data["remoteUrl"]
elif object_data["_type"] == "Folder":
feedbackAddress = object_data.get("feedbackAddress")
if feedbackAddress:
data["feedbackAddress"] = feedbackAddress
elif object_data["_type"] == "FormFolder":
self._migrate_FormFolder(data, object_data)
elif object_data["_type"] == "Topic":
self._migrate_Topic(data, object_data)
# ugent specific `show_description` flag
show_description = object_data.get("show_description", False)
if show_description:
data["show_description"] = show_description
resource_path = "/".join(path.split("/")[:-1])
# print('Creating', resource_path, data)
url = f"{self.config.plone.url}/{self.config.site.id}/{resource_path}"
result = requests.post(
url,
auth=self._auth,
headers=self._json_headers,
data=json.dumps(data, cls=CustomJSONEncoder),
)
if result.status_code not in (200, 201):
raise RuntimeError(result.text)
self._set_review_state(path, object_data)
self._set_related_items(path, object_data)
self._set_local_roles(path, object_data)
self._set_uid(path, object_data)
self._set_created_modified(path, object_data)
# apply folder restrictions after migration because otherwise we can not migrate properly
# self._set_allowed_and_addable_types(path, object_data)
self._set_position_in_parent(path, object_data)
self._set_marker_interfaces(path, object_data)
self._set_permissions(path, object_data)
@timeit
def _migrate_FormFolder(self, data, object_data):
data["@type"] = "EasyForm"
data["formPrologue"] = object_data["formPrologue"]
data["formEpilogue"] = object_data["formEpilogue"]
# query for subobjects by path
form_path = object_data["_path"]
query = f"""
FOR doc in {self.collection_name}
FILTER '{form_path}' in doc._paths_all
SORT doc._gopip
RETURN {{path: doc._path,
portal_type: doc._type,
id: doc._id,
title: doc.title,
_key: doc._key,
position_in_parent: doc._gopip
}}
"""
result = self._query_aql(query)
_keys = [r["_key"] for r in result]
pfg_migrator = PFGMigrator(
migration_data=data, # data for plone.restapi (by reference)
object_data=object_data, # exported data of old FormFolder
child_keys=_keys, # _key of all child objects
migrator=self, # main migrator
)
pfg_migrator.migrate() # updates `data` by reference inside the migrator
@timeit
def _migrate_Topic(self, data, object_data):
data["@type"] = "Collection"
# query for subobjects by path
form_path = object_data["_path"]
query = f"""
FOR doc in {self.collection_name}
FILTER '{form_path}' in doc._paths_all
SORT doc._gopip
RETURN {{path: doc._path,
portal_type: doc._type,
id: doc._id,
title: doc.title,
_key: doc._key,
position_in_parent: doc._gopip
}}
"""
result = self._query_aql(query)
_keys = [r["_key"] for r in result]
topic_migrator = TopicMigrator(
migration_data=data, # data for plone.restapi (by reference)
object_data=object_data, # exported data of old FormFolder
child_keys=_keys, # _key of all child objects
migrator=self, # main migrator
)
topic_migrator.migrate() # updates `data` by reference inside the migrator
@timeit
def _set_related_items(self, resource_path, object_data):
""" Set UID to original UID
"""
related_items = object_data.get("relatedItems")
return
if not related_items:
return
url = f"{self.config.plone.url}/{self.config.site.id}/{resource_path}/@@setuid"
result = requests.post(
url,
auth=self._auth,
headers=self._json_headers,
data=json.dumps(dict(uid=uid), cls=CustomJSONEncoder),
)
if result.status_code != 204:
raise RuntimeError(f"Error setting UID: {url}: {result.text}")
@timeit
def _set_uid(self, resource_path, object_data):
""" Set UID to original UID
"""
uid = object_data["_uid"]
url = f"{self.config.plone.url}/{self.config.site.id}/{resource_path}/@@setuid"
result = requests.post(
url,
auth=self._auth,
headers=self._json_headers,
data=json.dumps(dict(uid=uid), cls=CustomJSONEncoder),
)
if result.status_code != 204:
raise RuntimeError(f"Error setting UID: {url}: {result.text}")
@timeit
def _set_position_in_parent(self, resource_path, object_data):
""" Set position of object in parent """
position = object_data["_gopip"]
url = f"{self.config.plone.url}/{self.config.site.id}/{resource_path}/@@set-position-in-parent"
result = requests.post(
url,
auth=self._auth,
headers=self._json_headers,
data=json.dumps(dict(position=position), cls=CustomJSONEncoder),
)
if result.status_code != 204:
raise RuntimeError(f"Error setting setting position: {url}: {result.text}")
@timeit
def _set_allowed_and_addable_types(self, resource_path, object_data):
""" Folder restrictions and addable types """
constrain_types_mode = object_data.get('constrainTypesMode', -1) # -1 = ACQUIRE
addable_types = object_data.get('immediatelyAddableTypes', ())
allowed_types = object_data.get('locallyAllowedTypes', ())
if constrain_types_mode != -1 and (allowed_types or addable_types):
url = f"{self.config.plone.url}/{self.config.site.id}/{resource_path}/@@set-allowed-and-addable-types"
result = requests.post(
url,
auth=self._auth,
headers=self._json_headers,
data=json.dumps(dict(allowed_types=allowed_types, addable_types=addable_types, constrain_types_mode=constrain_types_mode))
)
if result.status_code != 204:
raise RuntimeError(f"Error setting allowed/addable types: {url}: {result.text}")
@timeit
def _update_all_related_items(self):
""" Update all relatedItems after migtations.
`related_items` is a dict[uid] = [list of references uids]
"""
url = f"{self.config.plone.url}/{self.config.site.id}/@@update-all-related-items"
result = requests.post(
url,
auth=self._auth,
headers=self._json_headers,
data=json.dumps(self._all_related_items)
)
if result.status_code != 204:
raise RuntimeError(f"Error updating related items: {url}: {result.text}")
@timeit
def _set_created_modified(self, resource_path, object_data):
""" Set created + modified timestamps """
created = object_data.get("creation_date")
modified = object_data.get("modification_date")
if modified or created:
url = f"{self.config.plone.url}/{self.config.site.id}/{resource_path}/@@set-created-modified"
result = requests.post(
url,
auth=self._auth,
headers=self._json_headers,
data=json.dumps(
dict(created=created, modified=modified), cls=CustomJSONEncoder
),
)
if result.status_code != 204:
raise RuntimeError(f"Error setting created+modified: {url}: {result.text}")
@timeit
def _set_permissions(self, resource_path, object_data):
""" Set marker interfaces """
permissions = object_data.get("_permissions", ())
permissions = dict([(k, v) for k,v in permissions.items() if k not in IGNORED_PERMISSIONS])
if permissions:
url = f"{self.config.plone.url}/{self.config.site.id}/{resource_path}/@@set-permissions"
result = requests.post(
url,
auth=self._auth,
headers=self._json_headers,
data=json.dumps(dict(permissions=permissions), cls=CustomJSONEncoder),
)
if result.status_code != 204:
raise RuntimeError(f"Error setting marker interfaces: {url}: {result.text}")
@timeit
def _set_marker_interfaces(self, resource_path, object_data):
""" Set marker interfaces """
directly_provided = object_data.get("_directly_provided", ())
directly_provided = [
iface
for iface in directly_provided
if iface in SUPPORTED_MARKER_INTERFACES]
if directly_provided:
url = f"{self.config.plone.url}/{self.config.site.id}/{resource_path}/@@set-marker-interfaces"
result = requests.post(
url,
auth=self._auth,
headers=self._json_headers,
data=json.dumps(dict(interfaces=directly_provided), cls=CustomJSONEncoder),
)
if result.status_code != 204:
raise RuntimeError(f"Error setting marker interfaces: {url}: {result.text}")
@timeit
def _set_local_roles(self, resource_path, object_data):
""" Set local roles
https://plonerestapi.readthedocs.io/en/latest/sharing.html
"""
local_roles = object_data["_ac_local_roles"]
if not local_roles:
return
entries = list()
for username, roles in local_roles.items():
entries.append(
dict(
id=username,
type="user",
roles=dict([(role, True) for role in roles]),
)
)
if entries:
url = f"{self.config.plone.url}/{self.config.site.id}/{resource_path}/@sharing"
result = requests.post(
url,
auth=self._auth,
headers=self._json_headers,
data=json.dumps(dict(entries=entries), cls=CustomJSONEncoder),
)
if result.status_code != 204:
raise RuntimeError(f"Error setting local roles: {url}: {result.text}")
def _set_review_state(self, resource_path, object_data):
""" Set review state based on workflow history """
review_state = object_data['review_state']
# images are a published by default (see ugent_image_workflow)
if object_data['_type'] == 'Image':
return
if review_state in ["private"]: # nothing to do
return
# map review state to transition name for moving an object from default
# "visible" state into target state (visible is the default state of ugent* workflows
state2action = {
# default state -> nothing to do
'visible': '',
'published': 'publish',
'internal': 'makeinternal',
}
actions = None
if f"{review_state}_{object_data['_type']}" in state2action:
actions = state2action[f"{review_state}_{object_data['_type']}"]
elif review_state in state2action:
actions = state2action[review_state]
else:
print(f'Unsupported review_state "{review_state}"')
return
if actions:
for action in actions.split('|'):
url = f"{self.config.plone.url}/{self.config.site.id}/{resource_path}/@workflow/{action}"
result = requests.post(url, auth=self._auth, headers=self._json_headers)
if result.status_code != 200:
raise RuntimeError(f"Error setting review state: {url}: {result.text}")
def migrate_folder(self, folder_name):
print(f"migrating folder {folder_name}")
query = f"""
FOR doc in {self.collection_name}
FILTER '{folder_name}' in doc._paths_all
LIMIT 99999999999
RETURN {{path: doc._path,
portal_type: doc._type,
id: doc._id,
title: doc.title,
_key: doc._key
}}
"""
print(query)
result = self._query_aql(query)
result = sorted(result, key=lambda x: x["portal_type"])
# group exportable items by portal_type
result_by_portal_type = itertools.groupby(result, lambda x: x["portal_type"])
# recreater folder structure
result_by_portal_type = itertools.groupby(result, lambda x: x["portal_type"])
all_folder_keys = list() # remember folder keys for later
for portal_type, items in result_by_portal_type:
if portal_type not in FOLDERISH_PT:
continue
print(f"Processing portal_type: {portal_type}")
# sort folder paths by depth
items = sorted(items, key=lambda x: x["path"].count("/"))
num_items = len(items)
for i, item in enumerate(items):
print(f"{i+1}/{num_items} Folder {item['path']}")
path_components = item["path"].split("/")
path_components = [
pc for pc in path_components if pc and pc != self.config.site.id
]
# construct all parent paths and check if they exist on the target site
# and if necessary recreate the related content
for i in range(len(path_components)):
parent_path = "/".join(path_components[:i])
if not parent_path:
continue
# check if parent_path exists on remote_portal
if parent_path in PARENT_EXISTS_CACHE:
parent_path_exists = True
else:
parent_path_exists = self.remote_exists(parent_path)
PARENT_EXISTS_CACHE[parent_path] = True
# parent path exists -> nothing to do
if parent_path_exists:
continue
# create new folderish object
parent_path_full = f"/{self.config.site.id}/{parent_path}"
parent_data = self._object_by_path(parent_path_full)
self._create_object(parent_path, parent_data["_key"])
# now reconstruct item content after we are sure that the parent
# structure exists
item_path = "/".join(
item["path"].split("/")[2:]
) # omit empty "" and portal_id
self._create_object(item_path, item["_key"])
all_folder_keys.append(item["_key"])
# now content
# we need to re-groupby because groupby() returns an iterator
result_by_portal_type = itertools.groupby(result, lambda x: x["portal_type"])
for portal_type, items in result_by_portal_type:
# folderish hierarchy is already created -> skip processing
if portal_type in FOLDERISH_PT:
continue
if (
self.config.migration.content_types
and not portal_type in self.config.migration.content_types
):
print(
f'Skipping processing of "{portal_type}" due to `content_types` configuration'
)
continue
items = [x for x in items]
num_items = len(items)
for i, item in enumerate(items):
if portal_type in PROCESSED_TYPES:
item_path = "/".join(
item["path"].split("/")[2:]
) # omit empty "" and portal_id
print(f"{i+1}/{num_items} {portal_type} {item_path}")
try:
self._create_object(item_path, item["_key"])
except Exception as e:
traceback.print_exc()
print(f'ERROR: migration error for {item["path"]}: {e}')
# final fixup for folders
for key in all_folder_keys:
object_data = self._object_by_key(key)
# add folder restrictions to folderish objects
self._set_allowed_and_addable_types(object_data['_relative_path'], object_data)
# migrate all relatedItems after all content has been created
self._update_all_related_items()
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--config", help="configuration file for migration (YAML format)", required=True
)
parser.add_argument(
"--incremental",
action="store_true",
help="Incremental import without swipping existing site",
)
parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="Verbose mode (timing)",
)
args = parser.parse_args()
if args.verbose:
global VERBOSE
VERBOSE = True
yaml_fn = os.path.abspath(args.config)
print(f"Reading {yaml_fn}")
if not os.path.exists(yaml_fn):
raise IOError(f"Migration configuration {yaml_fn} not found")
with open(yaml_fn) as fp:
config = attrdict.AttrDict(yaml.load(fp, Loader=yaml.FullLoader))
pprint.pprint(config)
migrator = Migrator(config)
# pre-check
for name in config.migration.folders:
if name.endswith("/"):
raise ValueError(
f"migration folder path {name} must not end with a trailing slash - please remove it"
)
data = migrator._object_by_path(name)
if not data:
raise ValueError(f"No source information found for {name}")
print(f"Precheck OK for {name}")
if not args.incremental:
migrator.create_plone_site()
else:
print("Skipping site creation (incremental mode)")
print(f"Content type filter: {config.migration.content_types}")
# run real migration
for name in config.migration.folders:
migrator.migrate_folder(name)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment