Skip to content

Instantly share code, notes, and snippets.

@KokoseiJ
Last active April 18, 2022 15:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save KokoseiJ/74250858daf092dffa8124fe2a5323db to your computer and use it in GitHub Desktop.
Save KokoseiJ/74250858daf092dffa8124fe2a5323db to your computer and use it in GitHub Desktop.
문갤문학 검색기 v3
## Copyright (C) 2022 파이썬 (KokoseiJ)
#
# mungalparser is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import requests
from bs4 import BeautifulSoup as bs
import re
import sys
import time
import datetime
import threading
from queue import Queue
from functools import cached_property
FILE_NAME = "mungalsearch.txt"
GALL_ID = "dokidokilc"
GALL_URL = "https://gall.dcinside.com/mgallery/board/lists?id={}"
GALL_VIEW_URL = "https://gall.dcinside.com/mgallery/board/view/?id={}&no={}"
USER_AGENT = "Just_Monika"
THREAD_NUM = 16
ERROR_WAIT = 5
PATTERNS = [
re.compile(r"[\[\(]?문갤 ?(?:릴레이 ?)?문학(?:/[^\t]+)?[\]\)]"),
re.compile(r"[\[\(]?문예 ?대회[\]\)]"),
# re.compile(r"[\[\(]?시인 ?(?:대회)?[\]\)]")
]
SPLASH = """\
===============
= 문갤문학 검색기 v3 =
= by 파이썬 =
===============
과도한 사용은 디시인사이드 서버에 부담을 줄 수 있습니다.
"""
class Gallery:
def __init__(self, gallid=GALL_ID, ua=USER_AGENT, threads=THREAD_NUM):
self.gallid = gallid
self.url = GALL_URL.format(gallid)
self.session = requests.session()
self.session.headers.update({
"User-Agent": ua
})
self.threadnum = threads
self.queue = Queue()
self.lock = threading.Lock()
self.finished = 0
self.current_page = 0
self.threads = [
threading.Thread(target=self.run)
for _ in range(threads)
]
@cached_property
def last_page(self):
r = self.session.get(f"{self.url}&page=999999999")
page = re.search(r"&page=([0-9]+)", r.url).group(1)
return int(page)
def _get_page(self, page):
try:
r = self.session.get(f"{self.url}&page={page}")
except Exception as e:
print(
f"\n경고! 페이지 {page} 색인 중 {type(e)} 오류가 발생하였습니다. "
f"{ERROR_WAIT}초 후 재시도합니다..."
)
time.sleep(ERROR_WAIT)
return self._get_page(page)
return r
def get_page(self, page):
r = self._get_page(page)
soup = bs(r.content, features="lxml")
td_list = soup.find_all("tr", {"class": "ub-content us-post"})
return [
GallEntry(self, entry) for entry in td_list
if entry['data-type'] != "icon_notice"
]
def gen_page(self, start=1):
for current_page in range(start, self.last_page + 1):
yield (current_page, self.get_page(current_page))
def get_current_page(self):
with self.lock:
page = self.current_page
self.current_page += 1
return page
def report_finish(self):
with self.lock:
self.finished += 1
def run(self):
while True:
page = self.get_current_page()
if page > self.last_page:
break
self.queue.put((page, self.get_page(page)))
self.report_finish()
return
def start(self):
[thread.start() for thread in self.threads]
def gen_page_threaded(self, start=1):
self.current_page = start
self.start()
while self.finished != self.threadnum or not self.queue.empty:
yield self.queue.get()
class GallEntry:
def __init__(self, gall, entry):
self.gall = gall
self.entry = entry
@cached_property
def url(self):
return GALL_VIEW_URL.format(self.gall.gallid, self.id)
@cached_property
def id(self):
return int(self._find_text("num"))
@cached_property
def subject(self):
return self._find_text("subject")
@cached_property
def title(self):
return self._find_class("tit").find("a").text.strip()
@cached_property
def author(self):
return self._find_text("writer").strip()
@cached_property
def nick(self):
return self._find_class("writer")['data-nick']
@cached_property
def userid(self):
return self._find_class("writer")['data-uid']
@cached_property
def ip(self):
return self._find_class("writer")['data-ip']
@cached_property
def is_anon(self):
return bool(self.ip)
@cached_property
def is_halfanon(self):
writer_elem = self._find_class("writer")
nikcon_elem = writer_elem.find("a", {"class": "writer_nikcon"})
if nikcon_elem is None:
return False
return nikcon_elem.find("img")['src'].rsplit("/")[-1].startswith("fix")
@cached_property
def is_gonik(self):
return not self.is_anon and not self.is_halfanon
@cached_property
def nick_format(self):
if not self.is_anon:
return f"{self.nick} ({self.userid})"
else:
return f"{self.nick} ({self.ip})"
@cached_property
def date(self):
timestamp = self._find_class("date")['title']
return datetime.datetime.fromisoformat(timestamp)
@cached_property
def date_format(self):
return self.date.strftime("%y/%m/%d %H:%M:%S")
@cached_property
def read_count(self):
return int(self._find_text("count"))
@cached_property
def recommend(self):
return int(self._find_text("recommend"))
def _find_class(self, classname):
return self.entry.find("td", {"class": f"gall_{classname}"})
def _find_text(self, classname):
return self._find_class(classname).text
def filter(entry):
if entry.subject == "문학":
return True
for pattern in PATTERNS:
if pattern.match(entry.title):
return True
return False
def run():
gall = Gallery()
filtered = []
ids = []
total = 0
for pagenum, entries in gall.gen_page_threaded():
print(f"[*] {pagenum} 페이지 처리중...", end="\r")
total += len(entries)
for entry in entries:
if filter(entry) and entry.id not in ids:
filtered.append(entry)
ids.append(entry.id)
return total, filtered
def format(num, entry):
return f"{num}. {entry.nick_format} - {entry.title} " \
f"| {entry.date_format} | {entry.recommend}\n{entry.url}\n\n"
def main():
print(SPLASH)
patternstr = "\n".join([f"r\"{pattern}\"" for pattern in PATTERNS])
print(
f"갤러리 ID: {GALL_ID}\n"
f"User-Agent: {USER_AGENT}\n"
f"스레드 개수: {THREAD_NUM}\n\n"
f"정규식 검색 패턴:\n",
patternstr + "\n"
)
for i in range(5, 0, -1):
print(f"{i}초 뒤 검색을 시작합니다" + "." * (6 - i), end="\r")
time.sleep(1)
start_time = time.time()
strftime = time.strftime("%y/%m/%d %H:%M:%S", time.localtime(start_time))
print(f"\n시작 시간: {strftime}\n")
total, filtered = run()
elapsed_time = time.time() - start_time
elapsed_format = time.strftime("%H:%M:%S", time.gmtime(elapsed_time))
print("\n\n색인이 완료되었습니다.\n")
print(f"소요시간: {elapsed_format}")
print(f"총 게시글수: {total}, 검색된 게시글: {len(filtered)}")
print("정렬을 시작합니다...")
filtered.sort(key=lambda x: x.id)
print("정렬이 완료되었습니다. 파일을 저장중입니다...")
with open(FILE_NAME, "w") as f:
f.write(
"문갤문학 검색기 v3 by 파이썬\n\n"
f"현재 시간: {strftime}\n"
"정규식 검색 패턴:\n" +
patternstr + "\n\n"
)
for i, entry in enumerate(filtered):
f.write(format(i+1, entry))
print("파일을 저장했습니다. 프로그램을 종료합니다.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment