Skip to content

Instantly share code, notes, and snippets.

@devinat1
Created September 30, 2024 21:01
Show Gist options
  • Save devinat1/38a3261736e2a4cf5b54af3107b753e0 to your computer and use it in GitHub Desktop.
Save devinat1/38a3261736e2a4cf5b54af3107b753e0 to your computer and use it in GitHub Desktop.
Clone generation
import os
import asyncio
import psutil
from urllib.parse import urlparse
import re
async def run_crawler(url, output_dir, workers=3, max_visited_urls=500, ignore_robots=True):
command = [
"crawler",
f"--url={url}",
f"--offline-export-dir={output_dir}",
f"--workers={workers}",
f"--max-visited-urls={max_visited_urls}",
]
if ignore_robots:
command.append("--ignore-robots-txt")
process = await asyncio.create_subprocess_exec(*command)
await process.communicate()
async def get_domain(url):
parsed_url = urlparse(url)
return parsed_url.netloc
async def sanitize(value: str) -> str:
value = re.sub(r"(https?://)|(www\.)|(\.com)", "", value)
value = re.sub(r"[^\w\s-]", "_", value)
return value.rstrip("_").strip()
async def generate_crawler_parameters(source_path, output_dir):
with open(source_path, "r") as f:
urls = [line.strip().split(",")[1] for line in f.readlines()[1:]] # Assuming CSV file with URL in second column
domains = [await get_domain(url) for url in urls]
output_dirs = [sanitize(os.path.join(output_dir, domain)) for domain in domains]
parameters = [(url, output_dir) for url, output_dir in zip(urls, output_dirs)]
return parameters
def get_dynamic_workers(cpu_threshold=85.0, memory_threshold=85.0, min_workers=1, max_workers=10):
cpu_usage = psutil.cpu_percent(interval=1)
memory_usage = psutil.virtual_memory().percent
if cpu_usage > cpu_threshold or memory_usage > memory_threshold:
# Reduce workers if system is overloaded
return min_workers
else:
# Maximize workers if system is healthy
return max_workers
async def run_parallel_crawls(source_path, output_base_dir, max_concurrent_tasks=5, min_workers=1, max_workers=10):
commands = await generate_crawler_parameters(source_path, output_base_dir)
semaphore = asyncio.Semaphore(max_concurrent_tasks)
async def wrapper(command):
async with semaphore:
dynamic_workers = get_dynamic_workers(min_workers=min_workers, max_workers=max_workers)
await run_crawler(*command, workers=dynamic_workers)
tasks = [wrapper(command) for command in commands]
await asyncio.gather(*tasks)
if __name__ == "__main__":
script_dir = os.path.dirname(os.path.abspath(__file__))
script_dir = os.path.join(script_dir, "../" * 2, "data/synthetic")
source_path = os.path.join(script_dir, "categorized-synthetic-sites.csv")
output_dir = os.path.join(script_dir, "clones")
asyncio.run(run_parallel_crawls(source_path, output_dir, max_concurrent_tasks=5))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment