Skip to content

Instantly share code, notes, and snippets.

@joeface
Created June 11, 2022 16:05
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joeface/0ae9c0ad5241e21b441a6fa0be98a343 to your computer and use it in GitHub Desktop.
Save joeface/0ae9c0ad5241e21b441a6fa0be98a343 to your computer and use it in GitHub Desktop.
Blocked Websites Discovery based on OONI API
import difflib
import json
import os
import re
import requests
import sys
from datetime import datetime, timedelta
from pprint import pprint
css = """
<style>
* {font-family: sans-serif;}
table td{ padding: 2px 4px; font-family: sans-serif;}
.diff_sub {text-decoration: line-through; color: grey;}
.diff_add {font-weight: bold; color: red;}
</style>
"""
COUNTRIES = ("RU", "BY", "UA", "KZ", "KG", "MD", "TJ", "AZ", "AM", "GE")
REPORTS = {}
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
REPORTS_DIR = os.path.join(BASE_DIR, "reports/")
def read_file():
print("Reading data from file")
try:
json_file = open(os.path.join(BASE_DIR, "aggregation.json"), "r")
data = json.load(json_file)
return data["result"]
except:
print("! Unable to read JSON file")
return None
def fetch_json(start_date, end_date):
print(f"Fetching remote data from {start_date} till {end_date}")
r = requests.get(
f"https://api.ooni.io/api/v1/aggregation?test_name=web_connectivity&since={start_date}&until={end_date}&axis_x=probe_cc&axis_y=domain",
timeout=60,
)
if r.status_code != requests.codes.ok:
print("Error fetching stats")
return None
data = r.json()
if "result" in data:
return data["result"]
return None
def parse_json(data):
global COUNTRIES
REPORT_DATA = {}
if not data:
print("Empty data source")
return False
# {"anomaly_count":0,"confirmed_count":0,"domain":"1.0.0.1","failure_count":0,"measurement_count":2,"ok_count":2,"probe_cc":"AE"}
for rec in data:
if (
rec["probe_cc"] in COUNTRIES
and rec["measurement_count"] > 3
and (rec["anomaly_count"] > rec["ok_count"] or rec["confirmed_count"] > 0)
):
if rec["probe_cc"] not in REPORT_DATA:
REPORT_DATA[rec["probe_cc"]] = {"hosts": [], "count": 0}
REPORT_DATA[rec["probe_cc"]]["count"] += 1
REPORT_DATA[rec["probe_cc"]]["hosts"].append(rec["domain"])
return REPORT_DATA
def save_report(date, report_data):
json_file = open(os.path.join(REPORTS_DIR, f"{date}.json"), "w")
json.dump(report_data, json_file)
return True
def show_report():
DATA = {}
reports = ("2022-06-08", "2022-06-09", "2022-06-10")
for report in reports:
print(f"\nREPORT FOR {report}")
json_file = open(os.path.join(REPORTS_DIR, f"{report}.json"), "r")
daily_report = json.load(json_file)
DATA[report] = daily_report
for country in daily_report:
print(f'- {country}: {daily_report[country]["count"]}')
d = difflib.HtmlDiff()
result = d.make_table(
DATA["2022-06-08"]["AM"]["hosts"], DATA["2022-06-09"]["AM"]["hosts"]
)
pprint(result)
print("------")
def generate_diff(country, bouding_date):
start_date = (bouding_date - timedelta(days=2)).strftime("%Y-%m-%d")
end_date = (bouding_date - timedelta(days=1)).strftime("%Y-%m-%d")
print(f"Building differences table for {country} {start_date} {end_date}")
try:
json_file = open(os.path.join(REPORTS_DIR, f"{start_date}.json"), "r")
start_date_report = json.load(json_file)
json_file = open(os.path.join(REPORTS_DIR, f"{end_date}.json"), "r")
end_date_report = json.load(json_file)
except:
print(f"Unable to find reports for {start_date} {end_date}")
return None
if country in start_date_report and country in end_date_report:
d = difflib.HtmlDiff()
result = d.make_table(
start_date_report[country]["hosts"],
end_date_report[country]["hosts"],
numlines=0,
context=True,
)
return f"<h2>FROM {start_date} TO {end_date}</h2>{result}"
else:
return None
def execute(sync_remote=False):
args = sys.argv
tables = {}
if (
len(args) == 3
and re.match("[\d]{4}-[\d]{2}-[\d]{2}", args[1])
and re.match("[\d]{4}-[\d]{2}-[\d]{2}", args[2])
):
start_date = args[1]
end_date = args[2]
else:
days = 7
for date_offset in range(days, 0, -1):
bouding_date = datetime.today() - timedelta(days=date_offset)
start_date = (bouding_date - timedelta(days=1)).strftime("%Y-%m-%d")
end_date = (bouding_date).strftime("%Y-%m-%d")
if sync_remote:
json_data = fetch_json(start_date, end_date)
day_report = parse_json(json_data)
save_report(start_date, day_report)
if date_offset < days:
for c in COUNTRIES:
table_content = generate_diff(c, bouding_date)
if table_content:
if c not in tables:
tables[c] = []
tables[c].append(table_content)
for country in tables:
html_report = open(os.path.join(REPORTS_DIR, f"{country}.html"), "w")
html_report.write(
f"<h1>{country}</h1>" + "<hr>".join(tables[country]) + css
)
html_report.close()
if __name__ == "__main__":
execute(sync_remote=False)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment