Skip to content

Instantly share code, notes, and snippets.

@d33tah
Last active July 18, 2021 15:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save d33tah/67690674d057a04613aafc1a13114107 to your computer and use it in GitHub Desktop.
Save d33tah/67690674d057a04613aafc1a13114107 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import dateutil.parser
import textwrap
import argparse
import telegram_scraper
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--outdir", required=True)
args = parser.parse_args()
t = telegram_scraper.TelegramScraper()
for msg in t.fetch_messages():
if "text" not in msg:
continue
timestamp = dateutil.parser.parse(msg["time"])
fname = (
f"{timestamp.year}-"
f"{str(timestamp.month).zfill(2)}-{str(timestamp.day).zfill(2)}-"
f"{str(msg['msgno']).zfill(6)}-{msg.get('author', 'UNKNOWN')}.md"
)
with open(args.outdir + "/" + fname, "w") as f:
f.write(textwrap.dedent(
f'''
---
layout: post
author: {msg.get("author", "UNKNOWN")}
title: "#{msg['msgno']}"
---
'''
).lstrip())
f.write(msg["text"])
if __name__ == "__main__":
main()
#!/usr/bin/env python3
import argparse
import logging
import json
import pathlib
import requests
import lxml.html
class TelegramScraper:
def __init__(self):
self.n = 0
self.to_save = {}
self.latest_cached_message = max(
[int(x.name) for x in pathlib.Path("cache").glob("*")]
)
def parse(self, response):
h = lxml.html.fromstring(response)
text_div = h.xpath('//div [contains(@class, "js-message_text")]')
photo_div = h.xpath(
'//a [contains(@class, "tgme_widget_message_photo_wrap")]'
)
author_el = h.xpath(
'//* [@class="tgme_widget_message_author_name"]/span'
)
timestamp = h.xpath("//time")
ret = {}
if timestamp:
ret["time"] = timestamp[0].get("datetime")
if author_el and author_el[0].text:
ret["author"] = author_el[0].text.strip()
if text_div:
ret["text"] = "\n".join(list(text_div[0].itertext()))
if photo_div:
ret["photos"] = [
photo.get("style").split("'")[-2] for photo in photo_div
]
if ret:
ret["msgno"] = self.n
return ret
def save_messages_into_cache(self):
for fname, response in self.to_save.items():
with open(fname, "w") as f:
f.write(response)
self.to_save = {}
def get_next_message(self):
fname = f"cache/{self.n}"
try:
with open(fname) as f:
response = f.read()
except FileNotFoundError:
if self.n < self.latest_cached_message:
return {}
url = f"https://t.me/hsl_info/{self.n}?embed=1"
response = requests.get(url).text
logging.debug("Parsing n=%d", self.n)
ret = self.parse(response)
if ret:
self.to_save[fname] = response
self.save_messages_into_cache()
return ret
def fetch_messages(self):
errors = 0
while True:
self.n += 1
msg = self.get_next_message()
if not msg:
errors += 1
if errors > 100:
break
continue
if "text" not in msg and "photos" not in msg:
continue
if "text" in msg and "photos" not in msg and len(msg["text"]) < 20:
continue
if msg.get("text", "").endswith("joined the room."):
continue
if msg.get("text", "").endswith("left the room."):
continue
yield msg
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--print-parsed")
parser.add_argument("--loglevel", default="INFO")
args = parser.parse_args()
t = TelegramScraper()
logging.basicConfig(level=args.loglevel.upper())
if args.print_parsed:
with open(args.print_parsed) as f:
print(t.parse(f.read()))
else:
for msg in t.fetch_messages():
print(msg)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment