Last active
January 9, 2019 08:58
-
-
Save 98hira/ce8e02b0972082fd144be7d007dd6950 to your computer and use it in GitHub Desktop.
いずれGithubで管理する予定だが、ひとまずGistへ
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import os | |
import sys | |
import csv | |
from bs4 import BeautifulSoup | |
import urllib.parse | |
from datetime import datetime | |
# 環境設定 | |
ENVIRONMENT_PATH = "/xxx/" | |
HATENA_USERNAME = "xxx" | |
HATENA_API_KEY = "xxx" | |
#google-code-prettify用の設定 | |
#Trueにすると変換後のHTMLファイルの末尾へ、 | |
#ライブラリ読み込みとデザイン設定を追加する。 | |
CONF_ADD_STYLE = False | |
#-------------------------------------------------------- | |
#hatenaAPI.py | |
#↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓ | |
from datetime import datetime | |
import base64 | |
import json | |
import requests | |
import sys | |
import random | |
import hashlib | |
from lxml import etree | |
class HatenaClient: | |
""" | |
Hatena Python Client | |
see http://developer.hatena.ne.jp/ | |
""" | |
def __init__(self, username: str, api_key: str)->None: | |
self.username = username | |
self.api_key = api_key | |
@staticmethod | |
def _create_body(title: str, body: str, draft: bool=True)-> str: | |
""" | |
see http://developer.hatena.ne.jp/ja/documents/blog/apis/atom | |
""" | |
post_date = datetime.today().strftime("%Y-%m-%d") | |
data = """<?xml version="1.0" encoding="utf-8"?> | |
<entry xmlns="http://www.w3.org/2005/Atom" | |
xmlns:app="http://www.w3.org/2007/app"> | |
<title>{title}</title> | |
<author><name>name</name></author> | |
<content type="text/plain">{body}</content> | |
<updated>{day}T00:00:00</updated> | |
<app:control> | |
<app:draft>{draft}</app:draft> | |
</app:control> | |
</entry> | |
""".format(title=title, body=body, day=post_date, draft="yes" if draft else "no").encode() | |
return data | |
def _wsse(self) -> str: | |
""" | |
see http://developer.hatena.ne.jp/ja/documents/auth/apis/wsse | |
""" | |
time = datetime.now().isoformat() + "Z" | |
b_nonce = hashlib.sha1(str(random.random()).encode()).digest() | |
b_digest = hashlib.sha1(b_nonce + time.encode() + | |
self.api_key.encode()).digest() | |
c = "UsernameToken Username=\"{username}\", PasswordDigest=\"{passwd}\", Nonce=\"{nonce}\", Created=\"{created}\"" | |
return c.format(username=self.username, | |
passwd=base64.b64encode(b_digest).decode(), | |
nonce=base64.b64encode(b_nonce).decode(), created=time) | |
def post_blog(self, blogname: str, title: str, body: str)-> None: | |
data = self._create_body(title, body) | |
headers = {"X-WSSE": self._wsse()} | |
url = "http://blog.hatena.ne.jp/{user}/{blog}/atom/entry".format( | |
user=self.username, blog=blogname) | |
r = requests.post(url, data=data, headers=headers) | |
if r.status_code != 201: | |
sys.stderr.write("error") | |
raise RuntimeError("faild", r.text) | |
def post_image(self, image_path: str, image_title="")-> str: | |
""" | |
post jpeg photo to hatena photo life | |
""" | |
#Check file extension | |
image_type = image_path.split(".")[-1].lower() | |
suport_type = ["gif","png","jpg","jpeg",] | |
if image_type not in suport_type: | |
return "image type Not Suport" | |
with open(image_path, "rb") as image_file: | |
image_content = base64.b64encode(image_file.read()).decode("utf-8") | |
template = """ | |
<entry xmlns="http://purl.org/atom/ns#"> | |
<title>{title}</title> | |
<content mode="base64" type="image/{type}"> | |
{content} | |
</content> | |
</entry> | |
""".format(title=image_title, content=image_content, type=image_type) | |
url = "http://f.hatena.ne.jp/atom/post" | |
headers = {"X-WSSE": self._wsse()} | |
r = requests.post(url, data=template, headers=headers) | |
if r.status_code != 201: | |
sys.stderr.write("error") | |
raise RuntimeError("faild") | |
hatenafoto_rss_ns = {"rss": "http://purl.org/rss/1.0/", | |
"rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#", | |
"dc": "http://purl.org/dc/elements/1.1/", | |
"content": "http://purl.org/rss/1.0/modules/content/", | |
"hatena": "http://www.hatena.ne.jp/info/xmlns#", | |
"taxo": "http://purl.org/rss/1.0/modules/taxonomy/", | |
"openSearch": "http://a9.com/-/spec/opensearchrss/1.0/" | |
} | |
root = etree.fromstring(r.text.encode("utf-8")) | |
return str(root.xpath("//hatena:imageurl/text()", namespaces=hatenafoto_rss_ns)[0]) | |
def put_image(self, image_title): | |
template = """ | |
<entry xmlns="http://purl.org/atom/ns#"> | |
<title>{title}</title> | |
</entry> | |
""".format(title=image_title) | |
url = "http://f.hatena.ne.jp/atom/edit/{url}".format(url=image_url) | |
headers = {"X-WSSE": self._wsse()} | |
r = requests.put(url, data=template, headers=headers) | |
if r.status_code != 200: | |
sys.stderr.write("error") | |
raise RuntimeError("faild") | |
return | |
#↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑ | |
class ImageDB: | |
LOCAL_PATH = 0 | |
TIME_STAMP = 1 | |
CLOUD_PATH = 2 | |
def __init__(self): | |
self.delete_list = ENVIRONMENT_PATH + "delete_list.csv" | |
self.db_file = ENVIRONMENT_PATH + "image_db.csv" | |
self.image_record = [] | |
self._read() | |
def _read(self): | |
""" | |
ファイル読み出し | |
""" | |
if(os.path.isfile(self.db_file) == True): | |
with open(self.db_file, "r+") as fd: | |
if(os.path.getsize(self.db_file) != 0): | |
temp_reader = csv.reader(fd) | |
self.image_record = list(temp_reader) | |
else: | |
#ファイルの新規作成 | |
open(self.db_file, "w") | |
def _save(self): | |
""" | |
ファイル保存 | |
""" | |
with open(self.db_file, "w") as fd: | |
output_writer = csv.writer(fd) | |
for record in self.image_record: | |
output_writer.writerow(record) | |
def search(self, local_path): | |
ret = -1 | |
for id, record in enumerate(self.image_record): | |
if record[0] == local_path: | |
ret = id | |
break | |
return ret | |
def record_get(self, record_id): | |
return self.image_record[record_id] | |
def record_add(self, local_path, time_stamp, cloud_path): | |
self.image_record.append([local_path, time_stamp, cloud_path]) | |
self._save() | |
def record_update(self, record_id, temp_record): | |
self.image_record[record_id] = temp_record | |
self._save() | |
def plan_to_delete(self, cloud_path): | |
open(self.delete_list, "a").write(cloud_path + "\n") | |
class CloudOperation: | |
def __init__(self): | |
self.cli = HatenaClient(HATENA_USERNAME, HATENA_API_KEY) | |
def image_upload(self, local_path): | |
return self.cli.post_image(local_path) | |
# return "https://cdn-ak.f.st-hatena.com/images/fotolife/h/hira98/20181220/20181220141216.png" | |
def image_upload(local_path, time_stamp): | |
cloud_path = "" | |
record_id = idb.search(local_path) | |
if record_id < 0: #新規追加 | |
#クラウドへアップロード | |
cloud_path = cop.image_upload(local_path) | |
idb.record_add(local_path, time_stamp, cloud_path) | |
else: #既に追加済みのファイルを編集 | |
# レコードを取得 | |
temp_record = idb.record_get(record_id) | |
# 時間を比較できる形式に変換する。 | |
local_time = datetime.strptime(time_stamp, "%Y/%m/%d_%H:%M:%S") | |
cloud_time = datetime.strptime(temp_record[idb.TIME_STAMP], "%Y/%m/%d_%H:%M:%S") | |
print(f"local:{local_time} cloud:{cloud_time}") | |
# 時間を比較する | |
if cloud_time == local_time: #ローカルとクラウドのデータが同じ場合 | |
cloud_path = temp_record[idb.CLOUD_PATH] | |
elif cloud_time < local_time: #ローカルのデータが最新の場合 | |
#クラウドのデータを削除フォルダに移動する。 | |
print(f"test::{temp_record[idb.CLOUD_PATH]}") | |
idb.plan_to_delete(temp_record[idb.CLOUD_PATH]) | |
#ローカルのデータをクラウドへアップロードする | |
cloud_path = cop.image_upload(local_path) | |
temp_record[idb.TIME_STAMP] = time_stamp | |
temp_record[idb.CLOUD_PATH] = cloud_path | |
else: #クラウドのデータが最新の場合 | |
#このパーターンになる時は、 | |
#image_infoが意図せず改変されている可能性がある。 | |
pass | |
#更新した情報をimage_infoへ反映させる。 | |
idb.record_update(record_id, temp_record) | |
return cloud_path | |
def src_parse(path): | |
#"/"が全部除去されてしまうので、頭に"/"を追加。 | |
path = "/" + path.lstrip("file:/") | |
#パスに全角文字を含む場合はURLエンコードされているためデコードする | |
path = urllib.parse.unquote(path) | |
file_name = path.split("/")[-1] | |
file_type = file_name.split(".")[-1] | |
alt_name = file_name.split(".")[0] | |
# file_mtime = os.path.getmtime("/"+path) | |
dt = datetime.fromtimestamp(os.path.getmtime("/"+path)) | |
file_mtime = dt.strftime("%Y/%m/%d_%H:%M:%S") | |
# print(f"path:{path}") | |
print(f"name:{file_name}") | |
# print(f"ftype:{file_type}") | |
print(f"file_mtime:{file_mtime}") | |
# print(f"alt_name:{alt_name}") | |
return path, file_mtime, alt_name | |
def html_parse(file_path): | |
if os.path.isfile(file_path) == False: | |
#HTMLファイルが見つからない。 | |
return | |
html = open(file_path) | |
soup = BeautifulSoup(html, "html.parser") | |
# はてなブログへ画像ファイルをアップロード | |
for img in soup.find_all("img"): | |
#不要な属性を削除 | |
del img["title"] | |
del img["referrerpolicy"] | |
local_path, time_stamp, alt_name = src_parse(img["src"]) | |
cloud_path = image_upload(local_path, time_stamp) | |
if cloud_path != "": | |
img["src"] = cloud_path | |
img["alt"] = alt_name | |
img["class"] = "hatena-fotolife" | |
# google-code-prettify対応 | |
add_class = "prettyprint linenums" | |
for code in soup.find_all("code"): | |
#codeタグを囲んでいるpreタグにclass属性を追加 | |
code.find_previous()["class"] = "code-paste" | |
#codeタグのclass属性にクラス追加 | |
try : | |
code["class"].append(add_class) | |
except KeyError: | |
code["class"] = add_class | |
#変換したHTMLファイルを保存 | |
temp = file_path.rfind(".") | |
output_file = file_path[:temp] + "-convert" + file_path[temp:] | |
open(output_file, "wb").write(soup.encode("utf-8")) | |
if CONF_ADD_STYLE: | |
with open(output_file, "a") as f: | |
futta = ''' | |
<script src="https://cdn.rawgit.com/google/code-prettify/master/loader/run_prettify.js"></script> | |
<style> | |
li.L0, li.L1, li.L2, li.L3, li.L4, li.L5, li.L6, li.L7, li.L8, li.L9 { | |
list-style-type: decimal; /* 行番号を表示するための設定 */ | |
padding-left: 8px; /* 行番号とソースコード間の間隔設定 */ | |
margin-left: 30px; /* 行番号を表示するスペースの間隔設定、狭すぎると行番号が表示されなくなる。*/ | |
} | |
pre.code-paste { | |
border: solid 1px gray; /* 枠線の設定 */ | |
overflow: auto; /* 枠線からはみ出たソースを隠す設定 */ | |
padding-top: 10px; /* 枠線とソースコード間の上辺の間隔設定 */ | |
padding-bottom: 16px; /* 枠線とソースコード間の下辺の間隔設定 */ | |
} | |
</style> | |
''' | |
print(futta, file=f) | |
def main(): | |
html_parse(sys.argv[1]) | |
if __name__ == "__main__": | |
idb = ImageDB() | |
cop = CloudOperation() | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment