Skip to content

Instantly share code, notes, and snippets.

@yifanlu
Created August 22, 2016 06:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yifanlu/e7da11d0f3d2910ce0d24fc1a5ffe6b8 to your computer and use it in GitHub Desktop.
Save yifanlu/e7da11d0f3d2910ce0d24fc1a5ffe6b8 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import codecs
import os
import re
import sys
from datetime import datetime, timedelta, tzinfo
from glob import glob
from urllib import urlretrieve
from urlparse import urljoin, urlparse
from xml.etree.ElementTree import ElementTree, XMLTreeBuilder
import phpserialize
import hashlib
import yaml
from bs4 import BeautifulSoup
from html2text import html2text_file
'''
exitwp - Wordpress xml exports to Jekykll blog format conversion
Tested with Wordpress 3.3.1 and jekyll 0.11.2
'''
######################################################
# Configration
######################################################
config = yaml.load(file('config.yaml', 'r'))
wp_exports = config['wp_exports']
build_dir = config['build_dir']
download_images = config['download_images']
target_format = config['target_format']
taxonomy_filter = set(config['taxonomies']['filter'])
taxonomy_entry_filter = config['taxonomies']['entry_filter']
taxonomy_name_mapping = config['taxonomies']['name_mapping']
item_type_filter = set(config['item_type_filter'])
item_field_filter = config['item_field_filter']
date_fmt = config['date_format']
body_replace = config['body_replace']
# Time definitions
ZERO = timedelta(0)
HOUR = timedelta(hours=1)
# UTC support
class UTC(tzinfo):
"""UTC."""
def utcoffset(self, dt):
return ZERO
def tzname(self, dt):
return 'UTC'
def dst(self, dt):
return ZERO
class ns_tracker_tree_builder(XMLTreeBuilder):
def __init__(self):
XMLTreeBuilder.__init__(self)
self._parser.StartNamespaceDeclHandler = self._start_ns
self.namespaces = {}
def _start_ns(self, prefix, ns):
self.namespaces[prefix] = '{' + ns + '}'
def html2fmt(html, target_format):
# html = html.replace("\n\n", '<br/><br/>')
# html = html.replace('<pre lang="xml">', '<pre lang="xml"><![CDATA[')
# html = html.replace('</pre>', ']]></pre>')
if target_format == 'html':
return html
else:
return html2text_file(html, None)
def parse_wp_xml(file):
parser = ns_tracker_tree_builder()
tree = ElementTree()
print 'reading: ' + wpe
root = tree.parse(file, parser)
ns = parser.namespaces
ns[''] = ''
c = root.find('channel')
def parse_header():
return {
'title': unicode(c.find('title').text),
'link': unicode(c.find('link').text),
'description': unicode(c.find('description').text)
}
def parse_items():
export_items = []
xml_items = c.findall('item')
for i in xml_items:
taxanomies = i.findall('category')
export_taxanomies = {}
for tax in taxanomies:
if 'domain' not in tax.attrib:
continue
t_domain = unicode(tax.attrib['domain'])
t_entry = unicode(tax.text)
if (not (t_domain in taxonomy_filter) and
not (t_domain
in taxonomy_entry_filter and
taxonomy_entry_filter[t_domain] == t_entry)):
if t_domain not in export_taxanomies:
export_taxanomies[t_domain] = []
export_taxanomies[t_domain].append(t_entry)
def gi(q, unicode_wrap=True, empty=False):
namespace = ''
tag = ''
if q.find(':') > 0:
namespace, tag = q.split(':', 1)
else:
tag = q
try:
result = i.find(ns[namespace] + tag).text
print result.encode('utf-8')
except AttributeError:
result = 'No Content Found'
if empty:
result = ''
if unicode_wrap:
result = unicode(result)
return result
def gimeta(title, type):
namespace = ''
tag = ''
result = {}
if type == 'feedback':
return result
try:
metadatas = i.findall(ns['wp'] + 'postmeta')
for md in metadatas:
key = md.find(ns['wp'] + 'meta_key').text
val = md.find(ns['wp'] + 'meta_value')
if val is not None:
val = val.text
else:
continue
try:
val_unserialized = phpserialize.loads(val)
val = val_unserialized
except ValueError as e:
asdf = md.find(ns['wp'] + 'meta_value').text
if str(e) == 'unexpected opcode':
pass
elif asdf is not None:
val = asdf.replace('\n','\r\n')
try:
val_unserialized = phpserialize.loads(val)
val = val_unserialized
except ValueError as e2:
pass
result[key] = val
except AttributeError as ex:
print 'AttributeError: ', ex
pass
return result
def gicomment():
namespace = ''
tag = ''
result = []
comments = i.findall(ns['wp'] + 'comment')
for cmt in comments:
cid = cmt.find(ns['wp'] + 'comment_id').text
email = cmt.find(ns['wp'] + 'comment_author_email').text
if email is None:
email = ''
email = hashlib.md5(email.strip().lower()).hexdigest()
name = cmt.find(ns['wp'] + 'comment_author').text
url = cmt.find(ns['wp'] + 'comment_author_url').text
if url is None:
url = ''
date = cmt.find(ns['wp'] + 'comment_date_gmt').text
dt = datetime.strptime(date, date_fmt)
date = dt.strftime('%Y-%m-%dT%H:%M:%S.000Z')
content = cmt.find(ns['wp'] + 'comment_content').text
result.append((cid, content, name, email, url, date))
print 'wtf: ', result
return result
body = gi('content:encoded')
for key in body_replace:
# body = body.replace(key, body_replace[key])
body = re.sub(key, body_replace[key], body)
img_srcs = []
if body is not None:
try:
soup = BeautifulSoup(body)
img_tags = soup.find_all('img')
for img in img_tags:
img_srcs.append(img['src'])
except:
print 'could not parse html: ' + body
# print img_srcs
excerpt = gi('excerpt:encoded', empty=True)
export_item = {
'title': gi('title'),
'link': gi('link'),
'author': gi('dc:creator'),
'date': gi('wp:post_date_gmt'),
'slug': gi('wp:post_name'),
'status': gi('wp:status'),
'type': gi('wp:post_type'),
'wp_id': gi('wp:post_id'),
'parent': gi('wp:post_parent'),
'comments': gi('wp:comment_status') == u'open',
'taxanomies': export_taxanomies,
'body': body,
'excerpt': excerpt,
'img_srcs': img_srcs,
'meta': gimeta(gi('title'), gi('wp:post_type')),
'comments': gicomment()
}
export_items.append(export_item)
return export_items
return {
'header': parse_header(),
'items': parse_items(),
}
def write_jekyll(data, target_format):
sys.stdout.write('writing')
item_uids = {}
attachments = {}
def get_blog_path(data, path_infix='jekyll'):
name = data['header']['link']
name = re.sub('^https?', '', name)
name = re.sub('[^A-Za-z0-9_.-]', '', name)
return os.path.normpath(build_dir + '/' + path_infix + '/' + name)
blog_dir = get_blog_path(data)
def get_full_dir(dir):
full_dir = os.path.normpath(blog_dir + '/' + dir)
if (not os.path.exists(full_dir)):
os.makedirs(full_dir)
return full_dir
def open_file(file):
f = codecs.open(file, 'w', encoding='utf-8')
return f
def get_item_uid(item, date_prefix=False, namespace=''):
result = None
if namespace not in item_uids:
item_uids[namespace] = {}
if item['wp_id'] in item_uids[namespace]:
result = item_uids[namespace][item['wp_id']]
else:
uid = []
if (date_prefix):
dt = datetime.strptime(item['date'], date_fmt)
uid.append(dt.strftime('%Y-%m-%d'))
uid.append('-')
s_title = item['slug']
if s_title is None or s_title == '':
s_title = item['title']
if s_title is None or s_title == '':
s_title = 'untitled'
s_title = s_title.replace(' ', '_')
s_title = re.sub('[^a-zA-Z0-9_-]', '', s_title)
uid.append(s_title)
fn = ''.join(uid)
n = 1
while fn in item_uids[namespace]:
n = n + 1
fn = ''.join(uid) + '_' + str(n)
item_uids[namespace][i['wp_id']] = fn
result = fn
return result
def get_item_path(item, dir=''):
full_dir = get_full_dir(dir)
filename_parts = [full_dir, '/']
filename_parts.append(item['uid'])
if item['type'] == 'page':
if (not os.path.exists(''.join(filename_parts))):
os.makedirs(''.join(filename_parts))
filename_parts.append('/index')
filename_parts.append('.')
filename_parts.append('md')
return ''.join(filename_parts)
def write_comments(slug, comments):
full_dir = get_full_dir('_data/comments/' + slug)
for comment in comments:
path = full_dir + '/comment-' + comment[0] + '.yml'
yml = {'message': comment[1],
'name': comment[2],
'email': comment[3],
'url': comment[4],
'hidden': '',
'date': comment[5]
}
dat = toyaml(yml)
f = open(path, 'w')
f.write(dat.encode('UTF-8'))
f.close
def get_attachment_path(src, dir, dir_prefix='images'):
try:
files = attachments[dir]
except KeyError:
attachments[dir] = files = {}
try:
filename = files[src]
except KeyError:
file_root, file_ext = os.path.splitext(os.path.basename(
urlparse(src)[2]))
file_infix = 1
if file_root == '':
file_root = '1'
current_files = files.values()
maybe_filename = file_root + file_ext
while maybe_filename in current_files:
maybe_filename = file_root + '-' + str(file_infix) + file_ext
file_infix = file_infix + 1
files[src] = filename = maybe_filename
target_dir = os.path.normpath(blog_dir + '/' + dir_prefix + '/' + dir)
target_file = os.path.normpath(target_dir + '/' + filename)
if (not os.path.exists(target_dir)):
os.makedirs(target_dir)
# if src not in attachments[dir]:
# print target_name
return target_file
for i in data['items']:
skip_item = False
for field, value in item_field_filter.iteritems():
if(i[field] == value):
skip_item = True
break
if(skip_item):
continue
sys.stdout.write('.')
sys.stdout.flush()
out = None
yaml_header = {
'title': i['title'],
'link': i['link'],
'author': i['author'],
'date': datetime.strptime(
i['date'], '%Y-%m-%d %H:%M:%S').replace(tzinfo=UTC()),
'slug': i['slug'],
'wordpress_id': int(i['wp_id']),
'comments': i['comments'],
}
if len(i['excerpt']) > 0:
yaml_header['excerpt'] = i['excerpt']
if i['status'] != u'publish':
yaml_header['published'] = False
if i['type'] == 'post':
i['uid'] = get_item_uid(i, date_prefix=True)
fn = get_item_path(i, dir='_posts')
out = open_file(fn)
yaml_header['layout'] = 'post'
print 'NEW POST: ', i['link']
write_comments(i['slug'], i['comments'])
elif i['type'] == 'page':
i['uid'] = get_item_uid(i)
# Chase down parent path, if any
parentpath = ''
item = i
while item['parent'] != '0':
item = next((parent for parent in data['items']
if parent['wp_id'] == item['parent']), None)
if item:
parentpath = get_item_uid(item) + '/' + parentpath
else:
break
fn = get_item_path(i, parentpath)
out = open_file(fn)
yaml_header['layout'] = 'page'
elif i['type'] == 'project':
i['uid'] = get_item_uid(i)
fn = get_item_path(i, dir='_projects')
out = open_file(fn)
yaml_header['layout'] = 'project'
if 'project_url' in i['meta'] and i['meta']['project_url']:
yaml_header['site'] = i['meta']['project_url']
if 'project_version' in i['meta'] and i['meta']['project_version']:
yaml_header['version'] = i['meta']['project_version']
yaml_header['source'] = 'https://sites.google.com/a/yifanlu.com/downloads/' + i['slug'] + '_source.zip'
yaml_header['archive'] = 'https://sites.google.com/a/yifanlu.com/downloads/' + i['slug'] + '.zip'
elif i['type'] in item_type_filter:
pass
else:
print 'Unknown item type :: ' + i['type']
if download_images:
for img in i['img_srcs']:
try:
urlretrieve(urljoin(data['header']['link'],
img.encode('utf-8')),
get_attachment_path(img, i['uid']))
except:
print '\n unable to download ' + urljoin(
data['header']['link'], img.encode('utf-8'))
if out is not None:
def toyaml(data):
return yaml.safe_dump(data, allow_unicode=True,
default_flow_style=False).decode('utf-8')
tax_out = {}
for taxonomy in i['taxanomies']:
for tvalue in i['taxanomies'][taxonomy]:
t_name = taxonomy_name_mapping.get(taxonomy, taxonomy)
if t_name not in tax_out:
tax_out[t_name] = []
if tvalue in tax_out[t_name]:
continue
tax_out[t_name].append(tvalue)
out.write('---\n')
if len(yaml_header) > 0:
out.write(toyaml(yaml_header))
if len(tax_out) > 0:
out.write(toyaml(tax_out))
out.write('---\n\n')
try:
out.write(html2fmt(i['body'], target_format))
except:
print '\n Parse error on: ' + i['title']
if i['type'] == 'project':
out.write('\n### Changes\n\n')
if 'project_changes' in i['meta']:
for k, v in i['meta']['project_changes'].iteritems():
date = i['meta']['project_update_dates'][k]
date = datetime.fromtimestamp(int(date)).replace(tzinfo=UTC())
change = html2fmt(v.replace('\r\n','\n'), target_format)
out.write('* **' + date.strftime('%Y-%m-%d') + '**: ' + change)
out.write('\n### Screenshots\n\n')
if 'project_image' in i['meta']:
for k, v in i['meta']['project_image'].iteritems():
out.write('![Screen ' + str(k) + '](' + v + ')\n')
out.write('\n')
out.close()
print '\n'
wp_exports = glob(wp_exports + '/*.xml')
for wpe in wp_exports:
data = parse_wp_xml(wpe)
write_jekyll(data, target_format)
print 'done'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment