Skip to content

Instantly share code, notes, and snippets.

@zkywalker
Last active December 30, 2022 09:00
Show Gist options
  • Save zkywalker/1ff2029e17e951ec136e350cdd2cd95c to your computer and use it in GitHub Desktop.
Save zkywalker/1ff2029e17e951ec136e350cdd2cd95c to your computer and use it in GitHub Desktop.
the demo of selenium chrome driver use ip proxy
import time
import zipfile
from selenium.webdriver.chrome.service import Service
from selenium import webdriver
class Browser:
"""
the demo of selenium chrome driver use ip proxy
create by zkywalker 2022/12/30
"""
def __init__(self, **kwargs):
self.is_loaded = False
self.driver_path = kwargs.get("driver_path")
self.proxy = kwargs.get("proxy", None)
self.headless = kwargs.get("headless", False)
options = webdriver.ChromeOptions()
if self.headless:
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
if self.proxy is not None:
add_proxy(options, self.proxy)
if self.driver_path:
self.browser = webdriver.Chrome(service=Service(self.driver_path), options=options)
else:
self.browser = webdriver.Chrome("/usr/lib/chromium-browser/chromedriver", chrome_options=options)
self.browser.set_script_timeout(5)
def load_url(self, url):
self.browser.get(url)
self.is_loaded = True
def load_base_url(self):
pass
def request_get(self, url):
if not self.is_loaded:
self.load_base_url()
data = self.browser.execute_script('''
function queryData() {
var p = new Promise(function(resolve,reject) {
var e={
"url":"%s",
"method":"GET"
};
var h = new XMLHttpRequest;
h.open(e.method, e.url, true);
h.setRequestHeader("accept","application/json, text/plain, */*");
h.setRequestHeader("content-type","application/json;charset=UTF-8");
h.onreadystatechange =function() {
if(h.readyState === 4 && h.status ===200) {
resolve(h.responseText);
} else {}
};
h.send(null);
});
return p;
}
var p1 = queryData();
res = Promise.all([p1]).then(function(result){
return result
})
return res
''' % url)[0]
return data
def request_post(self, url, data):
"""
post请求
:param url: 请求连接
:param data: post data
:return: 请求数据
"""
if not self.is_loaded:
self.load_base_url()
data = self.browser.execute_script('''
function queryData() {
var p = new Promise(function(resolve,reject) {
var e={"url":"%s",
"method":"POST",
"data" : '%s'};
var h = new XMLHttpRequest;h.open(e.method, e.url, true);
h.setRequestHeader("accept","application/json, text/plain, */*");
h.setRequestHeader("content-type","application/json;charset=UTF-8");
h.onreadystatechange =function() {
if(h.readyState != 4) return;
if(h.readyState === 4 && h.status === 200) {
resolve(h.responseText);
} else {
}
};
h.send(e.data);
});
return p;
}
var p1 = queryData();
res = Promise.all([p1]).then(function(result){
return result
})
return res;
''' % (url, data))[0]
return data
def request_post_for_url(self, url, data):
"""
post请求获取url
:param url: 请求连接
:param data: post data
:return: 环境渲染后的请求url
"""
if not self.is_loaded:
self.load_base_url()
start = int(time.time())
data = self.browser.execute_script('''
function queryData() {
var p = new Promise(function(resolve,reject) {
var e={"url":"%s",
"method":"POST",
"data" : '%s'};
var h = new XMLHttpRequest;h.open(e.method, e.url, true);
h.setRequestHeader("accept","application/json, text/plain, */*");
h.setRequestHeader("content-type","application/json;charset=UTF-8");
h.onreadystatechange =function() {
if(h.readyState != 4) return;
if(h.readyState === 4 && h.status ===200) {
resolve(h.responseURL);
} else {
}
};
h.send(e.data);
});
return p;
}
var p1 = queryData();
res = Promise.all([p1]).then(function(result){
return result
})
return res;
''' % (url, data))[0]
return data
def close(self):
try:
self.browser.close()
self.browser.quit()
finally:
print('close browser success')
def build_proxy_dict(proxy_string) -> dict:
"""
生成代理字典
:param proxy_string: 代理字符串 scheme://username:passwrod@host:port
:return: 代理字典 {'scheme': 'http', 'host': '1233', 'port': 111, 'username': 'test', 'password': '123'}
"""
proxy = dict()
if proxy_string is not None:
if "@" in proxy_string:
server_prefix = proxy_string.split("://")[0]
up = proxy_string.split("://")[1].split("@")[0]
address = proxy_string.split("@")[1]
proxy['scheme'] = server_prefix
proxy['host'] = address.split(":")[0]
proxy['port'] = int(address.split(":")[1])
proxy['username'] = up.split(":")[0]
proxy['password'] = up.split(":")[1]
else:
proxy = {"server": proxy_string}
return proxy
def add_proxy(chrome_options, proxy_string):
proxy_dict = build_proxy_dict(proxy_string)
manifest_json = """
{
"version": "1.0.0",
"manifest_version": 2,
"name": "Chrome Proxy",
"permissions": [
"proxy",
"tabs",
"unlimitedStorage",
"storage",
"<all_urls>",
"webRequest",
"webRequestBlocking"
],
"background": {
"scripts": ["background.js"]
},
"minimum_chrome_version":"22.0.0"
}
"""
background_js = """
var config = {
mode: "fixed_servers",
rules: {
singleProxy: {
scheme: "%s",
host: "%s",
port: parseInt(%s)
},
bypassList: ["localhost"]
}
};
chrome.proxy.settings.set({value: config, scope: "regular"}, function() {});
function callbackFn(details) {
return {
authCredentials: {
username: "%s",
password: "%s"
}
};
}
chrome.webRequest.onAuthRequired.addListener(
callbackFn,
{urls: ["<all_urls>"]},
['blocking']
);
""" % (
proxy_dict['scheme'], proxy_dict['host'], proxy_dict['port'],
proxy_dict['username'],
proxy_dict['password'])
plugin_file = f'proxy_auth_plugin.zip'
with zipfile.ZipFile(plugin_file, 'w') as zp:
zp.writestr("manifest.json", manifest_json)
zp.writestr("background.js", background_js)
chrome_options.add_extension(plugin_file)
if __name__ == '__main__':
targetURL = ""
proxyHost = ""
proxyPort = ""
username = ""
password = ""
driver_path = ""
driver = Browser(proxy=f'http://{username}:{password}@{proxyHost}:{proxyPort}',
driver_path=driver_path)
driver.load_url(targetURL)
print(driver.browser.page_source)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment