Skip to content

Instantly share code, notes, and snippets.

@simbafs
Created August 3, 2023 07:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save simbafs/f09e93996aabae758996a858b6ab0492 to your computer and use it in GitHub Desktop.
Save simbafs/f09e93996aabae758996a858b6ab0492 to your computer and use it in GitHub Desktop.
Crawler and Telegram bot to monitor NCKU course system

程式解說

如何開發:

整個程式分成兩個部份,首先是主要部份,網路爬蟲,會根據儲存的目標定期去選課系統抓指定課程剩餘人數,如果有變化就會通知使用者
第二部份是使用者界面,telegram 聊天機器人,透過指令的方式可以新增、刪除、列出要監聽的頁面跟指定欄位
這個聊天機器人支援多使用者,各自的資料不會互相影響,是分開儲存的

如何使用:

打開 Telegram,找到機器人 https://t.me/nckubot ,使用指令 /listen <url> [selector] 新增一筆紀錄,接著機器人就會每隔一段時間去抓看看有沒有變更

# ChatGPT link: https://chat.openai.com/share/4c2ffc97-0469-4ec5-a1ff-2277e75ec882
import os
import sys
import time
from threading import Thread
import requests
import telebot
from bs4 import BeautifulSoup
from dotenv import load_dotenv
load_dotenv()
def parseHeader(headerText):
headers = {}
# Split the input string into lines
lines = headerText.strip().split("\n")
for line in lines:
# Split each line by the first colon to separate key and value
key, value = line.split(":", 1)
headers[key.strip()] = value.strip()
return headers
magicN = 2
def get_text_by_css_selector(url, selector, printStatus=False):
# global magicN
# print(magicN)
# if magicN == 0:
# return {
# "enroll": 1,
# "available": 38,
# }
# magicN -= 1
try:
# Send an HTTP GET request to the URL
headers = parseHeader(
"""
Host: course.ncku.edu.tw
User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/116.0
Accept: */*
Accept-Language: en-US,en;q=0.8,zh-TW;q=0.5,zh;q=0.3
Accept-Encoding: gzip, deflate, br
DNT: 1
Connection: keep-alive
Referer: https://course.ncku.edu.tw/index.php?c=qry11215&m=en_query&i=U2RUawA2BjULclF3VzsBP1ZiU3RebAd2CDsHZwU9CzBVblYtVjxWPVM7UzsDPA02WmVVbVJmB2sFfF99Bj8GNgpmUHcIegFjBCQLZQInAjcCPVNsUiFXNV91BjwHeFwrAHJRawAyC2AGdQYlVHANNg0yUGgANQJuUSgHK1MjAztTNFRrACcGaAsrUT9XYwE0VmlTbV5y
Cookie: _ga=GA1.3.381219810.1673964398; _ga_V5263G30EE=GS1.1.1690885343.325.0.1690885343.0.0.0; _ga_9VR7HBB4SG=GS1.1.1687957937.7.0.1687957937.60.0.0; AMCV_4D6368F454EC41940A4C98A6%40AdobeOrg=-2121179033%7CMCIDTS%7C19523%7CMCMID%7C12671621494609851332063067034236606133%7CMCAID%7CNONE%7CMCOPTOUT-1686749625s%7CNONE%7CMCAAMLH-1687347225%7C11%7CMCAAMB-1687347225%7Cj8Odv6LonN4r3an7LhD3WZrU1bUpAkFkkiY1ncBR96t2PTI%7CMCSYNCSOP%7C411-19530%7CvVersion%7C5.3.0; s_pers=%20v8%3D1686742425397%7C1781350425397%3B%20v8_s%3DMore%2520than%252030%2520days%7C1686744225397%3B%20c19%3Dpr%253Apure%2520portal%253Apersons%253Aview%7C1686744225399%3B%20v68%3D1686742421172%7C1686744225400%3B; _ga_8RY35M6ZMQ=GS1.1.1690853311.10.0.1690853311.0.0.0; _ga_WHBRRJE1JV=GS1.1.1690298802.14.1.1690298834.0.0.0; _ga_YTL27NBKJ1=GS1.1.1686673400.7.1.1686673853.0.0.0; _ga_TKEHRTBPD6=GS1.1.1690760071.15.1.1690760097.0.0.0; _ga_BSPNZQ67NJ=GS1.1.1690760097.14.1.1690760238.0.0.0; _ga_8LGPXZ8D11=GS1.1.1690885960.5.0.1690885963.0.0.0; __dtsu=6D001683526930C7219E3255D732F50D; _ga_7TZLNDHBYS=GS1.1.1683905548.1.0.1683905556.0.0.0; _ga_H1MCL60QVP=GS1.1.1684898230.1.0.1684898236.0.0.0; _ga_FHTS9H202Z=GS1.1.1684918248.2.1.1684921479.0.0.0; _ga_H6F5F49WXK=GS1.1.1687957937.3.0.1687957937.60.0.0; cos_lang=cht; COURSE_WEB=ffffffff8f7cbb1d45525d5f4f58455e445a4a423660; COURSE_CDN=ffffffff8f7ce72345525d5f4f58455e445a4a42cbd9; _gid=GA1.3.1739964840.1690853312; SSO=adc313fd46be8056305dc9a4aa926d71; PHPSESSID=C24106082ff661c220ea6059c78ece6371aa19c20; AMCVS_4D6368F454EC41940A4C98A6%40AdobeOrg=1; s_sess=%20s_cpc%3D0%3B%20s_sq%3D%3B%20s_ppvl%3Dpr%25253Apure%252520portal%25253Apersons%25253Asimilar%252C64%252C64%252C889%252C1920%252C889%252C1920%252C1080%252C1%252CP%3B%20s_ppv%3Dpr%25253Apure%252520portal%25253Apersons%25253Aview%252C77%252C40%252C3913%252C1920%252C889%252C1920%252C1080%252C1%252CP%3B%20e41%3D1%3B%20s_cc%3Dtrue%3B
Sec-Fetch-Dest: script
Sec-Fetch-Mode: no-cors
Sec-Fetch-Site: same-origin
"""
)
print("Sending HTTP GET request...") if printStatus else None
response = requests.get(
url,
headers=headers,
timeout=10,
)
print("Response received!") if printStatus else None
# Check if the request was successful (status code 200)
if response.status_code == 200:
print("BeautifulSoup parsing...") if printStatus else None
# calculate parse time
start_time = time.time()
# Parse the HTML content using BeautifulSoup
soup = BeautifulSoup(response.content, "lxml")
print("BeautifulSoup parsed!") if printStatus else None
print(
"--- {:.2f} seconds ---".format(time.time() - start_time)
) if printStatus else None
# Find all elements matching the CSS selector
elements = soup.select(selector)
if len(elements) == 0:
print(
f"Error: No element found with the CSS selector '{selector}'"
) if printStatus else None
print(soup) if printStatus else None
return None
# Extract and return the text content of the elements
text_content = [element.text.strip() for element in elements]
enrollment = text_content[0].split("/")
return {
"enrolled": int(enrollment[0]),
"available": int(enrollment[1]),
}
else:
print(
f"Error: Failed to fetch data from {url}. Status code: {response.status_code}"
)
return None
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
return None
token = os.getenv("BOTTOKEN")
if token is None:
print("Error: No token provided")
sys.exit(1)
bot = telebot.TeleBot(token)
url = "https://course.ncku.edu.tw/index.php?c=qry11215&m=en_query&i=UGQNOgEzDj8DdgEhVmwEM1I6USEMalMhWG9SPFQ4AzIAZwMnDGcGNldgX3oAMFN0Bm5fNAU8UWcBMwIoAz1WOQJuVGAMNFlpUzxbMlRlADxQeVV0W25XNVxqBSAEJQZsCXcEaFd1Vm1Wal5jDH0FMVcmDj9SLAB6AXpSOVE3BmlQJw1nASIOPgNvAWhWZwQ5Un0="
print(url)
default_selector = "#A9-table > tbody:nth-child(2) > tr:nth-child(1) > td:nth-child(8)"
targetChat = "901756183"
delay = 30
printStatus = True
all_listeners = {}
@bot.message_handler(commands=["start", "help"])
def cmd_start(message):
id = message.chat.id
bot.send_message(
id,
"""
bot to moniter course enrollment
/listen <url> <selector>
/list
/remove <url>
""",
)
@bot.message_handler(commands=["listen"])
def cmd_listen(message):
id = message.chat.id
my_listeners = dict()
if id in all_listeners:
my_listeners = all_listeners[id]
args = message.text.split(" ", 2)
url = args[1]
selector = (
args[2]
if len(args) > 2
else "#A9-table > tbody:nth-child(2) > tr:nth-child(1) > td:nth-child(8)"
)
# todo check if duplicated
my_listeners[url] = [selector, 0]
all_listeners[id] = my_listeners
bot.send_message(id, f"added\nurl: {url}\nselector: {selector}")
print(all_listeners)
@bot.message_handler(commands=["list"])
def cmd_list(message):
id = message.chat.id
my_listeners = dict()
if id in all_listeners:
my_listeners = all_listeners[id]
bot.send_message(id, f"all listeners: {my_listeners}")
@bot.message_handler(commands=["remove"])
def cmd_remove(message):
id = message.chat.id
if id not in all_listeners:
bot.send_message(id, "you have no listeners")
return
my_listeners = all_listeners[id]
args = message.text.split(" ", 2)
url = args[1]
if url in my_listeners:
my_listeners.pop(url)
bot.reply_to(message, f"removed {url}")
else:
bot.reply_to(message, f"no such url {url}")
@bot.message_handler(func=lambda message: True)
def all_message(message):
bot.reply_to(message, "hello")
def crawler():
while True:
print("do crawler")
for id in all_listeners:
my_listeners = all_listeners[id]
for url in my_listeners:
selector = my_listeners[url][0]
# print(url, selector)
data = get_text_by_css_selector(url, selector)
if data is None:
print("data is none")
elif data["available"] != my_listeners[url][1]:
bot.send_message(
id,
f"data change! {url} now have {data['available']} available",
)
my_listeners[url][1] = int(data["available"])
else:
print("data not change", data["available"], my_listeners[url][1])
time.sleep(delay)
t1 = Thread(target=crawler)
t2 = Thread(target=bot.infinity_polling)
t1.start()
t2.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment