Last active
November 12, 2018 12:38
-
-
Save wzyboy/aee51ec93014bb8dd053a36fd6183261 to your computer and use it in GitHub Desktop.
Search thru Telegram history dumps.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
''' | |
A simple script to search thru Telegram history dumps. | |
(See also: telegram-history-dump) | |
Example usages: | |
# Print dialogs | |
./telegram.py print /path/to/files.jsonl | |
# Show recent dialogs | |
./telegram.py journal --since 2017-12-01 | |
# Search by key (default: text). | |
./telegram.py grep 'foo bar' | |
# Regular expressions are supported. | |
./telegram.py grep --key from '(?:John|Jane)_Doe' | |
''' | |
import os | |
import re | |
import json | |
import argparse | |
import subprocess | |
from datetime import datetime | |
from datetime import timedelta | |
from operator import itemgetter | |
def _parse_datetime(s): | |
try: | |
from dateutil.parser import parse | |
except ImportError: | |
return datetime.strptime('%Y-%m-%d') | |
else: | |
return parse(s) | |
def format_message(event): | |
event_type = event['event'] | |
if event_type not in ('message', 'service'): | |
return event | |
tpl = '[{timestamp}] {from_name}: {payload}' | |
timestamp = datetime.fromtimestamp(event['date']).strftime('%Y-%m-%d %H:%M:%S') | |
try: | |
from_name = '{} {}'.format(event['from']['first_name'], event['from']['last_name']).strip() | |
except KeyError: | |
from_name = 'user#{}'.format(event['from']['peer_id']) | |
payload = event.get( | |
'text', | |
event.get( | |
'media', | |
event.get( | |
'action' | |
) | |
) | |
) | |
msg = tpl.format(timestamp=timestamp, from_name=from_name, payload=payload) | |
return msg | |
def print_dialog(filename): | |
with open(filename, 'r') as f: | |
for line in f: | |
event = json.loads(line) | |
msg = format_message(event) | |
print(msg) | |
def get_files_by_age(since): | |
if since is None: | |
since = datetime.today() - timedelta(days=1) # last 24 hours | |
# Find .jsonl files that were modified recently | |
here = os.path.dirname(os.path.abspath(__file__)) | |
progress_file = os.path.join(here, 'output/progress.json') | |
with open(progress_file) as f: | |
progress = json.load(f) | |
dialogs = progress['dialogs'] | |
recent_files = [ | |
os.path.join(here, 'output', d['dumper_state']['outfile']) | |
for d in dialogs.values() | |
if datetime.utcfromtimestamp(d['newest_date']) >= since | |
] | |
return recent_files | |
def get_events_by_age(files, since): | |
# Filter events by age. The events are sorted from newest to oldest in a | |
# .jsonl file. Once we reached an old event, we are done on this file. This | |
# speeds things A LOT. | |
events = [] | |
for rf in files: | |
with open(rf, 'r') as f: | |
for line in f: | |
event = json.loads(line) | |
if datetime.utcfromtimestamp(event['date']) >= since: | |
events.append(event) | |
else: | |
break | |
events.sort(key=itemgetter('date')) | |
return events | |
def grep_events_by_key(key, pattern, files): | |
# Grep events in files | |
cmd = ['grep', '-RHP', pattern] | |
cmd.extend(files) | |
try: | |
_stdout = subprocess.check_output(cmd) | |
except subprocess.CalledProcessError as e: | |
if e.returncode == 1: | |
_stdout = b'' | |
else: | |
raise | |
stdout_lines = _stdout.decode('utf-8').splitlines() | |
if not stdout_lines: | |
return [] | |
# Check if pattern matches | |
fn_events = [] | |
pattern = re.compile(pattern) | |
key_valid = False | |
for line in stdout_lines: | |
filename, event_str = line.split(':', 1) | |
event = json.loads(event_str) | |
# Allow access to nested dicts | |
nested = event | |
keys = key.split('.') | |
for k in keys: | |
nested = nested.get(k, {}) | |
value = nested | |
if not value: | |
continue | |
key_valid = True | |
if pattern.search(str(value)): | |
fn_events.append((filename, event)) | |
if not key_valid: | |
print('KeyError: {} is not a valid key in any events.'.format(key)) | |
return fn_events | |
def main(): | |
ap = argparse.ArgumentParser() | |
sap = ap.add_subparsers(dest='action') | |
ap_print = sap.add_parser('print') | |
ap_print.add_argument('files', nargs='+') | |
ap_list = sap.add_parser('list') | |
ap_list.add_argument('--since', type=_parse_datetime) | |
#ap_list.add_argument('--sorted', action='store_true') | |
#ap_list.add_argument('--reversed', action='store_true') | |
ap_journal = sap.add_parser('journal') | |
ap_journal.add_argument('--since', type=_parse_datetime) | |
ap_grep = sap.add_parser('grep') | |
ap_grep.add_argument('pattern') | |
ap_grep.add_argument('--verbose', action='store_true') | |
ap_grep.add_argument('--sorted', action='store_true') | |
ap_grep.add_argument('--reversed', action='store_true') | |
ap_grep.add_argument('--key', default='text') | |
ap_grep.add_argument('--files', nargs='*') | |
args = ap.parse_args() | |
if args.action == 'print': | |
for f in args.files: | |
print_dialog(f) | |
elif args.action == 'list': | |
recent_files = get_files_by_age(args.since) | |
for rf in recent_files: | |
with open(rf, 'r') as f: | |
event = json.loads(next(f)) | |
to_name = event['to']['print_name'] | |
info = f'{rf}\t{to_name}' | |
print(info) | |
elif args.action == 'journal': | |
files = get_files_by_age(args.since) | |
events = get_events_by_age(files, args.since) | |
for event in events: | |
print(format_message(event)) | |
elif args.action == 'grep': | |
if not args.files: | |
here = os.path.dirname(os.path.abspath(__file__)) | |
files = [os.path.join(here, 'output/json')] | |
else: | |
files = args.files | |
fn_events = grep_events_by_key(args.key, args.pattern, files) | |
if args.sorted: | |
fn_events = sorted(fn_events, key=lambda x: x[1]['date'], reverse=args.reversed) | |
for fn, event in fn_events: | |
if args.verbose: | |
to_type = event['to']['peer_type'] | |
if to_type == 'user': | |
try: | |
to_name = '{} {}'.format(event['to']['first_name'], event['to']['last_name']).strip() | |
except KeyError: | |
to_name = 'user#{}'.format(event['to']['peer_id']) | |
elif to_type == 'chat': | |
to_name = event['to']['title'] | |
else: | |
to_name = None | |
info_line = '[INFO] filename="{}" to_name="{}" id="{}"'.format(fn, to_name, event['id']) | |
output = '{}\n{}'.format(info_line, format_message(event)) | |
else: | |
output = format_message(event) | |
print(output) | |
else: | |
ap.print_help() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment