Skip to content

Instantly share code, notes, and snippets.

@massimiliano-dalcero
Last active February 22, 2022 07:53
Show Gist options
  • Save massimiliano-dalcero/0252a3c5dc54a2f81650d5c0eafabf99 to your computer and use it in GitHub Desktop.
Save massimiliano-dalcero/0252a3c5dc54a2f81650d5c0eafabf99 to your computer and use it in GitHub Desktop.
# CEFPYTHON
# example of how to do a "connection" overriding using an
# external library (eg: urllib3) instead of internal netwkork
# library of chromium
#
# Massimiliano Dal Cero - max<A_T>yatta<D_O_T>it
from cefpython3 import cefpython as cef
import platform
import sys
import OpenSSL
import urllib3
import urllib3.contrib.pyopenssl
import re
import urlparse
from threading import Thread
urllib3.contrib.pyopenssl.inject_into_urllib3()
urllib3.disable_warnings()
http = urllib3.PoolManager()
BROWSER_DEFAULT_ENCODING = "UTF-8"
class WebRequestClient:
_resourceHandler = None
_data = ""
_dataLength = -1
_response = None
def OnUploadProgress(self, web_request, current, total):
pass
def OnDownloadProgress(self, web_request, current, total):
pass
def OnDownloadData(self, web_request, data):
# #print("OnDownloadData()")
self._data += data
def OnRequestComplete(self, web_request):
# cefpython.WebRequest.Status = {"Unknown", "Success",
# "Pending", "Canceled", "Failed"}
statusText = "Unknown"
if web_request.GetRequestStatus() in cef.WebRequest.Status:
statusText = cef.WebRequest.Status[\
webRequest.GetRequestStatus()]
# Emulate OnResourceResponse() in ClientHandler:
self._response = web_request.GetResponse()
# Are webRequest.GetRequest() and
# self._resourceHandler._request the same? What if
# there was a redirect, what will GetUrl() return
# for both of them?
self._data = self._resourceHandler._clientHandler._OnResourceResponse(
self._resourceHandler._browser,
self._resourceHandler._frame,
web_request.GetRequest(),
web_request.GetRequestStatus(),
web_request.GetRequestError(),
web_request.GetResponse(),
self._data)
self._dataLength = len(self._data)
# ResourceHandler.GetResponseHeaders() will get called
# after _responseHeadersReadyCallback.Continue() is called.
self._resourceHandler._responseHeadersReadyCallback.Continue()
class ClientHandler:
def GetResourceHandler(self, browser, frame, request):
resHandler = ResourceHandler()
resHandler._clientHandler = self
resHandler._browser = browser
resHandler._frame = frame
resHandler._request = request
#resHandler._cm = self.cm
self._AddStrongReference(resHandler)
return resHandler
#def _OnResourceResponse(self, request, response, data):
def _OnResourceResponse(self, browser, frame, request, requestStatus,
requestError, response, data):
return data
_resourceHandlers = {}
_resourceHandlerMaxId = 0
def _AddStrongReference(self, resHandler):
self._resourceHandlerMaxId += 1
resHandler._resourceHandlerId = self._resourceHandlerMaxId
self._resourceHandlers[resHandler._resourceHandlerId] = resHandler
def _ReleaseStrongReference(self, resHandler):
if resHandler._resourceHandlerId in self._resourceHandlers:
del self._resourceHandlers[resHandler._resourceHandlerId]
else:
#print("_ReleaseStrongReference() FAILED: resource handler not found, id = %s" % (resHandler._resourceHandlerId))
pass
class CookView():
def __init__(self):
self.cookies = []
def getGookieHead(self):
return "; ".join((str(i[0])+"="+str(i[1])) for i in self.cookies)
def Visit(self, cookie, count, total, delete_cookie_out):
self.cookies.append( (cookie.GetName(), cookie.GetValue()) )
return True
class ResourceHandler:
_responseHeadersReadyCallback = None
_offsetRead = 0
_data = None
_heads = None
_datalen = -1
def go(self, ref, heads, request,callback):
send_data = request.GetPostData()
if request.GetMethod() == "GET" or request.GetMethod() == "HEAD" or request.GetMethod() == "DELETE":
r = http.request_encode_url(request.GetMethod() ,request.GetUrl(),fields=send_data, headers=heads, redirect=False,preload_content=False)
elif ( request.GetMethod() == "POST" and heads["Content-Type"] == "application/x-www-form-urlencoded" ) or request.GetMethod() == "PUT" or request.GetMethod() == "PATCH":
r = http.request_encode_body(request.GetMethod() ,request.GetUrl(), fields=send_data, headers=heads, redirect=False, encode_multipart=False,preload_content=False)
elif ( request.GetMethod() == "POST" and "multipart" in heads["Content-Type"] ):
r = http.request(request.GetMethod() ,request.GetUrl(), fields=send_data, headers=heads, redirect=False,preload_content=False)
else:
r = http.request_encode_url(request.GetMethod() ,request.GetUrl(),fields=send_data, headers=heads, redirect=False,preload_content=False)
self._response = r
self._datalen = len(r.data)
self._data = r.data
self._datastream = r.stream
request.SetFlags(cef.Request.Flags["AllowCachedCredentials"] | cef.Request.Flags["AllowCookies"])
self._responseHeadersReadyCallback = callback
self._responseHeadersReadyCallback.Continue()
def ProcessRequest(self, request, callback):
heads = request.GetHeaderMap()
request.SetFlags(cef.Request.Flags["AllowCachedCredentials"] | cef.Request.Flags["AllowCookies"])
if request.GetUrl().startswith("chrome-extension"):
self._requestz = request
self._responseHeadersReadyCallback = callback
self._webRequestClient = WebRequestClient()
self._webRequestClient._resourceHandler = self
#request.SetFlags(cef.Request.Flags["AllowCachedCredentials"] | cef.Request.Flags["AllowCookies"])
self._webRequest = cef.WebRequest.Create(request, self._webRequestClient)
return True
elif not request.GetUrl().startswith("http"):
print("URL MALFORMED: " + request.GetUrl())
return False
else:
self._responseHeadersReadyCallback = callback
#cv = CookView()
#self._cm.VisitUrlCookies(request.GetUrl(), True, cv)
#cooks = cv.getGookieHead()
#print(" ** COOKS **")
#if len(cooks)>0:
# print( cooks )
# #heads["Cookie"] = cooks
self._requestz = request
self._heads = heads
t = Thread(target=self.go,args=(self,heads,request,callback))
t.start()
return True
def GetResponseHeaders(self, response, responseLengthOut, redirectUrlOut):
if self._requestz.GetUrl().startswith("chrome-extension"):
assert self._webRequestClient._response, "Response object empty"
wrcResponse = self._webRequestClient._response
response.SetStatus(wrcResponse.GetStatus())
response.SetStatusText(wrcResponse.GetStatusText())
response.SetMimeType(wrcResponse.GetMimeType())
if wrcResponse.GetHeaderMultimap():
response.SetHeaderMultimap(wrcResponse.GetHeaderMultimap())
responseLengthOut[0] = self._webRequestClient._dataLength
if not responseLengthOut[0]:
# Probably a cached page? Or a redirect?
pass
return
else:
wrcResponse = self._response
location=None
#if wrcResponse.status>=300 and wrcResponse.status<400:
try:
if "location" in wrcResponse.headers:
location = wrcResponse.headers["location"]
elif "Location" in wrcResponse.headers["Location"]:
location = wrcResponse.headers["Location"]
if location is not None:
if location == "." or location == "":
location = self._requestz.GetUrl()
else:
locres = urlparse.urlparse(location)
urlres = urlparse.urlparse(self._requestz.GetUrl())
if locres.netloc == "":
if locres.path[0] == "/":
location = urlres.scheme + "://" + urlres.netloc + location
else:
sep = "/"
if location[0] == "/" or self._requestz.GetUrl()[-1] == "/":
sep = ""
location = self._requestz.GetUrl() + sep + location
except:
pass
if location is not None:
redirectUrlOut[0] = location
response.SetStatus(wrcResponse.status)
response.SetStatusText(wrcResponse.reason)
ct = ct0 = "none"
if wrcResponse.headers.get("Content-Type") is not None:
ct0 = ct = wrcResponse.headers.get("Content-Type").split(";")[0]
response.SetMimeType(ct) #wrcResponse.headers.get("Content-Type"))
response.SetHeaderMultimap(wrcResponse.headers.items())
responseLengthOut[0] = self._datalen #len(self._data)
def ReadResponse(self, data_out, bytes_to_read, bytes_read_out, callback):
if self._requestz.GetUrl().startswith("chrome-extension"):
if self._offsetRead < self._webRequestClient._dataLength:
dataChunk = self._webRequestClient._data[self._offsetRead:(self._offsetRead + bytes_to_read)]
self._offsetRead += len(dataChunk)
data_out[0] = dataChunk
bytes_read_out[0] = len(dataChunk)
return True
self._clientHandler._ReleaseStrongReference(self)
return False
else:
if self._datalen < 0:
print("DATA LEN UNKNOW")
dataChunk = self._response.read(bytes_to_read)
if len(dataChunk) > 0:
bytes_read_out[0] = len(dataChunk)
data_out[0] = dataChunk
self._offsetRead += len(dataChunk)
return True
else:
bytes_read_out[0] = len(dataChunk)
data_out[0] = dataChunk
self._response.release_conn()
self._clientHandler._ReleaseStrongReference(self)
return False
elif self._offsetRead < self._datalen:
dataChunk = self._data[self._offsetRead:(self._offsetRead + bytes_to_read)]
self._offsetRead += len(dataChunk)
data_out[0] = dataChunk
bytes_read_out[0] = len(dataChunk)
return True
self._clientHandler._ReleaseStrongReference(self)
return False
def CanGetCookie(self, cookie):
return True
def CanSetCookie(self, cookie):
return True
def Cancel(self):
pass
pass
def main():
check_versions()
sys.excepthook = cef.ExceptHook # To shutdown all CEF processes on error
conf = {
"context_menu": {
"enabled":False
},
#"user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.3202.75 Safari/537.36",
"cache_path": "/Users/max/cache/",
"persist_session_cookies": True
}
cef.Initialize(settings=conf)
#url="https://www.whoishostingthis.com/tools/user-agent/"
url="https://www.facebook.com"
#url="https://www.whatismybrowser.com/detect/are-cookies-enabled"
#url="http://www.html-kit.com/tools/cookietester/"
browser = cef.CreateBrowserSync(url=url, window_title="UrlLib3")
#cm = cef.CookieManager.GetGlobalManager()
clientHandler = ClientHandler()
browser.SetClientHandler(clientHandler)
cef.MessageLoop()
cef.Shutdown()
def check_versions():
#print("[hello_world.py] CEF Python {ver}".format(ver=cef.__version__))
#print("[hello_world.py] Python {ver} {arch}".format(
# ver=platform.python_version(), arch=platform.architecture()[0]))
assert cef.__version__ >= "55.3", "CEF Python v55.3+ required to run this"
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment