Created
June 2, 2019 08:46
-
-
Save zopyx/8905a7c161f4e0c4578ded6670febb89 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#-*- coding: utf-8 -*- | |
# This scripts imports a JSON export made with `collective.jsonify` into | |
# ArangoDB (database: ugent, collection: portal by default) | |
import os | |
import json | |
import time | |
import pprint | |
import traceback | |
import datetime | |
import itertools | |
import argparse | |
import dateparser | |
import tqdm | |
import yaml | |
import furl | |
import attrdict | |
import requests | |
from requests.auth import HTTPBasicAuth | |
from arango import ArangoClient | |
from .pfg import PFGMigrator | |
from .topic import TopicMigrator | |
# folderish portal type for which we reconstruct the initial hierarchy in phase | |
# 1 | |
FOLDERISH_PT = ["Folder"] | |
# portal types that are not processed or just ignored because they are | |
# subobjects e.g. of FormFolder or Collection | |
IGNORED_TYPES = [ | |
"FormMailerAdapter", | |
"FormRichLabelField" "FormSaveDataAdapter", | |
"FormStringField", | |
"FormTextField", | |
"FormThanksPage", | |
] | |
# list of ignored or obsolete permissions | |
IGNORED_PERMISSIONS = [ | |
'Change portal events' | |
] | |
# portal types for which we have actually a migration as primary objects | |
PROCESSED_TYPES = [ | |
"Document", | |
"News Item", | |
"Link", | |
"File", | |
"Image", | |
"FormFolder", | |
"Topic", | |
"Event", | |
"LibraryDocument", | |
"Vacancy" | |
] | |
# marker interfaces directly provided by content object (_directly_provided JSON key) | |
# and supported by the migration | |
SUPPORTED_MARKER_INTERFACES = [ | |
'plone.app.layout.navigation.interfaces.INavigationRoot', | |
] | |
PARENT_EXISTS_CACHE = dict() | |
VERBOSE = False | |
class CustomJSONEncoder(json.JSONEncoder): | |
def default(self, obj): | |
if isinstance(obj, (datetime.datetime, datetime.date)): | |
return obj.isoformat() | |
return json.JSONEncoder.default(self, obj) | |
class MigrationError(Exception): | |
def __init__(self, message, response): | |
self.message = message | |
self.response = response | |
def __str__(self): | |
return f"{self.__class__.__name__} {self.response}" | |
def timeit(method): | |
def timed(*args, **kw): | |
ts = time.time() | |
result = method(*args, **kw) | |
te = time.time() | |
if "log_time" in kw: | |
name = kw.get("log_name", method.__name__.upper()) | |
kw["log_time"][name] = int((te - ts) * 1000) | |
else: | |
if VERBOSE: | |
print("%r %2.2f ms" % (method.__name__, (te - ts) * 1000)) | |
return result | |
return timed | |
class Migrator: | |
""" Migration wrapper """ | |
def __init__(self, config): | |
self.config = config | |
# ArangoDB connection | |
arango = self.config.arango | |
f = furl.furl(arango.url) | |
self.client = ArangoClient(protocol=f.scheme, host=f.host, port=f.port) | |
self.db = self.client.db( | |
arango.database, username=arango.username, password=arango.password | |
) | |
if not self.db.has_collection(arango.collection): | |
raise RuntimeError(f'collection "{arango.collection}" does not exist') | |
self.collection_name = arango.collection | |
self.collection = self.db[arango.collection] | |
self._all_related_items = dict() # list of _key values of migrated content | |
@timeit | |
def _query_aql(self, query): | |
result = self.db.aql.execute(query) | |
return result | |
@timeit | |
def _object_by_key(self, key): | |
result = self.collection.get(dict(_key=key)) | |
return result | |
@timeit | |
def _object_by_path(self, path): | |
query = f""" | |
FOR doc in {self.collection_name} | |
FILTER doc._path == '{path}' | |
RETURN {{path: doc._path, | |
portal_type: doc._type, | |
id: doc._id, | |
title: doc.title, | |
_key: doc._key | |
}} | |
""" | |
result = self._query_aql(query) | |
result = [r for r in result] | |
if len(result) == 1: | |
return result[0] | |
elif len(result) > 1: | |
raise ValueError(f'More than one object return for search by path "{path}"') | |
else: | |
raise ValueError(f'No object return for search by path "{path}"') | |
def _to_iso8601(self, s): | |
""" Convert a date string from collective.jsonify export format to ISO8601 """ | |
if s in (None, "None"): | |
return None | |
dt = dateparser.parse(s) | |
return dt.isoformat() | |
@property | |
def _json_headers(self): | |
""" Standard JSON headers """ | |
return {"accept": "application/json", "content-type": "application/json"} | |
@property | |
def _auth(self): | |
""" Credentials for Plone target site """ | |
return HTTPBasicAuth(self.config.plone.username, self.config.plone.password) | |
@timeit | |
def create_plone_site(self): | |
print("creating new plone site") | |
url = "{0}/@@recreate-plone-site".format(self.config.plone.url) | |
data = { | |
"site_id": self.config.site.id, | |
"extension_ids": self.config.site.extension_ids, | |
} | |
response = requests.post(url, auth=self._auth, json=data) | |
if response.status_code != 201: | |
raise MigrationError("Site could not be created", response=response) | |
@timeit | |
def remote_exists(self, path): | |
""" Check if the given `path` exists on the remote Plone site """ | |
url = f"{self.config.plone.url}/{self.config.site.id}/{path}" | |
response = requests.head(url, auth=self._auth) | |
if response.status_code == 200: | |
return True | |
elif response.status_code == 404: | |
return False | |
raise MigrationError( | |
f"GET {path} return error code {response.status_code} (expected 200 or 404)" | |
) | |
@timeit | |
def _create_object(self, path, _key): | |
""" Create remote content for the given path and the _key data """ | |
object_data = self._object_by_key(_key) | |
if object_data["_type"] in IGNORED_TYPES: | |
print("IGNORED: {object_data._type}") | |
return | |
data = { | |
"@type": object_data["_type"], | |
"id": object_data["_object_id"], | |
"title": object_data.get("title", object_data["_object_id"]), | |
"description": object_data.get("description", ""), | |
"contributors": object_data.get("contributors", ()), | |
"creators": object_data.get("creators", ()), | |
"subject": object_data.get("subject", ()), | |
"language": object_data.get("language", ""), | |
"location": object_data.get("location", ""), | |
"excludeFromNav": object_data.get("excludeFromNav", True), | |
} | |
related_items = object_data.get('relatedItems', ()) | |
if related_items: | |
self._all_related_items[object_data['_uid']] = related_items | |
effective = self._to_iso8601(object_data.get("effectiveDate")) | |
if effective: | |
data["effective"] = effective | |
expires = self._to_iso8601(object_data.get("expirationDate")) | |
if expires: | |
data["expires"] = effective | |
table_of_contents = object_data.get("tableContents") | |
if table_of_contents: | |
data["table_of_contents"] = table_of_contents | |
if object_data["_type"] == "Document": | |
data["text"] = object_data["text"] | |
elif object_data["_type"] == "LibraryDocument": | |
data["faculty_code"] = object_data.get("faculty_code") | |
data["id_socialmedia"] = object_data.get("id_socialmedia") | |
data["id_news"] = object_data.get("id_news") | |
data["id_highlights"] = object_data.get("id_highlights") | |
data["highlights_url"] = object_data.get("highlights_url") | |
data["id_dynamic_highlights"] = object_data.get("id_dynamic_highlights") | |
data["dynamic_highlights_pagelength"] = object_data.get("dynamic_highlights_pagelength") | |
data["toptext"] = object_data.get("toptext") | |
data["bottomtext"] = object_data.get("bottomtext") | |
elif object_data["_type"] == "Vacancy": | |
data["vacancy_type"] = object_data["vacancyType"] | |
data["contract"] = object_data["contract"] | |
data["occupancy_rate"] = object_data["occupancyRate"] | |
data["grade"] = object_data["grade"] | |
data["function_class"] = object_data["functionClass"] | |
data["last_application_date"] = object_data["lastApplicationDate"] | |
data["degree"] = object_data["diploma"] | |
data["text"] = object_data["text"] | |
# zExceptions.BadRequest: [{'message': 'Constraint not satisfied', | |
# 'field': 'department', 'error': ConstraintNotSatisfied('CA10', | |
# 'department')}] | |
# data["department"] = object_data["department"] | |
elif object_data["_type"] == "Event": | |
data["startDate"] = object_data["startDate"] | |
data["endDate"] = object_data["endDate"] | |
data["eventUrl"] = object_data["eventUrl"] | |
data["contactEmail"] = object_data["contactEmail"] | |
data["contactName"] = object_data["contactName"] | |
data["contactPhone"] = object_data["contactPhone"] | |
data["attendees"] = object_data["contactPhone"] | |
data["categories"] = object_data["categories"] | |
elif object_data["_type"] == "News Item": | |
data["text"] = object_data["text"] | |
elif object_data["_type"] == "File": | |
try: | |
file_data = object_data["_datafield_file"] | |
except KeyError: | |
print( | |
f"ERROR: JSON export has no _datafield_file for {path} - SKIPPING" | |
) | |
return | |
data["file"] = { | |
"data": file_data["data"], | |
"encoding": "base64", | |
"content-type": file_data["content_type"], | |
"filename": file_data["filename"], | |
} | |
elif object_data["_type"] == "Image": | |
file_data = object_data["_datafield_image"] | |
data["image"] = { | |
"data": file_data["data"], | |
"encoding": "base64", | |
"content-type": file_data["content_type"], | |
"filename": file_data["filename"], | |
} | |
elif object_data["_type"] == "Link": | |
data["remoteUrl"] = object_data["remoteUrl"] | |
elif object_data["_type"] == "Folder": | |
feedbackAddress = object_data.get("feedbackAddress") | |
if feedbackAddress: | |
data["feedbackAddress"] = feedbackAddress | |
elif object_data["_type"] == "FormFolder": | |
self._migrate_FormFolder(data, object_data) | |
elif object_data["_type"] == "Topic": | |
self._migrate_Topic(data, object_data) | |
# ugent specific `show_description` flag | |
show_description = object_data.get("show_description", False) | |
if show_description: | |
data["show_description"] = show_description | |
resource_path = "/".join(path.split("/")[:-1]) | |
# print('Creating', resource_path, data) | |
url = f"{self.config.plone.url}/{self.config.site.id}/{resource_path}" | |
result = requests.post( | |
url, | |
auth=self._auth, | |
headers=self._json_headers, | |
data=json.dumps(data, cls=CustomJSONEncoder), | |
) | |
if result.status_code not in (200, 201): | |
raise RuntimeError(result.text) | |
self._set_review_state(path, object_data) | |
self._set_related_items(path, object_data) | |
self._set_local_roles(path, object_data) | |
self._set_uid(path, object_data) | |
self._set_created_modified(path, object_data) | |
# apply folder restrictions after migration because otherwise we can not migrate properly | |
# self._set_allowed_and_addable_types(path, object_data) | |
self._set_position_in_parent(path, object_data) | |
self._set_marker_interfaces(path, object_data) | |
self._set_permissions(path, object_data) | |
@timeit | |
def _migrate_FormFolder(self, data, object_data): | |
data["@type"] = "EasyForm" | |
data["formPrologue"] = object_data["formPrologue"] | |
data["formEpilogue"] = object_data["formEpilogue"] | |
# query for subobjects by path | |
form_path = object_data["_path"] | |
query = f""" | |
FOR doc in {self.collection_name} | |
FILTER '{form_path}' in doc._paths_all | |
SORT doc._gopip | |
RETURN {{path: doc._path, | |
portal_type: doc._type, | |
id: doc._id, | |
title: doc.title, | |
_key: doc._key, | |
position_in_parent: doc._gopip | |
}} | |
""" | |
result = self._query_aql(query) | |
_keys = [r["_key"] for r in result] | |
pfg_migrator = PFGMigrator( | |
migration_data=data, # data for plone.restapi (by reference) | |
object_data=object_data, # exported data of old FormFolder | |
child_keys=_keys, # _key of all child objects | |
migrator=self, # main migrator | |
) | |
pfg_migrator.migrate() # updates `data` by reference inside the migrator | |
@timeit | |
def _migrate_Topic(self, data, object_data): | |
data["@type"] = "Collection" | |
# query for subobjects by path | |
form_path = object_data["_path"] | |
query = f""" | |
FOR doc in {self.collection_name} | |
FILTER '{form_path}' in doc._paths_all | |
SORT doc._gopip | |
RETURN {{path: doc._path, | |
portal_type: doc._type, | |
id: doc._id, | |
title: doc.title, | |
_key: doc._key, | |
position_in_parent: doc._gopip | |
}} | |
""" | |
result = self._query_aql(query) | |
_keys = [r["_key"] for r in result] | |
topic_migrator = TopicMigrator( | |
migration_data=data, # data for plone.restapi (by reference) | |
object_data=object_data, # exported data of old FormFolder | |
child_keys=_keys, # _key of all child objects | |
migrator=self, # main migrator | |
) | |
topic_migrator.migrate() # updates `data` by reference inside the migrator | |
@timeit | |
def _set_related_items(self, resource_path, object_data): | |
""" Set UID to original UID | |
""" | |
related_items = object_data.get("relatedItems") | |
return | |
if not related_items: | |
return | |
url = f"{self.config.plone.url}/{self.config.site.id}/{resource_path}/@@setuid" | |
result = requests.post( | |
url, | |
auth=self._auth, | |
headers=self._json_headers, | |
data=json.dumps(dict(uid=uid), cls=CustomJSONEncoder), | |
) | |
if result.status_code != 204: | |
raise RuntimeError(f"Error setting UID: {url}: {result.text}") | |
@timeit | |
def _set_uid(self, resource_path, object_data): | |
""" Set UID to original UID | |
""" | |
uid = object_data["_uid"] | |
url = f"{self.config.plone.url}/{self.config.site.id}/{resource_path}/@@setuid" | |
result = requests.post( | |
url, | |
auth=self._auth, | |
headers=self._json_headers, | |
data=json.dumps(dict(uid=uid), cls=CustomJSONEncoder), | |
) | |
if result.status_code != 204: | |
raise RuntimeError(f"Error setting UID: {url}: {result.text}") | |
@timeit | |
def _set_position_in_parent(self, resource_path, object_data): | |
""" Set position of object in parent """ | |
position = object_data["_gopip"] | |
url = f"{self.config.plone.url}/{self.config.site.id}/{resource_path}/@@set-position-in-parent" | |
result = requests.post( | |
url, | |
auth=self._auth, | |
headers=self._json_headers, | |
data=json.dumps(dict(position=position), cls=CustomJSONEncoder), | |
) | |
if result.status_code != 204: | |
raise RuntimeError(f"Error setting setting position: {url}: {result.text}") | |
@timeit | |
def _set_allowed_and_addable_types(self, resource_path, object_data): | |
""" Folder restrictions and addable types """ | |
constrain_types_mode = object_data.get('constrainTypesMode', -1) # -1 = ACQUIRE | |
addable_types = object_data.get('immediatelyAddableTypes', ()) | |
allowed_types = object_data.get('locallyAllowedTypes', ()) | |
if constrain_types_mode != -1 and (allowed_types or addable_types): | |
url = f"{self.config.plone.url}/{self.config.site.id}/{resource_path}/@@set-allowed-and-addable-types" | |
result = requests.post( | |
url, | |
auth=self._auth, | |
headers=self._json_headers, | |
data=json.dumps(dict(allowed_types=allowed_types, addable_types=addable_types, constrain_types_mode=constrain_types_mode)) | |
) | |
if result.status_code != 204: | |
raise RuntimeError(f"Error setting allowed/addable types: {url}: {result.text}") | |
@timeit | |
def _update_all_related_items(self): | |
""" Update all relatedItems after migtations. | |
`related_items` is a dict[uid] = [list of references uids] | |
""" | |
url = f"{self.config.plone.url}/{self.config.site.id}/@@update-all-related-items" | |
result = requests.post( | |
url, | |
auth=self._auth, | |
headers=self._json_headers, | |
data=json.dumps(self._all_related_items) | |
) | |
if result.status_code != 204: | |
raise RuntimeError(f"Error updating related items: {url}: {result.text}") | |
@timeit | |
def _set_created_modified(self, resource_path, object_data): | |
""" Set created + modified timestamps """ | |
created = object_data.get("creation_date") | |
modified = object_data.get("modification_date") | |
if modified or created: | |
url = f"{self.config.plone.url}/{self.config.site.id}/{resource_path}/@@set-created-modified" | |
result = requests.post( | |
url, | |
auth=self._auth, | |
headers=self._json_headers, | |
data=json.dumps( | |
dict(created=created, modified=modified), cls=CustomJSONEncoder | |
), | |
) | |
if result.status_code != 204: | |
raise RuntimeError(f"Error setting created+modified: {url}: {result.text}") | |
@timeit | |
def _set_permissions(self, resource_path, object_data): | |
""" Set marker interfaces """ | |
permissions = object_data.get("_permissions", ()) | |
permissions = dict([(k, v) for k,v in permissions.items() if k not in IGNORED_PERMISSIONS]) | |
if permissions: | |
url = f"{self.config.plone.url}/{self.config.site.id}/{resource_path}/@@set-permissions" | |
result = requests.post( | |
url, | |
auth=self._auth, | |
headers=self._json_headers, | |
data=json.dumps(dict(permissions=permissions), cls=CustomJSONEncoder), | |
) | |
if result.status_code != 204: | |
raise RuntimeError(f"Error setting marker interfaces: {url}: {result.text}") | |
@timeit | |
def _set_marker_interfaces(self, resource_path, object_data): | |
""" Set marker interfaces """ | |
directly_provided = object_data.get("_directly_provided", ()) | |
directly_provided = [ | |
iface | |
for iface in directly_provided | |
if iface in SUPPORTED_MARKER_INTERFACES] | |
if directly_provided: | |
url = f"{self.config.plone.url}/{self.config.site.id}/{resource_path}/@@set-marker-interfaces" | |
result = requests.post( | |
url, | |
auth=self._auth, | |
headers=self._json_headers, | |
data=json.dumps(dict(interfaces=directly_provided), cls=CustomJSONEncoder), | |
) | |
if result.status_code != 204: | |
raise RuntimeError(f"Error setting marker interfaces: {url}: {result.text}") | |
@timeit | |
def _set_local_roles(self, resource_path, object_data): | |
""" Set local roles | |
https://plonerestapi.readthedocs.io/en/latest/sharing.html | |
""" | |
local_roles = object_data["_ac_local_roles"] | |
if not local_roles: | |
return | |
entries = list() | |
for username, roles in local_roles.items(): | |
entries.append( | |
dict( | |
id=username, | |
type="user", | |
roles=dict([(role, True) for role in roles]), | |
) | |
) | |
if entries: | |
url = f"{self.config.plone.url}/{self.config.site.id}/{resource_path}/@sharing" | |
result = requests.post( | |
url, | |
auth=self._auth, | |
headers=self._json_headers, | |
data=json.dumps(dict(entries=entries), cls=CustomJSONEncoder), | |
) | |
if result.status_code != 204: | |
raise RuntimeError(f"Error setting local roles: {url}: {result.text}") | |
def _set_review_state(self, resource_path, object_data): | |
""" Set review state based on workflow history """ | |
review_state = object_data['review_state'] | |
# images are a published by default (see ugent_image_workflow) | |
if object_data['_type'] == 'Image': | |
return | |
if review_state in ["private"]: # nothing to do | |
return | |
# map review state to transition name for moving an object from default | |
# "visible" state into target state (visible is the default state of ugent* workflows | |
state2action = { | |
# default state -> nothing to do | |
'visible': '', | |
'published': 'publish', | |
'internal': 'makeinternal', | |
} | |
actions = None | |
if f"{review_state}_{object_data['_type']}" in state2action: | |
actions = state2action[f"{review_state}_{object_data['_type']}"] | |
elif review_state in state2action: | |
actions = state2action[review_state] | |
else: | |
print(f'Unsupported review_state "{review_state}"') | |
return | |
if actions: | |
for action in actions.split('|'): | |
url = f"{self.config.plone.url}/{self.config.site.id}/{resource_path}/@workflow/{action}" | |
result = requests.post(url, auth=self._auth, headers=self._json_headers) | |
if result.status_code != 200: | |
raise RuntimeError(f"Error setting review state: {url}: {result.text}") | |
def migrate_folder(self, folder_name): | |
print(f"migrating folder {folder_name}") | |
query = f""" | |
FOR doc in {self.collection_name} | |
FILTER '{folder_name}' in doc._paths_all | |
LIMIT 99999999999 | |
RETURN {{path: doc._path, | |
portal_type: doc._type, | |
id: doc._id, | |
title: doc.title, | |
_key: doc._key | |
}} | |
""" | |
print(query) | |
result = self._query_aql(query) | |
result = sorted(result, key=lambda x: x["portal_type"]) | |
# group exportable items by portal_type | |
result_by_portal_type = itertools.groupby(result, lambda x: x["portal_type"]) | |
# recreater folder structure | |
result_by_portal_type = itertools.groupby(result, lambda x: x["portal_type"]) | |
all_folder_keys = list() # remember folder keys for later | |
for portal_type, items in result_by_portal_type: | |
if portal_type not in FOLDERISH_PT: | |
continue | |
print(f"Processing portal_type: {portal_type}") | |
# sort folder paths by depth | |
items = sorted(items, key=lambda x: x["path"].count("/")) | |
num_items = len(items) | |
for i, item in enumerate(items): | |
print(f"{i+1}/{num_items} Folder {item['path']}") | |
path_components = item["path"].split("/") | |
path_components = [ | |
pc for pc in path_components if pc and pc != self.config.site.id | |
] | |
# construct all parent paths and check if they exist on the target site | |
# and if necessary recreate the related content | |
for i in range(len(path_components)): | |
parent_path = "/".join(path_components[:i]) | |
if not parent_path: | |
continue | |
# check if parent_path exists on remote_portal | |
if parent_path in PARENT_EXISTS_CACHE: | |
parent_path_exists = True | |
else: | |
parent_path_exists = self.remote_exists(parent_path) | |
PARENT_EXISTS_CACHE[parent_path] = True | |
# parent path exists -> nothing to do | |
if parent_path_exists: | |
continue | |
# create new folderish object | |
parent_path_full = f"/{self.config.site.id}/{parent_path}" | |
parent_data = self._object_by_path(parent_path_full) | |
self._create_object(parent_path, parent_data["_key"]) | |
# now reconstruct item content after we are sure that the parent | |
# structure exists | |
item_path = "/".join( | |
item["path"].split("/")[2:] | |
) # omit empty "" and portal_id | |
self._create_object(item_path, item["_key"]) | |
all_folder_keys.append(item["_key"]) | |
# now content | |
# we need to re-groupby because groupby() returns an iterator | |
result_by_portal_type = itertools.groupby(result, lambda x: x["portal_type"]) | |
for portal_type, items in result_by_portal_type: | |
# folderish hierarchy is already created -> skip processing | |
if portal_type in FOLDERISH_PT: | |
continue | |
if ( | |
self.config.migration.content_types | |
and not portal_type in self.config.migration.content_types | |
): | |
print( | |
f'Skipping processing of "{portal_type}" due to `content_types` configuration' | |
) | |
continue | |
items = [x for x in items] | |
num_items = len(items) | |
for i, item in enumerate(items): | |
if portal_type in PROCESSED_TYPES: | |
item_path = "/".join( | |
item["path"].split("/")[2:] | |
) # omit empty "" and portal_id | |
print(f"{i+1}/{num_items} {portal_type} {item_path}") | |
try: | |
self._create_object(item_path, item["_key"]) | |
except Exception as e: | |
traceback.print_exc() | |
print(f'ERROR: migration error for {item["path"]}: {e}') | |
# final fixup for folders | |
for key in all_folder_keys: | |
object_data = self._object_by_key(key) | |
# add folder restrictions to folderish objects | |
self._set_allowed_and_addable_types(object_data['_relative_path'], object_data) | |
# migrate all relatedItems after all content has been created | |
self._update_all_related_items() | |
def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"--config", help="configuration file for migration (YAML format)", required=True | |
) | |
parser.add_argument( | |
"--incremental", | |
action="store_true", | |
help="Incremental import without swipping existing site", | |
) | |
parser.add_argument( | |
"-v", | |
"--verbose", | |
action="store_true", | |
help="Verbose mode (timing)", | |
) | |
args = parser.parse_args() | |
if args.verbose: | |
global VERBOSE | |
VERBOSE = True | |
yaml_fn = os.path.abspath(args.config) | |
print(f"Reading {yaml_fn}") | |
if not os.path.exists(yaml_fn): | |
raise IOError(f"Migration configuration {yaml_fn} not found") | |
with open(yaml_fn) as fp: | |
config = attrdict.AttrDict(yaml.load(fp, Loader=yaml.FullLoader)) | |
pprint.pprint(config) | |
migrator = Migrator(config) | |
# pre-check | |
for name in config.migration.folders: | |
if name.endswith("/"): | |
raise ValueError( | |
f"migration folder path {name} must not end with a trailing slash - please remove it" | |
) | |
data = migrator._object_by_path(name) | |
if not data: | |
raise ValueError(f"No source information found for {name}") | |
print(f"Precheck OK for {name}") | |
if not args.incremental: | |
migrator.create_plone_site() | |
else: | |
print("Skipping site creation (incremental mode)") | |
print(f"Content type filter: {config.migration.content_types}") | |
# run real migration | |
for name in config.migration.folders: | |
migrator.migrate_folder(name) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment