Last active
December 30, 2022 09:00
-
-
Save zkywalker/1ff2029e17e951ec136e350cdd2cd95c to your computer and use it in GitHub Desktop.
the demo of selenium chrome driver use ip proxy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import zipfile | |
from selenium.webdriver.chrome.service import Service | |
from selenium import webdriver | |
class Browser: | |
""" | |
the demo of selenium chrome driver use ip proxy | |
create by zkywalker 2022/12/30 | |
""" | |
def __init__(self, **kwargs): | |
self.is_loaded = False | |
self.driver_path = kwargs.get("driver_path") | |
self.proxy = kwargs.get("proxy", None) | |
self.headless = kwargs.get("headless", False) | |
options = webdriver.ChromeOptions() | |
if self.headless: | |
options.add_argument('--headless') | |
options.add_argument('--no-sandbox') | |
options.add_argument('--disable-dev-shm-usage') | |
if self.proxy is not None: | |
add_proxy(options, self.proxy) | |
if self.driver_path: | |
self.browser = webdriver.Chrome(service=Service(self.driver_path), options=options) | |
else: | |
self.browser = webdriver.Chrome("/usr/lib/chromium-browser/chromedriver", chrome_options=options) | |
self.browser.set_script_timeout(5) | |
def load_url(self, url): | |
self.browser.get(url) | |
self.is_loaded = True | |
def load_base_url(self): | |
pass | |
def request_get(self, url): | |
if not self.is_loaded: | |
self.load_base_url() | |
data = self.browser.execute_script(''' | |
function queryData() { | |
var p = new Promise(function(resolve,reject) { | |
var e={ | |
"url":"%s", | |
"method":"GET" | |
}; | |
var h = new XMLHttpRequest; | |
h.open(e.method, e.url, true); | |
h.setRequestHeader("accept","application/json, text/plain, */*"); | |
h.setRequestHeader("content-type","application/json;charset=UTF-8"); | |
h.onreadystatechange =function() { | |
if(h.readyState === 4 && h.status ===200) { | |
resolve(h.responseText); | |
} else {} | |
}; | |
h.send(null); | |
}); | |
return p; | |
} | |
var p1 = queryData(); | |
res = Promise.all([p1]).then(function(result){ | |
return result | |
}) | |
return res | |
''' % url)[0] | |
return data | |
def request_post(self, url, data): | |
""" | |
post请求 | |
:param url: 请求连接 | |
:param data: post data | |
:return: 请求数据 | |
""" | |
if not self.is_loaded: | |
self.load_base_url() | |
data = self.browser.execute_script(''' | |
function queryData() { | |
var p = new Promise(function(resolve,reject) { | |
var e={"url":"%s", | |
"method":"POST", | |
"data" : '%s'}; | |
var h = new XMLHttpRequest;h.open(e.method, e.url, true); | |
h.setRequestHeader("accept","application/json, text/plain, */*"); | |
h.setRequestHeader("content-type","application/json;charset=UTF-8"); | |
h.onreadystatechange =function() { | |
if(h.readyState != 4) return; | |
if(h.readyState === 4 && h.status === 200) { | |
resolve(h.responseText); | |
} else { | |
} | |
}; | |
h.send(e.data); | |
}); | |
return p; | |
} | |
var p1 = queryData(); | |
res = Promise.all([p1]).then(function(result){ | |
return result | |
}) | |
return res; | |
''' % (url, data))[0] | |
return data | |
def request_post_for_url(self, url, data): | |
""" | |
post请求获取url | |
:param url: 请求连接 | |
:param data: post data | |
:return: 环境渲染后的请求url | |
""" | |
if not self.is_loaded: | |
self.load_base_url() | |
start = int(time.time()) | |
data = self.browser.execute_script(''' | |
function queryData() { | |
var p = new Promise(function(resolve,reject) { | |
var e={"url":"%s", | |
"method":"POST", | |
"data" : '%s'}; | |
var h = new XMLHttpRequest;h.open(e.method, e.url, true); | |
h.setRequestHeader("accept","application/json, text/plain, */*"); | |
h.setRequestHeader("content-type","application/json;charset=UTF-8"); | |
h.onreadystatechange =function() { | |
if(h.readyState != 4) return; | |
if(h.readyState === 4 && h.status ===200) { | |
resolve(h.responseURL); | |
} else { | |
} | |
}; | |
h.send(e.data); | |
}); | |
return p; | |
} | |
var p1 = queryData(); | |
res = Promise.all([p1]).then(function(result){ | |
return result | |
}) | |
return res; | |
''' % (url, data))[0] | |
return data | |
def close(self): | |
try: | |
self.browser.close() | |
self.browser.quit() | |
finally: | |
print('close browser success') | |
def build_proxy_dict(proxy_string) -> dict: | |
""" | |
生成代理字典 | |
:param proxy_string: 代理字符串 scheme://username:passwrod@host:port | |
:return: 代理字典 {'scheme': 'http', 'host': '1233', 'port': 111, 'username': 'test', 'password': '123'} | |
""" | |
proxy = dict() | |
if proxy_string is not None: | |
if "@" in proxy_string: | |
server_prefix = proxy_string.split("://")[0] | |
up = proxy_string.split("://")[1].split("@")[0] | |
address = proxy_string.split("@")[1] | |
proxy['scheme'] = server_prefix | |
proxy['host'] = address.split(":")[0] | |
proxy['port'] = int(address.split(":")[1]) | |
proxy['username'] = up.split(":")[0] | |
proxy['password'] = up.split(":")[1] | |
else: | |
proxy = {"server": proxy_string} | |
return proxy | |
def add_proxy(chrome_options, proxy_string): | |
proxy_dict = build_proxy_dict(proxy_string) | |
manifest_json = """ | |
{ | |
"version": "1.0.0", | |
"manifest_version": 2, | |
"name": "Chrome Proxy", | |
"permissions": [ | |
"proxy", | |
"tabs", | |
"unlimitedStorage", | |
"storage", | |
"<all_urls>", | |
"webRequest", | |
"webRequestBlocking" | |
], | |
"background": { | |
"scripts": ["background.js"] | |
}, | |
"minimum_chrome_version":"22.0.0" | |
} | |
""" | |
background_js = """ | |
var config = { | |
mode: "fixed_servers", | |
rules: { | |
singleProxy: { | |
scheme: "%s", | |
host: "%s", | |
port: parseInt(%s) | |
}, | |
bypassList: ["localhost"] | |
} | |
}; | |
chrome.proxy.settings.set({value: config, scope: "regular"}, function() {}); | |
function callbackFn(details) { | |
return { | |
authCredentials: { | |
username: "%s", | |
password: "%s" | |
} | |
}; | |
} | |
chrome.webRequest.onAuthRequired.addListener( | |
callbackFn, | |
{urls: ["<all_urls>"]}, | |
['blocking'] | |
); | |
""" % ( | |
proxy_dict['scheme'], proxy_dict['host'], proxy_dict['port'], | |
proxy_dict['username'], | |
proxy_dict['password']) | |
plugin_file = f'proxy_auth_plugin.zip' | |
with zipfile.ZipFile(plugin_file, 'w') as zp: | |
zp.writestr("manifest.json", manifest_json) | |
zp.writestr("background.js", background_js) | |
chrome_options.add_extension(plugin_file) | |
if __name__ == '__main__': | |
targetURL = "" | |
proxyHost = "" | |
proxyPort = "" | |
username = "" | |
password = "" | |
driver_path = "" | |
driver = Browser(proxy=f'http://{username}:{password}@{proxyHost}:{proxyPort}', | |
driver_path=driver_path) | |
driver.load_url(targetURL) | |
print(driver.browser.page_source) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment