Skip to content

Instantly share code, notes, and snippets.

@wzyboy
Last active November 12, 2018 12:38
Show Gist options
  • Save wzyboy/aee51ec93014bb8dd053a36fd6183261 to your computer and use it in GitHub Desktop.
Save wzyboy/aee51ec93014bb8dd053a36fd6183261 to your computer and use it in GitHub Desktop.
Search thru Telegram history dumps.
#!/usr/bin/env python
'''
A simple script to search thru Telegram history dumps.
(See also: telegram-history-dump)
Example usages:
# Print dialogs
./telegram.py print /path/to/files.jsonl
# Show recent dialogs
./telegram.py journal --since 2017-12-01
# Search by key (default: text).
./telegram.py grep 'foo bar'
# Regular expressions are supported.
./telegram.py grep --key from '(?:John|Jane)_Doe'
'''
import os
import re
import json
import argparse
import subprocess
from datetime import datetime
from datetime import timedelta
from operator import itemgetter
def _parse_datetime(s):
try:
from dateutil.parser import parse
except ImportError:
return datetime.strptime('%Y-%m-%d')
else:
return parse(s)
def format_message(event):
event_type = event['event']
if event_type not in ('message', 'service'):
return event
tpl = '[{timestamp}] {from_name}: {payload}'
timestamp = datetime.fromtimestamp(event['date']).strftime('%Y-%m-%d %H:%M:%S')
try:
from_name = '{} {}'.format(event['from']['first_name'], event['from']['last_name']).strip()
except KeyError:
from_name = 'user#{}'.format(event['from']['peer_id'])
payload = event.get(
'text',
event.get(
'media',
event.get(
'action'
)
)
)
msg = tpl.format(timestamp=timestamp, from_name=from_name, payload=payload)
return msg
def print_dialog(filename):
with open(filename, 'r') as f:
for line in f:
event = json.loads(line)
msg = format_message(event)
print(msg)
def get_files_by_age(since):
if since is None:
since = datetime.today() - timedelta(days=1) # last 24 hours
# Find .jsonl files that were modified recently
here = os.path.dirname(os.path.abspath(__file__))
progress_file = os.path.join(here, 'output/progress.json')
with open(progress_file) as f:
progress = json.load(f)
dialogs = progress['dialogs']
recent_files = [
os.path.join(here, 'output', d['dumper_state']['outfile'])
for d in dialogs.values()
if datetime.utcfromtimestamp(d['newest_date']) >= since
]
return recent_files
def get_events_by_age(files, since):
# Filter events by age. The events are sorted from newest to oldest in a
# .jsonl file. Once we reached an old event, we are done on this file. This
# speeds things A LOT.
events = []
for rf in files:
with open(rf, 'r') as f:
for line in f:
event = json.loads(line)
if datetime.utcfromtimestamp(event['date']) >= since:
events.append(event)
else:
break
events.sort(key=itemgetter('date'))
return events
def grep_events_by_key(key, pattern, files):
# Grep events in files
cmd = ['grep', '-RHP', pattern]
cmd.extend(files)
try:
_stdout = subprocess.check_output(cmd)
except subprocess.CalledProcessError as e:
if e.returncode == 1:
_stdout = b''
else:
raise
stdout_lines = _stdout.decode('utf-8').splitlines()
if not stdout_lines:
return []
# Check if pattern matches
fn_events = []
pattern = re.compile(pattern)
key_valid = False
for line in stdout_lines:
filename, event_str = line.split(':', 1)
event = json.loads(event_str)
# Allow access to nested dicts
nested = event
keys = key.split('.')
for k in keys:
nested = nested.get(k, {})
value = nested
if not value:
continue
key_valid = True
if pattern.search(str(value)):
fn_events.append((filename, event))
if not key_valid:
print('KeyError: {} is not a valid key in any events.'.format(key))
return fn_events
def main():
ap = argparse.ArgumentParser()
sap = ap.add_subparsers(dest='action')
ap_print = sap.add_parser('print')
ap_print.add_argument('files', nargs='+')
ap_list = sap.add_parser('list')
ap_list.add_argument('--since', type=_parse_datetime)
#ap_list.add_argument('--sorted', action='store_true')
#ap_list.add_argument('--reversed', action='store_true')
ap_journal = sap.add_parser('journal')
ap_journal.add_argument('--since', type=_parse_datetime)
ap_grep = sap.add_parser('grep')
ap_grep.add_argument('pattern')
ap_grep.add_argument('--verbose', action='store_true')
ap_grep.add_argument('--sorted', action='store_true')
ap_grep.add_argument('--reversed', action='store_true')
ap_grep.add_argument('--key', default='text')
ap_grep.add_argument('--files', nargs='*')
args = ap.parse_args()
if args.action == 'print':
for f in args.files:
print_dialog(f)
elif args.action == 'list':
recent_files = get_files_by_age(args.since)
for rf in recent_files:
with open(rf, 'r') as f:
event = json.loads(next(f))
to_name = event['to']['print_name']
info = f'{rf}\t{to_name}'
print(info)
elif args.action == 'journal':
files = get_files_by_age(args.since)
events = get_events_by_age(files, args.since)
for event in events:
print(format_message(event))
elif args.action == 'grep':
if not args.files:
here = os.path.dirname(os.path.abspath(__file__))
files = [os.path.join(here, 'output/json')]
else:
files = args.files
fn_events = grep_events_by_key(args.key, args.pattern, files)
if args.sorted:
fn_events = sorted(fn_events, key=lambda x: x[1]['date'], reverse=args.reversed)
for fn, event in fn_events:
if args.verbose:
to_type = event['to']['peer_type']
if to_type == 'user':
try:
to_name = '{} {}'.format(event['to']['first_name'], event['to']['last_name']).strip()
except KeyError:
to_name = 'user#{}'.format(event['to']['peer_id'])
elif to_type == 'chat':
to_name = event['to']['title']
else:
to_name = None
info_line = '[INFO] filename="{}" to_name="{}" id="{}"'.format(fn, to_name, event['id'])
output = '{}\n{}'.format(info_line, format_message(event))
else:
output = format_message(event)
print(output)
else:
ap.print_help()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment