Skip to content

Instantly share code, notes, and snippets.

@Bruno02468
Created August 1, 2021 16:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Bruno02468/827df21cc05e1f3cb91ded281d077d12 to your computer and use it in GitHub Desktop.
Save Bruno02468/827df21cc05e1f3cb91ded281d077d12 to your computer and use it in GitHub Desktop.
script pra contar quem manda msg mais/primeiro no zap
#!/usr/bin/env python3
# contazap.py
# bruno borges paschoalinoto -- unlicense'd
import re, csv, argparse, sys, typing
from datetime import date, datetime, timedelta
from typing import Callable, Iterator, Optional
from copy import deepcopy
from os import path
class Message(object):
def __init__(self, when: datetime, who: str, what: str, has_media: bool):
self.when = when
self.who = who
self.what = what
self.has_media = has_media
class MessageDecoder(object):
def __init__(
self, regex: str, check_media: Callable[[str], bool], flip_date: bool=False
):
self.pattern = re.compile(regex)
self.check_media = check_media
self.flip_date = flip_date
def parse(self, line: str) -> Optional[Message]:
had_media, line = self.check_media(line)
ma = self.pattern.match(line)
if ma is None:
return None
gs = ma.groups()
if len(gs) == 7:
day, month, year, hour, minute, sender, message = gs
second = 0
elif len(gs) == 8:
day, month, year, hour, minute, second, sender, message = gs
else:
return None
try:
day, month, year, hour, minute, second = map(int, [
day, month, year, hour, minute, second
])
if self.flip_date:
tmp = day
day = month
month = tmp
dt = datetime(year, month, day, hour, minute, second)
except:
return None
return Message(dt, sender, message, had_media)
_MEDIA_MAGIC = "\u200e"
_MEDIA_CHECK = lambda line: (
_MEDIA_MAGIC in line,
line.replace(_MEDIA_MAGIC, "")
)
_ANDROID_REGEX = r"^(\d\d)\/(\d\d)\/(\d{4}) (\d\d):(\d\d) . ([^:]+): (.+)$"
_IOS_REGEX = r"^\[(\d\d)\/(\d\d)\/(\d{4}) (\d\d):(\d\d):(\d\d)\] ([^:]+): (.+)$"
_DEFAULT_DECODERS = {
"android": MessageDecoder(
regex=_ANDROID_REGEX,
check_media=_MEDIA_CHECK,
flip_date=False
),
"android_stupid_dates": MessageDecoder(
regex=_ANDROID_REGEX,
check_media=_MEDIA_CHECK,
flip_date=True
),
"iphone": MessageDecoder(
regex=_IOS_REGEX,
check_media=_MEDIA_CHECK,
flip_date=False
),
"iphone_stupid_dates": MessageDecoder(
regex=_IOS_REGEX,
check_media=_MEDIA_CHECK,
flip_date=True
)
}
class ConvoDecoder(object):
def __init__(
self, lines: list[str], skip_media: bool=False, samples=-1, min_hits=0.5,
decoder: Optional[MessageDecoder]=None
):
self.lines = lines
self.skip_media = skip_media
self.samples = samples
self.min_hits = min_hits
self.decoder = decoder
self.msgs = None
def detect_decoder(self) -> Optional[MessageDecoder]:
counts = {}
sample = self.lines[:self.samples]
for dname, decoder in _DEFAULT_DECODERS.items():
test = lambda s: decoder.parse(s) is not None
counts[dname] = len(list(filter(test, sample)))
bests = sorted(counts.items(), key=lambda t: t[1])[::-1]
best, cnt = bests[0]
secondbest, cnt2 = bests[1]
totals = len(sample)
if cnt/totals >= self.min_hits and cnt > cnt2:
print(f"Autodetected format: {best}.")
return _DEFAULT_DECODERS[best]
else:
print("Could not autodetect message format. I tried:\n")
for n, c in bests:
print(f" - {n}: decoded {c}/{totals} (~{100*c/totals:.1f}%)")
print("\nSpecify one manually, or report this as a bug.")
return None
def decode_all(self) -> list[Message]:
if self.msgs is not None:
return self.msgs
use = self.detect_decoder() if self.decoder is None else self.decoder
if use is None:
raise Exception("Couldn't find a suitable decoder. This might be a bug!")
else:
parsed = [use.parse(l) for l in self.lines]
if self.skip_media:
msgs = [m for m in parsed if m is not None and not m.has_media]
else:
msgs = [m for m in parsed if m is not None]
msgs.sort(key=lambda m: m.when)
self.msgs = msgs
return self.msgs
def write_stats(self, fd: typing.IO, skip_boring_days=False):
msgs = self.decode_all()
if len(msgs) == 0:
raise Exception("No messages!")
senders = sorted(list(set([m.who for m in msgs])))
day_stats = {}
for msg in msgs:
day = msg.when.date()
if day not in day_stats:
day_stats[day] = {
"date": day.isoformat(),
"first": msg.who,
"#total msgs": 0
}
day_stats[day]["#total msgs"] += 1
k = f"#from {msg.who}"
if k not in day_stats[day]:
day_stats[day][k] = 0
day_stats[day][k] += 1
field_names = ["date", "first", "#total msgs"] + [f"#from {s}" for s in senders]
writer = csv.DictWriter(fd, field_names, restval=0, delimiter="\t")
writer.writeheader()
first_day = msgs[0].when.date()
last_day = msgs[-1].when.date()
one_day = timedelta(days=1)
day = deepcopy(first_day)
while day <= last_day:
stats = day_stats.get(day) or {
"date": deepcopy(day).isoformat(),
"first": "N/A",
"#total msgs": 0
}
if stats["#total msgs"] > 0 or not skip_boring_days:
writer.writerow(stats)
day += one_day
def main(raw_args: list[str]=None):
parser = argparse.ArgumentParser(description="Derive WhatsApp convo stats.")
parser.add_argument(
"--fmt",
type=str,
choices=["auto"] + list(_DEFAULT_DECODERS.keys()),
default="auto",
help=(
"The .txt format. Depends on the device used to export convo. "
"Pass auto (or just don't pass it) if you want me to autodetect."
)
)
parser.add_argument(
"--skip-media",
type=bool,
default=False,
action=argparse.BooleanOptionalAction,
help="Skip messages that contain media, such as images and videos."
)
parser.add_argument(
"--tsv",
type=bool,
default=True,
action=argparse.BooleanOptionalAction,
help="Generate a .tsv file with message counts and some stats."
)
parser.add_argument(
"--skip-boring-days",
type=bool,
default=False,
action=argparse.BooleanOptionalAction,
help="Skip days without no messages."
)
parser.add_argument(
"txt_file",
type=str,
nargs="+"
)
args = parser.parse_args(raw_args)
print("Reading chat dumps...")
lines = []
for fn in args.txt_file:
try:
fd = open(fn, "r")
lines += fd.readlines()
except Exception as e:
print(f"Could not read {fn}:", e)
exit(-1)
if args.fmt == "auto":
dec = None
else:
dec = _DEFAULT_DECODERS[args.fmt]
print("Decoding...")
cd = ConvoDecoder(lines, skip_media=args.skip_media, decoder=dec)
msgs = cd.decode_all()
if len(msgs) == 0:
print("No messages decoded!")
try:
if args.tsv:
tfn = f"{path.basename(args.txt_file[0])}.stats.tsv"
tfd = open(tfn, "w")
print(f"Writing stats to {tfn}...")
cd.write_stats(tfd, args.skip_boring_days)
print("Done.")
except IndexError as ve:
raise
except Exception as e:
exit(e)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment