-
-
Save rafajafar/7eafd987bf1be4524c1dfe88ef8e0fcd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyvirtualdisplay import Display | |
from selenium import webdriver | |
from selenium.webdriver.support.events import EventFiringWebDriver, AbstractEventListener | |
from selenium.common.exceptions import TimeoutException | |
from selenium.webdriver.common.action_chains import ActionChains | |
from selenium.webdriver.common.alert import Alert | |
from selenium.webdriver.chrome.options import Options as ChromeOptions | |
from selenium.webdriver.remote.remote_connection import LOGGER | |
from utils.fullpage_screenshot import fullpageScreenshot | |
from utils.pageload import renderedPageSource, waitDocumentReady | |
import os | |
import sys | |
import errno | |
import time | |
import traceback | |
import codecs | |
import re | |
import json | |
import logging | |
from urlparse import urlparse | |
import argparse | |
def filename_replace(mystr): | |
mstr = "" | |
regx = re.compile(r'[A-Za-z0-9_.-]') | |
for c in mystr: | |
v = c if regx.match(c) else '_' | |
mstr += v | |
return mstr | |
def make_sure_path_exists(path, mode = 0755): | |
try: | |
os.makedirs(path, mode) | |
except OSError as exception: | |
if exception.errno != errno.EEXIST: | |
raise | |
def get_url(driver, pg_url): | |
print "getting: %s" % pg_url.strip() | |
retries = 3 | |
while retries > 0: | |
try: | |
driver.get(pg_url) | |
except TimeoutException: | |
print "Timeout, retrying - ( %s )" % pg_url | |
retries = retries - 1 | |
continue | |
else: | |
break | |
def get_screen(driver, screenshot_file): | |
try: | |
retries = 3 | |
while retries > 0: | |
try: | |
fullpageScreenshot(driver, screenshot_file) | |
except TimeoutException: | |
print "Timeout, retrying - ( %s )" % screenshot_file | |
retries = retries - 1 | |
continue | |
else: | |
break | |
except: | |
traceback.print_exc() | |
exc_info = sys.exc_info() | |
try: | |
os.remove(screenshot_file) | |
except OSError: | |
pass | |
raise exc_info[0], exc_info[1], exc_info[2] | |
def get_source(driver, source_file, sleep_time=10): | |
try: | |
source = renderedPageSource(driver, sleep_time) | |
with codecs.open(source_file, "w", 'utf-8') as text_file: | |
text_file.write(source) | |
return source | |
except: | |
traceback.print_exc() | |
exc_info = sys.exc_info() | |
try: | |
os.remove(source_file) | |
except OSError: | |
pass | |
raise exc_info[0], exc_info[1], exc_info[2] | |
def read_js_file(js_fname): | |
o = "" | |
with codecs.open(js_fname, "r", 'utf-8') as myfile: | |
o = myfile.read() | |
return o | |
class ClickRecorder(): | |
data = {} | |
current_selector = None | |
current_coord = None | |
current_url = None | |
def addClick(self, sel, coordpair): | |
self.current_selector = sel | |
self.current_coord = coordpair | |
self.current_url = None | |
def addURL(self, url): | |
self.current_url = url | |
if url not in self.data: | |
self.data[url] = {} | |
if self.current_selector not in self.data[url]: | |
self.data[url][self.current_selector] = [] | |
self.data[url][self.current_selector].append(self.current_coord) | |
self.current_selector = None | |
self.current_coord = None | |
class NavListener(AbstractEventListener): | |
prevent_navigation = True | |
click_recorder = None | |
def __init__(self): | |
self.click_recorder = ClickRecorder() | |
def before_navigate_to(self, url, driver): | |
print "BEFORE" | |
self.nav_attempted = False | |
if self.prevent_navigation: | |
driver.execute_script("window.stop();") | |
self.click_recorder.addURL(url) | |
print "DONE" | |
def detect_advertisements(pg_url): | |
display = Display(visible=0, size=(1366, 768)) | |
display.start() | |
time_marker = int(time.time()) | |
#parses URL to make sure that urlparse can pick up on domain | |
url_pts = pg_url.split('//') | |
prs_url = "" | |
if len(url_pts) == 1: | |
prs_url = '//' + prs_url | |
else: | |
prs_url = pg_url | |
prs_url = prs_url.replace("://www.", "://") | |
parsedurl = urlparse(prs_url) | |
domain = filename_replace(parsedurl.netloc.strip().strip("/")) | |
path = filename_replace(parsedurl.path.strip().strip("/")) | |
qry = filename_replace(parsedurl.query.strip().strip("/")) | |
if path == '': | |
path = '_' | |
if qry == '': | |
qry = '_' | |
dirpath = "sites/%s/%s/%s" % (domain,path,qry) | |
report_dir = "%s/reports/%s" % (dirpath, time_marker) | |
make_sure_path_exists(report_dir) | |
ub_screen = "%s/screen" % report_dir | |
bl_screen = "%s/screen" % report_dir | |
ub_sourced = "%s/source" % report_dir | |
bl_sourced = "%s/source" % report_dir | |
make_sure_path_exists(ub_screen) | |
make_sure_path_exists(bl_screen) | |
make_sure_path_exists(ub_sourced) | |
make_sure_path_exists(bl_sourced) | |
ub_screenshot_file = "%s/unblocked.png" % ub_screen | |
bl_screenshot_file = "%s/blocked.png" % bl_screen | |
ub_source_file = "%s/unblocked.html" % ub_sourced | |
bl_source_file = "%s/blocked.html" % bl_sourced | |
ub_highlighted_file = "%s/ads_highlighted.png" % ub_screen | |
jqf = read_js_file("utils/jquery-2.2.1.min.js") | |
jqsl = read_js_file("utils/jquery.selectorator.js") | |
jqif = read_js_file("utils/jquery.iframeTracker.js") | |
sjbp = read_js_file("utils/selenium_jquery_blocked_parse.js") | |
sjfe = read_js_file("utils/selenium_jquery_find_elements.js") | |
sjsc = read_js_file("utils/selenium_simulate_clicks.js") | |
sjcc = read_js_file("utils/selenium_check_clicks.js") | |
jspn = read_js_file("utils/prevent_navigation.js") | |
try: | |
#having two drivers open at the same time apparently doesn't work. | |
#gotta do unblock, then block | |
# service_args=["--verbose", "--log-path=./chrome.log"] | |
chromedrvr = "/usr/local/bin/chromedriver" | |
os.environ["webdriver.chrome.driver"] = chromedrvr | |
chrome_options = ChromeOptions() | |
chrome_options.add_argument("load-extension=./extensions/ublock") | |
bl_driver = webdriver.Chrome(executable_path=chromedrvr, chrome_options=chrome_options) | |
bl_driver.set_page_load_timeout(30) | |
#bl_driver.implicitly_wait(30) | |
try: | |
get_url(bl_driver, pg_url) | |
get_screen(bl_driver,bl_screenshot_file) | |
get_source(bl_driver, bl_source_file) | |
except: | |
traceback.print_exc() | |
raise | |
try: | |
bl_driver.execute_script(jqf) | |
r = bl_driver.execute_script("window.mtJQ = $.noConflict(true);") | |
bl_driver.execute_script(sjbp) | |
bl_source = renderedPageSource(bl_driver) | |
bl_source_body = bl_driver.execute_script("return document.body.innerHTML;") | |
except: | |
traceback.print_exc() | |
raise | |
bl_source_file_post = "%s/blocked_post.html" % bl_sourced | |
get_source(bl_driver, bl_source_file_post) | |
bl_driver.quit() | |
chrome_options = ChromeOptions() | |
#chrome_options.add_argument("--disable-web-security") | |
#chrome_options.add_argument("--allow-running-insecure-content") | |
#chrome_options.add_argument("--allow-file-access-from-files") | |
#service_args=["--verbose", "--log-path=./chrome.log"], | |
selenium_logger = logging.getLogger('selenium.webdriver.remote.remote_connection') | |
selenium_logger.setLevel(logging.ERROR) | |
ub_driver_base = webdriver.Chrome(executable_path=chromedrvr,chrome_options=chrome_options, service_args=["--log-path=./chrome.log"]) | |
nav_listener = NavListener() | |
ub_driver = EventFiringWebDriver(ub_driver_base, nav_listener) | |
#ub_driver.implicitly_wait(30) | |
try: | |
get_url(ub_driver, pg_url) | |
get_screen(ub_driver,ub_screenshot_file) | |
get_source(ub_driver, ub_source_file) | |
except: | |
traceback.print_exc() | |
raise | |
try: | |
ub_driver.execute_script(jqf) | |
r = ub_driver.execute_script("window.mtJQ = $.noConflict(true);") | |
ub_driver.execute_script(jqsl) | |
ub_results = ub_driver.execute_script(sjfe, bl_source_body) | |
ub_source = renderedPageSource(ub_driver) | |
if ub_results != "success": | |
raise | |
except: | |
traceback.print_exc() | |
raise | |
ub_source_file_post = "%s/unblocked_post.html" % ub_sourced | |
get_source(ub_driver, ub_source_file_post) | |
#prep for click checks | |
#ub_driver.execute_script(jqif) | |
ub_driver.execute_script(jspn) | |
#ub_driver.execute_script(sjsc) | |
#execute click checks | |
clicks_relevant = json.loads(ub_driver.execute_script(sjcc)) | |
print json.dumps(clicks_relevant) | |
actions = ActionChains(ub_driver) | |
ub_driver.set_page_load_timeout(0.01) | |
for dompath in clicks_relevant: | |
print "#",dompath | |
el = ub_driver.find_element_by_id(dompath) | |
for cr in clicks_relevant[dompath]: | |
nav_listener.click_recorder.addClick(dompath, cr) | |
try: | |
actions.move_to_element_with_offset(el, cr["clickX"], cr["clickY"]).click().perform() | |
except: | |
pass | |
try: | |
alert = ub_driver.switch_to_alert() | |
alert.dismiss() | |
except: | |
pass | |
for url_fnd in nav_listener.click_recorder.data: | |
for dompath in nav_listener.click_recorder.data[url_fnd]: | |
ub_driver.execute_script("mtJQ('#%s').css('box-shadow', 'inset 0px 0px 0px 10px #f00');" % dompath) | |
get_screen(ub_driver, ub_highlighted_file); | |
print results | |
ub_driver.quit() | |
except: | |
print "error" | |
traceback.print_exc() | |
try: | |
bl_driver.quit() | |
except: | |
pass | |
try: | |
ub_driver.quit() | |
except: | |
pass | |
pass | |
display.stop() | |
if __name__ == "__main__": | |
argparser = argparse.ArgumentParser(description='Run uBlock Test and Generate Screenshots.') | |
argparser.add_argument('pg_url', help="URL to test against", nargs='?', default="http://reddit.com") | |
args = argparser.parse_args() | |
detect_advertisements(args.pg_url) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment