Last active
June 5, 2024 17:53
-
-
Save fishyer/638a1f492292d867fedaadd154dbda4b to your computer and use it in GitHub Desktop.
基于FastApi+Playwright+Redis的极简生产者消费者模型,可通过api接受关键词,并返回搜索结果
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from fish_util.src.loguru_util import print | |
from multiprocessing import Process | |
from fastapi import FastAPI | |
from playwright.sync_api import Page, sync_playwright | |
import time | |
import redis | |
import arrow | |
import uvicorn | |
redis_client = redis.StrictRedis(host="127.0.0.1", port=6379, db=0) | |
def put_message_in_queue(message): | |
redis_client.lpush("message_queue", message) | |
def get_message_from_queue(): | |
message = redis_client.rpop("message_queue") | |
return message.decode("utf-8") if message else None | |
def get_queue_length(): | |
return redis_client.llen("message_queue") | |
def start_playwright(): | |
with sync_playwright() as playwright: | |
browser = playwright.chromium.launch(headless=False) | |
context = browser.new_context() | |
while True: | |
queue_size = get_queue_length() | |
print("Queue size:", queue_size) | |
if queue_size > 0: | |
message = get_message_from_queue() | |
print(f"Processing message from queue: {message}") | |
def search(context, search_key: str) -> None: | |
current_time_str = arrow.now().format("YYYY-MM-DD_HHmmss") | |
page = context.new_page() | |
page.goto("https://weixin.sogou.com/") | |
page.wait_for_load_state() | |
page.screenshot(path=f"screenshot/{current_time_str}-step-1.png") | |
page.locator("#query").fill(search_key) | |
page.query_selector('input[type="submit"]').click() | |
page.wait_for_selector(".news-list li") | |
print(page.title(), page.url) | |
page.screenshot(path=f"screenshot/{current_time_str}-step-2.png") | |
def get_article_info(page: Page): | |
articles = page.query_selector_all(".news-list li") | |
print(f"len(articles): {len(articles)}") | |
for i, item in enumerate(articles): | |
item_a = item.query_selector(".txt-box>h3>a") | |
item_link = ( | |
"https://weixin.sogou.com" | |
+ item_a.get_attribute("href") | |
) | |
item_title = item_a.inner_text() | |
if i == 0: | |
return item_title, item_link | |
article_title, article_link = get_article_info(page) | |
print(f"{article_title} {article_link}") | |
page.close() | |
search(context, message) | |
time.sleep(10) | |
def start_fastapi(): | |
app = FastAPI() | |
@app.get("/wechat/title/{title}") | |
async def wechat_title(title: str): | |
print(f"Generating message from web: {title}") | |
put_message_in_queue(title) | |
return {"title": title} | |
print("http://localhost:38000/docs") | |
uvicorn.run(app, host="0.0.0.0", port=38000) | |
if __name__ == "__main__": | |
print(__file__) | |
# Start the Playwright service in a separate process | |
playwright_process = Process(target=start_playwright, name="PlaywrightProcess") | |
playwright_process.start() | |
# Start the FastAPI server in a separate process | |
fastapi_process = Process(target=start_fastapi, name="FastAPIProcess") | |
fastapi_process.start() | |
# Wait for both processes to finish | |
fastapi_process.join() | |
playwright_process.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment