Last active
July 14, 2023 09:20
-
-
Save Hansimov/a9763c1f1ea76fd014e1164f0e6a8917 to your computer and use it in GitHub Desktop.
Pip install local wheels
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import copy | |
import math | |
from pathlib import Path | |
import re | |
import requests | |
import subprocess | |
import sys | |
import os | |
import platform | |
def shell_cmd(cmd, getoutput=False): | |
print(f'\n$ [{os.getcwd()}]\n $ {cmd}\n') | |
if getoutput: | |
output = subprocess.getoutput(cmd) | |
print(output) | |
return output | |
else: | |
subprocess.run(cmd, shell=True) | |
class WheelDownloader: | |
def __init__(self, package, whl_path="."): | |
self.package = package | |
self.whl_path = whl_path | |
self.seg_str = "=" * 80 | |
self.sep_str = "-" * 40 | |
self.package_url = f"https://pypi.org/project/{self.package}/#files" | |
self.req_proxies = { # Note: Here you need to customize to your personal workable proxies. | |
"https" : "http://proxy-***.<corp>.com:913", | |
"http" : "http://proxy-***.<corp>.com:913", | |
} | |
self.req_headers = { | |
"user-agent": "Chrome", | |
"referer": f"http://pypi.org/project/{self.package}" | |
} | |
def print_whl(self, whl, indent=4): | |
idx, name, url = whl | |
print(f"{' '*indent}{idx:>{self.whl_num_digits}} - {name}") | |
def fetch_built_whls(self): | |
print(self.seg_str) | |
print(f"[*] Fetching wheels for package [{self.package}]:") | |
req = requests.get(self.package_url, headers=self.req_headers, proxies=self.req_proxies) | |
pattern = re.compile('<a href="(.*whl)">([\s\S]*?)</a>') | |
res = pattern.findall(req.text) | |
self.whl_num_digits = int(math.log10(len(res))) + 1 | |
# print(self.sep_str) | |
self.built_whl_list = [] | |
for idx, item in enumerate(res): | |
name, url = item[1].strip(), item[0] | |
self.built_whl_list.append([idx, name, url]) | |
self.print_whl([idx, name, url]) | |
def suggest_whls_to_download(self): | |
py_version_str = ".".join(list(map(str,sys.version_info[0:3]))) | |
cp_str = f"cp{sys.version_info.major}{sys.version_info.minor}" | |
os_str = f"{platform.system()} {platform.release()}" | |
machine_str = platform.machine().lower() | |
print(self.seg_str) | |
print(f"[*] Suggested wheels to donwload for you environment:" | |
f" (Python {py_version_str}, {os_str}, {machine_str})" | |
) | |
# print(self.sep_str) | |
self.suggestted_whl_list = [] | |
for whl in self.built_whl_list: | |
idx, name = whl[0], whl[1] | |
if cp_str in name and machine_str in name: | |
self.suggestted_whl_list.append(whl) | |
self.print_whl(whl) | |
def select_download_whls(self): | |
print(self.seg_str) | |
info_str = f"[*] {len(self.built_whl_list)} built wheels found for package [{self.package}], please type the indexes to download: ('Enter' to use suggestted, 'x' to exit)" + "\n" | |
input_str = input(info_str).strip() | |
self.download_whl_list = [] | |
if not input_str: | |
print(self.seg_str) | |
print("[*] OK. Suggestted wheels will be donwloaded:") | |
# print(self.sep_str) | |
for whl in self.suggestted_whl_list: | |
self.print_whl(whl) | |
self.download_whl_list = copy.deepcopy(self.suggestted_whl_list) | |
return | |
selected_whl_idx_list = re.split(",|\s|;", input_str) | |
if len(selected_whl_idx_list)<=1 and "x" in selected_whl_idx_list[0]: | |
return | |
else: | |
selected_whl_idx_list = [int(idx) for idx in selected_whl_idx_list] | |
print("[*] OK. Selected wheels will be donwloaded:") | |
for idx in selected_whl_idx_list: | |
self.download_whl_list.append(self.built_whl_list[idx]) | |
# print(f" {self.built_whl_list[idx][0]} - {self.built_whl_list[idx][1]}" ) | |
self.print_whl(self.built_whl_list[idx]) | |
def download_whls(self): | |
if not os.path.exists(self.whl_path): | |
os.makedirs(self.whl_path) | |
print(self.seg_str) | |
for whl in self.download_whl_list: | |
idx, name, url = whl | |
download_path = str(Path(self.whl_path) / name).replace("\\", "/") | |
print(f"[*] Downloading: {download_path}") | |
req = requests.get(url, headers=self.req_headers, proxies=self.req_proxies) | |
with open(download_path, "wb") as wf: | |
wf.write(req.content) | |
def run(self): | |
self.fetch_built_whls() | |
self.suggest_whls_to_download() | |
self.select_download_whls() | |
self.download_whls() | |
# Todo | |
class WheelInstaller: | |
def __init__(self, package): | |
self.package = package | |
self.whl_path = "." | |
def check_admin(self): | |
pass | |
def get_local_whls(self): | |
self.files = [ | |
f | |
for f in os.listdir(self.whl_path) | |
if os.path.isfile(f) | |
] | |
self.local_whls = [] | |
for f in self.files: | |
basename = Path(f).stem | |
ext = Path(f).suffix | |
if ext == ".whl" and basename.lower().startswith(self.package.lower()): | |
self.local_whls.append(f) | |
def install_local_whl(self): | |
self.get_local_whls() | |
if not self.local_whls: | |
error_str = f"Cannot found local .whl for package: [{self.package}]" | |
# raise FileNotFoundError(error_str) | |
print(error_str) | |
wheel_downloader = WheelDownloader(self.package) | |
wheel_downloader.run() | |
elif len(self.local_whls) >= 2: | |
newline = "\n" | |
whl_items_str = "" | |
for idx, whl in enumerate(self.local_whls): | |
whl_items_str += f" {idx} - {whl}\n" | |
whl_num = len(self.local_whls) | |
warning_str = f"{whl_num} wheels found for package [{self.package}], please type the index(es) to install ('x' to exit):{newline}{whl_items_str}" | |
install_whl_idxs = re.split(", | ", input(warning_str)) | |
if len(install_whl_idxs)<=1 and install_whl_idxs[0] == "x": | |
return | |
else: | |
for idx in install_whl_idxs: | |
self.local_whl = self.local_whls[idx] | |
else: | |
self.local_whl = self.local_whls[0] | |
print(f"Installing: [{self.local_whl}]") | |
cmd_pip_install_whl = f"python -m pip install {self.local_whl}" | |
# * python - Getting realtime output using subprocess - Stack Overflow | |
# * https://stackoverflow.com/a/57970619 | |
process = subprocess.Popen( | |
cmd_pip_install_whl, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT, | |
shell=True, | |
encoding='utf-8', | |
) | |
dependency_package = None | |
while True: | |
realtime_output = process.stdout.readline().strip() | |
if realtime_output == "" and process.poll() is not None: | |
break | |
print(f"> {realtime_output}", flush=True) | |
pattern = re.compile("WARNING:.*/simple/(.*)/") | |
res = pattern.search(realtime_output) | |
if res: | |
dependency_package = res.group(1) | |
print(f"> Break installation for package: [{self.local_whl}]") | |
process.kill() | |
break | |
if dependency_package: | |
print(f">>> Dependency need to be installed: [{dependency_package}]") | |
# print(dependency_package) | |
new_wheel_installer = WheelInstaller(dependency_package) | |
new_wheel_installer.run() | |
def run(self): | |
self.check_admin() | |
self.install_local_whl() | |
wheel_installer = WheelInstaller("ipython") | |
wheel_installer.run() | |
# wheel_downloader = WheelDownloader("psutil", "whls") | |
# wheel_downloader.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment