-
-
Save owatan/3d22eb07f1406a7eee6746b6e6ffb7e9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import os | |
import boto3 | |
import urllib.request | |
import io | |
from boto3.session import Session | |
session = Session() | |
s3 = session.resource("s3") | |
s3_bucket = s3.Bucket("") | |
s3_client = session.client("s3") | |
class FanboxClient: | |
headers = { | |
"Accept": "application/json", | |
"Referer": "https://www.fanbox.cc", | |
"Origin": "https://www.fanbox.cc", | |
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.80 Safari/537.36", | |
"Cookie": f"FANBOXSESSID={os.getenv('FANBOX_SESSION_ID')}" | |
} | |
def get_fanbox_json(self, post_url): | |
post_id = post_url.split('/')[-1] | |
fanbox_api_url = f"https://api.fanbox.cc/post.info?postId={post_id}" | |
req = urllib.request.Request(fanbox_api_url, headers=self.headers) | |
with urllib.request.urlopen(req) as res: | |
body = json.loads(res.read()) | |
return body | |
def get_fanbox_file(self, file_url): | |
req = urllib.request.Request(file_url, headers=self.headers) | |
with urllib.request.urlopen(req) as res: | |
body = res.read() | |
return body | |
class FanboxJson: | |
def get_fanbox_post_id(self, fanbox_json): | |
return fanbox_json["body"]["id"] | |
def get_fanbox_post_title(self, fanbox_json): | |
return fanbox_json["body"]["title"] | |
def get_creator_id(self, fanbox_json): | |
return fanbox_json["body"]["creatorId"] | |
def get_item_set(self, fanbox_json): | |
item_set = [] | |
for (key, value,) in fanbox_json["body"]["body"].items(): | |
if key == "images": | |
for (index, item,) in enumerate(value): | |
item_id = self.get_fanbox_post_id(fanbox_json) | |
item_suffix = f"{index + 1:03}" | |
item_extension = item["extension"] | |
item_filename = f"{item_id}_{item_suffix}".replace('/', '_') | |
item_url = item["originalUrl"] | |
item_set.append({"item_filename": f"{item_filename}.{item_extension}", "item_url": item_url}) | |
if key == "imageMap": | |
for (index, item,) in enumerate(value.values()): | |
item_id = self.get_fanbox_post_id(fanbox_json) | |
item_suffix = f"{index + 1:03}" | |
item_extension = item["extension"] | |
item_filename = f"{item_id}_{item_suffix}".replace('/', '_') | |
item_url = item["originalUrl"] | |
item_set.append({"item_filename": f"{item_filename}.{item_extension}", "item_url": item_url}) | |
if key == "files": | |
for item in value: | |
item_filename = item["name"].replace('/', '_') | |
item_extension = item["extension"] | |
item_url = item["url"] | |
item_set.append({"item_filename": f"{item_filename}.{item_extension}", "item_url": item_url}) | |
if key == "fileMap": | |
for item in value.values(): | |
item_filename = item["name"].replace('/', '_') | |
item_extension = item["extension"] | |
item_url = item["url"] | |
item_set.append({"item_filename": f"{item_filename}.{item_extension}", "item_url": item_url}) | |
return item_set | |
def lambda_handler(event, context): | |
fanbox_json = FanboxClient().get_fanbox_json(event["fanbox_post_url"]) | |
item_set = FanboxJson().get_item_set(fanbox_json) | |
creator_id = FanboxJson().get_creator_id(fanbox_json) | |
title = FanboxJson().get_fanbox_post_title(fanbox_json).replace('/', '_') | |
for item in item_set: | |
print(item["item_url"]) | |
item_bin = io.BytesIO( | |
FanboxClient().get_fanbox_file(item["item_url"]) | |
) | |
s3_bucket.put_object( | |
Body=item_bin, | |
Bucket=os.getenv('S3_BUCKET_NAME'), | |
Key=f"{creator_id}/{title}/{item['item_filename']}", | |
# StorageClass=os.getenv('S3_STORAGE_CLASS'), | |
) | |
return { | |
'statusCode': 200, | |
'body': json.dumps(event["fanbox_post_url"]) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
厳密に設計するなら31行目みたいな処理をFanboxクラスから取り除いてFanboxClientみたいな別のクラスを立てる、○○clientの中でjsonを取ってきてそこでFanboxを生成する、Fanboxからは外部url呼び出しの副作用を追い出してpure data classにする。