Created
June 11, 2022 16:05
-
-
Save joeface/0ae9c0ad5241e21b441a6fa0be98a343 to your computer and use it in GitHub Desktop.
Blocked Websites Discovery based on OONI API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import difflib | |
import json | |
import os | |
import re | |
import requests | |
import sys | |
from datetime import datetime, timedelta | |
from pprint import pprint | |
css = """ | |
<style> | |
* {font-family: sans-serif;} | |
table td{ padding: 2px 4px; font-family: sans-serif;} | |
.diff_sub {text-decoration: line-through; color: grey;} | |
.diff_add {font-weight: bold; color: red;} | |
</style> | |
""" | |
COUNTRIES = ("RU", "BY", "UA", "KZ", "KG", "MD", "TJ", "AZ", "AM", "GE") | |
REPORTS = {} | |
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
REPORTS_DIR = os.path.join(BASE_DIR, "reports/") | |
def read_file(): | |
print("Reading data from file") | |
try: | |
json_file = open(os.path.join(BASE_DIR, "aggregation.json"), "r") | |
data = json.load(json_file) | |
return data["result"] | |
except: | |
print("! Unable to read JSON file") | |
return None | |
def fetch_json(start_date, end_date): | |
print(f"Fetching remote data from {start_date} till {end_date}") | |
r = requests.get( | |
f"https://api.ooni.io/api/v1/aggregation?test_name=web_connectivity&since={start_date}&until={end_date}&axis_x=probe_cc&axis_y=domain", | |
timeout=60, | |
) | |
if r.status_code != requests.codes.ok: | |
print("Error fetching stats") | |
return None | |
data = r.json() | |
if "result" in data: | |
return data["result"] | |
return None | |
def parse_json(data): | |
global COUNTRIES | |
REPORT_DATA = {} | |
if not data: | |
print("Empty data source") | |
return False | |
# {"anomaly_count":0,"confirmed_count":0,"domain":"1.0.0.1","failure_count":0,"measurement_count":2,"ok_count":2,"probe_cc":"AE"} | |
for rec in data: | |
if ( | |
rec["probe_cc"] in COUNTRIES | |
and rec["measurement_count"] > 3 | |
and (rec["anomaly_count"] > rec["ok_count"] or rec["confirmed_count"] > 0) | |
): | |
if rec["probe_cc"] not in REPORT_DATA: | |
REPORT_DATA[rec["probe_cc"]] = {"hosts": [], "count": 0} | |
REPORT_DATA[rec["probe_cc"]]["count"] += 1 | |
REPORT_DATA[rec["probe_cc"]]["hosts"].append(rec["domain"]) | |
return REPORT_DATA | |
def save_report(date, report_data): | |
json_file = open(os.path.join(REPORTS_DIR, f"{date}.json"), "w") | |
json.dump(report_data, json_file) | |
return True | |
def show_report(): | |
DATA = {} | |
reports = ("2022-06-08", "2022-06-09", "2022-06-10") | |
for report in reports: | |
print(f"\nREPORT FOR {report}") | |
json_file = open(os.path.join(REPORTS_DIR, f"{report}.json"), "r") | |
daily_report = json.load(json_file) | |
DATA[report] = daily_report | |
for country in daily_report: | |
print(f'- {country}: {daily_report[country]["count"]}') | |
d = difflib.HtmlDiff() | |
result = d.make_table( | |
DATA["2022-06-08"]["AM"]["hosts"], DATA["2022-06-09"]["AM"]["hosts"] | |
) | |
pprint(result) | |
print("------") | |
def generate_diff(country, bouding_date): | |
start_date = (bouding_date - timedelta(days=2)).strftime("%Y-%m-%d") | |
end_date = (bouding_date - timedelta(days=1)).strftime("%Y-%m-%d") | |
print(f"Building differences table for {country} {start_date} {end_date}") | |
try: | |
json_file = open(os.path.join(REPORTS_DIR, f"{start_date}.json"), "r") | |
start_date_report = json.load(json_file) | |
json_file = open(os.path.join(REPORTS_DIR, f"{end_date}.json"), "r") | |
end_date_report = json.load(json_file) | |
except: | |
print(f"Unable to find reports for {start_date} {end_date}") | |
return None | |
if country in start_date_report and country in end_date_report: | |
d = difflib.HtmlDiff() | |
result = d.make_table( | |
start_date_report[country]["hosts"], | |
end_date_report[country]["hosts"], | |
numlines=0, | |
context=True, | |
) | |
return f"<h2>FROM {start_date} TO {end_date}</h2>{result}" | |
else: | |
return None | |
def execute(sync_remote=False): | |
args = sys.argv | |
tables = {} | |
if ( | |
len(args) == 3 | |
and re.match("[\d]{4}-[\d]{2}-[\d]{2}", args[1]) | |
and re.match("[\d]{4}-[\d]{2}-[\d]{2}", args[2]) | |
): | |
start_date = args[1] | |
end_date = args[2] | |
else: | |
days = 7 | |
for date_offset in range(days, 0, -1): | |
bouding_date = datetime.today() - timedelta(days=date_offset) | |
start_date = (bouding_date - timedelta(days=1)).strftime("%Y-%m-%d") | |
end_date = (bouding_date).strftime("%Y-%m-%d") | |
if sync_remote: | |
json_data = fetch_json(start_date, end_date) | |
day_report = parse_json(json_data) | |
save_report(start_date, day_report) | |
if date_offset < days: | |
for c in COUNTRIES: | |
table_content = generate_diff(c, bouding_date) | |
if table_content: | |
if c not in tables: | |
tables[c] = [] | |
tables[c].append(table_content) | |
for country in tables: | |
html_report = open(os.path.join(REPORTS_DIR, f"{country}.html"), "w") | |
html_report.write( | |
f"<h1>{country}</h1>" + "<hr>".join(tables[country]) + css | |
) | |
html_report.close() | |
if __name__ == "__main__": | |
execute(sync_remote=False) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment