Skip to content

Instantly share code, notes, and snippets.

@collina
Created December 7, 2015 00:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save collina/216b5576a8c68411575f to your computer and use it in GitHub Desktop.
Save collina/216b5576a8c68411575f to your computer and use it in GitHub Desktop.
# -*- encoding: utf-8 -*-
#
# Wikipedia Content Research Test
# **************
#
# :authors: Collin Anderson (@CDA)
# :licence: see LICENSE
import sys
import json
from twisted.internet import defer
from twisted.python import usage
from ooni.utils import randomStr
from ooni.utils import log
from ooni.templates import httpt
from urllib2 import quote
class UsageOptions(usage.Options):
optParameters = [['backend', 'b', None,
'URL of the test backend to use. Should be \
listening on port 80 and be a \
HTTPReturnJSONHeadersHelper'],
['content', 'c', None, 'The file to read \
from containing the content of a block page'],
['localization', 'l', 'en', 'The subdomain of \
wikipedia.org that articles are associated with'],
]
class HTTPHost(httpt.HTTPTest):
"""
This test output should remain compatible with the 'HTTP Host Test.'
Usually this test should be run with a list of page titles for a particular
localization of Wikipedia.
This Test Will:
* ) Randomize Order of Page Titles
Baseline Test:
1.) Accessibility of localized Wikipedia,
2.) Accessibility of English Wikipedia (if not same as localization)
3.)
Testing per Article:
1.) The localized subdomain of Wikipedia (if provided),
1.) The Article name as Host
2.) The English Version of Wikipedia (if not same) [not yet]
3.) A random Host (as a baseline)
Need to Do:
* ) Check based on different encodings
* ) check_for_censorship in HTTP Host is borked
"""
name = "Wikipedia Content Test"
description = "Tests the accessibility of wikipedia content " \
"based on the HTTP Host header and GET request fields."
author = "Collin Anderson"
version = "0.1"
randomizeUA = False
usageOptions = UsageOptions
inputFile = ['file', 'f', None,
'List of article titles to test for censorship, generally (locale name, e.g. `fawiki`)-(dump date YYYYMMDD)-all-titles.gz']
requiredTestHelpers = {'backend': 'http-return-json-headers'}
requiredOptions = ['backend']
requiresTor = False
requiresRoot = False
def setUp(self):
self.report['transparent_http_proxy'] = False
self.localOptions['localized_hostname'] = self.localOptions['localization'] + '.wikipedia.org'
def check_for_censorship(self, body, test_name):
"""
XXX this is to be filled in with either a domclass based classified or
with a rule that will allow to detect that the body of the result is
that of a censored site.
"""
# If we don't see a json dict we know that something is wrong for
# sure
if not body.startswith("{"):
log.msg("This does not appear to be JSON")
self.report['transparent_http_proxy'] = True
# self.check_for_censorship(body)
# return
else:
try:
content = json.loads(body)
# We base the determination of the presence of a transparent HTTP
# proxy on the basis of the response containing the json that is to be
# returned by a HTTP Request Test Helper
if 'request_headers' in content and 'request_line' in content and 'headers_dict' in content:
log.msg("Found the keys I expected in %s" % content)
self.report['transparent_http_proxy'] = self.report['transparent_http_proxy'] | False
self.report[test_name] = False
else:
log.msg("Did not find the keys I expected in %s" % content)
except:
log.msg("The json does not parse, this is not what we expected")
self.report['transparent_http_proxy'] = True
# self.check_for_censorship(body)
# return
if (not self.report.has_key('transparent_http_proxy')) or self.report['transparent_http_proxy'] == True:
self.report['transparent_http_proxy'] = True
if self.localOptions['content']:
self.report[test_name] = True
censorship_page = open(self.localOptions['content'])
response_page = iter(body.split("\n"))
for censorship_line in censorship_page:
response_line = response_page.next()
if response_line != censorship_line:
self.report[test_name] = False
break
censorship_page.close()
@defer.inlineCallbacks
def test_valid_request(self):
"""
Stuffs the HTTP Host header field with the site to be tested for
censorship and does an HTTP request of this kind to our backend.
We randomize the HTTP User Agent headers.
"""
test_name = sys._getframe().f_code.co_name.replace('test_', '')
headers = {}
headers["Host"] = [self.localOptions['localized_hostname']]
test_url = '%s/wiki/%s' % (self.localOptions['backend'], quote(self.input))
response = yield self.doRequest(test_url, headers=headers)
self.check_for_censorship(response.body, test_name)
# @defer.inlineCallbacks
# def test_filtering_via_request_string(self):
# test_name = sys._getframe().f_code.co_name.replace('test_', '')
# headers = {}
# headers["Host"] = [randomStr(10) + '.' + randomStr(3)]
# test_url = '%s/wiki/%s' % (self.localOptions['backend'], self.input)
# response = yield self.doRequest(test_url, headers=headers)
# self.check_for_censorship(response.body, test_name)
#
# @defer.inlineCallbacks
# def test_filtering_via_hostname(self):
# test_name = sys._getframe().f_code.co_name.replace('test_', '')
# headers = {}
# headers["Host"] = [self.input]
# test_url = self.localOptions['backend']
# response = yield self.doRequest(test_url, headers=headers)
# self.check_for_censorship(response.body, test_name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment