Skip to content

Instantly share code, notes, and snippets.

@GreenD93
Last active March 12, 2025 05:58
Show Gist options
  • Save GreenD93/957ac66e0ef6433be6eb633febcf8492 to your computer and use it in GitHub Desktop.
Save GreenD93/957ac66e0ef6433be6eb633febcf8492 to your computer and use it in GitHub Desktop.
import aiohttp
import asyncio
import urllib
from urllib.parse import urlparse
from selenium import webdriver
from selenium.webdriver.common.by import By
from bs4 import BeautifulSoup
import re
import random
from src.utils.settings import *
class NaverCrawler:
def __init__(self):
# Naver api key
self.client_id = None
self.client_secret = None
# init
self.set_naver_api_key(CLIENT_ID, CLIENT_SECRET)
def set_naver_api_key(self, client_id, client_secret):
self.client_id = client_id
self.client_secret = client_secret
def generate_search_query(self, user_input, query_type):
destination = user_input['destination']
preferred_food = user_input['preferred_food']
preferred_activity = user_input['preferred_activity']
accommodation_type = user_input['accommodation_type']
if query_type == "food":
query = f"{destination} 여행 {preferred_food} 맛집"
elif query_type == "activity":
query = f"{destination} 여행 {preferred_activity} 관광지"
elif query_type == "course":
query = f"{destination} 여행 교통"
else:
query = f"{destination} 여행 {accommodation_type} BEST"
return query
def _check_naver_url(self, url):
naver_url = 'blog.naver.com'
if naver_url in url:
return True
else:
return False
def get_blog_infos(self, query_types, search_quries):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop = asyncio.get_event_loop()
items = loop.run_until_complete(self._get_blog_infos_async(query_types, search_quries))
return items
async def _get_blog_infos_async(self, query_types, search_quries):
async def fetch(sess, query_type, query, display=5, start=1, sort='sim'):
encoded_query = urllib.parse.quote(query)
base_url = "https://openapi.naver.com/v1/search/blog?query="
url = base_url + encoded_query + "&display=" + str(display) + "&start=" + str(start) + "&sort=" + sort
headers = {
"X-Naver-Client-Id": self.client_id,
"X-Naver-Client-Secret": self.client_secret
}
async with sess.get(url, headers=headers) as response:
code = response.status
if (code == 200):
item = await response.json()
results = item['items']
else:
print("Error Code:" + code)
return None
# only naver blog
results = [result for result in results if self._check_naver_url(result['link'])]
random.shuffle(results) # 동일한 결과가 안나오게 random하게 shuffle
item = {
'results': results,
'query_type': query_type,
'query': query
}
return item
async def fetch_all(sess, query_types, search_quries):
items = await asyncio.gather(*[asyncio.create_task(fetch(sess, query_type, query))
for query_type, query in zip(query_types, search_quries)])
return items
async with aiohttp.ClientSession() as sess:
items = await fetch_all(sess, query_types, search_quries)
return items
def get_contents(self, items):
# mac
# brew install --cask chromedriver
# driver = webdriver.Chrome()
# ubuntu
#https://velog.io/@codingchild/Crawling-Chromedriver-setting-ubuntu
options = webdriver.ChromeOptions()
options.add_argument('--disable-gpu')
options.add_argument('--headless')
options.add_argument('--no-sandbox') # sandbox를 사용하지 않는다는 옵션!! 필수
options.add_argument('--disable-blink-features=AutomationControlled')
driver = webdriver.Chrome(options=options)
candidates = []
for item in items:
results = item['results']
# 후보군 중 파싱이 되는 블로그만 사용!
for result in results:
try:
blog_url = result['link']
content = self.get_content_from_blog(driver, blog_url)
# blog full body content
result['content'] = content
candidates.append(result)
break
except as e :
print(f"Parsing Error : {e}")
continue
driver.close()
return candidates
def get_content_from_blog(self, driver, blog_url):
def _get_source(driver):
iframe = driver.find_element(By.ID, "mainFrame")
driver.switch_to.frame(iframe)
source = driver.page_source
return source
def _get_content(source):
html = BeautifulSoup(source, "html.parser")
content = html.select("div.se-main-container")
content = ''.join(str(content))
return content
def _preprocess_content(content):
pattern1 = '<[^>]*>'
pattern2 = """[\n\n\n\n\n// flash 오류를 우회하기 위한 함수 추가\nfunction _flash_removeCallback() {}"""
content = re.sub(pattern=pattern1, repl='', string=content)
content = content.replace(pattern2, '')
content = content.replace('\n', '')
content = content.replace('\u200b', '')
return content
driver.get(blog_url)
driver.implicitly_wait(3)
source = _get_source(driver)
content = _get_content(source)
content = _preprocess_content(content)
return content
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment