Skip to content

Instantly share code, notes, and snippets.

@wuyue92tree
Last active June 11, 2021 03:54
Show Gist options
  • Save wuyue92tree/af423d0c4fed8465482f2c3b435461b2 to your computer and use it in GitHub Desktop.
Save wuyue92tree/af423d0c4fed8465482f2c3b435461b2 to your computer and use it in GitHub Desktop.
change-cef-proxy-when-runing.py
# base on https://gist.github.com/massimiliano-dalcero/0252a3c5dc54a2f81650d5c0eafabf99
# use urllib3 setup the proxy for every browser in running programe
# cefext.py
from cefpython3 import cefpython as cef
import urllib3
# import urllib3.contrib.pyopenssl
from threading import Thread
from urllib.parse import urlparse
# urllib3.contrib.pyopenssl.inject_into_urllib3()
urllib3.disable_warnings()
BROWSER_DEFAULT_ENCODING = "UTF-8"
class WebRequestClient:
_resourceHandler = None
_data = ""
_dataLength = -1
_response = None
def OnUploadProgress(self, web_request, current, total):
pass
def OnDownloadProgress(self, web_request, current, total):
pass
def OnDownloadData(self, web_request, data):
# #print("OnDownloadData()")
self._data += data
def OnRequestComplete(self, web_request):
# cefpython.WebRequest.Status = {"Unknown", "Success",
# "Pending", "Canceled", "Failed"}
statusText = "Unknown"
if web_request.GetRequestStatus() in cef.WebRequest.Status:
statusText = cef.WebRequest.Status[cef.webRequest.GetRequestStatus(
)]
# Emulate OnResourceResponse() in ClientHandler:
self._response = web_request.GetResponse()
# Are webRequest.GetRequest() and
# self._resourceHandler._request the same? What if
# there was a redirect, what will GetUrl() return
# for both of them?
self._data = self._resourceHandler._clientHandler._OnResourceResponse(
self._resourceHandler._browser,
self._resourceHandler._frame,
web_request.GetRequest(),
web_request.GetRequestStatus(),
web_request.GetRequestError(),
web_request.GetResponse(),
self._data)
self._dataLength = len(self._data)
# ResourceHandler.GetResponseHeaders() will get called
# after _responseHeadersReadyCallback.Continue() is called.
self._resourceHandler._responseHeadersReadyCallback.Continue()
class ClientHandler(object):
def __init__(self, proxy=None, cookie=None):
self.proxy = proxy
self.cookie = cookie
def GetResourceHandler(self, browser, frame, request):
resHandler = ResourceHandler(self.proxy, self.cookie)
resHandler._clientHandler = self
resHandler._browser = browser
resHandler._frame = frame
resHandler._request = request
#resHandler._cm = self.cm
self._AddStrongReference(resHandler)
return resHandler
# def _OnResourceResponse(self, request, response, data):
def _OnResourceResponse(self, browser, frame, request, requestStatus,
requestError, response, data):
return data
_resourceHandlers = {}
_resourceHandlerMaxId = 0
def _AddStrongReference(self, resHandler):
self._resourceHandlerMaxId += 1
resHandler._resourceHandlerId = self._resourceHandlerMaxId
self._resourceHandlers[resHandler._resourceHandlerId] = resHandler
def _ReleaseStrongReference(self, resHandler):
if resHandler._resourceHandlerId in self._resourceHandlers:
del self._resourceHandlers[resHandler._resourceHandlerId]
else:
#print("_ReleaseStrongReference() FAILED: resource handler not found, id = %s" % (resHandler._resourceHandlerId))
pass
class CookView():
def __init__(self):
self.cookies = []
def getGookieHead(self):
return "; ".join((str(i[0])+"="+str(i[1])) for i in self.cookies)
def Visit(self, cookie, count, total, delete_cookie_out):
self.cookies.append((cookie.GetName(), cookie.GetValue()))
return True
class ResourceHandler(object):
_responseHeadersReadyCallback = None
_offsetRead = 0
_data = None
_heads = None
_datalen = -1
def __init__(self, proxy, cookie):
self.proxy = proxy
self.cookie = cookie
def go(self, http, ref, heads, request, callback):
send_data = request.GetPostData()
if request.GetMethod() == "GET" or request.GetMethod() == "HEAD" or request.GetMethod() == "DELETE":
r = http.request_encode_url(request.GetMethod(), request.GetUrl(
), fields=send_data, headers=heads, redirect=False, preload_content=False)
elif (request.GetMethod() == "POST" and heads["Content-Type"] == "application/x-www-form-urlencoded") or request.GetMethod() == "PUT" or request.GetMethod() == "PATCH":
r = http.request_encode_body(request.GetMethod(), request.GetUrl(
), fields=send_data, headers=heads, redirect=False, encode_multipart=False, preload_content=False)
elif (request.GetMethod() == "POST" and "multipart" in heads["Content-Type"]):
r = http.request(request.GetMethod(), request.GetUrl(
), fields=send_data, headers=heads, redirect=False, preload_content=False)
else:
r = http.request_encode_url(request.GetMethod(), request.GetUrl(
), fields=send_data, headers=heads, redirect=False, preload_content=False)
self._response = r
self._datalen = len(r.data)
self._data = r.data
self._datastream = r.stream
request.SetFlags(
cef.Request.Flags["AllowCachedCredentials"] | cef.Request.Flags["AllowCookies"])
self._responseHeadersReadyCallback = callback
self._responseHeadersReadyCallback.Continue()
def ProcessRequest(self, request, callback):
heads = request.GetHeaderMap()
request.SetFlags(
cef.Request.Flags["AllowCachedCredentials"] | cef.Request.Flags["AllowCookies"])
if request.GetUrl().startswith("chrome-extension"):
self._requestz = request
self._responseHeadersReadyCallback = callback
self._webRequestClient = WebRequestClient()
self._webRequestClient._resourceHandler = self
#request.SetFlags(cef.Request.Flags["AllowCachedCredentials"] | cef.Request.Flags["AllowCookies"])
self._webRequest = cef.WebRequest.Create(
request, self._webRequestClient)
return True
elif not request.GetUrl().startswith("http"):
print("URL MALFORMED: " + request.GetUrl())
return False
else:
self._responseHeadersReadyCallback = callback
#cv = CookView()
#self._cm.VisitUrlCookies(request.GetUrl(), True, cv)
#cooks = cv.getGookieHead()
#print(" ** COOKS **")
# if len(cooks)>0:
# print( cooks )
# #heads["Cookie"] = cooks
if self.proxy:
http = urllib3.ProxyManager(self.proxy)
else:
http = urllib3.PoolManager()
self._requestz = request
self._heads = heads
t = Thread(target=self.go, args=(http, self, heads, request, callback))
t.start()
return True
def GetResponseHeaders(self, response, responseLengthOut, redirectUrlOut):
if self._requestz.GetUrl().startswith("chrome-extension"):
assert self._webRequestClient._response, "Response object empty"
wrcResponse = self._webRequestClient._response
response.SetStatus(wrcResponse.GetStatus())
response.SetStatusText(wrcResponse.GetStatusText())
response.SetMimeType(wrcResponse.GetMimeType())
if wrcResponse.GetHeaderMultimap():
response.SetHeaderMultimap(wrcResponse.GetHeaderMultimap())
responseLengthOut[0] = self._webRequestClient._dataLength
if not responseLengthOut[0]:
# Probably a cached page? Or a redirect?
pass
return
else:
wrcResponse = self._response
location = None
# if wrcResponse.status>=300 and wrcResponse.status<400:
try:
if "location" in wrcResponse.headers:
location = wrcResponse.headers["location"]
elif "Location" in wrcResponse.headers["Location"]:
location = wrcResponse.headers["Location"]
if location is not None:
if location == "." or location == "":
location = self._requestz.GetUrl()
else:
locres = urlparse.urlparse(location)
urlres = urlparse.urlparse(self._requestz.GetUrl())
if locres.netloc == "":
if locres.path[0] == "/":
location = urlres.scheme + "://" + urlres.netloc + location
else:
sep = "/"
if location[0] == "/" or self._requestz.GetUrl()[-1] == "/":
sep = ""
location = self._requestz.GetUrl() + sep + location
except:
pass
if location is not None:
redirectUrlOut[0] = location
response.SetStatus(wrcResponse.status)
response.SetStatusText(wrcResponse.reason)
ct = ct0 = "none"
if wrcResponse.headers.get("Content-Type") is not None:
ct0 = ct = wrcResponse.headers.get(
"Content-Type").split(";")[0]
# wrcResponse.headers.get("Content-Type"))
response.SetMimeType(ct)
response.SetHeaderMultimap(wrcResponse.headers.items())
responseLengthOut[0] = self._datalen # len(self._data)
def ReadResponse(self, data_out, bytes_to_read, bytes_read_out, callback):
if self._requestz.GetUrl().startswith("chrome-extension"):
if self._offsetRead < self._webRequestClient._dataLength:
dataChunk = self._webRequestClient._data[self._offsetRead:(
self._offsetRead + bytes_to_read)]
self._offsetRead += len(dataChunk)
data_out[0] = dataChunk
bytes_read_out[0] = len(dataChunk)
return True
self._clientHandler._ReleaseStrongReference(self)
return False
else:
if self._datalen < 0:
print("DATA LEN UNKNOW")
dataChunk = self._response.read(bytes_to_read)
if len(dataChunk) > 0:
bytes_read_out[0] = len(dataChunk)
data_out[0] = dataChunk
self._offsetRead += len(dataChunk)
return True
else:
bytes_read_out[0] = len(dataChunk)
data_out[0] = dataChunk
self._response.release_conn()
self._clientHandler._ReleaseStrongReference(self)
return False
elif self._offsetRead < self._datalen:
dataChunk = self._data[self._offsetRead:(
self._offsetRead + bytes_to_read)]
self._offsetRead += len(dataChunk)
data_out[0] = dataChunk
bytes_read_out[0] = len(dataChunk)
return True
self._clientHandler._ReleaseStrongReference(self)
return False
def CanGetCookie(self, cookie):
return True
def CanSetCookie(self, cookie):
return True
def Cancel(self):
pass
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment