Skip to content

Instantly share code, notes, and snippets.

@mh-firouzjah
Last active February 8, 2024 17:55
Show Gist options
  • Save mh-firouzjah/239fc5e99352407d693eff981ecd4a69 to your computer and use it in GitHub Desktop.
Save mh-firouzjah/239fc5e99352407d693eff981ecd4a69 to your computer and use it in GitHub Desktop.

A Python Script To Fetch Data From RSS Feeds Asynchronously

async def task(work_queue: asyncio.Queue, client: httpx.AsyncClient, result: list[tuple[Post, Tag]]) -> None:
    while not work_queue.empty():
        source: Source = await work_queue.get()
        url = source.feed_url
        logger.info(f"Working on {url}")
        try:
            res = await client.get(url)
        except httpx.RequestError as exc:
            msg = f"An error occurred while requesting {exc.request.url!r}."
            logger.exception(msg)
            source.last_result = "\n" + msg
            await sync_to_async(source.save, thread_sensitive=True)(update_fields=["last_result"])
            continue

        if res.status_code not in range(200, 300):
            msg = f"Status code: {res.status_code}"
            logger.error(f"Source: {source}, " + msg)
            source.last_result += "\n" + msg
            await sync_to_async(source.save, thread_sensitive=True)(update_fields=["last_result"])
            continue

        logger.info(f"Parsing response from source: {source}")
        feed = feedparser.parse(res)

        if not feed.has_key("entries") or not feed.entries:
            msg = f"No feed data for {source}"
            logger.error(msg)
            source.last_result += "\n" + msg
            await sync_to_async(source.save, thread_sensitive=True)(update_fields=["last_result"])
            continue

        logger.info(f"Pulling posts from source: {source}")
        posts_and_tags = pull_posts(source, feed)
        logger.info(f"Compeleted pulling posts from source: {source}")
        result.append(posts_and_tags)


async def gather_tasks() -> None:
    # Create the queue of work
    work_queue = asyncio.Queue()

    # A mutable object to send to each task so task can update it
    results: list[tuple[Post, Tag]] = []

    # Put some work in the queue
    async for source in Source.objects.filter(is_active=True):
        await work_queue.put(source)

    # Run the tasks
    async with httpx.AsyncClient() as client:
        await asyncio.gather(*[asyncio.create_task(task(work_queue, client, results)) for _ in range(cpu_cores_number * 2 + 1)])

    return results


def refreshfeeds():
    logger.info("Started pulling feeds...")
    result = asyncio.run(gather_tasks())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment