Skip to content

Instantly share code, notes, and snippets.

@98hira
Last active January 9, 2019 08:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save 98hira/ce8e02b0972082fd144be7d007dd6950 to your computer and use it in GitHub Desktop.
Save 98hira/ce8e02b0972082fd144be7d007dd6950 to your computer and use it in GitHub Desktop.
いずれGithubで管理する予定だが、ひとまずGistへ
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import csv
from bs4 import BeautifulSoup
import urllib.parse
from datetime import datetime
# 環境設定
ENVIRONMENT_PATH = "/xxx/"
HATENA_USERNAME = "xxx"
HATENA_API_KEY = "xxx"
#google-code-prettify用の設定
#Trueにすると変換後のHTMLファイルの末尾へ、
#ライブラリ読み込みとデザイン設定を追加する。
CONF_ADD_STYLE = False
#--------------------------------------------------------
#hatenaAPI.py
#↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
from datetime import datetime
import base64
import json
import requests
import sys
import random
import hashlib
from lxml import etree
class HatenaClient:
"""
Hatena Python Client
see http://developer.hatena.ne.jp/
"""
def __init__(self, username: str, api_key: str)->None:
self.username = username
self.api_key = api_key
@staticmethod
def _create_body(title: str, body: str, draft: bool=True)-> str:
"""
see http://developer.hatena.ne.jp/ja/documents/blog/apis/atom
"""
post_date = datetime.today().strftime("%Y-%m-%d")
data = """<?xml version="1.0" encoding="utf-8"?>
<entry xmlns="http://www.w3.org/2005/Atom"
xmlns:app="http://www.w3.org/2007/app">
<title>{title}</title>
<author><name>name</name></author>
<content type="text/plain">{body}</content>
<updated>{day}T00:00:00</updated>
<app:control>
<app:draft>{draft}</app:draft>
</app:control>
</entry>
""".format(title=title, body=body, day=post_date, draft="yes" if draft else "no").encode()
return data
def _wsse(self) -> str:
"""
see http://developer.hatena.ne.jp/ja/documents/auth/apis/wsse
"""
time = datetime.now().isoformat() + "Z"
b_nonce = hashlib.sha1(str(random.random()).encode()).digest()
b_digest = hashlib.sha1(b_nonce + time.encode() +
self.api_key.encode()).digest()
c = "UsernameToken Username=\"{username}\", PasswordDigest=\"{passwd}\", Nonce=\"{nonce}\", Created=\"{created}\""
return c.format(username=self.username,
passwd=base64.b64encode(b_digest).decode(),
nonce=base64.b64encode(b_nonce).decode(), created=time)
def post_blog(self, blogname: str, title: str, body: str)-> None:
data = self._create_body(title, body)
headers = {"X-WSSE": self._wsse()}
url = "http://blog.hatena.ne.jp/{user}/{blog}/atom/entry".format(
user=self.username, blog=blogname)
r = requests.post(url, data=data, headers=headers)
if r.status_code != 201:
sys.stderr.write("error")
raise RuntimeError("faild", r.text)
def post_image(self, image_path: str, image_title="")-> str:
"""
post jpeg photo to hatena photo life
"""
#Check file extension
image_type = image_path.split(".")[-1].lower()
suport_type = ["gif","png","jpg","jpeg",]
if image_type not in suport_type:
return "image type Not Suport"
with open(image_path, "rb") as image_file:
image_content = base64.b64encode(image_file.read()).decode("utf-8")
template = """
<entry xmlns="http://purl.org/atom/ns#">
<title>{title}</title>
<content mode="base64" type="image/{type}">
{content}
</content>
</entry>
""".format(title=image_title, content=image_content, type=image_type)
url = "http://f.hatena.ne.jp/atom/post"
headers = {"X-WSSE": self._wsse()}
r = requests.post(url, data=template, headers=headers)
if r.status_code != 201:
sys.stderr.write("error")
raise RuntimeError("faild")
hatenafoto_rss_ns = {"rss": "http://purl.org/rss/1.0/",
"rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
"dc": "http://purl.org/dc/elements/1.1/",
"content": "http://purl.org/rss/1.0/modules/content/",
"hatena": "http://www.hatena.ne.jp/info/xmlns#",
"taxo": "http://purl.org/rss/1.0/modules/taxonomy/",
"openSearch": "http://a9.com/-/spec/opensearchrss/1.0/"
}
root = etree.fromstring(r.text.encode("utf-8"))
return str(root.xpath("//hatena:imageurl/text()", namespaces=hatenafoto_rss_ns)[0])
def put_image(self, image_title):
template = """
<entry xmlns="http://purl.org/atom/ns#">
<title>{title}</title>
</entry>
""".format(title=image_title)
url = "http://f.hatena.ne.jp/atom/edit/{url}".format(url=image_url)
headers = {"X-WSSE": self._wsse()}
r = requests.put(url, data=template, headers=headers)
if r.status_code != 200:
sys.stderr.write("error")
raise RuntimeError("faild")
return
#↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑
class ImageDB:
LOCAL_PATH = 0
TIME_STAMP = 1
CLOUD_PATH = 2
def __init__(self):
self.delete_list = ENVIRONMENT_PATH + "delete_list.csv"
self.db_file = ENVIRONMENT_PATH + "image_db.csv"
self.image_record = []
self._read()
def _read(self):
"""
ファイル読み出し
"""
if(os.path.isfile(self.db_file) == True):
with open(self.db_file, "r+") as fd:
if(os.path.getsize(self.db_file) != 0):
temp_reader = csv.reader(fd)
self.image_record = list(temp_reader)
else:
#ファイルの新規作成
open(self.db_file, "w")
def _save(self):
"""
ファイル保存
"""
with open(self.db_file, "w") as fd:
output_writer = csv.writer(fd)
for record in self.image_record:
output_writer.writerow(record)
def search(self, local_path):
ret = -1
for id, record in enumerate(self.image_record):
if record[0] == local_path:
ret = id
break
return ret
def record_get(self, record_id):
return self.image_record[record_id]
def record_add(self, local_path, time_stamp, cloud_path):
self.image_record.append([local_path, time_stamp, cloud_path])
self._save()
def record_update(self, record_id, temp_record):
self.image_record[record_id] = temp_record
self._save()
def plan_to_delete(self, cloud_path):
open(self.delete_list, "a").write(cloud_path + "\n")
class CloudOperation:
def __init__(self):
self.cli = HatenaClient(HATENA_USERNAME, HATENA_API_KEY)
def image_upload(self, local_path):
return self.cli.post_image(local_path)
# return "https://cdn-ak.f.st-hatena.com/images/fotolife/h/hira98/20181220/20181220141216.png"
def image_upload(local_path, time_stamp):
cloud_path = ""
record_id = idb.search(local_path)
if record_id < 0: #新規追加
#クラウドへアップロード
cloud_path = cop.image_upload(local_path)
idb.record_add(local_path, time_stamp, cloud_path)
else: #既に追加済みのファイルを編集
# レコードを取得
temp_record = idb.record_get(record_id)
# 時間を比較できる形式に変換する。
local_time = datetime.strptime(time_stamp, "%Y/%m/%d_%H:%M:%S")
cloud_time = datetime.strptime(temp_record[idb.TIME_STAMP], "%Y/%m/%d_%H:%M:%S")
print(f"local:{local_time} cloud:{cloud_time}")
# 時間を比較する
if cloud_time == local_time: #ローカルとクラウドのデータが同じ場合
cloud_path = temp_record[idb.CLOUD_PATH]
elif cloud_time < local_time: #ローカルのデータが最新の場合
#クラウドのデータを削除フォルダに移動する。
print(f"test::{temp_record[idb.CLOUD_PATH]}")
idb.plan_to_delete(temp_record[idb.CLOUD_PATH])
#ローカルのデータをクラウドへアップロードする
cloud_path = cop.image_upload(local_path)
temp_record[idb.TIME_STAMP] = time_stamp
temp_record[idb.CLOUD_PATH] = cloud_path
else: #クラウドのデータが最新の場合
#このパーターンになる時は、
#image_infoが意図せず改変されている可能性がある。
pass
#更新した情報をimage_infoへ反映させる。
idb.record_update(record_id, temp_record)
return cloud_path
def src_parse(path):
#"/"が全部除去されてしまうので、頭に"/"を追加。
path = "/" + path.lstrip("file:/")
#パスに全角文字を含む場合はURLエンコードされているためデコードする
path = urllib.parse.unquote(path)
file_name = path.split("/")[-1]
file_type = file_name.split(".")[-1]
alt_name = file_name.split(".")[0]
# file_mtime = os.path.getmtime("/"+path)
dt = datetime.fromtimestamp(os.path.getmtime("/"+path))
file_mtime = dt.strftime("%Y/%m/%d_%H:%M:%S")
# print(f"path:{path}")
print(f"name:{file_name}")
# print(f"ftype:{file_type}")
print(f"file_mtime:{file_mtime}")
# print(f"alt_name:{alt_name}")
return path, file_mtime, alt_name
def html_parse(file_path):
if os.path.isfile(file_path) == False:
#HTMLファイルが見つからない。
return
html = open(file_path)
soup = BeautifulSoup(html, "html.parser")
# はてなブログへ画像ファイルをアップロード
for img in soup.find_all("img"):
#不要な属性を削除
del img["title"]
del img["referrerpolicy"]
local_path, time_stamp, alt_name = src_parse(img["src"])
cloud_path = image_upload(local_path, time_stamp)
if cloud_path != "":
img["src"] = cloud_path
img["alt"] = alt_name
img["class"] = "hatena-fotolife"
# google-code-prettify対応
add_class = "prettyprint linenums"
for code in soup.find_all("code"):
#codeタグを囲んでいるpreタグにclass属性を追加
code.find_previous()["class"] = "code-paste"
#codeタグのclass属性にクラス追加
try :
code["class"].append(add_class)
except KeyError:
code["class"] = add_class
#変換したHTMLファイルを保存
temp = file_path.rfind(".")
output_file = file_path[:temp] + "-convert" + file_path[temp:]
open(output_file, "wb").write(soup.encode("utf-8"))
if CONF_ADD_STYLE:
with open(output_file, "a") as f:
futta = '''
<script src="https://cdn.rawgit.com/google/code-prettify/master/loader/run_prettify.js"></script>
<style>
li.L0, li.L1, li.L2, li.L3, li.L4, li.L5, li.L6, li.L7, li.L8, li.L9 {
list-style-type: decimal; /* 行番号を表示するための設定 */
padding-left: 8px; /* 行番号とソースコード間の間隔設定 */
margin-left: 30px; /* 行番号を表示するスペースの間隔設定、狭すぎると行番号が表示されなくなる。*/
}
pre.code-paste {
border: solid 1px gray; /* 枠線の設定 */
overflow: auto; /* 枠線からはみ出たソースを隠す設定 */
padding-top: 10px; /* 枠線とソースコード間の上辺の間隔設定 */
padding-bottom: 16px; /* 枠線とソースコード間の下辺の間隔設定 */
}
</style>
'''
print(futta, file=f)
def main():
html_parse(sys.argv[1])
if __name__ == "__main__":
idb = ImageDB()
cop = CloudOperation()
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment