Skip to content

Instantly share code, notes, and snippets.

@Eddy-Morgan
Last active December 19, 2023 04:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save Eddy-Morgan/881d3a2f2df1ceaeaf3a1041859b2972 to your computer and use it in GitHub Desktop.
Save Eddy-Morgan/881d3a2f2df1ceaeaf3a1041859b2972 to your computer and use it in GitHub Desktop.
Proxy integrated into selenium in scrapy middleware
# Define here the models for your spider middleware
#
# See documentation in:
# https://docs.scrapy.org/en/latest/topics/spider-middleware.html
import sys
import time
import logging
from scrapy import signals
from scrapy.mail import MailSender
from scrapy.utils.project import get_project_settings
# useful for handling different item types with a single interface
from itemadapter import is_item, ItemAdapter
from scrapy.http import HtmlResponse
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from shutil import which
import undetected_chromedriver as uc
if not (sys.platform == "linux" or sys.platform == "linux2"):
uc.TARGET_VERSION = 90
from datetime import datetime
from selenium.webdriver.common.keys import Keys
from pyvirtualdisplay import Display
from scrapy.downloadermiddlewares.retry import RetryMiddleware
from scrapy.utils.response import response_status_message
import os
import codecs
from selenium.webdriver.common.action_chains import ActionChains
settings = get_project_settings()
class CouponsRetryMiddleware(RetryMiddleware):
def process_response(self, request, response, spider):
if request.meta.get('dont_retry', False):
return response
if response.status in self.retry_http_codes:
reason = response_status_message(response.status)
return self._retry(request, reason, spider) or response
if (response.status == 200) and (request.meta.get('myoferToken')) and (not any(item for item in response.meta["cookieJar"] if item["name"] == "token")):
reason = "Missing token cookie"
spider.logger.info('Spider %s retrying' % reason)
return self._retry(request,reason, spider) or response
return response
class CouponsSpiderMiddleware:
# Not all methods need to be defined. If a method is not defined,
# scrapy acts as if the spider middleware does not modify the
# passed objects.
@classmethod
def from_crawler(cls, crawler):
# This method is used by Scrapy to create your spiders.
s = cls()
crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
return s
def process_spider_input(self, response, spider):
# Called for each response that goes through the spider
# middleware and into the spider.
# Should return None or raise an exception.
return None
def process_spider_output(self, response, result, spider):
# Called with the results returned from the Spider, after
# it has processed the response.
# Must return an iterable of Request, or item objects.
for i in result:
yield i
def process_spider_exception(self, response, exception, spider):
# Called when a spider or process_spider_input() method
# (from other spider middleware) raises an exception.
# Should return either None or an iterable of Request or item objects.
pass
def process_start_requests(self, start_requests, spider):
# Called with the start requests of the spider, and works
# similarly to the process_spider_output() method, except
# that it doesn’t have a response associated.
# Must return only requests (not items).
for r in start_requests:
yield r
def spider_opened(self, spider):
spider.logger.info('Spider opened: %s' % spider.name)
class CouponsDownloaderMiddleware:
# Not all methods need to be defined. If a method is not defined,
# scrapy acts as if the downloader middleware does not modify the
# passed objects.
def __init__(self):
mailfrom=settings.get("MAIL_ADDRESS")
smtpport=settings.get("MAIL_PORT")
smtpuser=settings.get("MAIL_USER")
smtppass=settings.get("MAIL_PASSWORD")
smtphost=settings.get("SMTP_HOST")
self.mailer = MailSender(mailfrom=mailfrom,smtphost=smtphost,
smtpport=smtpport,smtpuser=smtpuser,smtppass=smtppass)
@classmethod
def from_crawler(cls, crawler):
# This method is used by Scrapy to create your spiders.
s = cls()
s.cookie = ""
if sys.platform == "linux" or sys.platform == "linux2":
s.display = Display(visible=0, size=(800, 600))
s.display.start()
logging.info("Virtual Display Initiated")
chrome_options = Options()
if crawler.spider.undetectable:
s.driver = uc.Chrome()
if crawler.spider.proxy:
proxyauth_plugin_path = s.create_proxyauth_extension(
proxy_host=crawler.settings.get('SELENIUM_PROXY_HOST'),
proxy_port=crawler.settings.get('SELENIUM_PROXY_PORT'),
proxy_username=f"lum-customer-{s.user}-ip-{s.ip}-zone-{s.zone}",
proxy_password=crawler.settings.get('SELENIUM_PROXY_PASSWORD'),
scheme='http')
options = uc.ChromeOptions()
options.add_extension(proxyauth_plugin_path)
s.driver = uc.Chrome(options=options)
else:
# driver_location = "/usr/bin/chromedriver"
driver_location = which('chromedriver')
# binary_location = "/usr/bin/google-chrome"
userAgent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.56 Safari/537.36"
# chrome_options.binary_location = binary_location
chrome_options.add_argument(f'user-agent={userAgent}')
chrome_options.add_argument("--ignore-certificate-errors")
chrome_options.add_argument("--ignore-ssl-errors")
chrome_options.add_argument("--headless" )
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
s.driver = webdriver.Chrome(executable_path=driver_location,chrome_options=chrome_options) # your chosen driver
crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(s.spider_closed, signal=signals.spider_closed)
return s
def create_proxyauth_extension(self,
proxy_host,
proxy_port,
proxy_username,
proxy_password,
scheme='http',
plugin_path=None):
"""Proxy Auth Extension
args:
proxy_host (str): domain or ip address, ie proxy.domain.com
proxy_port (int): port
proxy_username (str): auth username
proxy_password (str): auth password
kwargs:
scheme (str): proxy scheme, default http
plugin_path (str): absolute path of the extension
return str -> plugin_path
"""
if plugin_path is None:
file = './chrome_proxy_helper'
if not os.path.exists(file):
os.mkdir(file)
plugin_path = file + '/%s_%s@%s_%s.zip' % (
proxy_username, proxy_password, proxy_host, proxy_port)
manifest_json = """
{
"version": "1.0.0",
"manifest_version": 2,
"name": "Chrome Proxy",
"permissions": [
"proxy",
"tabs",
"unlimitedStorage",
"storage",
"<all_urls>",
"webRequest",
"webRequestBlocking"
],
"background": {
"scripts": ["background.js"]
},
"minimum_chrome_version":"22.0.0"
}
"""
background_js = string.Template("""
var config = {
mode: "fixed_servers",
rules: {
singleProxy: {
scheme: "${scheme}",
host: "${host}",
port: parseInt(${port})
},
bypassList: ["foobar.com"]
}
};
chrome.proxy.settings.set({value: config, scope: "regular"}, function() {});
function callbackFn(details) {
return {
authCredentials: {
username: "${username}",
password: "${password}"
}
};
}
chrome.webRequest.onAuthRequired.addListener(
callbackFn,
{urls: ["<all_urls>"]},
['blocking']
);
""").substitute(
host=proxy_host,
port=proxy_port,
username=proxy_username,
password=proxy_password,
scheme=scheme,
)
with zipfile.ZipFile(plugin_path, 'w') as zp:
zp.writestr("manifest.json", manifest_json)
zp.writestr("background.js", background_js)
return plugin_path
def popElement(self,interactElement):
try:
element = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.ID, interactElement)))
self.driver.execute_script("arguments[0].click();", element)
except Exception as ex:
logging.error(ex)
self.driver.save_screenshot(f"{settings.get('SCREENSHOTS_PATH')}{interactElement}_click_error.png")
n = os.path.join(settings.get('SCREENSHOTS_PATH'), f"{interactElement}_PageSave.html")
f = codecs.open(n, "w", "utf-8")
h = self.driver.page_source
f.write(h)
def xpath_pop_element(self, sel):
try:
element = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.XPATH, sel)))
self.driver.execute_script("arguments[0].click();", element)
except Exception as ex:
logging.error(ex)
self.driver.save_screenshot(f"{settings.get('SCREENSHOTS_PATH')}{interactElement}_click_error.png")
n = os.path.join(settings.get('SCREENSHOTS_PATH'), f"{interactElement}_PageSave.html")
f = codecs.open(n, "w", "utf-8")
h = self.driver.page_source
f.write(h)
def selenium_login(self,usrEId,pwdEId,username,password,spider):
try:
usrElement = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.ID, usrEId)))
usrElement.send_keys(username)
if spider.name == 'ashmoret':
element = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.XPATH, '//*[@id="f_login"]/div[4]/input')))
self.driver.execute_script("arguments[0].click();", element)
self.driver.find_element_by_id(pwdEId).send_keys(password,Keys.ENTER)
self.cookie = self.driver.get_cookies()
except TimeoutException as timeex:
logging.error(timeex)
except NoSuchElementException as noElementex:
logging.error(noElementex)
def process_request(self, request, spider):
# only process tagged request or delete this if you want all
if not (request.meta.get('selenium') or spider.undetectable):
return
if (not request.meta.get('login')) and (spider.name == 'hvr'):
for k in self.cookie:
self.driver.add_cookie(k)
self.driver.get(request.url)
if request.meta.get('scroll'):
self.scroll()
if spider.wait:
try:
elementId = spider.elementId
element_present = EC.presence_of_element_located((By.ID, elementId))
if request.meta.get('elementId'):
elementId = request.meta.get('elementId')
element_present = EC.presence_of_element_located((By.ID, elementId))
if request.meta.get('elementClass'):
elementId = request.meta.get('elementClass')
element_present = EC.presence_of_element_located((By.CLASS_NAME,elementId))
WebDriverWait(self.driver, 2).until(element_present)
except TimeoutException:
spider.logger.error('Spider %s took too long to load' % spider.name)
return
if request.meta.get('interactElement'):
self.popElement(request.meta.get('interactElement'))
if request.meta.get("interact-xpath"):
self.xpath_pop_element(request.meta.get("interact-xpath"))
if request.meta.get('login'):
self.selenium_login(spider.usrEId,spider.pwdEId,spider.username,spider.password,spider)
body = self.driver.page_source
url = request.url
response = HtmlResponse(url, body=body, encoding='utf-8', request=request)
response.meta['cookieJar'] = self.driver.get_cookies()
if request.meta.get("script"):
response.meta['script_response'] = self.driver.execute_script(request.meta.get("script"))
return response
def scroll(self):
SCROLL_PAUSE_TIME = 2
# Get scroll height
last_height = self.driver.execute_script("return document.body.scrollHeight")
main_scroll_count = 0
while True:
# Scroll down to bottom
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
# Wait to load page
time.sleep(SCROLL_PAUSE_TIME)
# Calculate new scroll height and compare with last scroll height
new_height = self.driver.execute_script("return document.body.scrollHeight")
main_scroll_count = main_scroll_count + 1
if new_height == last_height:
break
last_height = new_height
def process_response(self, request, response, spider):
# Called with the response returned from the downloader.
# Must either;
# - return a Response object
# - return a Request object
# - or raise IgnoreRequest
return response
def process_exception(self, request, exception, spider):
# Called when a download handler or a process_request()
# (from other downloader middleware) raises an exception.
# Must either:
# - return None: continue processing this exception
# - return a Response object: stops process_exception() chain
# - return a Request object: stops process_exception() chain
pass
def spider_opened(self, spider):
spider.logger.info('Spider opened: %s' % spider.name)
return self.mailer.send(to=settings.get("EMAIL_LIST"), cc=settings.get("CC_LIST"), subject=f"IGROUP Coupon Scraping - Spider {spider.name} status",body=f"Spider {spider.name} started at {datetime.now().strftime('%m/%d/%Y, %H:%M:%S')}")
def spider_closed(self, spider):
spider.logger.info('Spider closed: %s' % spider.name)
if self.driver:
self.driver.close()
self.driver = None
if sys.platform == "linux" or sys.platform == "linux2":
self.display.stop()
spider.logger.info("Virtual Display killed")
return self.mailer.send(to=settings.get("EMAIL_LIST"), cc=settings.get("CC_LIST"), subject=f"IGROUP Coupon Scraping - Spider {spider.name} status",body=f"Spider {spider.name} closed at {datetime.now().strftime('%m/%d/%Y, %H:%M:%S')}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment