Last active
June 4, 2021 00:49
-
-
Save IlluminatiFish/e0e6b46635633fc1eaeccc54adb15ba5 to your computer and use it in GitHub Desktop.
A set of functions that allows python developers to download/retrieve content from a specified URL to their own computer.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# This program is a utility used by myself that I have released | |
# to the public under the GPLv3 license | |
# | |
# Copyright (c) 2021 IlluminatiFish. | |
# | |
# This program is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, version 3. | |
# | |
# This program is distributed in the hope that it will be useful, but | |
# WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
# General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License | |
# along with this program. If not, see http://www.gnu.org/licenses/. | |
# | |
import socket, os, ssl | |
from urllib.parse import urlparse | |
HTTP_VERSION = '1.1' | |
def get_content(raw_content): | |
''' | |
Processes the raw content found on the web server. | |
:param raw_content: The raw content taken from the web server. | |
:returns: The DOM of the web server. | |
''' | |
content = raw_content.decode() | |
headers = content.split('\r\n\r\n', 1)[0] | |
dom_object = content.split('\r\n\r\n', 1)[1] | |
header_list = headers.splitlines() | |
http_status = int(header_list[:1][0].split(' ')[1]) | |
status = int(header_list[:1][0].split(' ')[1]) | |
if status >= 400: | |
if status == 505: | |
raise Exception(f'Web server does not support http version {HTTP_VERSION}') | |
else: | |
raise Exception(f'Web server hosting the content returned a {status} http error code') | |
return dom_object | |
def download_content(url, user_agent, save_path, save_name, segment_size, timeout): | |
''' | |
Download the content from the web server at :param url: | |
:param url: The url you are requesting the content from. | |
:param user_agent: The user agent used in the request | |
:param save_path: The path where you are saving the content to. | |
:param save_name: The name of the saved file. | |
:param segment_size: The size of each segment retrived from the web server's raw response. | |
:param timeout: The timeout for the base socket used in the connection. | |
:returns: None | |
''' | |
parsed_url = urlparse(url) | |
domain = parsed_url.netloc | |
path = parsed_url.path | |
if not parsed_url.scheme: | |
raise Exception('Did you enter a correct URL? Try adding a URL scheme like http or https') | |
ip = socket.gethostbyname(domain) | |
port = 80 if parsed_url.scheme == 'http' else 443 | |
if not parsed_url.path: | |
path = '/' | |
request = 'GET {} HTTP/{}\r\nHost: {}\r\nAccept: */*\r\nConnection: close\r\nUser-Agent: {}\r\n\r\n'.format(path, | |
HTTP_VERSION, | |
domain, | |
user_agent) | |
base_socket = socket.socket() | |
base_socket.settimeout(timeout) | |
raw_response = bytes("".encode()) | |
if parsed_url.scheme == 'http': | |
base_socket.connect((ip, port)) | |
base_socket.send(request.encode()) | |
while True: | |
try: | |
segment = base_socket.recv(segment_size) | |
except socket.timeout as ex: | |
print('[-] Socket receiver timed out after {} seconds'.format(timeout)) | |
break | |
if not segment: | |
break | |
raw_response += bytes(segment) | |
if parsed_url.scheme == 'https': | |
context = ssl.SSLContext() | |
ssl_socket = context.wrap_socket(base_socket, server_hostname=domain) | |
ssl_socket.connect((ip, port)) | |
ssl_socket.send(request.encode()) | |
while True: | |
try: | |
segment = ssl_socket.recv(segment_size) | |
except socket.timeout as ex: | |
print('[-] Socket receiver timed out after {} seconds'.format(timeout)) | |
break | |
if not segment: | |
break | |
raw_response += bytes(segment) | |
if not raw_response: | |
raise Exception( | |
f'Did not receive any raw response content, from the web server hosted on {ip}:{port} with domain {domain} and path {path}') | |
content = get_content(raw_response) | |
if not content: | |
raise Exception('Did not find any DOM content') | |
if os.path.isfile(f'{save_path}/{save_name}'): | |
raise Exception(f'File already exists with name {save_name} in path {save_path}!') | |
if not os.path.exists(f'{save_path}'): | |
os.makedirs(f'{save_path}') | |
src = open(f'{save_path}/{save_name}', 'a') | |
src.write(content) | |
src.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment