Skip to content

Instantly share code, notes, and snippets.

@loopyd
Last active November 26, 2023 06:41
Show Gist options
  • Save loopyd/581977be7040ffa5796458afb3c96bd2 to your computer and use it in GitHub Desktop.
Save loopyd/581977be7040ffa5796458afb3c96bd2 to your computer and use it in GitHub Desktop.
[python] CivitAI Scraper - Data archival utility for CivitAI
""" =======================================================================================
civitai-scraper.py v1.0.3 deitydurg
=======================================================================================
This script is used to scrape CivitAI (https://civitai.com) for models and creators.
It will save the results to a json file, which can be used to bulk-download the models.
This script is not affiliated with CivitAI in any way, and is provided as-is with some
updates when I have time. Therefore you should use it at your own risk.
---------------------------------------------------------------------------------------
Questions? Comments? Need to scream at me for writing this? OK!
Feel free to present them to the Discord username deitydurg, or comment on this Gist.
I will address them at my earliest convenience.
For help with the script, run it with the -h or --help options.
=======================================================================================
"""
import json
import os, sys
import random
from pathlib import Path
import asyncio
from argparse import ArgumentParser
from typing import Any, Dict, List, Optional
from pydantic import BaseModel
from aiolimiter import AsyncLimiter
import aiohttp
import aiofiles
import logging
import brotli
import dotenv
from colorama import Fore, Back, Style, init
def singleton(cls):
instances = {}
def wrapper(*args, **kwargs):
if cls not in instances:
instances[cls] = cls(*args, **kwargs)
return instances[cls]
return wrapper
@singleton
class AppConfig(BaseModel):
class Config:
arbitrary_types_allowed = True
protected_namespaces = ()
api_max_retries: int = 3
api_retry_delay: int = 5
api_retry_period: int = 180
api_retry_limit: int = 100
threads: int = 5
creator_limit: int = 100
start_page: int = -1
save_interval: int = 60
no_skip: bool = False
log_level: str = "info"
log_file: str = f"{Path(__file__).parent.absolute()}{os.sep}civitai-scraper.log"
colorize: bool = False
db_path: str = f"{Path(__file__).parent.absolute()}{os.sep}civitai-db.bin"
AppConfig = AppConfig()
@singleton
class AppData(BaseModel):
class Config:
arbitrary_types_allowed = True
protected_namespaces = ()
creators: List[Dict] = []
models: List[Dict] = []
logger: Optional[logging.Logger] = None
limiter: Optional[AsyncLimiter] = None
creator_queue: Optional[asyncio.Queue] = None
model_queue: Optional[asyncio.Queue] = None
page_queue: Optional[asyncio.Queue] = None
save_queue: Optional[asyncio.Queue] = None
controller_complete: Optional[asyncio.Event] = None
save_padlock: Optional[asyncio.Event] = None
AppData = AppData()
def log_builder(logger, log_file=None, log_level=logging.INFO, colorize=False):
"""
Configures the provided logger with specified settings.
Args:
logger: The logging object to configure.
log_file (str, optional): The path to the log file. No file logging if None.
log_level (int, optional): The logging level to use.
colorize (bool): If True, colorizes the log output.
"""
class ColorizedFormatter(logging.Formatter):
def format(self, record):
levelname = record.levelname
if colorize:
if levelname == "DEBUG":
levelname_color = Fore.WHITE + Style.BRIGHT
func_color = Fore.LIGHTBLACK_EX + Style.BRIGHT
time_color = Fore.LIGHTBLACK_EX
seperator_color = Fore.WHITE
elif levelname == "INFO":
levelname_color = Fore.LIGHTBLUE_EX + Style.BRIGHT
func_color = Fore.BLUE + Style.BRIGHT
time_color = Fore.BLUE
seperator_color = Fore.BLUE + Style.BRIGHT
elif levelname == "WARNING":
levelname_color = Fore.LIGHTYELLOW_EX + Style.BRIGHT
func_color = Fore.YELLOW + Style.BRIGHT
time_color = Fore.YELLOW
seperator_color = Fore.YELLOW + Style.BRIGHT
elif levelname == "ERROR":
levelname_color = Fore.LIGHTRED_EX + Style.BRIGHT
func_color = Fore.RED + Style.BRIGHT
time_color = Fore.RED
seperator_color = Fore.RED + Style.BRIGHT
elif levelname == "CRITICAL":
levelname_color = Fore.WHITE + Style.BRIGHT + Back.RED
func_color = Fore.RED + Style.BRIGHT
time_color = Fore.RED
seperator_color = Fore.RED + Style.BRIGHT
record.levelname = f"{seperator_color}[{Style.RESET_ALL}{levelname_color}{record.levelname}{Style.RESET_ALL}{seperator_color}]{Style.RESET_ALL}"
record.funcName = f"{seperator_color}[{Style.RESET_ALL}{func_color}{record.funcName}{Style.RESET_ALL}{seperator_color}]{Style.RESET_ALL}"
record.msecs = f"{seperator_color}[{Style.RESET_ALL}{time_color}{self.formatTime(record, self.datefmt)}.{int(record.msecs):03d}{Style.RESET_ALL}{seperator_color}]{Style.RESET_ALL}"
else:
record.levelname = f"[{record.levelname}]"
record.funcName = f"[{record.funcName}]"
record.msecs = f"[{self.formatTime(record, self.datefmt)}.{int(record.msecs):03d}]"
return super().format(record)
if colorize:
init(autoreset=True)
formatter = ColorizedFormatter(fmt="%(levelname)s %(msecs)s %(funcName)s: %(message)s",
datefmt="%Y/%m/%d %H:%M:%S")
logger.handlers.clear()
console_handler = logging.StreamHandler(stream=sys.stdout)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
if log_file:
file_handler = logging.FileHandler(log_file)
file_formatter = logging.Formatter("[%(levelname)s] [%(msecs)s] [%(funcName)s]: %(message)s",
"%Y-%m-%d %H:%M:%S")
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
logger.setLevel(log_level.upper())
def log(loglevel: int, message: str) -> None:
"""
log: Log a message to the logging system.
Args:
message (int): The message to log.
Returns:
None
"""
log_func = {
'INFO': AppData.logger.info,
'WARNING': AppData.logger.warning,
'ERROR': AppData.logger.error,
'CRITICAL': AppData.logger.critical,
'DEBUG': AppData.logger.debug
}.get(str(logging.getLevelName(loglevel)).upper(), logging.info)
log_func(msg=message, stacklevel=2)
async def api_call(url: str, method: str = 'GET', data: Any = None) -> str:
"""
api_call: Make an API call to the given URL.
Args:
url (str): The URL to make the request to.
method (str, optional): The HTTP method to use. Defaults to 'GET'.
data (any, optional): The data to send with the request. Defaults to None.
retries (int, optional): The number of times to retry the request. Defaults to 3.
delay (int, optional): The number of seconds to wait between retries. Defaults to 5.
Returns:
str: The response text.
"""
attempt = 0
while attempt < AppConfig.api_retry_limit:
try:
log(logging.DEBUG, f"{method} {url}")
async with AppData.limiter:
async with aiohttp.ClientSession() as session:
async with session.request(method, url, data=data) as response:
response.raise_for_status()
return await response.text()
except aiohttp.ClientError as e:
log(logging.WARNING, f"Error on attempt {attempt + 1}/{AppConfig.api_retry_limit} ({e})")
await asyncio.sleep(AppConfig.api_retry_delay)
attempt += 1
log(logging.ERROR, "Failed to retrieve data after retries.")
return None
async def save_db_json(creators: List[Dict], models: List[Dict], file_path: str) -> None:
"""
save_db_json: Save the given creators and models to the given JSON file using Brotli compression.
Args:
creators (list of dict): The creators object to save.
models (list of dict): The models object to save.
file_path (str): The path to the json file.
Returns:
None
"""
json_data = {'creators': creators, 'models': models}
minified_json = json.dumps(json_data, ensure_ascii=False, separators=(',', ':'))
compressed_data = brotli.compress(minified_json.encode('utf-8'))
async with aiofiles.open(file_path, 'wb') as file:
await file.write(compressed_data)
log(logging.DEBUG, f"Saved {len(creators)} creators and {len(models)} models to {file_path}")
async def load_db_json(file_path: str) -> (List[Dict], List[Dict]):
"""
load_db_json: Load the JSON database file at the given path using Brotli decompression.
Args:
file_path (str): The path to the json file.
Returns:
list of dict: The creators object.
list of dict: The models object.
"""
if not os.path.exists(file_path):
return [], []
async with aiofiles.open(file_path, 'rb') as file:
compressed_data = await file.read()
data = json.loads(brotli.decompress(compressed_data).decode('utf-8'))
log(logging.DEBUG, f"Loaded {len(data['creators'])} creators and {len(data['models'])} models from: {file_path}")
return data['creators'], data['models']
async def controller() -> None:
"""
controller: The main controller for the CivitAI scraping system.
Args:
None (uses AppConfig and AppData singletons)
Returns:
None
"""
page = AppConfig.start_page
while page < AppConfig.start_page + AppConfig.creator_limit:
await AppData.page_queue.put(page)
log(logging.DEBUG, f"Emitted page: {page}")
page += 1
retries = 0
while True:
condition = AppData.page_queue.empty() == False or AppData.creator_queue.empty() == False or AppData.model_queue.empty() == False or AppData.save_queue.empty() == False
if condition == False:
retries += 1
if retries >= 5:
log(logging.DEBUG, "Controller thread exiting.")
break
log(logging.DEBUG, "Controller thread is idling.")
await asyncio.sleep(1.0)
continue
else:
retries = 0
log(logging.DEBUG, "Controller thread detects work.")
await asyncio.sleep(1.0)
continue
AppData.controller_complete.set()
async def creator_emitter() -> None:
"""
creator_emitter: Emits creator objects to the given queue
Args:
None (uses AppConfig and AppData singletons)
Returns:
None
"""
retries = 0
while AppData.controller_complete.is_set() == False or AppData.page_queue.empty() == False:
try:
await asyncio.sleep(random.uniform(0.1, 0.3))
page = AppData.page_queue.get_nowait()
retries = 0
except asyncio.QueueEmpty:
retries += 1
if retries >= 60:
log(logging.DEBUG, "Creator emitter thread exiting.")
break
await asyncio.sleep(1.0)
log(logging.DEBUG, "Creator emitter thread is idling.")
continue
log(logging.DEBUG, f"Consumed page: {page}")
creators = await api_call(method='GET', url=f"https://civitai.com/api/v1/creators?page={page}")
creators = json.loads(creators)
if not creators or len(creators['items']) == 0:
log(logging.INFO, f"No creators found on page: {page}")
AppData.page_queue.task_done()
break
for creator in creators.get('items', []):
creator_username = creator['username']
creator_link = creator['link']
if len(AppData.creators) > 0:
in_creators = any(c['creator'] == creator_username for c in AppData.creators)
else:
in_creators = False
if in_creators and AppConfig.no_skip == False:
log(logging.DEBUG, f"Skipping creator: {creator_username}")
continue
creator_object = {'creator': creator_username, 'link': creator_link, 'page': page}
await AppData.creator_queue.put(creator_object)
await AppData.save_padlock.wait()
AppData.creators = list_append(AppData.creators, creator_object, 'creator', True)
log(logging.DEBUG, f"Emitted creator: {creator_username}")
AppData.page_queue.task_done()
log(logging.DEBUG, "Creator emitter thread exited.")
async def model_emitter() -> None:
"""
model_emitter: Consume creator objects from the given queue and scrape their models.
Args:
None (uses AppConfig and AppData singletons)
Returns:
None
"""
retries = 0
while AppData.controller_complete.is_set() == False or AppData.creator_queue.empty() == False:
try:
await asyncio.sleep(random.uniform(0.1, 0.3))
creator_object = AppData.creator_queue.get_nowait()
retries = 0
except asyncio.QueueEmpty:
retries += 1
if retries >= 60:
log(logging.DEBUG, "Model emitter thread exiting.")
break
await asyncio.sleep(1.0)
log(logging.DEBUG, "Model emitter thread is idling.")
continue
page = 1
while True:
page_models = await api_call(method='GET', url=f"{creator_object['link']}&page={page}")
page_models = json.loads(page_models)
if not page_models or len(page_models['items']) == 0:
log(logging.DEBUG, f"No models found for creator: {creator_object['creator']} on page: {page}")
break
models_added = 0
for model_type in ["LORA", "Checkpoint", "Controlnet"]:
filtered_models = [m["modelVersions"] for m in page_models['items'] if m['type'] == model_type]
if len(filtered_models) == 0:
log(logging.DEBUG, f"No models found for {creator_object['creator']} with type: {model_type}")
continue
models_to_add = [
{
'filename': f"{creator_object['creator']}-{model_file['name']}",
'url': model_file['downloadUrl'],
'creator': creator_object['creator'],
'type': model_type,
'page': page
}
for model_versions in filtered_models
for model_version in model_versions
for model_file in model_version['files']
if 'name' in model_file and 'downloadUrl' in model_file and model_file['pickleScanResult'] == 'Success' and model_file['virusScanResult'] == 'Success'
]
for model in models_to_add:
models_added += 1
log(logging.DEBUG, f"Emitted model: {model['filename']}")
await AppData.model_queue.put(model)
if models_added == 0:
log(logging.DEBUG, f"No models emitted for creator: {creator_object['creator']}")
break
page += 1
AppData.creator_queue.task_done()
log(logging.DEBUG, "Model emitter thread exited.")
async def model_consumer() -> None:
"""
model_consumer: Consume model objects from the given queue and add them to the given json string.
Args:
None (uses AppConfig and AppData singletons)
Returns:
str: The json string with the models appended.
"""
retries = 0
while AppData.controller_complete.is_set() == False or AppData.model_queue.empty() == False:
try:
await asyncio.sleep(random.uniform(0.1, 0.3))
model_object = AppData.model_queue.get_nowait()
retries = 0
except asyncio.QueueEmpty:
retries += 1
if retries >= 60:
log(logging.DEBUG, "Model consumer thread exiting.")
break
await asyncio.sleep(1.0)
log(logging.DEBUG, "Model consumer thread is idling.")
continue
model_filename = model_object['filename']
await AppData.save_padlock.wait()
AppData.models = list_append(AppData.models, model_object, 'filename', True)
log(logging.INFO, f"Processed model: {model_filename}")
AppData.model_queue.task_done()
log(logging.DEBUG, "Model consumer thread exited.")
async def save_db_emitter() -> None:
"""
save_db_emitter: Emit save events to the given queue.
Args:
None (uses AppConfig and AppData singletons)
Returns:
None
"""
retries = 0
while AppData.controller_complete.is_set() == False:
retries += 1
if retries >= AppConfig.save_interval:
retries = 0
AppData.save_padlock.clear()
data = {'creators': AppData.creators, 'models': AppData.models, 'file_path': AppConfig.db_path}
await AppData.save_queue.put(data)
log(logging.DEBUG, f"Emitted save event")
AppData.save_padlock.set()
await asyncio.sleep(1.0)
log(logging.DEBUG, f"Emitted final save event")
data = {'creators': AppData.creators, 'models': AppData.models, 'file_path': AppConfig.db_path}
await AppData.save_queue.put(data)
log(logging.DEBUG, "Save emitter thread exited.")
async def save_db_consumer() -> None:
"""
save_db_consumer: Consume save events from the given queue.
Args:
save_queue (asyncio.Queue): The queue to consume the save events from.
"""
retries = 0
while AppData.controller_complete.is_set() == False or AppData.save_queue.empty() == False:
try:
await asyncio.sleep(random.uniform(0.1, 0.3))
data = AppData.save_queue.get_nowait()
retries = 0
except asyncio.QueueEmpty:
retries += 1
if retries >= AppConfig.save_interval + 1:
log(logging.DEBUG, "Save consumer thread exiting.")
break
await asyncio.sleep(1.0)
log(logging.DEBUG, "Save consumer thread is idling.")
continue
creators_json = data['creators']
models_json = data['models']
file_path = data['file_path']
await save_db_json(creators_json, models_json, file_path)
AppData.save_queue.task_done()
log(logging.DEBUG, "Save consumer thread exited.")
def last_key(data: List[Dict], key: str, default: Any = None) -> Any:
"""
last_key: Get the last value of the given key in the given json data.
Args:
data (dist of dict): The json data to search.
key (str): The key to search for.
default (str, optional): The default value to return if the key is not found. Defaults to None.
Returns:
str: The value of the key.
"""
return data[-1].get(key, default) if data else default
def list_append(data: List[Dict], new_item: Dict, unique_key: str = None, update_if_exists: bool = False) -> List[Dict]:
"""
list_append: Append or optionally update the given item to the given list.
Args:
data (list of dict): The data to append to or update.
new_item (dict): The item to append or with which to update an existing item.
unique_key (str, optional): The key to check for uniqueness. Defaults to None.
update_if_exists (bool, optional): If True, update an existing item based on the unique_key.
Defaults to False.
Returns:
list of dict: The data with the new item appended or existing item updated.
"""
if unique_key and update_if_exists:
for index, item in enumerate(data):
if item.get(unique_key) == new_item.get(unique_key):
data[index] = new_item
break
else:
data.append(new_item)
else:
if new_item not in data:
data.append(new_item)
return data
async def init_app() -> None:
"""
init_app: Load the configuration from the .env file, if it exists, and then from the command line arguments. Command line arguments take precedence.
"""
# Pass-through the environment variables from the .env file to the AppConfig singleton.
dotenv.load_dotenv()
AppConfig.api_max_retries = int(os.getenv("RETRIES", AppConfig.api_max_retries))
AppConfig.api_retry_delay = int(os.getenv("RETRY_DELAY", AppConfig.api_retry_delay))
AppConfig.api_retry_period = int(os.getenv("API_PERIOD", AppConfig.api_retry_period))
AppConfig.api_retry_limit = int(os.getenv("API_RETRY_LIMIT", AppConfig.api_retry_limit))
AppConfig.threads = int(os.getenv("THREADS", AppConfig.threads))
AppConfig.creator_limit = int(os.getenv("CREATOR_LIMIT", AppConfig.creator_limit))
AppConfig.start_page = int(os.getenv("START_PAGE", AppConfig.start_page))
AppConfig.save_interval = int(os.getenv("SAVE_INTERVAL", AppConfig.save_interval))
AppConfig.no_skip = bool(os.getenv("NO_SKIP", AppConfig.no_skip))
AppConfig.log_level = os.getenv("LOG_LEVEL", AppConfig.log_level)
AppConfig.log_file = os.getenv("LOG_FILE", AppConfig.log_file)
AppConfig.colorize = bool(os.getenv("COLORIZE", AppConfig.colorize))
AppConfig.db_path = os.getenv("DB", AppConfig.db_path)
# Parse the command line arguments with argparse.
parser = ArgumentParser(
prog="civitai_scraper.py",
description="Scrape CivitAI for models and creators.",
allow_abbrev=True,
add_help=False,
epilog="Tool created by: deitydurg | If any questions, ask on Discord for assistance.")
logging_group = parser.add_argument_group("Logging")
logging_group.add_argument('-x', '--log-level', type=str, dest="loglevel", nargs='?', default=None, choices=["info", "warning", "error", "critical", "debug", None], help="The logging level to use. If you want to see debug messages, set this to 'debug'.")
logging_group.add_argument('-y', '--log-file', type=str, dest="logfile", nargs='?', default=None, help="The path to the log file where logs will be saved.")
logging_group.add_argument('-z', '--no-color', action="store_false", dest="colorize", help="If specified, do not colorize the log output.")
ratelimit_group = parser.add_argument_group("Rate Limits & Performance")
ratelimit_group.add_argument('-p', '--api-period', type=int, nargs='?', default=None, dest="apiperiod", help="The period of time to limit API calls (in seconds). WARNING: Setting this value too low may result in a ban from Civiarti API, or you being temporarily ratelimited.")
ratelimit_group.add_argument('-l', '--api-limit', type=int, nargs='?', default=None, dest="apilimit", help="The number of API calls to allow per period. WARNING: Setting this value too low may result in a ban from Civiarti API, or you being temporarily ratelimited.")
ratelimit_group.add_argument('-t', '--threads', type=int, nargs='?', default=None, dest="threads", help="The maximum number of concurrent/asynchronous threads to run. This can help out with entering retries for making too many requests at once, but will slow down the tool. If you are seeing retry messages often, try lowering this value from its default of 5.")
ratelimit_group.add_argument('-r', '--retry-delay', type=int, nargs='?', default=None, dest="retrydelay", help="The number of seconds to wait between retries. This can help out with crashes due to too many requests retrying.")
ratelimit_group.add_argument('-k', '--retry-limit', type=int, nargs='?', default=None, dest="retrylimit", help="The number of times to retry a request before giving up. This can help out with crashes due to too many requests retrying. Set this value high to ensure reliability.")
scraping_group = parser.add_argument_group("Scraping Options")
scraping_group.add_argument('-c', '--creator-limit', type=int, nargs='?', default=None, dest="creatorlimit", help="The maximum number of creators to scrape.")
scraping_group.add_argument('-s', '--start-page', type=int, nargs='?', default=None, dest="startpage", help="The page of creators to start scraping from. You can use this to resume a previous scraping session. If this is set to -1, it will start from the last page scraped.")
scraping_group.add_argument('-n', '--no-skip', action="store_true", dest="noskip", default=None, help="Do not skip creators that are already in the database. This will cause the tool to scrape all encountered creators, even if they are already in the database -- updating their models.")
database_group = parser.add_argument_group("Database Options")
database_group.add_argument('-j', '--json', type=str, nargs='?', default=None, dest="db",help="The path to the json file used as the database. If the file does not exist, it will be created.")
database_group.add_argument('-i', '--save-interval', type=int, nargs='?', default=None, dest="saveinterval", help="The number of seconds to wait between saving the database to disk. This can help with performance, but setting it too low may result in data loss.")
misc_group = parser.add_argument_group("Miscellaneous")
misc_group.add_argument('-v', '--version', action="version", version="%(prog)s 1.0.3", help="Show the version of this tool.")
misc_group.add_argument('-h', '--help', action="help", help="Show this help message and exit.")
argv = parser.parse_args()
# Pass-through the command line arguments to the AppConfig singleton.
AppConfig.api_max_retries = AppConfig.api_max_retries if argv.retrylimit == None else argv.retrylimit
AppConfig.api_retry_delay = AppConfig.api_retry_delay if argv.retrydelay == None else argv.retrydelay
AppConfig.api_retry_period = AppConfig.api_retry_period if argv.apiperiod == None else argv.apiperiod
AppConfig.api_retry_limit = AppConfig.api_retry_limit if argv.apilimit == None else argv.apilimit
AppConfig.threads = AppConfig.threads if argv.threads == None else argv.threads
AppConfig.creator_limit = AppConfig.creator_limit if argv.creatorlimit == None else argv.creatorlimit
AppConfig.start_page = AppConfig.start_page if argv.startpage == None else argv.startpage
AppConfig.save_interval = AppConfig.save_interval if argv.saveinterval == None else argv.saveinterval
AppConfig.no_skip = AppConfig.no_skip if argv.noskip == None else argv.noskip
AppConfig.log_level = AppConfig.log_level if argv.loglevel == None else argv.loglevel
AppConfig.log_file = AppConfig.log_file if argv.logfile == None else argv.logfile
AppConfig.colorize = AppConfig.colorize if argv.colorize == None else argv.colorize
AppConfig.db_path = AppConfig.db_path if argv.db == None else argv.db
# Configure the logging system with the new settings.
AppData.logger = logging.root
log_builder(AppData.logger, log_file=AppConfig.log_file, log_level=AppConfig.log_level, colorize=AppConfig.colorize)
# Adjust the AppConfig singleton to ensure that the values are valid.
AppConfig.creator_limit = max(1, AppConfig.creator_limit)
AppData.limiter = AsyncLimiter(AppConfig.api_retry_period, AppConfig.api_retry_limit)
# Load the database from disk.
AppData.creators, AppData.models = await load_db_json(AppConfig.db_path)
AppConfig.start_page = AppConfig.start_page if AppConfig.start_page != -1 else (last_key(AppData.creators, 'page', 1) + 1 if len(AppData.creators) > 0 and last_key(AppData.creators, 'page', 1) == 1 else last_key(AppData.creators, 'page', 1) + 1)
# Log the configuration.
if AppConfig.no_skip == False:
log(logging.DEBUG, "Skipping creators that are already in the database.")
else:
log(logging.DEBUG, "Not skipping creators that are already in the database.")
log(logging.DEBUG, f"Starting from page: {AppConfig.start_page }")
pass
async def main() -> None:
"""
main: The main function.
Returns:
None
"""
await init_app()
###############################
# Start of the main program...#
###############################
AppData.controller_complete = asyncio.Event()
AppData.save_padlock = asyncio.Event()
AppData.creator_queue = asyncio.Queue(AppConfig.threads)
AppData.model_queue = asyncio.Queue(AppConfig.threads)
AppData.page_queue = asyncio.Queue(AppConfig.threads)
AppData.save_queue = asyncio.Queue(1)
AppData.save_padlock.set()
tasks = [
*[
asyncio.create_task(controller())
],
*[
asyncio.create_task(creator_emitter())
for _ in range(AppConfig.threads)
],
*[
asyncio.create_task(model_emitter())
for _ in range(AppConfig.threads)
],
*[
asyncio.create_task(model_consumer())
for _ in range(AppConfig.threads)
],
*[
asyncio.create_task(save_db_emitter())
],
*[
asyncio.create_task(save_db_consumer())
]
]
await asyncio.gather(*tasks)
for task in tasks:
task.cancel()
log(logging.INFO, "Scraping process completed.")
if __name__ == "__main__":
asyncio.run(main())
@loopyd
Copy link
Author

loopyd commented Nov 25, 2023

Changelog

1.0.0

  • Initial release

📝 Notes:

This is the first release! Some things will definitely be buggy, so bear with me while releases continue. If you would like to make any contributions, please feel free to suggest changes by commenting below, or request features.

1.0.1

  • Add -save-internal parameter that allows scraper to save to the database periodically.
  • Add save_emitter and save_consumer threads to thread model to control saving and save file padlocking event for safe I/O writes to database file.
  • Fix some issues with output and rename main controller thread appropriately (the thread that emits page numbers populates the thread network with work, therefore it is the main controller).

📝 Notes:

Benchmark: is able to scrape the entire website for model download links that pertain to users in less than 6 hours.

1.0.2

  • Add colorized log output with colorama.
  • Add --no-color switch to turn colorized output on or off.
  • Database compression with brotli (produces smaller database files)

📝Notes:

This change adds some polish. The script is starting to get very large...

1.0.3

  • Move to pydantic for 37% performance increase through object serialization and singleton decorator.
  • Add .env file support to configure the tool with .env file placed next to it.

📝 Notes:

The claimed speedup is due to use of avoiding making copies of objects during assignment and object reference caching provided by pydantic, and singleton decorator that prevents large objects from making additional instances.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment