Skip to content

Instantly share code, notes, and snippets.

@dertom95
Last active September 18, 2020 11:55
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save dertom95/e51ae81991ef0137ef1d52722bd3a257 to your computer and use it in GitHub Desktop.
pebble-blog rss-format to ghost-cms-json-format converter
# pebble-rss to ghost-json converter
# 2020 by thomas trocha
# license: public domain
# use at own risk, make sure you have the right to download files. if not, don't do it
#
# usage: place blog-rss.xml in this script's folder
# modify vars below:
# input_name = name of the pebble-rss.xml-filename
# output_name= converted ghost-cms.json-filename
# download_images= try to download images
# image_folder=relative folder where to put images
#
# more info: https://thomas.trocha.com/blog/migrating-blog-pebble-ghost-cms/
import xml.etree.ElementTree as ET
from datetime import datetime
import time
import json
import re
from pathlib import Path
import requests,os
input_name = 'pebble_blog_rss_orig.xml'
output_name = 'blog.json'
download_images = True
image_folder = 'images'
if download_images:
Path(image_folder).mkdir(parents=True, exist_ok=True)
now = datetime.now()
namespaces = {'dc':'http://purl.org/dc/elements/1.1/','content':'http://purl.org/rss/1.0/modules/content/'}
def create_epoch_time(rss_date):
# rss-dates: 2020-08-02T17:08:00Z
date_time = '29.08.2011 11:05:02'
pattern = '%Y-%m-%dT%H:%M:%SZ'
parse_time = time.strptime(rss_date, pattern)
epoch = int(time.mktime(parse_time))*1000
return epoch
downloaded_images = {}
image_counter = 0
def image_downloader(text):
global downloaded_images,image_counter
match = re.findall("\\<img src=\"(.*?)\"",text,re.DOTALL)
if match:
for image in match:
if image not in downloaded_images:
image_counter += 1
output_extension = None
if ".jpg" in image:
output_extension = ".jpg"
elif ".png" in image:
output_extension = ".png"
elif ".gif" in image:
output_extension = ".gif"
else:
print ("Unknown image type: %s! ignoring" % image)
continue
output_file = "%s/image_%s%s" % ( image_folder,image_counter,output_extension )
if os.path.exists(output_file):
# file is already downloaded
downloaded_images[image]=output_file
print("FOUND IN FS IMAGE:%s => %s" % (image,output_file))
else:
# download file
print("DOWNLOAD IMAGE:%s => %s" % (image,output_file))
try:
r = requests.get(image,allow_redirects=True)
open(output_file,'wb').write(r.content)
downloaded_images[image]=output_file
except:
print("Could not load image %s" % image)
continue
text = text.replace(image,"/content/images/%s" % downloaded_images[image])
return text
tree = ET.parse(input_name)
root = tree.getroot()
result = None
posts = None
tags = None
tag_map = None
posts_tags = None
users = None
item_id = 0
tag_ids = 0
def add_tag(tag,post_id):
global tags,tag_map,tag_ids
tag_id = 0
if tag in tag_map:
tag_id = tag_map[tag]
else:
tag_ids += 1
tag_id = tag_ids
tag_map[tag]=tag_id
tags.append({
"id" : tag_id,
"name" : tag,
"slug" : tag.replace(" ","-"),
"description" : ""
})
posts_tags.append({
"tag_id" : tag_id,
"post_id" : post_id
})
def parse_rss(root):
for child in root:
print("%s : %s" % (child.tag,child.attrib))
rss_channel(child)
def rss_channel(channel):
global item_id,posts,tags,posts_tags,users,tag_map
blog_date = channel.find("dc:date",namespaces).text
blog_date_epoch = create_epoch_time(blog_date)
result = {
"meta" : {
"exported_on" : blog_date_epoch,
"version" : "3.27.0"
}
}
item_id = 1
posts = []
tags = []
tag_map={}
posts_tags = []
users = [
{
"id": 1,
"name": "dertom95",
"slug": "dertom",
"email": "thomas.trocha@gmail.com",
"profile_image": None,
"cover_image": None,
"bio": None,
"website": None,
"location": None,
"accessibility": None,
"meta_title": None,
"meta_description": None,
"created_at": 1283780649000,
"created_by": 1,
"updated_at": 1286958624000,
"updated_by": 1
}
]
result["data"] = {
"posts" : posts,
"tags" : tags,
"posts_tags" : posts_tags,
"users" : users
}
for item in channel.findall("item"):
rss_item(item)
text_file = open("blog.json","w")
text_file.write(json.dumps(result, sort_keys=True, indent=4))
text_file.close()
def rss_item(item):
global item_id,posts,tags,posts_tags,users,download_images
content = item.find("content:encoded",namespaces).text
if download_images:
# download images <img src=".."> and replace the images
content = image_downloader(content)
mobiledoc = {
"version": "0.3.1",
"markups": [],
"atoms": [],
"cards": [['html', {"cardName": 'html', "html": '%s' % content}]],
"sections": [[10,0]]
}
post = {
"id" : item_id,
"title" : item.find("title").text,
"slug" : item.find("title").text.replace(" ","-"),
"status" : "published",
"published_by": 1,
"author_id" : 1,
"published_at": create_epoch_time(item.find("dc:date",namespaces).text),
"created_at": create_epoch_time(item.find("dc:date",namespaces).text),
"created_by": 1,
"mobiledoc" : json.dumps(mobiledoc)
}
posts.append(post)
categories = item.findall("category");
if categories:
for cat in categories:
tag = cat.text
if tag:
print("TAG:%s item_id:%s" % (tag,item_id))
add_tag(tag,item_id)
item_id += 1
parse_rss(root)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment