Last active
May 1, 2024 08:02
-
-
Save jul/a94976f47af0581ec0f5dba27f6547ce to your computer and use it in GitHub Desktop.
making templatized sociogram
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import psycopg2 | |
from datetime import date, datetime, timedelta | |
from archery import mdict | |
def int_env_default(var, default): | |
return int(os.getenv(var) or default) | |
def float_env_default(var, default): | |
return float(os.getenv(var) or default) | |
MIN_MAIL = int_env_default("MIN_MAIL",6, ) | |
MAX_MAIL = int_env_default("MAX_MAIL",100) | |
WL_MIN = int_env_default("WL_MIN", 3) | |
CUT_SIZE = int_env_default("CUT_SIZE", 20) | |
DATE = os.getenv("DATE") or "2016-01-01" | |
END_DATE = os.getenv("END_DATE") or "2017-05-01" | |
BY_DAYS = int_env_default("BY_DAYS",4) # 13x28 = 364 ~ 365.5 | |
EDGE_SCALE= float_env_default("EDGE_SCALE", 1) | |
#from pdb import set_trace;set_trace() | |
TEMPLATE=os.getenv("TEMPLATE") | |
THRESHOLD_ILOT= int_env_default("THRESHOLD_ILOT",1) | |
end_date = date.fromisoformat(END_DATE) | |
date = date.fromisoformat(DATE) | |
td = timedelta(days=BY_DAYS) | |
td2 = timedelta(days=BY_DAYS/2) | |
#https://stackoverflow.com/questions/47339121/how-do-i-convert-a-string-into-an-f-string | |
def effify(template: str): | |
non_f_str = "" | |
with open(template) as f: non_f_str=f.read() | |
return eval(f'f"""{non_f_str}"""') | |
def is_ilot(node:str, edge_dict:tuple) -> bool: | |
"""ilot == has only 1 link back and forth either in (from,) or (,to)""" | |
count=0 | |
for edge in edge_dict.keys(): | |
if node == edge[1] or node == edge[0]: | |
count+=1 | |
if count > 2: | |
return False | |
return True | |
patt_to_col = dict({ | |
"e2m":"red", | |
"emmanuel.macron":"red", | |
"emmanuelmacron":"red", | |
"alexis.kohler" : "midnightBlue", | |
"gabriel.attal" : "orange", | |
"sachahoulie@" : "green", | |
"sejourne.stephane" : "grey15", | |
"stephane.sejourne" : "grey15", | |
"clement.beaune" : "aquamarine", | |
"olivia.gregoire" : "darkOrange", | |
"veranolivier":"green", | |
"julien.denormandie" : "indigo", | |
"sibeth.ndiaye" : "orange", | |
"iledefrance.fr" : "cyan3", | |
"barbara.frugier" : "green", | |
"cedric.o" : "purple", | |
"gouv.fr" : "yellow", | |
"benjamin.griveaux":"blue", | |
"iledefrance.fr" : "beige", | |
"ismael.emelie" : "orange", | |
"benjamin.griveaux":"SteelBlue", | |
"laurent.bigorgne" : "darkBlue", | |
"jean.pisani-ferry": "chocolate", | |
"ismael.emelie" : "Olive", | |
"gregoire.potton" : "grey20", | |
"eric.dumas":"salmon", | |
"alexandre.benalla" : "darkGreen", | |
"pierre.person" : "darkBlue", | |
"pierrperson" : "darkBlue", | |
"quentin.lafay":"grey10", | |
"jesusetgabriel.com" : "crimson", | |
"fm.alaintourret" : "purple", | |
"langannechristine" :"darkgoldenrod4", | |
#"en-marche.fr" : "chocolate", | |
#"paris.fr" : "yellow", | |
}) | |
detected_edges_color =dict() | |
wl = lambda s : any(map(str.startswith, patt_to_col.keys() ,s)) | |
def in_wl(mail : str): | |
for l in patt_to_col: | |
if mail.startswith(l) or mail.endswith(l): | |
return l | |
def wl(pair: tuple): | |
for l in patt_to_col: | |
if in_wl(pair[0]) and in_wl(pair[1]): | |
return patt_to_col[in_wl(pair[0])] | |
#assert wl(("jesusetgabriel.com", "jesusetgabriel.com")) == "crimson" | |
is_vip = lambda t:all(map(in_wl, t)) | |
def set_color(mails:tuple): | |
for mail in mails: | |
detected_edges_color[mail]=wl(mails) | |
first = True | |
#from pdb import set_trace;set_trace() | |
while ( not TEMPLATE and first ) or (TEMPLATE and date < end_date): | |
first=False | |
direct=mdict() | |
final = mdict() | |
conn = psycopg2.connect("dbname=ml host=192.168.1.32 port=5432 user=jul sslmode='require' ") | |
with conn.cursor() as sql: | |
sql.execute(f"""SELECT "to", "from" from mail where DATE BETWEEN '{date}' AND '{date+td}';""") | |
while t := sql.fetchone(): | |
for fr in t[0]: | |
fr=fr.strip() | |
for to in t[1]: | |
to=to.strip() | |
if fr != to and fr and to and not {fr[0], to[0]} & {'"', "'"}: | |
direct += mdict({ (fr,to) : 1 }) | |
tk= list(direct.keys()) | |
def has_more_than_n_neighbour(email: str, n :int, final : dict): | |
count = 0 | |
for k in final.keys(): | |
if len(set([email]) & set(k)): | |
count+=1 | |
if count >n: | |
return True | |
return False | |
for k in tk: | |
# dont modify a dict you iterate hence copy of keys | |
if ( is_vip(k) or MAX_MAIL >= direct.get(k,0) + direct.get(k[::-1],0) >= MIN_MAIL ) and k not in final and k[::-1] not in final: | |
final[k]=direct[k] | |
final[k]+=direct.get(k[::-1],0) | |
else: | |
try: | |
del(direct[k]) | |
except KeyError: | |
pass | |
try: | |
del(direct[k[::-1]]) | |
except KeyError: | |
pass | |
tk= list(final.keys()) | |
for e in tk: | |
# dont modify a dict you iterate hence copy of keys | |
if not has_more_than_n_neighbour(e[0],THRESHOLD_ILOT,final) or not has_more_than_n_neighbour(e[1],THRESHOLD_ILOT,final): | |
try: | |
del(final[e]) | |
except KeyError: | |
pass | |
try: | |
del(final[e[::-1]]) | |
except KeyError: | |
pass | |
else: | |
set_color(e) | |
color = "".join([ f"""{i[1]} pour {i[0]}{[", ",chr(0x0a),][(n%4)==3]} """ for n,i in enumerate(patt_to_col.items()) ]) | |
final *= EDGE_SCALE | |
conn.close() | |
with open(f"out/rec.{date}.dot" , "w") as f: | |
quoted_mail = set({}) | |
for _t,_f in final.keys(): | |
quoted_mail |= set([_t]) | |
quoted_mail |= set([_f]) | |
edges = "\n".join([ f""" "{e}" [shape=rectangle fillcolor="{c or "lightblue"};.1:white" color={c or "green"} style=striped ];""" for e,c in detected_edges_color.items() if e in quoted_mail ]) | |
title = f"""label="Sociogramme de {date} à {date + td} extrait des macron leaks orienté gouv.fr, personne d'intérêts (vert), victime du hacking (rouge), et président (bleu)\n | |
entre [ {MIN_MAIL}, {MAX_MAIL} ] échangés \n | |
plus gros liens au dessus de {CUT_SIZE} mails échangés entre interlocuteurs \n | |
couleur par priorités selon les origines \n | |
{color}" | |
""" | |
if not TEMPLATE: | |
print(""" | |
graph Sociogramme { """ + f""" | |
fontname="Comics sans MS" | |
size=120 | |
ratio=0.588 | |
{title} | |
labelloc="c" | |
labelloc="t"; | |
start=1 | |
{edges} | |
node [ shape=rectangle style=striped fillcolor="slateblue;0.1:white" gradientangle=90 ]; | |
"e2m@en-marche.fr" -- "e2m@cabinets.finances.gouv.fr" -- "emmanuelmacron3@gmail.com" -- "emmanuel.macron@en-marche.fr" -- "emmanuelmacron@en-marche.fr" [label="is"] | |
"alexis.kohler@en-marche.fr" -- "alexis.kohler@cabinets.finances.gouv.fr" [label=is]; | |
"stephane.sejourne@en-marche.fr" -- "stephane.sejourne@cabinets.finances.gouv.fr" -- "stephane.sejourne@gmail.com" [label=is]; | |
"julien.denormandie@en-marche.fr" -- "julien.denormandie@cabinets.finances.gouv.fr" [label=is]; | |
"benjamin.griveaux@en-marche.fr" -- "benjamin.griveaux@sante.gouv.fr" [label=is]; | |
"quentin.lafay@en-marche.fr" -- "quentin.lafay@cabinets.finances.gouv.fr" -- "quentin.lafay@sante.gouv.fr" -- "quentin.lafay@gmail.com" [label=is]; | |
"pierrperson@gmail.com" -- "pierre.person@en-marche.fr" [label=is]; | |
"ismael.emelien@en-marche.fr" -- "ismael.emelien@yahoo.fr" [label=is]; | |
""" ,file=f) | |
for k,v in final.items(): | |
print(f""" "{k[0]}" -- "{k[1]}" [color={wl(k) or "lightblue"} penwidth={v} ];""", file=f) | |
print("}", file=f) | |
else: | |
print(effify(TEMPLATE), file=f) | |
date += td2 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment