Skip to content

Instantly share code, notes, and snippets.

@ad-inventory-fraud
Last active April 10, 2023 21:37
Show Gist options
  • Save ad-inventory-fraud/fd2a3f9467fb76addc951a5bff18ebaa to your computer and use it in GitHub Desktop.
Save ad-inventory-fraud/fd2a3f9467fb76addc951a5bff18ebaa to your computer and use it in GitHub Desktop.
from selenium.webdriver.common.action_chains import ActionChains
from requests.packages.urllib3.util.retry import Retry
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from requests.adapters import HTTPAdapter
from selenium.common.exceptions import (
NoSuchElementException,
InvalidSessionIdException,
TimeoutException,
WebDriverException,
JavascriptException
)
from webdriver_utils import scroll_down
# from pyvirtualdisplay import Display
from adblockparser import AdblockRules
from selenium.webdriver import Chrome
from argparse import ArgumentParser
from browsermobproxy import Server
from selenium import webdriver
import datetime
from tld import get_fld
from time import sleep
from PIL import Image
import requests
import random
import math
import json
import tld
import os
# Uncomment and use the code lines below in order to access/debug chrome instrumentation remotely while running in non-headless mode
# Also uncomment the associated commented import
'''
disp = Display(backend="xvnc", size=(1920,1080), rfbport=1212) # XXXX has to be a random port number
disp.start()
'''
# Defining experimental options to chromer webdriver instate while instrumenting it
chrome_options = Options()
# chrome_options.binary_location = "/usr/bin/google-chrome-stable"
chrome_options.add_argument("--headless")
chrome_options.add_argument("--start-maximized")
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--ignore-ssl-errors=yes')
chrome_options.add_argument('--ignore-certificate-errors')
chrome_options.add_argument("--window-size=1280,720")
global_curr_domain = ""
def download_lists(FILTERLIST_DIR):
"""
Function to download the lists used in AdGraph.
Args:
FILTERLIST_DIR: Path of the output directory to which filter lists should be written.
Returns:
Nothing, writes the lists to a directory.
This functions does the following:
1. Sends HTTP requests for the lists used in AdGraph.
2. Writes to an output directory.
"""
num_retries = 5
session = requests.Session()
retry = Retry(total=num_retries, connect=num_retries, read=num_retries, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry,pool_connections=100, pool_maxsize=200)
session.mount('http://', adapter)
session.mount('https://', adapter)
request_headers_https = {
"Connection": "keep-alive",
"User-Agent": "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36",
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate, br"
}
# "Accept-Language": "en-US,en;q=0.9"
request_headers_http = {
"Connection": "keep-alive",
"User-Agent": "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36",
"Accept": "*/*"
}
raw_lists = {
'easylist': 'https://easylist.to/easylist/easylist.txt',
'easyprivacy': 'https://easylist.to/easylist/easyprivacy.txt',
'antiadblock': 'https://raw.github.com/reek/anti-adblock-killer/master/anti-adblock-killer-filters.txt',
'blockzilla': 'https://raw.githubusercontent.com/annon79/Blockzilla/master/Blockzilla.txt',
'fanboyannoyance': 'https://easylist.to/easylist/fanboy-annoyance.txt',
'fanboysocial': 'https://easylist.to/easylist/fanboy-social.txt',
'peterlowe': 'http://pgl.yoyo.org/adservers/serverlist.php?hostformat=adblockplus&mimetype=plaintext',
'squid': 'http://www.squidblacklist.org/downloads/sbl-adblock.acl',
'warning': 'https://easylist-downloads.adblockplus.org/antiadblockfilters.txt',
}
for listname, url in raw_lists.items():
with open(os.path.join(FILTERLIST_DIR, "%s.txt" % listname), 'wb') as f:
# f.write(requests.get(url).content)
try:
response = session.get(url, timeout=45, headers=request_headers_https)
response_content = response.content
f.write(response_content)
except requests.exceptions.ConnectionError as e1:
continue
def read_file_newline_stripped(fname):
try:
with open(fname) as f:
lines = f.readlines()
lines = [x.strip() for x in lines]
return lines
except:
return []
def setup_filterlists():
'''
Setup and download (if not already downloaded earlier) the filter lists to identify ad-related URLs
'''
FILTERLIST_DIR = "filterlists"
if not os.path.isdir(FILTERLIST_DIR):
os.makedirs(FILTERLIST_DIR)
download_lists(FILTERLIST_DIR)
filterlist_rules = {}
filterlists = os.listdir(FILTERLIST_DIR)
for fname in filterlists:
rule_dict = {}
rules = read_file_newline_stripped(os.path.join(FILTERLIST_DIR, fname))
rule_dict['script'] = AdblockRules(rules, use_re2=False, max_mem=1024*1024*1024, supported_options=['script', 'domain', 'subdocument'], skip_unsupported_rules=False)
rule_dict['script_third'] = AdblockRules(rules, use_re2=False, max_mem=1024*1024*1024, supported_options=['third-party', 'script', 'domain', 'subdocument'], skip_unsupported_rules=False)
rule_dict['image'] = AdblockRules(rules, use_re2=False, max_mem=1024*1024*1024, supported_options=['image', 'domain', 'subdocument'], skip_unsupported_rules=False)
rule_dict['image_third'] = AdblockRules(rules, use_re2=False, max_mem=1024*1024*1024, supported_options=['third-party', 'image', 'domain', 'subdocument'], skip_unsupported_rules=False)
rule_dict['css'] = AdblockRules(rules, use_re2=False, max_mem=1024*1024*1024, supported_options=['stylesheet', 'domain', 'subdocument'], skip_unsupported_rules=False)
rule_dict['css_third'] = AdblockRules(rules, use_re2=False, max_mem=1024*1024*1024, supported_options=['third-party', 'stylesheet', 'domain', 'subdocument'], skip_unsupported_rules=False)
rule_dict['xmlhttp'] = AdblockRules(rules, use_re2=False, max_mem=1024*1024*1024, supported_options=['xmlhttprequest', 'domain', 'subdocument'], skip_unsupported_rules=False)
rule_dict['xmlhttp_third'] = AdblockRules(rules, use_re2=False, max_mem=1024*1024*1024, supported_options=['third-party', 'xmlhttprequest', 'domain', 'subdocument'], skip_unsupported_rules=False)
rule_dict['third'] = AdblockRules(rules, use_re2=False, max_mem=1024*1024*1024, supported_options=['third-party', 'domain', 'subdocument'], skip_unsupported_rules=False)
rule_dict['domain'] = AdblockRules(rules, use_re2=False, max_mem=1024*1024*1024, supported_options=['domain', 'subdocument'], skip_unsupported_rules=False)
filterlist_rules[fname] = rule_dict
return filterlists, filterlist_rules
def match_url(domain_top_level, current_domain, current_url, resource_type, rules_dict):
'''
Associate the URL to a particular category based on different rules
'''
try:
if domain_top_level == current_domain:
third_party_check = False
else:
third_party_check = True
if resource_type == 'sub_frame':
subdocument_check = True
else:
subdocument_check = False
if resource_type == 'script':
if third_party_check:
rules = rules_dict['script_third']
options = {'third-party': True, 'script': True, 'domain': domain_top_level, 'subdocument': subdocument_check}
else:
rules = rules_dict['script']
options = {'script': True, 'domain': domain_top_level, 'subdocument': subdocument_check}
elif resource_type == 'image' or resource_type == 'imageset':
if third_party_check:
rules = rules_dict['image_third']
options = {'third-party': True, 'image': True, 'domain': domain_top_level, 'subdocument': subdocument_check}
else:
rules = rules_dict['image']
options = {'image': True, 'domain': domain_top_level, 'subdocument': subdocument_check}
elif resource_type == 'stylesheet':
if third_party_check:
rules = rules_dict['css_third']
options = {'third-party': True, 'stylesheet': True, 'domain': domain_top_level, 'subdocument': subdocument_check}
else:
rules = rules_dict['css']
options = {'stylesheet': True, 'domain': domain_top_level, 'subdocument': subdocument_check}
elif resource_type == 'xmlhttprequest':
if third_party_check:
rules = rules_dict['xmlhttp_third']
options = {'third-party': True, 'xmlhttprequest': True, 'domain': domain_top_level, 'subdocument': subdocument_check}
else:
rules = rules_dict['xmlhttp']
options = {'xmlhttprequest': True, 'domain': domain_top_level, 'subdocument': subdocument_check}
elif third_party_check:
rules = rules_dict['third']
options = {'third-party': True, 'domain': domain_top_level, 'subdocument': subdocument_check}
else:
rules = rules_dict['domain']
options = {'domain': domain_top_level, 'subdocument': subdocument_check}
return rules.should_block(current_url, options)
except Exception as e:
return False
def label_data(script_url):
'''
# top_domain = the website being visited
# script_domain = domain of iframe url
# script_url = url of iframe
# resource_type = subframe, image, script
'''
top_domain = global_curr_domain
data_label = False
filterlists, filterlist_rules = setup_filterlists()
for fl in filterlists:
for resource_type in ["sub_frame", "script", "image"]:
list_label = match_url(top_domain, get_fld(script_url), script_url, resource_type, filterlist_rules[fl])
data_label = data_label | list_label
if data_label == True:
break
if data_label == True:
break
return data_label
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment