Skip to content

Instantly share code, notes, and snippets.

@larsimmisch
Created December 2, 2015 21:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save larsimmisch/397c8abaa36e390a87f2 to your computer and use it in GitHub Desktop.
Save larsimmisch/397c8abaa36e390a87f2 to your computer and use it in GitHub Desktop.
Python script to merge comments from one wordpress instance to another
#!/usr/bin/env python
# I recently had to merge the comments from an old instance of wordpress to
# a new one (long and boring story).
# The posts had already been manually merged, but not the comments.
# So I wrote this script to merge the comments
# Warning: while this has worked (once) this may not be a good way to do it.
# Also, it is slow, so, for repeated operation, it creates a stats.json so it doesn't
# have to examine existing (or spam) comments twice. Make sure to remove it
from __future__ import print_function
import sqlalchemy as sa
from sqlalchemy import orm
import json
DB_SRC = "mysql://user:source@127.0.0.1:3307/wordpress"
DB_DEST = "mysql://user:dest@127.0.0.1/wordpress"
# from http://beerpla.net/2010/03/21/how-to-diagnose-and-fix-incorrect-post-comment-counts-in-wordpress/
fix_post_comment_count = """
UPDATE wp_posts wpp
LEFT JOIN
(SELECT comment_post_id AS c_post_id, count(*) AS cnt FROM wp_comments
WHERE comment_approved = 1 GROUP BY comment_post_id) wpc
ON wpp.id=wpc.c_post_id
SET wpp.comment_count=wpc.cnt
WHERE wpp.post_type IN ('post', 'page')
AND (wpp.comment_count!=wpc.cnt OR (wpp.comment_count != 0 AND wpc.cnt IS NULL));
"""
class WPDB:
def __init__(self, url):
self.engine = sa.create_engine(url)
self.meta = sa.schema.MetaData()
self.meta.reflect(bind=self.engine)
self.wp_comments = self.meta.tables['wp_comments']
self.wp_posts = self.meta.tables['wp_posts']
self.session = orm.sessionmaker(bind=self.engine)()
def merge_comments():
global existing_comments
existing_comments = []
found = 0
count = 0
source = WPDB(DB_SRC)
dest = WPDB(DB_DEST)
with open('stats.json', 'r') as f:
existing_comments = json.load(f).get('existing_comments', [])
for c_src in source.session.query(source.wp_comments):
if c_src.comment_ID in existing_comments:
continue
# ignore pingbacks and spam
if c_src.comment_type == 'pingback' or\
c_src.comment_approved == 'spam':
existing_comments.append(c_src.comment_ID)
continue
dest_exists = False
count = count + 1
p_src = source.session.query(source.wp_posts).\
filter(source.wp_posts.c.ID == c_src.comment_post_ID).\
first()
# print(c_src.comment_author_email, c_src.comment_content,
# p_src.post_name)
c_dest = dest.session.query(dest.wp_comments).\
filter(dest.wp_comments.c.comment_author_email
== c_src.comment_author_email).\
filter(dest.wp_comments.c.comment_author
== c_src.comment_author).\
filter(dest.wp_comments.c.comment_author_url
== c_src.comment_author_url).\
filter(dest.wp_comments.c.comment_content
== c_src.comment_content).\
first()
if c_dest:
# sanity check that the post name is identical - we can't
# go by post IDs
p_dest = dest.session.query(dest.wp_posts).\
filter(dest.wp_posts.c.ID == c_dest.comment_post_ID).\
first()
if p_src.post_name == p_dest.post_name:
dest_exists = True
existing_comments.append(c_src.comment_ID)
found = found + 1
print("comment %d exists in dest" % c_dest.comment_ID)
if not dest_exists:
p_dest_all = dest.session.query(dest.wp_posts).\
filter(dest.wp_posts.c.post_name == p_src.post_name).\
all()
if len(p_dest_all) == 0:
print("error: %d no post with title %s" %
(c_src.comment_ID, p_src.post_name))
elif len(p_dest_all) > 1:
print("error: comment %d more than one post (%d) with name %s" %
(c_src.comment_ID, len(p_dest_all), p_src.post_name))
else:
print("adding comment %s" % str(c_src))
c_new = dest.wp_comments.insert(c_src)
print("Adding to post %s: %s" % (c_src.post_name, str(c_new)))
dest.engine.execute(c_new, comment_ID = None,
comment_post_ID = p_dest_all[0].ID)
print("found %d of %d comments" % (found, count))
print("fixing post comment count")
dest.engine.execute(fix_post_comment_count)
return existing_comments
if __name__ == '__main__':
try:
merge_comments()
finally:
with open('stats.json', 'w') as f:
json.dump({ 'existing_comments': existing_comments }, f)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment