Instantly share code, notes, and snippets.
GreenD93/naver_crawler.py Secret
Last active
March 12, 2025 05:58
-
Star
0
(0)
You must be signed in to star a gist -
Fork
0
(0)
You must be signed in to fork a gist
-
Save GreenD93/957ac66e0ef6433be6eb633febcf8492 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import aiohttp | |
import asyncio | |
import urllib | |
from urllib.parse import urlparse | |
from selenium import webdriver | |
from selenium.webdriver.common.by import By | |
from bs4 import BeautifulSoup | |
import re | |
import random | |
from src.utils.settings import * | |
class NaverCrawler: | |
def __init__(self): | |
# Naver api key | |
self.client_id = None | |
self.client_secret = None | |
# init | |
self.set_naver_api_key(CLIENT_ID, CLIENT_SECRET) | |
def set_naver_api_key(self, client_id, client_secret): | |
self.client_id = client_id | |
self.client_secret = client_secret | |
def generate_search_query(self, user_input, query_type): | |
destination = user_input['destination'] | |
preferred_food = user_input['preferred_food'] | |
preferred_activity = user_input['preferred_activity'] | |
accommodation_type = user_input['accommodation_type'] | |
if query_type == "food": | |
query = f"{destination} 여행 {preferred_food} 맛집" | |
elif query_type == "activity": | |
query = f"{destination} 여행 {preferred_activity} 관광지" | |
elif query_type == "course": | |
query = f"{destination} 여행 교통" | |
else: | |
query = f"{destination} 여행 {accommodation_type} BEST" | |
return query | |
def _check_naver_url(self, url): | |
naver_url = 'blog.naver.com' | |
if naver_url in url: | |
return True | |
else: | |
return False | |
def get_blog_infos(self, query_types, search_quries): | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
loop = asyncio.get_event_loop() | |
items = loop.run_until_complete(self._get_blog_infos_async(query_types, search_quries)) | |
return items | |
async def _get_blog_infos_async(self, query_types, search_quries): | |
async def fetch(sess, query_type, query, display=5, start=1, sort='sim'): | |
encoded_query = urllib.parse.quote(query) | |
base_url = "https://openapi.naver.com/v1/search/blog?query=" | |
url = base_url + encoded_query + "&display=" + str(display) + "&start=" + str(start) + "&sort=" + sort | |
headers = { | |
"X-Naver-Client-Id": self.client_id, | |
"X-Naver-Client-Secret": self.client_secret | |
} | |
async with sess.get(url, headers=headers) as response: | |
code = response.status | |
if (code == 200): | |
item = await response.json() | |
results = item['items'] | |
else: | |
print("Error Code:" + code) | |
return None | |
# only naver blog | |
results = [result for result in results if self._check_naver_url(result['link'])] | |
random.shuffle(results) # 동일한 결과가 안나오게 random하게 shuffle | |
item = { | |
'results': results, | |
'query_type': query_type, | |
'query': query | |
} | |
return item | |
async def fetch_all(sess, query_types, search_quries): | |
items = await asyncio.gather(*[asyncio.create_task(fetch(sess, query_type, query)) | |
for query_type, query in zip(query_types, search_quries)]) | |
return items | |
async with aiohttp.ClientSession() as sess: | |
items = await fetch_all(sess, query_types, search_quries) | |
return items | |
def get_contents(self, items): | |
# mac | |
# brew install --cask chromedriver | |
# driver = webdriver.Chrome() | |
# ubuntu | |
#https://velog.io/@codingchild/Crawling-Chromedriver-setting-ubuntu | |
options = webdriver.ChromeOptions() | |
options.add_argument('--disable-gpu') | |
options.add_argument('--headless') | |
options.add_argument('--no-sandbox') # sandbox를 사용하지 않는다는 옵션!! 필수 | |
options.add_argument('--disable-blink-features=AutomationControlled') | |
driver = webdriver.Chrome(options=options) | |
candidates = [] | |
for item in items: | |
results = item['results'] | |
# 후보군 중 파싱이 되는 블로그만 사용! | |
for result in results: | |
try: | |
blog_url = result['link'] | |
content = self.get_content_from_blog(driver, blog_url) | |
# blog full body content | |
result['content'] = content | |
candidates.append(result) | |
break | |
except as e : | |
print(f"Parsing Error : {e}") | |
continue | |
driver.close() | |
return candidates | |
def get_content_from_blog(self, driver, blog_url): | |
def _get_source(driver): | |
iframe = driver.find_element(By.ID, "mainFrame") | |
driver.switch_to.frame(iframe) | |
source = driver.page_source | |
return source | |
def _get_content(source): | |
html = BeautifulSoup(source, "html.parser") | |
content = html.select("div.se-main-container") | |
content = ''.join(str(content)) | |
return content | |
def _preprocess_content(content): | |
pattern1 = '<[^>]*>' | |
pattern2 = """[\n\n\n\n\n// flash 오류를 우회하기 위한 함수 추가\nfunction _flash_removeCallback() {}""" | |
content = re.sub(pattern=pattern1, repl='', string=content) | |
content = content.replace(pattern2, '') | |
content = content.replace('\n', '') | |
content = content.replace('\u200b', '') | |
return content | |
driver.get(blog_url) | |
driver.implicitly_wait(3) | |
source = _get_source(driver) | |
content = _get_content(source) | |
content = _preprocess_content(content) | |
return content |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment