Skip to content

Instantly share code, notes, and snippets.

@hellais
Created February 19, 2018 12:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hellais/c520929b9ed6120587860efcf069e44d to your computer and use it in GitHub Desktop.
Save hellais/c520929b9ed6120587860efcf069e44d to your computer and use it in GitHub Desktop.
Mining OONI Data Notebook
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Mining OONI Data\n",
"\n",
"The goal of this jupyter notebook is to showcase some of the common workflows for mining OONI data.\n",
"\n",
"You can use the various code snippets in this notebook as building blocks for your own data analysis workflows.\n",
"\n",
"The code in here is written for python3, but should be easily adapted to python2 if you need to.\n",
"\n",
"It is recommended that you use jupyter notebook for running it, but you can also run the code in here as part of a standard python script."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Standard lib imports\n",
"import os\n",
"import re\n",
"import json\n",
"import gzip\n",
"import ipaddress\n",
"import datetime as dt # It's useful to have dt as a shorthand for `datetime`\n",
"\n",
"from urllib.parse import urlparse"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# (optional) \n",
"# Using ujson instead of the standard json module, leads to significant performance boosts\n",
"# You can install usjon with:\n",
"# pip install ujson\n",
"import ujson as json"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Useful modules\n",
"# Install with:\n",
"# pip install requests dateutil progressbar\n",
"\n",
"import requests # Make doing HTTP(s) requests more pleasant\n",
"from dateutil.parser import parse as parse_date # Dead simple date parsing\n",
"import progressbar # Get a nice progress bar"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"[Pandas](https://pandas.pydata.org/) and [Numpy](http://www.numpy.org) are to very common (and powerful) tools for doing data analysis in python.\n",
"\n",
"You can find installation instructions for them here:\n",
"* https://pandas.pydata.org/pandas-docs/stable/install.html\n",
"* https://scipy.org/install.html\n",
"\n",
"We suggest you check out some of the tutorials and code examples, especially for pandas, to familiarise yourself with these very powerful tools as the links below:\n",
"\n",
"* https://pandas.pydata.org/pandas-docs/stable/tutorials.html\n",
"\n",
"You will also find a bunch of code examples and solutions to common data analysis problems on [stackoverflow](https://stackoverflow.com)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import pandas as pd\n",
"import numpy as np"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"*optional*\n",
"\n",
"When you are dealing with datasets that don't easily fit inside of RAM (10s of GBs), it's useful to have concurrency and stream processing abstractions.\n",
"\n",
"[Dask](https://dask.pydata.org/) allows you to use familiar `pandas` abstractions and makes it easy to write concurrent streaming code.\n",
"\n",
"If you think this is your use case you should also import the following modules:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import dask.bag as db\n",
"import dask.dataframe as dd\n",
"from dask.delayed import delayed\n",
"import dask.multiprocessing\n",
"from dask.diagnostics import ProgressBar"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"*optional*\n",
"\n",
"If you need to do a lot of IP and network related lookup, the following modules are very useful.\n",
"\n",
"In particular `radix` can be used to build a [radix tree](https://en.wikipedia.org/wiki/Radix_tree) for storing IP->ASN mappings and make the lookup much faster.\n",
"\n",
"`censys` is a public API that allows you to lookup information about particular hosts or IPs."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import pyasn\n",
"import radix\n",
"import censys.ipv4 as censsysipv4"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We assume that you have downloaded all the OONI metrics you are interested in processing and placed them inside of the directory pointed to by the `MEASUREMENT_DIR` variable.\n",
"\n",
"The OONI measurements can be downloaded using [ooni-sync](https://github.com/thetorproject/ooni-sync) (useful when you care about a small subset of ALL the OONI data, for example only that of a particular country):\n",
"\n",
"```\n",
"ooni-sync -directory data/ZA probe_cc=ZA test_name=web_connectivity\n",
"```\n",
"\n",
"Or the [AWS CLI](https://aws.amazon.com/cli/) when you need to download most of the OONI data:\n",
"\n",
"```\n",
"aws s3 cp --recursive s3://ooni-data/autoclaved/jsonl/ data/\n",
"```\n",
"\n",
"Note: the above command will download serveral TBs of OONI data into the `data/` directory. Only use it if you have enough space."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"MEASUREMENT_DIR = 'data/'"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"For analysing `web_connectivity` measurements it's useful to have a database of how blockpages look like. We use the following `list` to store a fingerprint DB.\n",
"\n",
"As you mine through the data and uncover more blockpages you should add them to this list and re-process the data. It's recommended that you also add a reference to where the blockpage came from, so that you can tighten the fingerprint if you need to.\n",
"\n",
"If you discover a new blockpage [that OONI doesn't have](https://github.com/TheTorProject/ooni-pipeline/blob/master/af/oometa/003-fingerprints.install.sql#L31), please tell us by filing a [github issue](https://github.com/thetorproject/ooni-pipeline/issues/new)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import re\n",
"BLOCKPAGES = [\n",
" {\n",
" 'match': [\n",
" (\n",
" 'body_s_re',\n",
" re.compile(b'prohibited for viewership from within Pakistan.')\n",
" )\n",
" ],\n",
" 'block_id': 'id-surf-safe-0',\n",
" 'measurement_id': [\n",
" 'https://explorer.ooni.torproject.org/measurement/20160822T062512Z_AS23674_hWMuxtANu8ujfyxCXoww1arsj3gFrfJbXvxd1rkZjuF6f1LD05?input=http:%2F%2Fen.wikipedia.org%2Fwiki%2FLars_Vilks_Muhammad_drawings_controversy'\n",
" ]\n",
" }\n",
"]\n",
"CENSOR_IPS = [\n",
" #lambda ip: ip.startswith('10.10')\n",
"]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The functions below are \"extractors\". You can think of them as the `reduce` step in a map-reduce workflow. Generally these functions take as input and individual measurement and output a subset of the keys of the measurement JSON.\n",
"\n",
"A common pattern to all extractors is that of annotating the result row with a quaternary key called `anomaly_color`:\n",
"\n",
"* `red` means that the measurement in question is for sure an instance of internet censorship or a confirmed case of network anomaly (ex. a middlebox was found)\n",
"* `orange` means the measurement presents an anomaly, but is not confirmed. These measurements are worth looking into to potentially convert them to `red`\n",
"* `yellow` means there was an error in the measurement or the result keys are inconsistent\n",
"* `green` means this measurement is for sure normal and can be considered as no evidence of censorship or no network anomaly."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from base64 import b64decode\n",
"def check_for_match(resp, rule):\n",
" rule_type, r = rule\n",
" if rule_type == 'body_s_re':\n",
" resp_body = resp.get('body', None)\n",
" if isinstance(resp_body, dict):\n",
" resp_body = b64decode(resp_body['data'])\n",
" elif resp_body is not None:\n",
" # XXX is this sane?\n",
" resp_body = resp_body.encode('utf-8')\n",
" else:\n",
" return False\n",
" return r.search(resp_body) != None\n",
"\n",
"def get_blockpage(test_keys):\n",
" dns_addrs = []\n",
" for query in test_keys.get('queries', []):\n",
" for answer in query['answers']:\n",
" if answer['answer_type'] == 'A':\n",
" try:\n",
" for idx, f in enumerate(CENSOR_IPS):\n",
" if f(answer['ipv4']):\n",
" return 'dns-blk-%s' % idx\n",
" except ValueError:\n",
" pass\n",
"\n",
" for i in range(len(test_keys['requests'])):\n",
" if not isinstance(test_keys['requests'][i].get('response', None), dict):\n",
" continue\n",
" resp = test_keys['requests'][i]['response']\n",
" for fp in BLOCKPAGES:\n",
" for rule in fp['match']:\n",
" if check_for_match(resp, rule):\n",
" return fp['block_id']\n",
" return\n",
"\n",
"import re\n",
"from base64 import b64decode\n",
"TITLE_REGEXP = re.compile(\"<title>(.*?)</title>\", re.IGNORECASE | re.DOTALL)\n",
"def extract_title(body):\n",
" m = TITLE_REGEXP.search(body, re.IGNORECASE | re.DOTALL)\n",
" if m:\n",
" return m.group(1)\n",
" return ''\n",
"\n",
"def extract_wc_ctrl(test_keys):\n",
" ctrl = test_keys.get('control', {})\n",
" if ctrl is None:\n",
" ctrl = {}\n",
" return {\n",
" 'ctrl_failure': test_keys.get('control_failure', None),\n",
" 'ctrl_http_body_len': ctrl.get('http_request', {}).get('body_length', None),\n",
" 'ctrl_http_failure': ctrl.get('http_request', {}).get('failure', None),\n",
" 'ctrl_http_headers': ctrl.get('http_request', {}).get('headers', None),\n",
" 'ctrl_http_title': ctrl.get('http_request', {}).get('title', None),\n",
" 'ctrl_http_status_code': ctrl.get('http_request', {}).get('status_code', None),\n",
" 'ctrl_tcp_info': ctrl.get('tcp_connect', None),\n",
" 'ctrl_dns_failure': ctrl.get('dns', {}).get('failure', None),\n",
" 'ctrl_dns_addrs': ctrl.get('dns', {}).get('addrs', None)\n",
" }\n",
"\n",
"def extract_wc_exp_http(test_keys):\n",
" body_len = None\n",
" title = None\n",
"\n",
" if len(test_keys['requests']) == 0:\n",
" return {\n",
" 'exp_http_failure': test_keys.get('http_experiment_failure', 'empty_req_no_fail'),\n",
" 'exp_http_body_len': None,\n",
" 'exp_http_headers': None,\n",
" 'exp_http_title': None,\n",
" 'exp_http_status_code': None\n",
" }\n",
"\n",
" last_session = test_keys['requests'][0]\n",
" last_resp = last_session.get('response', {})\n",
" if last_resp is None:\n",
" last_resp = {}\n",
" resp_body = last_resp.get('body', None)\n",
"\n",
" if isinstance(resp_body, dict):\n",
" resp_body = b64decode(resp_body['data'])\n",
" body_len = len(resp_body)\n",
"\n",
" elif resp_body is not None:\n",
" title = extract_title(resp_body)\n",
" body_len = len(resp_body)\n",
"\n",
" return {\n",
" 'exp_http_failure': test_keys.get('http_experiment_failure', 'unknown_failure_exp'),\n",
" 'exp_http_body_len': body_len,\n",
" 'exp_http_headers': last_resp.get('headers', None),\n",
" 'exp_http_title': title,\n",
" 'exp_http_status_code': last_resp.get('code', None)\n",
" }\n",
"\n",
"def extract_wc_exp_dns(test_keys):\n",
" dns_addrs = []\n",
" for query in test_keys.get('queries', []):\n",
" for answer in query['answers']:\n",
" if answer['answer_type'] == 'A':\n",
" dns_addrs.append(answer['ipv4'])\n",
" return {\n",
" 'exp_dns_failure': test_keys.get('dns_experiment_failure', None),\n",
" 'exp_dns_addrs': dns_addrs\n",
" }\n",
"\n",
"def extract_wc_exp_tcp(test_keys):\n",
" tcp_info = {}\n",
" for tcp_connect in test_keys['tcp_connect']:\n",
" key = \"%s:%d\" % (tcp_connect['ip'], tcp_connect['port'])\n",
" if key in tcp_info:\n",
" raise RuntimeError(\"Duplicate tcp key %s\" % (key))\n",
"\n",
" tcp_info[key] = {\n",
" 'failure': tcp_connect['status']['failure'],\n",
" 'status': tcp_connect['status']['success']\n",
" }\n",
"\n",
" return {\n",
" 'exp_tcp_info': tcp_info\n",
" }\n",
"\n",
"def extract_wc_probe_calculations(test_keys):\n",
" calcs = {}\n",
"\n",
" probe_calculations = [\n",
" 'dns_consistency',\n",
" 'blocking',\n",
" 'accessible',\n",
" 'headers_match',\n",
" 'body_proportion',\n",
" 'title_match',\n",
" 'headers_match',\n",
" 'status_code_match',\n",
" ]\n",
" # XXX also calculate these in pipeline\n",
" for calc_key in probe_calculations:\n",
" calcs[\"calc_%s\" % calc_key] = test_keys.get(calc_key, None)\n",
"\n",
" calcs['calc_blockpage'] = get_blockpage(test_keys)\n",
" return calcs\n",
"\n",
"def compute_wc_anomaly(final):\n",
" color = 'green'\n",
"\n",
" # We map to green all things related to server side blocking\n",
" if final['exp_http_title'] is not None:\n",
" if final['exp_http_title'].startswith('Attention Required! | CloudFlare'):\n",
" return color\n",
"\n",
" if final['exp_http_title'].startswith('Sucuri WebSite Firewall -'):\n",
" return color\n",
" if final['exp_http_title'].startswith('Sucuri CloudProxy Website Firewall'):\n",
" return color\n",
"\n",
" if final['calc_blocking'] == None:\n",
" color = 'yellow'\n",
" elif final['calc_blocking'] != False:\n",
" color = 'orange'\n",
"\n",
" if final['calc_blockpage'] != None:\n",
" color = 'red'\n",
" return color\n",
"\n",
"def extract_web_connectivity(m):\n",
" #'client_resolver'?\n",
" final = {\n",
" 'input': m['input']\n",
" }\n",
" final.update(extract_wc_ctrl(m['test_keys']))\n",
" final.update(extract_wc_exp_http(m['test_keys']))\n",
" final.update(extract_wc_exp_dns(m['test_keys']))\n",
" final.update(extract_wc_probe_calculations(m['test_keys']))\n",
" final['anmly_color'] = compute_wc_anomaly(final)\n",
" return final\n",
"\n",
"def extract_fm_probe_calculations(test_keys):\n",
" calcs = {}\n",
" calc_keys = [\n",
" 'facebook_b_api_dns_consistent',\n",
" 'facebook_b_api_reachable',\n",
" 'facebook_b_graph_dns_consistent',\n",
" 'facebook_b_graph_reachable',\n",
" 'facebook_dns_blocking',\n",
" 'facebook_edge_dns_consistent',\n",
" 'facebook_edge_reachable',\n",
" 'facebook_external_cdn_dns_consistent',\n",
" 'facebook_external_cdn_reachable',\n",
" 'facebook_scontent_cdn_dns_consistent',\n",
" 'facebook_scontent_cdn_reachable',\n",
" 'facebook_star_dns_consistent',\n",
" 'facebook_star_reachable',\n",
" 'facebook_stun_dns_consistent',\n",
" 'facebook_stun_reachable',\n",
" 'facebook_tcp_blocking'\n",
" ]\n",
" for key in calc_keys:\n",
" try:\n",
" calcs[\"calc_%s\" % key] = test_keys[key]\n",
" except KeyError:\n",
" calcs[\"calc_%s\" % key] = None\n",
" return calcs\n",
"\n",
"# anomaly_color can be one of:\n",
"# * green\n",
"# * yellow\n",
"# * orange\n",
"# * red\n",
"def compute_fm_anomaly(final):\n",
" true_calc_keys = [\n",
" 'facebook_b_api_dns_consistent',\n",
" 'facebook_b_api_reachable',\n",
" 'facebook_b_graph_dns_consistent',\n",
" 'facebook_b_graph_reachable',\n",
" 'facebook_edge_dns_consistent',\n",
" 'facebook_edge_reachable',\n",
" 'facebook_external_cdn_dns_consistent',\n",
" 'facebook_external_cdn_reachable',\n",
" 'facebook_scontent_cdn_dns_consistent',\n",
" 'facebook_scontent_cdn_reachable',\n",
" 'facebook_star_dns_consistent',\n",
" 'facebook_star_reachable',\n",
" 'facebook_stun_dns_consistent',\n",
" # facebook_stun_reachable',\n",
" ]\n",
" false_calc_keys = [\n",
" 'facebook_tcp_blocking',\n",
" 'facebook_dns_blocking'\n",
" ]\n",
" color = 'green'\n",
" for key in false_calc_keys:\n",
" if final['calc_%s' % key] == True:\n",
" color = 'red'\n",
" elif final['calc_%s' % key] == None and color != 'red':\n",
" color = 'yellow'\n",
" for key in true_calc_keys:\n",
" if final['calc_%s' % key] == False:\n",
" color = 'red'\n",
" elif final['calc_%s' % key] == None and color != 'red':\n",
" color = 'yellow'\n",
" return color\n",
"\n",
"def extract_facebook_messenger(m):\n",
" final = {}\n",
" # XXX compute these pipeline side too\n",
" final.update(extract_fm_probe_calculations(m['test_keys']))\n",
" final['anmly_color'] = compute_fm_anomaly(final)\n",
" return final\n",
"\n",
"def extract_wa_probe_calculations(test_keys):\n",
" # registration_server_failure: null,\n",
" # registration_server_status: 'ok',\n",
" # whatsapp_web_failure: null\n",
" # whatsapp_endpoints_status: 'ok'\n",
" # whatsapp_web_status: 'ok'\n",
" # whatsapp_endpoints_dns_inconsistent: []\n",
" # whatsapp_endpoints_blocked: []\n",
" calc = {}\n",
" calc_keys = [\n",
" 'registration_server_status',\n",
" 'whatsapp_endpoints_status',\n",
" 'whatsapp_web_status',\n",
" 'whatsapp_endpoints_dns_inconsistent',\n",
" 'whatsapp_endpoints_blocked'\n",
" ]\n",
" for key in calc_keys:\n",
" calc['calc_%s' % key] = test_keys[key]\n",
" return calc\n",
"\n",
"def extract_wa_failures(test_keys):\n",
" # XXX maybe we want to do this from the raw data directly?\n",
" fail = {}\n",
" failure_keys = [\n",
" 'registration_server_failure',\n",
" 'whatsapp_web_failure'\n",
" ]\n",
" for key in failure_keys:\n",
" try:\n",
" fail[key] = test_keys[key]\n",
" except KeyError:\n",
" fail[key] = None\n",
" return fail\n",
"\n",
"def compute_wa_anomaly(final):\n",
" color = 'green'\n",
" for key in ['registration_server_status', 'whatsapp_endpoints_status', 'whatsapp_web_status']:\n",
" if final[\"calc_%s\" % key] != 'ok':\n",
" color = 'red'\n",
" return color\n",
"\n",
"def extract_whatsapp(m):\n",
" final = {}\n",
" # XXX compute these pipeline side too\n",
" final.update(extract_wa_probe_calculations(m['test_keys']))\n",
" final.update(extract_wa_failures(m['test_keys']))\n",
" final['anmly_color'] = compute_wa_anomaly(final)\n",
" return final\n",
"\n",
"def extract_telegram_probe_calculations(test_keys):\n",
" # telegram_web_failure: null,\n",
" # telegram_http_blocking: false\n",
" # telegram_web_status: 'ok'\n",
" # telegram_tcp_blocking: false\n",
"\n",
" calc = {}\n",
" calc_keys = [\n",
" 'telegram_web_failure',\n",
" 'telegram_http_blocking',\n",
" 'telegram_web_status',\n",
" 'telegram_tcp_blocking'\n",
" ]\n",
" for key in calc_keys:\n",
" calc['calc_%s' % key] = test_keys[key]\n",
" return calc\n",
"\n",
"def extract_telegram_failures(test_keys):\n",
" # XXX maybe we want to do this from the raw data directly?\n",
" fail = {}\n",
" failure_keys = [\n",
" 'telegram_web_failure'\n",
" ]\n",
" for key in failure_keys:\n",
" try:\n",
" fail[key] = test_keys[key]\n",
" except KeyError:\n",
" fail[key] = None\n",
" return fail\n",
"\n",
"def compute_telegram_anomaly(final):\n",
" color = 'green'\n",
" for key in ['telegram_tcp_blocking', 'telegram_http_blocking']:\n",
" if final[\"calc_%s\" % key] == True:\n",
" color = 'red'\n",
" for key in ['telegram_web_status']:\n",
" if final[\"calc_%s\" % key] != 'ok':\n",
" color = 'red'\n",
" return color\n",
"\n",
"def extract_telegram(m):\n",
" final = {}\n",
" # XXX compute these pipeline side too\n",
" final.update(extract_telegram_probe_calculations(m['test_keys']))\n",
" final.update(extract_telegram_failures(m['test_keys']))\n",
" final['anmly_color'] = compute_telegram_anomaly(final)\n",
" return final\n",
"\n",
"def extract_vanilla_tor(m):\n",
" final = {}\n",
" final['calc_success'] = m['test_keys'].get('success', None)\n",
" final['anmly_color'] = 'green'\n",
" if final['calc_success'] == False:\n",
" final['anmly_color'] = 'red'\n",
" elif final['calc_success'] == None:\n",
" final['anmly_color'] = 'yellow'\n",
" return final\n",
"\n",
"def extract_hhfm_exp(test_keys):\n",
" exp = {}\n",
" exp['exp_req_headers'] = test_keys['requests'][0].get('request', {}).get('headers', {})\n",
" exp['exp_req_headers']['Connection'] = 'close'\n",
" exp['exp_req_failure'] = test_keys['requests'][0].get('failure', None)\n",
" if exp['exp_req_failure'] is None:\n",
" resp_body = test_keys['requests'][0]['response'].get('body', None)\n",
" else:\n",
" resp_body = None\n",
" exp['exp_resp_body'] = resp_body\n",
" return exp\n",
"\n",
"def compute_hhfm_result(final):\n",
" calc = {}\n",
" calc['calc_headers_modified'] = None\n",
" calc['calc_total_tampering'] = None\n",
" try:\n",
" # {\"headers_dict\": {\"acCePT-languagE\": [\"en-US,en;q=0.8\"], ...},\n",
" # \"request_line\": \"geT / HTTP/1.1\",\n",
" # \"request_headers\": [ [\"Connection\", \"close\"], ... ]\n",
" # }\n",
" ctrl_headers = json.loads(final['exp_resp_body'])\n",
" except:\n",
" calc['calc_total_tampering'] = True\n",
" return calc\n",
" calc['calc_headers_modified'] = False\n",
" calc['calc_total_tampering'] = False\n",
" if len(final['exp_req_headers']) != len(ctrl_headers['headers_dict']):\n",
" calc['calc_headers_modified'] = True\n",
" return calc\n",
" for k, v in final['exp_req_headers'].items():\n",
" try:\n",
" if v != ctrl_headers['headers_dict'][k][0]:\n",
" calc['calc_headers_modified'] = True\n",
" return calc\n",
" except KeyError:\n",
" calc['calc_headers_modified'] = True\n",
" return calc\n",
" return calc\n",
"\n",
"def compute_hhfm_anomaly(final):\n",
" color = 'green'\n",
" for key in ['calc_headers_modified', 'calc_total_tampering']:\n",
" if final[key] == True:\n",
" color = 'red'\n",
" elif final[key] == None and color != 'red':\n",
" color = 'yellow'\n",
" return color\n",
"\n",
"def extract_http_header_field_manipulation(m):\n",
" final = {}\n",
" final.update(extract_hhfm_exp(m['test_keys']))\n",
" final.update(compute_hhfm_result(final))\n",
" final['anmly_color'] = compute_hhfm_anomaly(final)\n",
" return final\n",
"\n",
"def compute_hirl_anomaly(final):\n",
" color = 'green'\n",
" for key in ['calc_tampering']:\n",
" if final[key] == True:\n",
" color = 'red'\n",
" elif final[key] == None and color != 'red':\n",
" color = 'yellow'\n",
" return color\n",
"\n",
"def extract_http_invalid_request_line(m):\n",
" final = {\n",
" 'calc_tampering': None,\n",
" 'exp_received': m['test_keys']['received'],\n",
" 'exp_sent': m['test_keys']['sent']\n",
" }\n",
" if len(m['test_keys']['received']) != len(m['test_keys']['sent']):\n",
" final['calc_tampering'] = True\n",
" else:\n",
" final['calc_tampering'] = False\n",
" for i, v in enumerate(m['test_keys']['received']):\n",
" if v != m['test_keys']['sent'][i]:\n",
" final['calc_tampering'] = True\n",
" break\n",
" final['anmly_color'] = compute_hirl_anomaly(final)\n",
" return final\n",
"\n",
"\n",
"def extract_common(m):\n",
" common = {}\n",
" common_fields = [\n",
" 'probe_cc',\n",
" 'probe_asn',\n",
" 'test_start_time',\n",
" 'report_id',\n",
" 'bucket_date'\n",
" ]\n",
" for field in common_fields:\n",
" common['a_%s' % field] = m[field]\n",
" platform = common.get('annotations', {}).get('platform', 'uknown')\n",
" common['a_software_string'] = '%s/%s/%s' % (platform, m['software_name'], m['software_version'])\n",
"\n",
" return common"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The following `dict` maps the test names to their relative extractor."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"extractors = {\n",
" 'web_connectivity': extract_web_connectivity,\n",
" 'vanilla_tor': extract_vanilla_tor,\n",
" 'facebook_messenger': extract_facebook_messenger,\n",
" 'whatsapp': extract_whatsapp,\n",
" 'http_header_field_manipulation': extract_http_header_field_manipulation,\n",
" 'http_invalid_request_line': extract_http_invalid_request_line,\n",
" 'telegram': extract_telegram\n",
"}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The following functions are used to convert measurements to `pandas.DataFrame`s."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from glob import glob\n",
"failed_report_files = []\n",
"def msmts_from_report(report_path, extract_test=extract_web_connectivity):\n",
" ds = []\n",
" with open(report_path) as in_file:\n",
" for line in in_file:\n",
" entry = json.loads(line)\n",
" d = extract_common(entry)\n",
" d.update(extract_test(entry))\n",
" ds.append(d)\n",
" return ds\n",
"\n",
"def get_all_dfs(probe_cc):\n",
" bar = progressbar.ProgressBar()\n",
" msmts = {}\n",
" for report_file in bar(glob('{}/{}/*'.format(MEASUREMENT_DIR, probe_cc))):\n",
" tst, probe_cc, probe_asn, test_name, report_id = os.path.basename(report_file).split('-')[:5]\n",
" extract_test = extractors.get(test_name, None)\n",
" if extract_test is None:\n",
" continue\n",
" msmts[test_name] = msmts.get(test_name, [])\n",
" try:\n",
" msmts[test_name] += msmts_from_report(report_file, extract_test=extract_test)\n",
" except Exception as exc:\n",
" failed_report_files.append((exc, report_file))\n",
" all_dfs = {}\n",
" for key in msmts.keys():\n",
" all_dfs[key] = pd.DataFrame(msmts[key])\n",
" return all_dfs"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"`all_dfs` will be a dictionary keyed on the `test_name` containing the `pandas.DataFrame` for that particular test type."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"all_dfs = get_all_dfs(probe_cc='ZA')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A common pattern is then to look at the `anmly_color` key and filter by the keys that are not `green` (normal), for each test type.\n",
"\n",
"Rinse and repeat until you get interesting results"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df_wa = all_dfs['whatsapp']\n",
"df_wa[df_wa['anmly_color'] != 'green']"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df_hirl = all_dfs['http_invalid_request_line']\n",
"df_hirl[df_hirl['anmly_color'] != 'green']"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df_fm = all_dfs['facebook_messenger']\n",
"df_fm[df_fm['anmly_color'] == 'red']"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df_hhfm = all_dfs['http_header_field_manipulation']\n",
"df_hhfm[df_hhfm['anmly_color'] != 'green']"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df_vt = all_dfs['vanilla_tor']\n",
"df_vt[df_vt['anmly_color'] != 'green']"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Web Connectivity measurements are a bit more complex to work with (as they are much more susceptible to false positives). Generally it's useful to plot measurements over time and use the full power of `pandas` to get useful \"over time\" insight."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df_wc = all_dfs['web_connectivity']"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"only_blockpages = df_wc[df_wc['anmly_color'] == 'red'][[\n",
" 'calc_blocking',\n",
" 'calc_blockpage',\n",
" 'anmly_color',\n",
" 'a_probe_asn',\n",
" 'a_test_start_time',\n",
" 'a_report_id',\n",
" 'input'\n",
"]]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"unique_blocked_sites = only_blockpages[['input']].drop_duplicates()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.1"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment