import aiohttp import asyncio import urllib from urllib.parse import urlparse from selenium import webdriver from selenium.webdriver.common.by import By from bs4 import BeautifulSoup import re import random from src.utils.settings import * class NaverCrawler: def __init__(self): # Naver api key self.client_id = None self.client_secret = None # init self.set_naver_api_key(CLIENT_ID, CLIENT_SECRET) def set_naver_api_key(self, client_id, client_secret): self.client_id = client_id self.client_secret = client_secret def generate_search_query(self, user_input, query_type): destination = user_input['destination'] preferred_food = user_input['preferred_food'] preferred_activity = user_input['preferred_activity'] accommodation_type = user_input['accommodation_type'] if query_type == "food": query = f"{destination} 여행 {preferred_food} 맛집" elif query_type == "activity": query = f"{destination} 여행 {preferred_activity} 관광지" elif query_type == "course": query = f"{destination} 여행 교통" else: query = f"{destination} 여행 {accommodation_type} BEST" return query def _check_naver_url(self, url): naver_url = 'blog.naver.com' if naver_url in url: return True else: return False def get_blog_infos(self, query_types, search_quries): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop = asyncio.get_event_loop() items = loop.run_until_complete(self._get_blog_infos_async(query_types, search_quries)) return items async def _get_blog_infos_async(self, query_types, search_quries): async def fetch(sess, query_type, query, display=5, start=1, sort='sim'): encoded_query = urllib.parse.quote(query) base_url = "https://openapi.naver.com/v1/search/blog?query=" url = base_url + encoded_query + "&display=" + str(display) + "&start=" + str(start) + "&sort=" + sort headers = { "X-Naver-Client-Id": self.client_id, "X-Naver-Client-Secret": self.client_secret } async with sess.get(url, headers=headers) as response: code = response.status if (code == 200): item = await response.json() results = item['items'] else: print("Error Code:" + code) return None # only naver blog results = [result for result in results if self._check_naver_url(result['link'])] random.shuffle(results) # 동일한 결과가 안나오게 random하게 shuffle item = { 'results': results, 'query_type': query_type, 'query': query } return item async def fetch_all(sess, query_types, search_quries): items = await asyncio.gather(*[asyncio.create_task(fetch(sess, query_type, query)) for query_type, query in zip(query_types, search_quries)]) return items async with aiohttp.ClientSession() as sess: items = await fetch_all(sess, query_types, search_quries) return items def get_contents(self, items): # mac # brew install --cask chromedriver # driver = webdriver.Chrome() # ubuntu #https://velog.io/@codingchild/Crawling-Chromedriver-setting-ubuntu options = webdriver.ChromeOptions() options.add_argument('--disable-gpu') options.add_argument('--headless') options.add_argument('--no-sandbox') # sandbox를 사용하지 않는다는 옵션!! 필수 options.add_argument('--disable-blink-features=AutomationControlled') driver = webdriver.Chrome(options=options) candidates = [] for item in items: results = item['results'] # 후보군 중 파싱이 되는 블로그만 사용! for result in results: try: blog_url = result['link'] content = self.get_content_from_blog(driver, blog_url) # blog full body content result['content'] = content candidates.append(result) break except as e : print(f"Parsing Error : {e}") continue driver.close() return candidates def get_content_from_blog(self, driver, blog_url): def _get_source(driver): iframe = driver.find_element(By.ID, "mainFrame") driver.switch_to.frame(iframe) source = driver.page_source return source def _get_content(source): html = BeautifulSoup(source, "html.parser") content = html.select("div.se-main-container") content = ''.join(str(content)) return content def _preprocess_content(content): pattern1 = '<[^>]*>' pattern2 = """[\n\n\n\n\n// flash 오류를 우회하기 위한 함수 추가\nfunction _flash_removeCallback() {}""" content = re.sub(pattern=pattern1, repl='', string=content) content = content.replace(pattern2, '') content = content.replace('\n', '') content = content.replace('\u200b', '') return content driver.get(blog_url) driver.implicitly_wait(3) source = _get_source(driver) content = _get_content(source) content = _preprocess_content(content) return content