Skip to content

Instantly share code, notes, and snippets.

@jul
Last active May 1, 2024 08:02
Show Gist options
  • Save jul/a94976f47af0581ec0f5dba27f6547ce to your computer and use it in GitHub Desktop.
Save jul/a94976f47af0581ec0f5dba27f6547ce to your computer and use it in GitHub Desktop.
making templatized sociogram
#!/usr/bin/env python3
import os
import psycopg2
from datetime import date, datetime, timedelta
from archery import mdict
def int_env_default(var, default):
return int(os.getenv(var) or default)
def float_env_default(var, default):
return float(os.getenv(var) or default)
MIN_MAIL = int_env_default("MIN_MAIL",6, )
MAX_MAIL = int_env_default("MAX_MAIL",100)
WL_MIN = int_env_default("WL_MIN", 3)
CUT_SIZE = int_env_default("CUT_SIZE", 20)
DATE = os.getenv("DATE") or "2016-01-01"
END_DATE = os.getenv("END_DATE") or "2017-05-01"
BY_DAYS = int_env_default("BY_DAYS",4) # 13x28 = 364 ~ 365.5
EDGE_SCALE= float_env_default("EDGE_SCALE", 1)
#from pdb import set_trace;set_trace()
TEMPLATE=os.getenv("TEMPLATE")
THRESHOLD_ILOT= int_env_default("THRESHOLD_ILOT",1)
end_date = date.fromisoformat(END_DATE)
date = date.fromisoformat(DATE)
td = timedelta(days=BY_DAYS)
td2 = timedelta(days=BY_DAYS/2)
#https://stackoverflow.com/questions/47339121/how-do-i-convert-a-string-into-an-f-string
def effify(template: str):
non_f_str = ""
with open(template) as f: non_f_str=f.read()
return eval(f'f"""{non_f_str}"""')
def is_ilot(node:str, edge_dict:tuple) -> bool:
"""ilot == has only 1 link back and forth either in (from,) or (,to)"""
count=0
for edge in edge_dict.keys():
if node == edge[1] or node == edge[0]:
count+=1
if count > 2:
return False
return True
patt_to_col = dict({
"e2m":"red",
"emmanuel.macron":"red",
"emmanuelmacron":"red",
"alexis.kohler" : "midnightBlue",
"gabriel.attal" : "orange",
"sachahoulie@" : "green",
"sejourne.stephane" : "grey15",
"stephane.sejourne" : "grey15",
"clement.beaune" : "aquamarine",
"olivia.gregoire" : "darkOrange",
"veranolivier":"green",
"julien.denormandie" : "indigo",
"sibeth.ndiaye" : "orange",
"iledefrance.fr" : "cyan3",
"barbara.frugier" : "green",
"cedric.o" : "purple",
"gouv.fr" : "yellow",
"benjamin.griveaux":"blue",
"iledefrance.fr" : "beige",
"ismael.emelie" : "orange",
"benjamin.griveaux":"SteelBlue",
"laurent.bigorgne" : "darkBlue",
"jean.pisani-ferry": "chocolate",
"ismael.emelie" : "Olive",
"gregoire.potton" : "grey20",
"eric.dumas":"salmon",
"alexandre.benalla" : "darkGreen",
"pierre.person" : "darkBlue",
"pierrperson" : "darkBlue",
"quentin.lafay":"grey10",
"jesusetgabriel.com" : "crimson",
"fm.alaintourret" : "purple",
"langannechristine" :"darkgoldenrod4",
#"en-marche.fr" : "chocolate",
#"paris.fr" : "yellow",
})
detected_edges_color =dict()
wl = lambda s : any(map(str.startswith, patt_to_col.keys() ,s))
def in_wl(mail : str):
for l in patt_to_col:
if mail.startswith(l) or mail.endswith(l):
return l
def wl(pair: tuple):
for l in patt_to_col:
if in_wl(pair[0]) and in_wl(pair[1]):
return patt_to_col[in_wl(pair[0])]
#assert wl(("jesusetgabriel.com", "jesusetgabriel.com")) == "crimson"
is_vip = lambda t:all(map(in_wl, t))
def set_color(mails:tuple):
for mail in mails:
detected_edges_color[mail]=wl(mails)
first = True
#from pdb import set_trace;set_trace()
while ( not TEMPLATE and first ) or (TEMPLATE and date < end_date):
first=False
direct=mdict()
final = mdict()
conn = psycopg2.connect("dbname=ml host=192.168.1.32 port=5432 user=jul sslmode='require' ")
with conn.cursor() as sql:
sql.execute(f"""SELECT "to", "from" from mail where DATE BETWEEN '{date}' AND '{date+td}';""")
while t := sql.fetchone():
for fr in t[0]:
fr=fr.strip()
for to in t[1]:
to=to.strip()
if fr != to and fr and to and not {fr[0], to[0]} & {'"', "'"}:
direct += mdict({ (fr,to) : 1 })
tk= list(direct.keys())
def has_more_than_n_neighbour(email: str, n :int, final : dict):
count = 0
for k in final.keys():
if len(set([email]) & set(k)):
count+=1
if count >n:
return True
return False
for k in tk:
# dont modify a dict you iterate hence copy of keys
if ( is_vip(k) or MAX_MAIL >= direct.get(k,0) + direct.get(k[::-1],0) >= MIN_MAIL ) and k not in final and k[::-1] not in final:
final[k]=direct[k]
final[k]+=direct.get(k[::-1],0)
else:
try:
del(direct[k])
except KeyError:
pass
try:
del(direct[k[::-1]])
except KeyError:
pass
tk= list(final.keys())
for e in tk:
# dont modify a dict you iterate hence copy of keys
if not has_more_than_n_neighbour(e[0],THRESHOLD_ILOT,final) or not has_more_than_n_neighbour(e[1],THRESHOLD_ILOT,final):
try:
del(final[e])
except KeyError:
pass
try:
del(final[e[::-1]])
except KeyError:
pass
else:
set_color(e)
color = "".join([ f"""{i[1]} pour {i[0]}{[", ",chr(0x0a),][(n%4)==3]} """ for n,i in enumerate(patt_to_col.items()) ])
final *= EDGE_SCALE
conn.close()
with open(f"out/rec.{date}.dot" , "w") as f:
quoted_mail = set({})
for _t,_f in final.keys():
quoted_mail |= set([_t])
quoted_mail |= set([_f])
edges = "\n".join([ f""" "{e}" [shape=rectangle fillcolor="{c or "lightblue"};.1:white" color={c or "green"} style=striped ];""" for e,c in detected_edges_color.items() if e in quoted_mail ])
title = f"""label="Sociogramme de {date} à {date + td} extrait des macron leaks orienté gouv.fr, personne d'intérêts (vert), victime du hacking (rouge), et président (bleu)\n
entre [ {MIN_MAIL}, {MAX_MAIL} ] échangés \n
plus gros liens au dessus de {CUT_SIZE} mails échangés entre interlocuteurs \n
couleur par priorités selon les origines \n
{color}"
"""
if not TEMPLATE:
print("""
graph Sociogramme { """ + f"""
fontname="Comics sans MS"
size=120
ratio=0.588
{title}
labelloc="c"
labelloc="t";
start=1
{edges}
node [ shape=rectangle style=striped fillcolor="slateblue;0.1:white" gradientangle=90 ];
"e2m@en-marche.fr" -- "e2m@cabinets.finances.gouv.fr" -- "emmanuelmacron3@gmail.com" -- "emmanuel.macron@en-marche.fr" -- "emmanuelmacron@en-marche.fr" [label="is"]
"alexis.kohler@en-marche.fr" -- "alexis.kohler@cabinets.finances.gouv.fr" [label=is];
"stephane.sejourne@en-marche.fr" -- "stephane.sejourne@cabinets.finances.gouv.fr" -- "stephane.sejourne@gmail.com" [label=is];
"julien.denormandie@en-marche.fr" -- "julien.denormandie@cabinets.finances.gouv.fr" [label=is];
"benjamin.griveaux@en-marche.fr" -- "benjamin.griveaux@sante.gouv.fr" [label=is];
"quentin.lafay@en-marche.fr" -- "quentin.lafay@cabinets.finances.gouv.fr" -- "quentin.lafay@sante.gouv.fr" -- "quentin.lafay@gmail.com" [label=is];
"pierrperson@gmail.com" -- "pierre.person@en-marche.fr" [label=is];
"ismael.emelien@en-marche.fr" -- "ismael.emelien@yahoo.fr" [label=is];
""" ,file=f)
for k,v in final.items():
print(f""" "{k[0]}" -- "{k[1]}" [color={wl(k) or "lightblue"} penwidth={v} ];""", file=f)
print("}", file=f)
else:
print(effify(TEMPLATE), file=f)
date += td2
@jul
Copy link
Author

jul commented Apr 28, 2024

persistent

@jul
Copy link
Author

jul commented May 1, 2024

output2.mp4

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment