Skip to content

Instantly share code, notes, and snippets.

@frisi
Created April 5, 2017 07:22
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save frisi/56b64fa6630a6ec6a69fbff9e571ac02 to your computer and use it in GitHub Desktop.
Save frisi/56b64fa6630a6ec6a69fbff9e571ac02 to your computer and use it in GitHub Desktop.
create copy of linguaplone translation folder and fix internal links and embedded images after that (see https://community.plone.org/t/translating-website-for-different-regions/3792)
# -*- coding: utf-8 -*-
from OFS.CopySupport import CopyError
from Products.Archetypes.Field import ReferenceField
from Products.Archetypes.Field import TextField
from Products.Archetypes.exceptions import ReferenceException
from Products.CMFCore.WorkflowCore import WorkflowException
from Products.statusmessages.interfaces import IStatusMessage
from plone import api
from z3c.form import field, button
from z3c.form.form import Form
from customer.policy import _
from zope import schema
from zope.annotation.interfaces import IAnnotations
from zope.interface import Interface
import logging
import re
import transaction
logger = logging.getLogger('customer.translate')
class ICopyContentToLanguage(Interface):
target_languages = schema.List(
title=_(u'Target languages'),
description=_(
u'Select into which languages the translation will be made'),
value_type=schema.Choice(
title=_(u'Target languages'),
vocabulary='plone.app.vocabularies.SupportedContentLanguages'
),
default=[],
)
class CopyContentToLanguage(Form):
"""create a copy of the context for the chosen language and
* register translations proplery (set translation references)
* fix images/links to uids within text to translated content
based on code erral kindly supplied in this thread:
https://community.plone.org/t/translating-website-for-different-regions/3792/
"""
fields = field.Fields(ICopyContentToLanguage)
label = _(u'Copy the contents of this objects and its subobjects '
u'to the selected language/country')
ignoreContext = True
@button.buttonAndHandler(u'Copy content')
def copy_content_to(self, action):
self.skipped_items = []
self.problem_items = []
# {'uid-of-translation': {'fieldname': [object1, object2]}}
self.references_to_fix = {}
# dictionary of uids of articles with list of textfields
# containing resolveuid links
# {'uid-of-translation': ['text', 'otherfieldname']}
self.text_contains_uids = {}
data, errors = self.extractData()
if errors:
self.status = self.formErrorsMessage
return
target_languages = data.get('target_languages', [])
path = '/'.join(self.context.getPhysicalPath())
logger.info('copy content of {} to language(s): {}'.format(
path,
', '.join(target_languages)))
api.portal.get_tool('portal_catalog')
brains = list(api.content.find(context=self.context,
sort_on='getObjPositionInParent'))
# sort brains by length of path to make sure we're not translating
# an item that has an untranslated parent
brains.sort(lambda x, y: cmp(len(x.getPath().split('/')),
len(y.getPath().split('/'))))
count_translated = 0
for brain in brains:
obj = brain.getObject()
if obj != self.context:
count_translated += self.copy_content_of(obj, target_languages)
logger.info('translated {} items'.format(count_translated))
self.fix_references()
self.fix_resolve_uid_links()
logger.info('done')
msg = _(u'Contents copied successfully. Created ${num} translations',
mapping={'num': count_translated})
IStatusMessage(self.request).add(msg, type='info')
if self.skipped_items:
msg = u'Skipped {} items: {}'.format(
len(self.skipped_items),
', '.join(self.skipped_items))
IStatusMessage(self.request).add(msg, type='warning')
logger.warn(msg)
if self.problem_items:
logger.warn('problems for {} items:\n {}'.format(
len(self.problem_items),
'\n'.join(self.problem_items)))
return
def copy_content_of(self, item, target_languages):
item_path = '/'.join(item.getPhysicalPath())
if item.portal_type == 'FormFolder':
logger.warn('Skipping FormFolder ' + item_path)
self.skipped_items.append(item_path)
return 0
if item.aq_parent.portal_type == 'FormFolder':
logger.info('Skipping FormFolder contents ' + item_path)
return 0
count = 0
logger.info('translating ' + item_path)
for language in target_languages:
# create translation
translation = item.getTranslation(language)
if translation:
logger.info('already translated: {}'.format(
'/'.join(translation.getPhysicalPath())))
continue
try:
translation = item.addTranslation(language)
except CopyError, e:
logger.error(
'problem: translation could not be created for ' +
item_path)
logger.error(str(e))
self.problem_items.append(item_path)
continue
count += 1
translation_path = '/'.join(translation.getPhysicalPath())
logger.info('created translation for {}: {}'.format(
language, translation_path))
self.copy_fields(item, translation)
self.copy_properties(item, translation)
self.copy_seo_properties(item, translation)
self.copy_workflow(item, translation)
if item.id != translation.id:
logger.info('correct different id {} and {}'.format(
item_path, translation_path))
try:
# Make sure all persistent objects have _p_jar attribute
# otherwhise we can get the CopyError
# https://docs.plone.org/4/en/develop/plone/content/rename.html
transaction.savepoint(optimistic=True)
api.content.rename(translation, item.id, True)
except CopyError:
# usually happens if obj.cb_isMoveable()
self.problem_items.append(translation_path)
logger.warning(
'problem: could not rename {} {} to {}'.format(
translation.portal_type, translation_path,
item.id))
translation.reindexObject()
return count
def copy_fields(self, source, target):
target_path = '/'.join(target.getPhysicalPath())
logger.info('copying fields...')
for field in source.Schema().fields():
fieldname = field.__name__
if fieldname.lower() in ['language', 'id']:
# skip language
# skip id (setting it makes it unicode which breaks catalogs)
logger.debug('Skipped %s' % fieldname)
continue
target_field = target.getField(fieldname, target)
if target_field is None:
logger.warn(
('problem: field {} not available on '
'target object {}').format(
fieldname, target_path))
self.problem_items.append(target_path)
continue
if target_field.writeable(target):
value = field.get(source)
if isinstance(value, unicode):
logger.info(u'unicode value! {}: {}'.format(
field.getName(), value))
value = value.encode('utf-8')
if value:
logger.debug('Set attribute {} in {}'.format(
fieldname, target_path))
if type(field) == TextField:
# no not run transforms on text
# keep resolveuid/adfadf/@@images/mini instead of
# /path/to/image/@@images/45245345435.jpg
value = field.getRaw(source)
if 'resolveuid' in value:
# remember this object to later change the uids to
# the translated objects
fieldnames = self.text_contains_uids.get(
target.UID(), [])
fieldnames.append(fieldname)
self.text_contains_uids[target.UID()] = fieldnames
if type(field) == ReferenceField:
# store references
# to fix them later when every content item
# has been translated
refs = self.references_to_fix.get(target.UID(), {})
refs[fieldname] = [ref.UID() for ref in value]
self.references_to_fix[target.UID()] = refs
try:
target_field.set(target, value)
except ReferenceException, e:
logger.error('invalid references: ' + str(e))
else:
logger.info(
('Not writeable. Can not set value for '
'field {} in {}.').format(fieldname, target_path))
def copy_workflow(self, source, target):
try:
source_state = api.content.get_state(source)
current_state = api.content.get_state(target)
except WorkflowException:
# no workflow used for this type, skip this step
return
if current_state != source_state:
logger.info('change workflow state to ' + source_state)
api.content.transition(target, to_state=source_state)
def copy_seo_properties(self, source, target):
source_anno = IAnnotations(source)
keys = [key for key in source_anno.keys() if key.startswith('pSEO_')]
target_anno = IAnnotations(target)
for key in keys:
val = source_anno.get(key)
target_anno[key] = val
logger.debug('seo perseo setting {}={}'.format(key, val))
def copy_properties(self, source, target):
for prop in source.propertyMap():
_id = prop['id']
if _id == 'title':
continue
val = source.getProperty(_id)
target.manage_addProperty(prop['id'], val, prop['type'])
logger.debug('set property {}: {}'.format(_id, val))
def fix_references(self):
"""we stored all items that reference others and now we
fix their references to point to the Translations
"""
logger.info('fixing reference fields')
count = 0
not_translated_count = 0
for item_uid, refs in self.references_to_fix.iteritems():
obj = api.content.get(UID=item_uid)
language = obj.getLanguage()
path = '/'.join(obj.getPhysicalPath())
for fieldname, uids in refs.iteritems():
translated_uids, not_translated = self._translated_uids(
uids, language)
if not_translated:
not_translated_count += len(not_translated)
logger.warn(
('{}: no {} translations for field {} for these uids: '
'{}').format(path, language, fieldname,
','.join(not_translated)))
count += len(translated_uids)
obj.getField(fieldname).set(obj, translated_uids)
obj.reindexObject()
logger.info('fixed {} references for {} items '.format(
count, len(self.references_to_fix)))
if not_translated_count:
logger.warn(
'translation missing for {} items'.format(not_translated_count)
)
def _translated_uids(self, uids, language):
"""looks up translations for `uids` for the given `language`
returns a tuple of two lists (translated_uids, uids_not_translated)
"""
not_translated = []
translated_uids = []
for uid in uids:
original = api.content.get(UID=uid)
if original is None:
# we silently ignore these, as they simply don't appear
# on the portal and don't lead to errors
continue
translation = original.getTranslation(language)
if translation is None:
not_translated.append('/'.join(original.getPhysicalPath()))
continue
translated_uids.append(translation.UID())
return (translated_uids, not_translated)
def fix_resolve_uid_links(self):
logger.info('fixing resolveuid references in textfields')
uids = re.compile(r'(?<=resolveuid/)[0-9,a-f]*', re.DOTALL)
count_articles = 0
# trick to overcome UnboundLocalError
# http://stackoverflow.com/a/9264811/810427
count_broken = [0]
count_not_translated = [0]
count_replaced = [0]
for item_uid, fieldnames in self.text_contains_uids.iteritems():
obj = api.content.get(UID=item_uid)
language = obj.getLanguage()
path = '/'.join(obj.getPhysicalPath())
for fieldname in fieldnames:
field = obj.getField(fieldname)
text = field.getRaw(obj)
def replaceuid(m):
uid = m.group(0)
obj = api.content.get(UID=uid)
if obj is None:
logger.warn(
'broken link for {} field {} uid {}'.format(
path, fieldname, uid))
count_broken[0] += 1
return uid
translation = obj.getTranslation(language)
if translation is None:
logger.warn(
('{}: field "{}" no translation for '
'uid {} ({})').format(
path, fieldname, uid,
'/'.join(obj.getPhysicalPath())))
count_not_translated[0] += 1
return uid
count_replaced[0] += 1
return translation.UID()
new_text = re.sub(uids, replaceuid, text)
field.set(obj, new_text)
count_articles += 1
obj.reindexObject()
logger.info('fixed {} resolveuid references for {} items '.format(
count_replaced[0], count_articles))
if count_broken[0]:
logger.warn(
'found {} broken links (see log above)'.format(
count_broken[0]))
if count_not_translated[0]:
logger.warn(
'{} objects have no translation (see log above'.format(
count_not_translated[0]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment