Skip to content

Instantly share code, notes, and snippets.

@aNNiMON

aNNiMON/.gitignore

Last active May 7, 2021
Embed
What would you like to do?
PicSorter
__pycache__
.idea
input
library
logs
images.db

PicSorter

Finds an image on danbooru, writes tags as IPTC keywords, than places the image in the library

from pathlib import Path
import yaml
class Config:
@staticmethod
def load(path='config.yml'):
with open(path, 'rt', encoding='utf8') as f:
config = yaml.load(f.read(), Loader=yaml.FullLoader)
return Config(config)
def __init__(self, config):
dirs = config.get('dirs', {})
self.dir_tmp = Path(dirs.get('tmp', '/tmp/'))
self.dir_processed = Path(dirs.get('processed', './processed'))
self.dir_logs = Path(dirs.get('logs', './logs'))
self.dir_library = Path(dirs.get('library', './library'))
self.__setup_folders()
def __setup_folders(self):
self.dir_tmp.mkdir(exist_ok=True)
self.dir_logs.mkdir(exist_ok=True)
self.dir_library.mkdir(exist_ok=True)
dirs:
tmp: /tmp/
processed: ./processed
logs: ./logs
library: ./library
import sqlite3
from datetime import datetime
class Database:
def __init__(self):
self.db_name = 'images.db'
self.__create_tables()
def __create_tables(self):
conn = sqlite3.connect(self.db_name)
c = conn.cursor()
c.executescript("""
CREATE TABLE IF NOT EXISTS danbooru (
id INTEGER PRIMARY KEY NOT NULL UNIQUE,
tags TEXT NOT NULL,
created_at TIMESTAMP
);
""")
conn.commit()
conn.close()
def is_exists(self, _id) -> bool:
conn = sqlite3.connect(self.db_name)
c = conn.cursor()
c.execute("SELECT EXISTS(SELECT 1 FROM danbooru WHERE id=?)", (_id, ))
result = c.fetchone()[0]
conn.close()
return bool(result)
def add(self, _id, tags):
conn = sqlite3.connect(self.db_name)
c = conn.cursor()
sql = 'INSERT INTO danbooru(id, tags, created_at) VALUES (?,?,?)'
c.execute(sql, (_id, tags, datetime.now()))
conn.commit()
conn.close()
import logging
from typing import Optional
import requests
from bs4 import BeautifulSoup
class Iqdb:
@staticmethod
def search(file: str) -> Optional[str]:
logging.info('Searching %s', file)
files = {'file': open(file, 'rb')}
resp = requests.post('https://iqdb.org/', files=files, timeout=10)
doc = BeautifulSoup(resp.text, 'html.parser')
for tag in doc.select(".image a"):
url = tag.get("href")
if "danbooru.donmai.us/posts" in url:
if url.startswith("//"):
url = "https:" + url
return url
return None
import logging
import os
import shutil
from pathlib import Path
from tags import Tags
class Library:
def __init__(self, dir_root: Path):
self.dir_root = dir_root
self.dir_orphan = Path(dir_root, '_orphan')
self.dir_orphan.mkdir(exist_ok=True, parents=True)
def move_to_orphan(self, p: Path) -> None:
logging.info("%s move to orphan", p)
shutil.move(os.fspath(p), os.fspath(self.dir_orphan))
def move(self, p: Path, tags: Tags) -> str:
new_path = self.__compute_path(tags)
new_path.mkdir(exist_ok=True, parents=True)
logging.info("%s move to %s", p.name, new_path)
shutil.move(os.fspath(p), os.fspath(new_path))
return str(new_path).replace("\\", "/") + "/" + p.name
def __compute_path(self, tags: Tags) -> Path:
p = self.dir_root
if tags.copyrights == 'original':
# Originals groups by artist
p = p / "_originals"
if tags.artists != "":
artist = tags.artists.split(" ")[0]
artist = self.__sanitize(artist)
if (p / artist).exists():
p = p / artist
return p
# Main section
if tags.copyrights != "":
_copyright = tags.copyrights.split(" ")[0]
p = p / self.__sanitize(_copyright)
if tags.characters == "":
return p
# Characters section
characters = tags.characters_sanitized()
num = len(characters)
if num == 1:
p = p / self.__sanitize(characters[0])
elif num == 2 and characters[0] in characters[1]:
p = p / self.__sanitize(characters[0])
elif num == 2 and characters[1] in characters[0]:
p = p / self.__sanitize(characters[1])
else:
p = p / "_multiple"
return p
@staticmethod
def __sanitize(s: str) -> str:
s = "".join(x for x in s if x.isalnum() or x in "._-()")
return s.replace("_", " ").strip()
import logging
import re
import subprocess
from datetime import datetime
from pathlib import Path
from typing import Optional
import fluentpy as _
import requests
from tags import Tags
class Metadata:
def __init__(self, dir_tmp: Path):
self.dir_tmp = dir_tmp
self.tmp_image_file = Path(self.dir_tmp, "tmp.jpg")
def process(self, url: str) -> Optional[tuple[Path, Tags]]:
logging.info("Retrieving metadata for %s", url)
meta = self.__get_metadata(url)
status = self.__download_file(meta)
if not status:
logging.warning("Download failed")
return None
return self.__write_tags(url, meta)
@staticmethod
def __get_metadata(url: str) -> dict:
return requests.get(url + ".json").json()
def __download_file(self, r: dict) -> bool:
ext = r.get("file_ext", "")
w = int(r.get("image_width", "0"))
h = int(r.get("image_height", "0"))
if (ext not in ["jpg", "jpeg", "png", "webp"]) or w == 0 or h == 0:
return False
file_url = r.get("file_url")
file_size_kb = int(r.get('file_size', "0")) / 1024
logging.info("Downloading image")
recompress = self.__need_recompress(ext, w, h, file_size_kb)
return self.__download(file_url, recompress=recompress)
@staticmethod
def __need_recompress(ext, w, h, size_kb) -> bool:
return ext == 'jpg' and size_kb > 1400 and w < 2500 and h < 2500
def __download(self, img_url: str, recompress: bool = False) -> bool:
opt_args = []
if recompress:
opt_args = ['-quality', "80"]
ret = subprocess.call([
'magick', img_url,
'-resize', '2500x2500>',
*opt_args, self.tmp_image_file
], stdout=subprocess.PIPE)
return ret == 0
# noinspection PyCallingNonCallable
# noinspection PyProtectedMember
def __write_tags(self, url: str, r: dict) -> tuple[Path, Tags]:
tag_general = r.get('tag_string_general', "")
tag_copyrights = r.get('tag_string_copyright', "")
tag_characters = r.get('tag_string_character', "")
tag_artists = r.get('tag_string_artist', "")
tags = Tags(tag_general, tag_copyrights, tag_characters, tag_artists)
tags_file = Path(self.dir_tmp, "tags.txt")
with open(tags_file, "w") as f:
content = _(tags.tags) \
.map(lambda s: "-IPTC:keywords=" + s) \
.join("\n") \
._
content += "\n-Exif:ImageDescription=" + url
content += "\n-Iptc:Caption-Abstract=" + url
content += "\n-Xmp:Description=" + url
f.write(content)
logging.info("Writing tags")
subprocess.call([
'exiftool', '-q', '-overwrite_original',
'-@', tags_file,
self.tmp_image_file
], stdout=subprocess.PIPE)
filename = self.__format_filename(tags)
result_file = Path(self.tmp_image_file.parent, filename)
self.tmp_image_file.rename(result_file)
return result_file, tags
@staticmethod
def __format_filename(tags: Tags):
filename = '{} {} by {} at {}.jpg'.format(
tags.copyrights.split(" ")[0] or "",
", ".join(tags.characters_sanitized()[:2]),
tags.artists.split(" ")[0] or "",
datetime.now().strftime('%Y%m%d_%H%M%S')
)
filename = "".join(x for x in filename if x.isalnum() or x in " ._-()")
return re.sub(r'\s+', ' ', filename).strip()
import argparse
import logging
import os
import re
import shutil
import time
from datetime import datetime
from pathlib import Path
from typing import Optional
from config import Config
from database import Database
from iqdb import Iqdb
from library import Library
from metadata import Metadata
class PicSorter:
@staticmethod
def parse_args():
parser = argparse.ArgumentParser(
description='Finds an image on danbooru, writes tags as IPTC keywords, than places the image in the library'
)
parser.add_argument('-c', '--config',
type=Path,
default='config.yml',
help='config.yml file path')
parser.add_argument('input', nargs=argparse.REMAINDER)
args = parser.parse_args()
if len(args.input) >= 1:
PicSorter(args.config).process(args.input)
def __init__(self, config_file='config.yml'):
config = Config.load(config_file)
self.config = config
self.__setup_logging(config.dir_logs)
self.library = Library(config.dir_library)
self.metadata = Metadata(config.dir_tmp)
self.db = Database()
@staticmethod
def __setup_logging(dir_logs: Path):
filename = datetime.now().strftime('%Y-%m-%d.log')
logfile = Path(dir_logs, filename)
logging.basicConfig(
filename=os.fspath(logfile),
level=logging.INFO,
format='%(asctime)s %(levelname)s %(module)s: %(message)s',
datefmt='%H:%M:%S',
)
def process(self, inputs: list[str]) -> None:
for input in inputs:
if input.startswith("http") or re.search(r"(\d{3,})", input):
print("Processing url", input)
self.__process_url(input)
else:
p = Path(input)
if p.is_dir():
self.__process_folder(p)
elif p.is_file():
print("Processing file", input)
self.__process_file(input)
def __process_folder(self, dir_input: Path) -> None:
files = {p for p in dir_input.iterdir()
if p.suffix in [".jpg", ".png"]}
for filename in files:
print("Processing", filename)
try:
self.__process_file(filename)
except Exception as ex:
raise ex
time.sleep(5)
def __process_file(self, filename: str) -> bool:
url = self.__search_iqdb(filename)
if url is None:
return False
if self.__process_url(url):
self.config.dir_processed.mkdir(exist_ok=True, parents=True)
from_path = os.fspath(filename)
to_path = os.fspath(self.config.dir_processed)
shutil.move(from_path, to_path)
print("Saved to", to_path)
return True
return False
def __search_iqdb(self, filename: str) -> Optional[str]:
url = Iqdb.search(filename)
if url is None:
logging.warning("%s not found", filename)
self.library.move_to_orphan(Path(filename))
return None
return url
def __process_url(self, url: str) -> bool:
m = re.search(r"(?:posts/)?(\d{3,})", url)
if not m:
return False
post_id = int(m.group(1))
if self.db.is_exists(post_id):
logging.info("Skipping exists post %d", post_id)
return False
meta_result = self.metadata.process("https://danbooru.donmai.us/posts/" + str(post_id))
if meta_result is None:
return False
image_path, tags = meta_result
to_path = self.library.move(image_path, tags)
self.db.add(post_id, tags.tags_string)
print("Saved to", to_path)
return True
if __name__ == '__main__':
PicSorter.parse_args()
beautifulsoup4==4.9.3
fluentpy>=2.0
PyYAML==5.4.1
requests>=2.24
from dataclasses import dataclass, field
import fluentpy as _
@dataclass
class Tags:
general: str
copyrights: str
characters: str
artists: str
tags: list[str] = field(init=False)
tags_string: str = field(init=False)
def __post_init__(self):
self.tags = self.__union_tags()
self.tags_string = " ".join(self.tags)
# noinspection PyCallingNonCallable
# noinspection PyProtectedMember
def characters_sanitized(self) -> list:
if self.copyrights == "":
# No need to sanitize tags
return self.characters.split(" ")
copyrights = self.copyrights.split(" ")
return _(self.characters) \
.split(" ") \
.filter(lambda s: s != "") \
.map(lambda s: self.__rename(s, copyrights)) \
._
@staticmethod
def __rename(s: str, substrings: list[str]) -> str:
for substring in substrings:
s = s.replace("_("+substring+")", "") \
.replace("("+substring+")", "") \
.strip()
return s
def __union_tags(self) -> list[str]:
tags = self.general.split(" ")
tags += self.__prefix_tags(self.copyrights, 'copyright_')
tags += self.__prefix_tags(self.characters, 'character_')
tags += self.__prefix_tags(self.artists, 'artist_')
return tags
# noinspection PyCallingNonCallable
# noinspection PyProtectedMember
@staticmethod
def __prefix_tags(tags: str, prefix: str) -> list[str]:
return _(tags) \
.split(" ") \
.filter(lambda s: s != "") \
.map(lambda s: prefix + s.strip()) \
._
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment