Skip to content

Instantly share code, notes, and snippets.

@fishyer
Last active June 5, 2024 17:53
Show Gist options
  • Save fishyer/638a1f492292d867fedaadd154dbda4b to your computer and use it in GitHub Desktop.
Save fishyer/638a1f492292d867fedaadd154dbda4b to your computer and use it in GitHub Desktop.
基于FastApi+Playwright+Redis的极简生产者消费者模型,可通过api接受关键词,并返回搜索结果
from fish_util.src.loguru_util import print
from multiprocessing import Process
from fastapi import FastAPI
from playwright.sync_api import Page, sync_playwright
import time
import redis
import arrow
import uvicorn
redis_client = redis.StrictRedis(host="127.0.0.1", port=6379, db=0)
def put_message_in_queue(message):
redis_client.lpush("message_queue", message)
def get_message_from_queue():
message = redis_client.rpop("message_queue")
return message.decode("utf-8") if message else None
def get_queue_length():
return redis_client.llen("message_queue")
def start_playwright():
with sync_playwright() as playwright:
browser = playwright.chromium.launch(headless=False)
context = browser.new_context()
while True:
queue_size = get_queue_length()
print("Queue size:", queue_size)
if queue_size > 0:
message = get_message_from_queue()
print(f"Processing message from queue: {message}")
def search(context, search_key: str) -> None:
current_time_str = arrow.now().format("YYYY-MM-DD_HHmmss")
page = context.new_page()
page.goto("https://weixin.sogou.com/")
page.wait_for_load_state()
page.screenshot(path=f"screenshot/{current_time_str}-step-1.png")
page.locator("#query").fill(search_key)
page.query_selector('input[type="submit"]').click()
page.wait_for_selector(".news-list li")
print(page.title(), page.url)
page.screenshot(path=f"screenshot/{current_time_str}-step-2.png")
def get_article_info(page: Page):
articles = page.query_selector_all(".news-list li")
print(f"len(articles): {len(articles)}")
for i, item in enumerate(articles):
item_a = item.query_selector(".txt-box>h3>a")
item_link = (
"https://weixin.sogou.com"
+ item_a.get_attribute("href")
)
item_title = item_a.inner_text()
if i == 0:
return item_title, item_link
article_title, article_link = get_article_info(page)
print(f"{article_title} {article_link}")
page.close()
search(context, message)
time.sleep(10)
def start_fastapi():
app = FastAPI()
@app.get("/wechat/title/{title}")
async def wechat_title(title: str):
print(f"Generating message from web: {title}")
put_message_in_queue(title)
return {"title": title}
print("http://localhost:38000/docs")
uvicorn.run(app, host="0.0.0.0", port=38000)
if __name__ == "__main__":
print(__file__)
# Start the Playwright service in a separate process
playwright_process = Process(target=start_playwright, name="PlaywrightProcess")
playwright_process.start()
# Start the FastAPI server in a separate process
fastapi_process = Process(target=start_fastapi, name="FastAPIProcess")
fastapi_process.start()
# Wait for both processes to finish
fastapi_process.join()
playwright_process.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment