-
-
Save twig/0fdbb2174cb820e01d6c to your computer and use it in GitHub Desktop.
Disqus / Blogger / Django import script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import arrow | |
from bs4 import BeautifulSoup | |
from disqus.wxr_feed import ContribCommentsWxrFeed | |
from HTMLParser import HTMLParser | |
from twigcorp.utils import Url | |
class BloggerCommentsWxrFeed(ContribCommentsWxrFeed): | |
""" | |
Blogger comments in WXR format for Disqus. | |
""" | |
def get_object(self, request, requested_file): | |
self.posts_by_url = {} | |
self.comment_ids = [] | |
self.soup = BeautifulSoup(requested_file, "html5lib", from_encoding="utf-8") | |
self.site_url = self.soup.find('link', rel='alternate').get('href') | |
""" | |
posts_by_url[url] = { id, title, pubdate, guid, comments } | |
comment = { id, user_id, username, user_email, user_url, ip, submit_date, comment, approved, parent_id } | |
""" | |
return self.posts_by_url | |
def items(self, posts_by_url_arg): | |
parser = HTMLParser() | |
for category in self.soup.find_all('category', term="http://schemas.google.com/blogger/2008/kind#post"): | |
post = None | |
for el in category.parents: | |
if el.name == 'entry': | |
post = el | |
break | |
if post is None: | |
print "Unable to find entry" | |
continue | |
link_element = post.find('link', rel="alternate") | |
if link_element is None: | |
continue | |
post_id = post.find('id').text | |
url = link_element.get('href') | |
self.posts_by_url[url] = { | |
'id': post_id.split('-')[-1], | |
'url': url, | |
'title': post.find('title').text, | |
'pubdate': post.find('published').text, | |
'guid': post_id, | |
'comments': [] | |
} | |
for category in self.soup.find_all('category', term="http://schemas.google.com/blogger/2008/kind#comment"): | |
comment = category.parent | |
comment_id = comment.find('id').text.split('-')[-1] | |
reply_to = comment.find('thr:in-reply-to') | |
url = reply_to.get('href') | |
author = comment.find('author') | |
author_uri_el = author.find('uri') | |
parent_comment = comment.find('link', rel='related', type="application/atom+xml") | |
comment_text = comment.find('content').text.strip() | |
comment_text = parser.unescape(comment_text) | |
for br in [ '<br />', '<BR />', '<br/>', '<BR/>' ]: | |
comment_text = comment_text.replace(br, '\n') | |
# { id, user_id, username, user_email, user_url, ip, submit_date, comment, approved, parent_id } | |
self.posts_by_url[url]['comments'].append({ | |
'id': comment_id, | |
'user_id': 0, | |
'user_name': author.find('name').text, | |
'user_email': author.find('email').text.lower(), | |
'user_url': author_uri_el.text if author_uri_el is not None else '', | |
'ip': '', | |
'submit_date': comment.find('published').text, | |
'comment': comment_text, | |
'approved': True, | |
'parent_id': parent_comment.get('href').split('/')[-1] if parent_comment is not None else '', | |
}) | |
self.comment_ids.append(comment_id) | |
return self.posts_by_url.values() | |
def item_title(self, item): | |
# print "*** title", item['title'] | |
return item['title'] | |
def item_description(self, item): | |
# return item['title'] | |
return u'' | |
def item_pubdate(self, item): | |
return arrow.get(item['pubdate']).datetime | |
def item_guid(self, item): | |
""" | |
The path to the comments thread. | |
""" | |
# The Disqus blogger plugin uses the Post ID as the Disqus identifier. | |
return item['id'] | |
def item_comment_status(self, item): | |
# Can people comment on this item? One of either 'open' or 'closed'. | |
return "open" | |
def item_link(self, item): | |
return item['url'] | |
def item_comment_status(self, item): | |
return "open" | |
def item_comments(self, item): | |
return item['comments'] | |
# Comment information | |
def comment_id(self, comment): | |
# print "comment", comment['id'] | |
return comment['id'] | |
def comment_user_id(self, comment): | |
return comment['user_id'] | |
def comment_user_name(self, comment): | |
return comment['user_name'] | |
def comment_user_email(self, comment): | |
return comment['user_email'] if comment['user_email'] != 'noreply@blogger.com' else '' | |
def comment_user_url(self, comment): | |
# Comment author's homepage URL - we use this as a honeypot | |
return comment['user_url'] | |
def comment_ip_address(self, comment): | |
return comment['ip'] | |
def comment_submit_date(self, comment): | |
return arrow.get(comment['submit_date']).datetime | |
def comment_comment(self, comment): | |
return comment['comment'] | |
def comment_is_approved(self, comment): | |
return '1' if comment['approved'] else '0' | |
def comment_parent(self, comment): | |
# Should match comment_id() | |
if comment['parent_id'] and comment['parent_id'] in self.comment_ids: | |
return comment['parent_id'] | |
return 0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment