Created
August 1, 2021 16:50
-
-
Save Bruno02468/827df21cc05e1f3cb91ded281d077d12 to your computer and use it in GitHub Desktop.
script pra contar quem manda msg mais/primeiro no zap
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# contazap.py | |
# bruno borges paschoalinoto -- unlicense'd | |
import re, csv, argparse, sys, typing | |
from datetime import date, datetime, timedelta | |
from typing import Callable, Iterator, Optional | |
from copy import deepcopy | |
from os import path | |
class Message(object): | |
def __init__(self, when: datetime, who: str, what: str, has_media: bool): | |
self.when = when | |
self.who = who | |
self.what = what | |
self.has_media = has_media | |
class MessageDecoder(object): | |
def __init__( | |
self, regex: str, check_media: Callable[[str], bool], flip_date: bool=False | |
): | |
self.pattern = re.compile(regex) | |
self.check_media = check_media | |
self.flip_date = flip_date | |
def parse(self, line: str) -> Optional[Message]: | |
had_media, line = self.check_media(line) | |
ma = self.pattern.match(line) | |
if ma is None: | |
return None | |
gs = ma.groups() | |
if len(gs) == 7: | |
day, month, year, hour, minute, sender, message = gs | |
second = 0 | |
elif len(gs) == 8: | |
day, month, year, hour, minute, second, sender, message = gs | |
else: | |
return None | |
try: | |
day, month, year, hour, minute, second = map(int, [ | |
day, month, year, hour, minute, second | |
]) | |
if self.flip_date: | |
tmp = day | |
day = month | |
month = tmp | |
dt = datetime(year, month, day, hour, minute, second) | |
except: | |
return None | |
return Message(dt, sender, message, had_media) | |
_MEDIA_MAGIC = "\u200e" | |
_MEDIA_CHECK = lambda line: ( | |
_MEDIA_MAGIC in line, | |
line.replace(_MEDIA_MAGIC, "") | |
) | |
_ANDROID_REGEX = r"^(\d\d)\/(\d\d)\/(\d{4}) (\d\d):(\d\d) . ([^:]+): (.+)$" | |
_IOS_REGEX = r"^\[(\d\d)\/(\d\d)\/(\d{4}) (\d\d):(\d\d):(\d\d)\] ([^:]+): (.+)$" | |
_DEFAULT_DECODERS = { | |
"android": MessageDecoder( | |
regex=_ANDROID_REGEX, | |
check_media=_MEDIA_CHECK, | |
flip_date=False | |
), | |
"android_stupid_dates": MessageDecoder( | |
regex=_ANDROID_REGEX, | |
check_media=_MEDIA_CHECK, | |
flip_date=True | |
), | |
"iphone": MessageDecoder( | |
regex=_IOS_REGEX, | |
check_media=_MEDIA_CHECK, | |
flip_date=False | |
), | |
"iphone_stupid_dates": MessageDecoder( | |
regex=_IOS_REGEX, | |
check_media=_MEDIA_CHECK, | |
flip_date=True | |
) | |
} | |
class ConvoDecoder(object): | |
def __init__( | |
self, lines: list[str], skip_media: bool=False, samples=-1, min_hits=0.5, | |
decoder: Optional[MessageDecoder]=None | |
): | |
self.lines = lines | |
self.skip_media = skip_media | |
self.samples = samples | |
self.min_hits = min_hits | |
self.decoder = decoder | |
self.msgs = None | |
def detect_decoder(self) -> Optional[MessageDecoder]: | |
counts = {} | |
sample = self.lines[:self.samples] | |
for dname, decoder in _DEFAULT_DECODERS.items(): | |
test = lambda s: decoder.parse(s) is not None | |
counts[dname] = len(list(filter(test, sample))) | |
bests = sorted(counts.items(), key=lambda t: t[1])[::-1] | |
best, cnt = bests[0] | |
secondbest, cnt2 = bests[1] | |
totals = len(sample) | |
if cnt/totals >= self.min_hits and cnt > cnt2: | |
print(f"Autodetected format: {best}.") | |
return _DEFAULT_DECODERS[best] | |
else: | |
print("Could not autodetect message format. I tried:\n") | |
for n, c in bests: | |
print(f" - {n}: decoded {c}/{totals} (~{100*c/totals:.1f}%)") | |
print("\nSpecify one manually, or report this as a bug.") | |
return None | |
def decode_all(self) -> list[Message]: | |
if self.msgs is not None: | |
return self.msgs | |
use = self.detect_decoder() if self.decoder is None else self.decoder | |
if use is None: | |
raise Exception("Couldn't find a suitable decoder. This might be a bug!") | |
else: | |
parsed = [use.parse(l) for l in self.lines] | |
if self.skip_media: | |
msgs = [m for m in parsed if m is not None and not m.has_media] | |
else: | |
msgs = [m for m in parsed if m is not None] | |
msgs.sort(key=lambda m: m.when) | |
self.msgs = msgs | |
return self.msgs | |
def write_stats(self, fd: typing.IO, skip_boring_days=False): | |
msgs = self.decode_all() | |
if len(msgs) == 0: | |
raise Exception("No messages!") | |
senders = sorted(list(set([m.who for m in msgs]))) | |
day_stats = {} | |
for msg in msgs: | |
day = msg.when.date() | |
if day not in day_stats: | |
day_stats[day] = { | |
"date": day.isoformat(), | |
"first": msg.who, | |
"#total msgs": 0 | |
} | |
day_stats[day]["#total msgs"] += 1 | |
k = f"#from {msg.who}" | |
if k not in day_stats[day]: | |
day_stats[day][k] = 0 | |
day_stats[day][k] += 1 | |
field_names = ["date", "first", "#total msgs"] + [f"#from {s}" for s in senders] | |
writer = csv.DictWriter(fd, field_names, restval=0, delimiter="\t") | |
writer.writeheader() | |
first_day = msgs[0].when.date() | |
last_day = msgs[-1].when.date() | |
one_day = timedelta(days=1) | |
day = deepcopy(first_day) | |
while day <= last_day: | |
stats = day_stats.get(day) or { | |
"date": deepcopy(day).isoformat(), | |
"first": "N/A", | |
"#total msgs": 0 | |
} | |
if stats["#total msgs"] > 0 or not skip_boring_days: | |
writer.writerow(stats) | |
day += one_day | |
def main(raw_args: list[str]=None): | |
parser = argparse.ArgumentParser(description="Derive WhatsApp convo stats.") | |
parser.add_argument( | |
"--fmt", | |
type=str, | |
choices=["auto"] + list(_DEFAULT_DECODERS.keys()), | |
default="auto", | |
help=( | |
"The .txt format. Depends on the device used to export convo. " | |
"Pass auto (or just don't pass it) if you want me to autodetect." | |
) | |
) | |
parser.add_argument( | |
"--skip-media", | |
type=bool, | |
default=False, | |
action=argparse.BooleanOptionalAction, | |
help="Skip messages that contain media, such as images and videos." | |
) | |
parser.add_argument( | |
"--tsv", | |
type=bool, | |
default=True, | |
action=argparse.BooleanOptionalAction, | |
help="Generate a .tsv file with message counts and some stats." | |
) | |
parser.add_argument( | |
"--skip-boring-days", | |
type=bool, | |
default=False, | |
action=argparse.BooleanOptionalAction, | |
help="Skip days without no messages." | |
) | |
parser.add_argument( | |
"txt_file", | |
type=str, | |
nargs="+" | |
) | |
args = parser.parse_args(raw_args) | |
print("Reading chat dumps...") | |
lines = [] | |
for fn in args.txt_file: | |
try: | |
fd = open(fn, "r") | |
lines += fd.readlines() | |
except Exception as e: | |
print(f"Could not read {fn}:", e) | |
exit(-1) | |
if args.fmt == "auto": | |
dec = None | |
else: | |
dec = _DEFAULT_DECODERS[args.fmt] | |
print("Decoding...") | |
cd = ConvoDecoder(lines, skip_media=args.skip_media, decoder=dec) | |
msgs = cd.decode_all() | |
if len(msgs) == 0: | |
print("No messages decoded!") | |
try: | |
if args.tsv: | |
tfn = f"{path.basename(args.txt_file[0])}.stats.tsv" | |
tfd = open(tfn, "w") | |
print(f"Writing stats to {tfn}...") | |
cd.write_stats(tfd, args.skip_boring_days) | |
print("Done.") | |
except IndexError as ve: | |
raise | |
except Exception as e: | |
exit(e) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment