Skip to content

Instantly share code, notes, and snippets.

@zhongfu
Last active October 21, 2021 00:44
Show Gist options
  • Save zhongfu/fa2fe44634d7e8ff4f52f5a59de3c745 to your computer and use it in GitHub Desktop.
Save zhongfu/fa2fe44634d7e8ff4f52f5a59de3c745 to your computer and use it in GitHub Desktop.
lobsterdao chat stats

how do I use this crap?

  • open up tdesktop (ie the Telegram desktop client)
  • go to your favorite group
  • export all messages as json -- you can skip all media
  • head to wherever tdesktop downloaded it to -- maybe Downloads/ChatExport_yyyy-mm-dd/
  • run generate.py there
  • optionally: weep a little
#!/usr/bin/env python3
# counts number of rows where the value in COL_TO_READ
# is less than RANK
import csv
import sys
if len(sys.argv) < 4:
print(f"{sys.argv[0]} FILENAME COL_TO_READ RANKS...")
sys.exit(1)
fn = sys.argv[1]
col = sys.argv[2]
ranks_raw = sys.argv[3:]
try:
ranks = [int(r) for r in ranks_raw]
except ValueError:
print("ranks should be numbers")
sys.exit(2)
with open(fn, 'r', newline='') as f:
reader = csv.DictReader(f)
count = 0
end = False
for rank in ranks:
while True:
try:
row = next(reader)
except StopIteration:
end = True
if int(row[col]) < rank or end:
print(f"{count} users with {rank} {col}")
count += 1
break
if not end:
count += 1
#!/usr/bin/env python3
import csv
import json
import pytz
from datetime import datetime
# tdesktop dumps messages in local tz
# so we change the tz from utc+8 to utc+1.. utc+2..? why is DST a thing
source_tz = pytz.timezone('Asia/Singapore')
target_tz = pytz.timezone('CET')
# change this to the message id that you'd like to stop at
id_limit = None
messages = dict()
with open('result.json', 'r') as f:
messages_list = json.load(f)['messages']
for msg in messages_list:
msgid = msg['id']
if id_limit and msgid >= id_limit:
break
messages[msgid] = msg
msg['date'] = datetime.fromisoformat(msg['date']).astimezone(source_tz).astimezone(target_tz)
def get_uid(msg):
uid = msg.get('from_id') or msg.get('actor_id')
assert isinstance(uid, str), "uid is not str"
return uid
uid_map = dict()
for msgid, msg in messages.items():
uid = get_uid(msg)
name = msg.get('from') or msg.get('actor') or ''
uid_map[uid] = name
def most_replies():
parents = dict()
msgs_with_replies = dict()
for msgid, msg in messages.items():
if 'reply_to_message_id' in msg:
current = msgid
parent = msg['reply_to_message_id']
while parent in parents:
parent = parents[parent]
parents[current] = parent
msgs_with_replies[parent] = msgs_with_replies.get(parent, 0) + 1
msgs_with_replies = dict(sorted(msgs_with_replies.items(), key=lambda tup: tup[1], reverse=True))
# str or list
def parse_text(text):
if isinstance(text, str) or text is None:
return text
elif isinstance(text, list):
bits = list()
for bit in text:
if isinstance(bit, str):
bits.append(bit)
elif isinstance(bit, dict):
bits.append(bit['text'])
else:
raise ValueError(f"Unknown msg text component with type {type(text)}")
return ''.join(bits)
else:
raise ValueError(f"Got msg with weird type {type(text)}")
with open('largest_threads.csv', 'w', newline='') as f:
fields = ["replies", "msg_id", "msg_link", "sender", "sender_id", "date", "message"]
writer = csv.writer(f)
writer.writerow(fields)
for msgid, replies in msgs_with_replies.items():
parts = [str(replies), str(msgid), f"https://t.me/lobsters_chat/{msgid}"]
if msgid in messages:
msg = messages[msgid]
parts.append(msg.get('from') or msg.get('actor') or '')
parts.append(get_uid(msg))
parts.append(msg['date'].strftime("%Y-%m-%dT%H:%M:%S"))
if msg['type'] == 'message':
parts.append(parse_text(msg.get('text')) or '(media?)')
elif msg['type'] == 'service':
action = msg.get('action')
assert isinstance(action, str), f"service message with no action? {msgid}"
parts.append(f"(action: {msg.get('action')})")
else:
raise ValueError(f"Message with weird type {msg['type']}!")
else:
parts.append('')
parts.append('')
parts.append('')
parts.append('(deleted?)')
writer.writerow(parts)
def most_active():
user_msg_count = dict()
user_msg_count_replies = dict()
user_msg_count_days = dict()
user_msg_count_replies_days = dict()
for msgid, msg in messages.items():
if msg['type'] != 'message':
continue
uid = get_uid(msg)
date = msg['date'].date()
if uid not in user_msg_count:
user_msg_count[uid] = 0
if uid not in user_msg_count_days:
user_msg_count_days[uid] = set()
user_msg_count[uid] += 1
user_msg_count_days[uid].add(date)
if 'reply_to_message_id' in msg:
if uid not in user_msg_count_replies:
user_msg_count_replies[uid] = 0
if uid not in user_msg_count_replies_days:
user_msg_count_replies_days[uid] = set()
user_msg_count_replies[uid] += 1
user_msg_count_replies_days[uid].add(date)
fields_msgs = ["rank", "messages", "name", "uid"]
fields_days = ["rank", "days", "name", "uid"]
user_msg_count = dict(sorted(user_msg_count.items(), key=lambda tup: tup[1], reverse=True))
user_msg_count_replies = dict(sorted(user_msg_count_replies.items(), key=lambda tup: tup[1], reverse=True))
user_msg_count_days = dict(sorted({uid: len(days) for uid, days in user_msg_count_days.items()}.items(), key=lambda tup: tup[1], reverse=True))
user_msg_count_replies_days = dict(sorted({uid: len(days) for uid, days in user_msg_count_replies_days.items()}.items(), key=lambda tup: tup[1], reverse=True))
with open('user_msg_count.csv', 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(fields_msgs)
rank = None
msgs = None
for num, (uid, cnt) in enumerate(user_msg_count.items()):
if rank == None or cnt < msgs:
rank = num + 1
msgs = cnt
writer.writerow([rank, msgs, uid_map[uid], uid])
with open('user_msg_count_replies.csv', 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(fields_msgs)
rank = None
msgs = None
for num, (uid, cnt) in enumerate(user_msg_count_replies.items()):
if rank == None or cnt < msgs:
rank = num + 1
msgs = cnt
writer.writerow([rank, msgs, uid_map[uid], uid])
with open('user_msg_count_days.csv', 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(fields_days)
rank = None
msgs = None
for num, (uid, cnt) in enumerate(user_msg_count_days.items()):
if rank == None or cnt < msgs:
rank = num + 1
msgs = cnt
writer.writerow([rank, msgs, uid_map[uid], uid])
with open('user_msg_count_replies_days.csv', 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(fields_days)
rank = None
msgs = None
for num, (uid, cnt) in enumerate(user_msg_count_replies_days.items()):
if rank == None or cnt < msgs:
rank = num + 1
msgs = cnt
writer.writerow([rank, msgs, uid_map[uid], uid])
if __name__ == '__main__':
most_replies()
most_active()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment