Created
August 22, 2016 06:38
-
-
Save yifanlu/e7da11d0f3d2910ce0d24fc1a5ffe6b8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import codecs | |
import os | |
import re | |
import sys | |
from datetime import datetime, timedelta, tzinfo | |
from glob import glob | |
from urllib import urlretrieve | |
from urlparse import urljoin, urlparse | |
from xml.etree.ElementTree import ElementTree, XMLTreeBuilder | |
import phpserialize | |
import hashlib | |
import yaml | |
from bs4 import BeautifulSoup | |
from html2text import html2text_file | |
''' | |
exitwp - Wordpress xml exports to Jekykll blog format conversion | |
Tested with Wordpress 3.3.1 and jekyll 0.11.2 | |
''' | |
###################################################### | |
# Configration | |
###################################################### | |
config = yaml.load(file('config.yaml', 'r')) | |
wp_exports = config['wp_exports'] | |
build_dir = config['build_dir'] | |
download_images = config['download_images'] | |
target_format = config['target_format'] | |
taxonomy_filter = set(config['taxonomies']['filter']) | |
taxonomy_entry_filter = config['taxonomies']['entry_filter'] | |
taxonomy_name_mapping = config['taxonomies']['name_mapping'] | |
item_type_filter = set(config['item_type_filter']) | |
item_field_filter = config['item_field_filter'] | |
date_fmt = config['date_format'] | |
body_replace = config['body_replace'] | |
# Time definitions | |
ZERO = timedelta(0) | |
HOUR = timedelta(hours=1) | |
# UTC support | |
class UTC(tzinfo): | |
"""UTC.""" | |
def utcoffset(self, dt): | |
return ZERO | |
def tzname(self, dt): | |
return 'UTC' | |
def dst(self, dt): | |
return ZERO | |
class ns_tracker_tree_builder(XMLTreeBuilder): | |
def __init__(self): | |
XMLTreeBuilder.__init__(self) | |
self._parser.StartNamespaceDeclHandler = self._start_ns | |
self.namespaces = {} | |
def _start_ns(self, prefix, ns): | |
self.namespaces[prefix] = '{' + ns + '}' | |
def html2fmt(html, target_format): | |
# html = html.replace("\n\n", '<br/><br/>') | |
# html = html.replace('<pre lang="xml">', '<pre lang="xml"><![CDATA[') | |
# html = html.replace('</pre>', ']]></pre>') | |
if target_format == 'html': | |
return html | |
else: | |
return html2text_file(html, None) | |
def parse_wp_xml(file): | |
parser = ns_tracker_tree_builder() | |
tree = ElementTree() | |
print 'reading: ' + wpe | |
root = tree.parse(file, parser) | |
ns = parser.namespaces | |
ns[''] = '' | |
c = root.find('channel') | |
def parse_header(): | |
return { | |
'title': unicode(c.find('title').text), | |
'link': unicode(c.find('link').text), | |
'description': unicode(c.find('description').text) | |
} | |
def parse_items(): | |
export_items = [] | |
xml_items = c.findall('item') | |
for i in xml_items: | |
taxanomies = i.findall('category') | |
export_taxanomies = {} | |
for tax in taxanomies: | |
if 'domain' not in tax.attrib: | |
continue | |
t_domain = unicode(tax.attrib['domain']) | |
t_entry = unicode(tax.text) | |
if (not (t_domain in taxonomy_filter) and | |
not (t_domain | |
in taxonomy_entry_filter and | |
taxonomy_entry_filter[t_domain] == t_entry)): | |
if t_domain not in export_taxanomies: | |
export_taxanomies[t_domain] = [] | |
export_taxanomies[t_domain].append(t_entry) | |
def gi(q, unicode_wrap=True, empty=False): | |
namespace = '' | |
tag = '' | |
if q.find(':') > 0: | |
namespace, tag = q.split(':', 1) | |
else: | |
tag = q | |
try: | |
result = i.find(ns[namespace] + tag).text | |
print result.encode('utf-8') | |
except AttributeError: | |
result = 'No Content Found' | |
if empty: | |
result = '' | |
if unicode_wrap: | |
result = unicode(result) | |
return result | |
def gimeta(title, type): | |
namespace = '' | |
tag = '' | |
result = {} | |
if type == 'feedback': | |
return result | |
try: | |
metadatas = i.findall(ns['wp'] + 'postmeta') | |
for md in metadatas: | |
key = md.find(ns['wp'] + 'meta_key').text | |
val = md.find(ns['wp'] + 'meta_value') | |
if val is not None: | |
val = val.text | |
else: | |
continue | |
try: | |
val_unserialized = phpserialize.loads(val) | |
val = val_unserialized | |
except ValueError as e: | |
asdf = md.find(ns['wp'] + 'meta_value').text | |
if str(e) == 'unexpected opcode': | |
pass | |
elif asdf is not None: | |
val = asdf.replace('\n','\r\n') | |
try: | |
val_unserialized = phpserialize.loads(val) | |
val = val_unserialized | |
except ValueError as e2: | |
pass | |
result[key] = val | |
except AttributeError as ex: | |
print 'AttributeError: ', ex | |
pass | |
return result | |
def gicomment(): | |
namespace = '' | |
tag = '' | |
result = [] | |
comments = i.findall(ns['wp'] + 'comment') | |
for cmt in comments: | |
cid = cmt.find(ns['wp'] + 'comment_id').text | |
email = cmt.find(ns['wp'] + 'comment_author_email').text | |
if email is None: | |
email = '' | |
email = hashlib.md5(email.strip().lower()).hexdigest() | |
name = cmt.find(ns['wp'] + 'comment_author').text | |
url = cmt.find(ns['wp'] + 'comment_author_url').text | |
if url is None: | |
url = '' | |
date = cmt.find(ns['wp'] + 'comment_date_gmt').text | |
dt = datetime.strptime(date, date_fmt) | |
date = dt.strftime('%Y-%m-%dT%H:%M:%S.000Z') | |
content = cmt.find(ns['wp'] + 'comment_content').text | |
result.append((cid, content, name, email, url, date)) | |
print 'wtf: ', result | |
return result | |
body = gi('content:encoded') | |
for key in body_replace: | |
# body = body.replace(key, body_replace[key]) | |
body = re.sub(key, body_replace[key], body) | |
img_srcs = [] | |
if body is not None: | |
try: | |
soup = BeautifulSoup(body) | |
img_tags = soup.find_all('img') | |
for img in img_tags: | |
img_srcs.append(img['src']) | |
except: | |
print 'could not parse html: ' + body | |
# print img_srcs | |
excerpt = gi('excerpt:encoded', empty=True) | |
export_item = { | |
'title': gi('title'), | |
'link': gi('link'), | |
'author': gi('dc:creator'), | |
'date': gi('wp:post_date_gmt'), | |
'slug': gi('wp:post_name'), | |
'status': gi('wp:status'), | |
'type': gi('wp:post_type'), | |
'wp_id': gi('wp:post_id'), | |
'parent': gi('wp:post_parent'), | |
'comments': gi('wp:comment_status') == u'open', | |
'taxanomies': export_taxanomies, | |
'body': body, | |
'excerpt': excerpt, | |
'img_srcs': img_srcs, | |
'meta': gimeta(gi('title'), gi('wp:post_type')), | |
'comments': gicomment() | |
} | |
export_items.append(export_item) | |
return export_items | |
return { | |
'header': parse_header(), | |
'items': parse_items(), | |
} | |
def write_jekyll(data, target_format): | |
sys.stdout.write('writing') | |
item_uids = {} | |
attachments = {} | |
def get_blog_path(data, path_infix='jekyll'): | |
name = data['header']['link'] | |
name = re.sub('^https?', '', name) | |
name = re.sub('[^A-Za-z0-9_.-]', '', name) | |
return os.path.normpath(build_dir + '/' + path_infix + '/' + name) | |
blog_dir = get_blog_path(data) | |
def get_full_dir(dir): | |
full_dir = os.path.normpath(blog_dir + '/' + dir) | |
if (not os.path.exists(full_dir)): | |
os.makedirs(full_dir) | |
return full_dir | |
def open_file(file): | |
f = codecs.open(file, 'w', encoding='utf-8') | |
return f | |
def get_item_uid(item, date_prefix=False, namespace=''): | |
result = None | |
if namespace not in item_uids: | |
item_uids[namespace] = {} | |
if item['wp_id'] in item_uids[namespace]: | |
result = item_uids[namespace][item['wp_id']] | |
else: | |
uid = [] | |
if (date_prefix): | |
dt = datetime.strptime(item['date'], date_fmt) | |
uid.append(dt.strftime('%Y-%m-%d')) | |
uid.append('-') | |
s_title = item['slug'] | |
if s_title is None or s_title == '': | |
s_title = item['title'] | |
if s_title is None or s_title == '': | |
s_title = 'untitled' | |
s_title = s_title.replace(' ', '_') | |
s_title = re.sub('[^a-zA-Z0-9_-]', '', s_title) | |
uid.append(s_title) | |
fn = ''.join(uid) | |
n = 1 | |
while fn in item_uids[namespace]: | |
n = n + 1 | |
fn = ''.join(uid) + '_' + str(n) | |
item_uids[namespace][i['wp_id']] = fn | |
result = fn | |
return result | |
def get_item_path(item, dir=''): | |
full_dir = get_full_dir(dir) | |
filename_parts = [full_dir, '/'] | |
filename_parts.append(item['uid']) | |
if item['type'] == 'page': | |
if (not os.path.exists(''.join(filename_parts))): | |
os.makedirs(''.join(filename_parts)) | |
filename_parts.append('/index') | |
filename_parts.append('.') | |
filename_parts.append('md') | |
return ''.join(filename_parts) | |
def write_comments(slug, comments): | |
full_dir = get_full_dir('_data/comments/' + slug) | |
for comment in comments: | |
path = full_dir + '/comment-' + comment[0] + '.yml' | |
yml = {'message': comment[1], | |
'name': comment[2], | |
'email': comment[3], | |
'url': comment[4], | |
'hidden': '', | |
'date': comment[5] | |
} | |
dat = toyaml(yml) | |
f = open(path, 'w') | |
f.write(dat.encode('UTF-8')) | |
f.close | |
def get_attachment_path(src, dir, dir_prefix='images'): | |
try: | |
files = attachments[dir] | |
except KeyError: | |
attachments[dir] = files = {} | |
try: | |
filename = files[src] | |
except KeyError: | |
file_root, file_ext = os.path.splitext(os.path.basename( | |
urlparse(src)[2])) | |
file_infix = 1 | |
if file_root == '': | |
file_root = '1' | |
current_files = files.values() | |
maybe_filename = file_root + file_ext | |
while maybe_filename in current_files: | |
maybe_filename = file_root + '-' + str(file_infix) + file_ext | |
file_infix = file_infix + 1 | |
files[src] = filename = maybe_filename | |
target_dir = os.path.normpath(blog_dir + '/' + dir_prefix + '/' + dir) | |
target_file = os.path.normpath(target_dir + '/' + filename) | |
if (not os.path.exists(target_dir)): | |
os.makedirs(target_dir) | |
# if src not in attachments[dir]: | |
# print target_name | |
return target_file | |
for i in data['items']: | |
skip_item = False | |
for field, value in item_field_filter.iteritems(): | |
if(i[field] == value): | |
skip_item = True | |
break | |
if(skip_item): | |
continue | |
sys.stdout.write('.') | |
sys.stdout.flush() | |
out = None | |
yaml_header = { | |
'title': i['title'], | |
'link': i['link'], | |
'author': i['author'], | |
'date': datetime.strptime( | |
i['date'], '%Y-%m-%d %H:%M:%S').replace(tzinfo=UTC()), | |
'slug': i['slug'], | |
'wordpress_id': int(i['wp_id']), | |
'comments': i['comments'], | |
} | |
if len(i['excerpt']) > 0: | |
yaml_header['excerpt'] = i['excerpt'] | |
if i['status'] != u'publish': | |
yaml_header['published'] = False | |
if i['type'] == 'post': | |
i['uid'] = get_item_uid(i, date_prefix=True) | |
fn = get_item_path(i, dir='_posts') | |
out = open_file(fn) | |
yaml_header['layout'] = 'post' | |
print 'NEW POST: ', i['link'] | |
write_comments(i['slug'], i['comments']) | |
elif i['type'] == 'page': | |
i['uid'] = get_item_uid(i) | |
# Chase down parent path, if any | |
parentpath = '' | |
item = i | |
while item['parent'] != '0': | |
item = next((parent for parent in data['items'] | |
if parent['wp_id'] == item['parent']), None) | |
if item: | |
parentpath = get_item_uid(item) + '/' + parentpath | |
else: | |
break | |
fn = get_item_path(i, parentpath) | |
out = open_file(fn) | |
yaml_header['layout'] = 'page' | |
elif i['type'] == 'project': | |
i['uid'] = get_item_uid(i) | |
fn = get_item_path(i, dir='_projects') | |
out = open_file(fn) | |
yaml_header['layout'] = 'project' | |
if 'project_url' in i['meta'] and i['meta']['project_url']: | |
yaml_header['site'] = i['meta']['project_url'] | |
if 'project_version' in i['meta'] and i['meta']['project_version']: | |
yaml_header['version'] = i['meta']['project_version'] | |
yaml_header['source'] = 'https://sites.google.com/a/yifanlu.com/downloads/' + i['slug'] + '_source.zip' | |
yaml_header['archive'] = 'https://sites.google.com/a/yifanlu.com/downloads/' + i['slug'] + '.zip' | |
elif i['type'] in item_type_filter: | |
pass | |
else: | |
print 'Unknown item type :: ' + i['type'] | |
if download_images: | |
for img in i['img_srcs']: | |
try: | |
urlretrieve(urljoin(data['header']['link'], | |
img.encode('utf-8')), | |
get_attachment_path(img, i['uid'])) | |
except: | |
print '\n unable to download ' + urljoin( | |
data['header']['link'], img.encode('utf-8')) | |
if out is not None: | |
def toyaml(data): | |
return yaml.safe_dump(data, allow_unicode=True, | |
default_flow_style=False).decode('utf-8') | |
tax_out = {} | |
for taxonomy in i['taxanomies']: | |
for tvalue in i['taxanomies'][taxonomy]: | |
t_name = taxonomy_name_mapping.get(taxonomy, taxonomy) | |
if t_name not in tax_out: | |
tax_out[t_name] = [] | |
if tvalue in tax_out[t_name]: | |
continue | |
tax_out[t_name].append(tvalue) | |
out.write('---\n') | |
if len(yaml_header) > 0: | |
out.write(toyaml(yaml_header)) | |
if len(tax_out) > 0: | |
out.write(toyaml(tax_out)) | |
out.write('---\n\n') | |
try: | |
out.write(html2fmt(i['body'], target_format)) | |
except: | |
print '\n Parse error on: ' + i['title'] | |
if i['type'] == 'project': | |
out.write('\n### Changes\n\n') | |
if 'project_changes' in i['meta']: | |
for k, v in i['meta']['project_changes'].iteritems(): | |
date = i['meta']['project_update_dates'][k] | |
date = datetime.fromtimestamp(int(date)).replace(tzinfo=UTC()) | |
change = html2fmt(v.replace('\r\n','\n'), target_format) | |
out.write('* **' + date.strftime('%Y-%m-%d') + '**: ' + change) | |
out.write('\n### Screenshots\n\n') | |
if 'project_image' in i['meta']: | |
for k, v in i['meta']['project_image'].iteritems(): | |
out.write('![Screen ' + str(k) + '](' + v + ')\n') | |
out.write('\n') | |
out.close() | |
print '\n' | |
wp_exports = glob(wp_exports + '/*.xml') | |
for wpe in wp_exports: | |
data = parse_wp_xml(wpe) | |
write_jekyll(data, target_format) | |
print 'done' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment