Skip to content

Instantly share code, notes, and snippets.

@ChenyangGao
Created June 20, 2023 06:15
Show Gist options
  • Save ChenyangGao/97fcf3b826c10339c9c7ffdf72e6c5b3 to your computer and use it in GitHub Desktop.
Save ChenyangGao/97fcf3b826c10339c9c7ffdf72e6c5b3 to your computer and use it in GitHub Desktop.
This module supports the convenient use of `pip` in code, not just on the command line.
#!/usr/bin/env python3
# coding: utf-8
# Reference:
# https://docs.python.org/3/installing/index.html
# https://packaging.python.org/tutorials/installing-packages/
# https://pip.pypa.io/en/stable/
__author__ = 'ChenyangGao <https://chenyanggao.github.io/>'
__version__ = (0, 0, 6)
__all__ = [
'check_pip', 'install_pip_by_ensurepip', 'install_pip_by_getpip',
'install_pip', 'execute_pip', 'execute_pip_in_child_process', 'install',
'uninstall', 'check_install', 'check_uninstall', 'ensure_import',
]
import platform
import subprocess
from sys import executable
from tempfile import NamedTemporaryFile
from typing import Final, Iterable, List, Sequence, Union
from types import ModuleType
from urllib.parse import urlsplit
from urllib.request import urlopen
# When using subprocess.run on Windows, you should specify shell=True
_PLATFORM_IS_WINDOWS: Final[bool] = platform.system() == 'Windows'
## The following two may be redundant
# INDEX_URL: Base URL of the Python Package Index (default https://pypi.org/simple).
# This should point to a repository compliant with PEP 503 (the simple repository API)
# or a local directory laid out in the same format.
INDEX_URL: str = 'https://mirrors.aliyun.com/pypi/simple/'
# TRUSTED_HOST: Mark this host or host:port pair as trusted,
# even though it does not have valid or any HTTPS.
TRUSTED_HOST: str = 'mirrors.aliyun.com'
def _update_index_url(val: str):
globals()['INDEX_URL'] = val
install.__kwdefaults__['index_url'] = val # type: ignore
split_result = urlsplit(val)
if split_result.scheme == 'https':
_update_trusted_host(split_result.netloc)
def _update_trusted_host(val: str):
globals()['TRUSTED_HOST'] = val
install.__kwdefaults__['trusted_host'] = val # type: ignore
if _PLATFORM_IS_WINDOWS:
import site as _site
from os import path as _path
if not hasattr(_site, 'PREFIXES'):
_site.PREFIXES = [__import__('sys').prefix, __import__('sys').exec_prefix]
_site.ENABLE_USER_SITE = True
_libpath = _path.dirname(_site.__file__)
if not hasattr(_site, 'USER_BASE'):
_site.USER_BASE = _path.dirname(_libpath)
if not hasattr(_site, 'USER_SITE'):
_site.USER_SITE = _path.join(_libpath, 'site-packages')
del _site, _path, _libpath
def check_pip(ensure: bool = True) -> bool:
'Check if the `pip` package is installed.'
try:
# Check whether the `pip` package can be imported
import pip # type: ignore
except ImportError:
# If the `pip` package can't be imported, there may be reasons why it can't be installed
try:
## Check configurations for `site-packages`
# USER_BASE: Path to the base directory for the user site-packages.
# [site.USER_BASE](https://docs.python.org/3/library/site.html#site.USER_BASE)
# USER_SITE: Path to the user site-packages for the running Python.
# [site.USER_SITE](https://docs.python.org/3/library/site.html#site.USER_SITE)
# NOTE: I found that the following file has a function `create_site_py`, `create_site_py` creates a site.py,
# it is the actual imported `site` module in Windows platform, but there are lot of missing things:
# https://github.com/Sigil-Ebook/Sigil/blob/master/src/Resource_Files/python_pkg/windows_python_gather.py
from site import USER_BASE, USER_SITE
# TODO: I need to confirm whether the `site` built-in module exists in Windows platform,
# if so, I can find out the values of `USER_BASE` and `USER_SITE` that are not missing,
# otherwise, I may try to construct available values for `USER_BASE` and `USER_SITE`.
except ImportError:
print('''Defective Python executable detected.
Please replace current Python executable with another Python executable with `pip` package
or replace with another Python executable which can install `pip` package
(the `site` module defines available `USER_BASE` and `USER_SITE`).
Python official download address: https://www.python.org/downloads/
Tips: If you installed Python from source, with an installer from python.org,
you should already have `pip`. If you installed using your OS package manager,
`pip` may have been installed, or you can install separately by the same package manager.''')
return False
else:
if not ensure:
return False
try:
install_pip()
except:
return False
return True
def install_pip_by_ensurepip(*args: str) -> None:
'''Install `pip` package using `ensurepip` package.
Reference:
- https://docs.python.org/3/library/ensurepip.html
- https://packaging.python.org/tutorials/installing-packages/#ensure-you-can-run-pip-from-the-command-line
'''
from ensurepip import _main # type: ignore
if not _main(list(args)):
raise RuntimeError
def install_pip_by_getpip(
*args: str,
check: bool = False,
executable: str = executable,
) -> subprocess.CompletedProcess:
'''Install `pip` package using bootstrapping script.
Reference:
- https://bootstrap.pypa.io/get-pip.py
- https://packaging.python.org/tutorials/installing-packages/#ensure-you-can-run-pip-from-the-command-line
'''
with NamedTemporaryFile(mode='wb', suffix='.py') as f:
f.write(b'''\
#!/usr/bin/env python
if platform.system() == 'Windows':
import site as _site
from os import path as _path
if not hasattr(_site, 'PREFIXES'):
_site.PREFIXES = [__import__('sys').prefix, __import__('sys').exec_prefix]
_site.ENABLE_USER_SITE = True
_libpath = _path.dirname(_site.__file__)
if not hasattr(_site, 'USER_BASE'):
_site.USER_BASE = _path.dirname(_libpath)
if not hasattr(_site, 'USER_SITE'):
_site.USER_SITE = _path.join(_libpath, 'site-packages')
del _site, _path, _libpath
''')
response = urlopen(
'https://bootstrap.pypa.io/get-pip.py',
context=__import__('ssl')._create_unverified_context()
)
f.write(response.read())
f.flush()
return subprocess.run([executable, f.name, *args],
check=check, shell=_PLATFORM_IS_WINDOWS)
def install_pip(executable: str = executable) -> None:
'Install `pip` package.'
try:
# https://docs.python.org/3/installing/index.html#pip-not-installed
install_pip_by_ensurepip('--default-pip')
except:
args: List[str] = []
index_url = globals().get('INDEX_URL')
trusted_host = globals().get('TRUSTED_HOST')
if index_url:
args.extend(('-i', index_url))
if not trusted_host:
trusted_host = urlsplit(index_url).netloc
if trusted_host:
args.extend(('--trusted-host', trusted_host))
install_pip_by_getpip(*args, check=True, executable=executable)
def execute_pip(args: Sequence[str]):
'execute pip in same thread'
from pip._internal import main
return main(list(args))
def execute_pip_in_child_process(
args: Union[str, Iterable[str]],
executable: str = executable,
) -> subprocess.CompletedProcess:
'execute pip in child process'
command: Union[str, list]
if isinstance(args, str):
command = '"%s" -m pip %s' % (executable, args)
return subprocess.run(command, shell=True)
else:
command = [executable, '-m', 'pip', *args]
return subprocess.run(command, shell=_PLATFORM_IS_WINDOWS)
def install(
package: str,
/,
*other_packages: str,
upgrade: bool = False,
index_url: str = INDEX_URL,
trusted_host: str = TRUSTED_HOST,
other_args: Iterable[str] = (),
new_process: bool = False,
) -> None:
'install package with pip'
cmd = ['install']
if index_url:
cmd.extend(('-i', index_url))
if not trusted_host:
trusted_host = urlsplit(index_url).netloc
if trusted_host:
cmd.extend(('--trusted-host', trusted_host))
if upgrade:
cmd.append('--upgrade')
cmd.extend(other_args)
cmd.append(package)
if other_packages:
cmd.extend(other_packages)
if new_process:
execute_pip_in_child_process(cmd)
else:
execute_pip(cmd)
def uninstall(
package: str,
/,
*other_packages: str,
other_args: Iterable[str] = ('--yes',),
new_process: bool = False,
) -> None:
'uninstall package with pip'
cmd = ['uninstall', *other_args, package, *other_packages]
if new_process:
execute_pip_in_child_process(cmd)
else:
execute_pip(cmd)
def check_install(
module: str,
depencies: Union[None, str, Iterable[str]]= None,
) -> None:
'Import the `module`, if it does not exist, try to install the `depencies`'
try:
__import__(module)
except ModuleNotFoundError:
if depencies is None:
depencies = module,
elif isinstance(depencies, str):
depencies = depencies,
install(*depencies)
def check_uninstall(
module: str,
depencies: Union[None, str, Iterable[str]]= None,
) -> None:
'Import the `module`, if it exists, try to uninstall the `depencies`'
try:
__import__(module)
if depencies is None:
depencies = module,
elif isinstance(depencies, str):
depencies = depencies,
uninstall(*depencies)
except ModuleNotFoundError:
pass
def ensure_import(
module: str,
depencies: Union[None, str, Iterable[str]]= None,
) -> ModuleType:
'''Import the `module`, if it does not exist, try to install the `depencies`,
and then import it again.'''
check_install(module, depencies)
return __import__(module)
check_pip()
if __name__ == '__main__':
execute_pip(__import__('sys').argv[1:])
@ChenyangGao
Copy link
Author

ChenyangGao commented Jun 20, 2023

Through recent practice, I have rewritten a similar module that is more concise in implementation and has added some new features.

#!/usr/bin/env python3
# coding: utf-8

"""This module supports the convenient use of `pip` in code, not just on the command line.
"""

__author__ = "ChenyangGao <https://chenyanggao.github.io/>"
__version__ = (0, 0, 1)
__all__ = [
    "ensurepip", "usepip", "pip_install", "pip_uninstall", "ensure_import", 
    "iter_packages", "iter_uptodate_packages", "iter_outdated_packages", 
    "iter_latest_info", "update_all_packages", 
]


def ensurepip() -> None:
    """Make sure that the `pip` is installed. If not, use the `ensurepip` module 
    for download and installation."""
    try:
        import pip
    except ModuleNotFoundError:
        try:
            _run_ensurepip()
            import pip
        except:
            _run_getpip()
            import pip

ensurepip()


from importlib import import_module
from pip._internal.cli import base_command
from pip._internal.commands import create_command
from pip._internal.commands.list import ListCommand
from pip._internal.metadata import get_environment
from pip._internal.metadata.pkg_resources import BaseDistribution
from pip._internal.models.candidate import InstallationCandidate
from pip._internal.utils.compat import stdlib_pkgs
from pip._internal.utils.temp_dir import global_tempdir_manager, tempdir_registry
from pip._vendor.packaging.utils import canonicalize_name
from platform import system
from subprocess import CompletedProcess, run as sprun
from sys import executable
from types import ModuleType
from typing import cast, Final, Iterable, Iterator, NamedTuple, Optional
from urllib.error import URLError
from urllib.parse import urlparse
from urllib.request import urlopen


_PLATFORM_IS_WINDOWS: Final[bool] = system() == 'Windows'


def _run_ensurepip(new_process: bool = True):
    """
    """
    if new_process:
        sprun([executable, '-m', 'ensurepip'], check=True, shell=_PLATFORM_IS_WINDOWS)
    else:
        import ensurepip
        ensurepip._main() # type: ignore


def _run_getpip():
    """
    """
    # See:
    # - https://pip.pypa.io/en/stable/installation/
    # - https://github.com/pypa/get-pip
    url = "https://bootstrap.pypa.io/get-pip.py"
    try:
        script = urlopen(url, timeout=3).read()
        sprun([executable, "-c", script.decode("utf-8")], check=True, shell=_PLATFORM_IS_WINDOWS)
    except URLError:
        # For people who living in Chinese Mainland
        url = "https://ghproxy.com/https://raw.githubusercontent.com/pypa/get-pip/main/public/get-pip.py"
        script = urlopen(url, timeout=3).read()
        sprun([executable, "-c", script.decode("utf-8"), "-i", "https://pypi.tuna.tsinghua.edu.cn/simple/"], 
              check=True, shell=_PLATFORM_IS_WINDOWS)


def usepip(command: str, *args: str, new_process: bool = True):
    "Use the `pip` command with the same arguments as the command line."
    ensurepip()
    if new_process:
        sprun([executable, '-m', 'pip', command, *args], check=True, shell=_PLATFORM_IS_WINDOWS)
    else:
        cmd = create_command(command)
        args_ = list(args)
        options, args_ = cmd.parse_args(args_)
        base_command.setup_logging(
            verbosity     = options.verbose,
            no_color      = options.no_color,
            user_log_file = options.log,
        )
        with cmd.main_context():
            cmd.tempdir_registry = cmd.enter_context(tempdir_registry())
            cmd.enter_context(global_tempdir_manager())
            cmd.run(options, args_)


def pip_install(
    *args_or_packages: str, 
    upgrade: bool = False, 
    index_url: Optional[str] = None, 
    trusted_host: Optional[str] = None, 
    new_process: bool = True, 
) -> CompletedProcess:
    """Use the `pip install` command with the same arguments as the command line.
    """
    args = ["install"]
    if upgrade:
        args.append("-U")
    if index_url:
        args.extend(('-i', index_url))
        if index_url.startswith("http://") and not trusted_host:
            trusted_host = urlparse(index_url).netloc
        if trusted_host:
            args.extend(('--trusted-host', trusted_host))
    return usepip(*args, *args_or_packages, new_process=new_process)


def pip_uninstall(*args_or_packages: str, new_process: bool = True) -> CompletedProcess:
    """Use the `pip uninstall -y` command with the same arguments as the command line.
    """
    return usepip('uninstall', '-y', *args_or_packages, new_process=new_process)


def ensure_import(
    module: str, 
    package: Optional[str] = None, 
    pip_package: Optional[str] = None, 
) -> ModuleType:
    """Use `importlib.import_module` to import the `module`. If the `module` is missing, 
    use `pip` to download and install it, then try importing it again.

    :params module: Module name to import.
    :params package: The `package` argument is required when performing a relative import 
        (If the `module` starts with a dot '.'). It specifies the package to use as the 
        anchor point from which to resolve the relative import to an absolute import.
    :params pip_package: When the `module` is missing, the name of the module to be installed. 
        If it is None, it is determined based on the arguments `module` and `package`.

    :return: Return the return value of `importlib.import_module(module, package)`.
    """
    if pip_package is None:
        if module.startswith('.'):
            if package:
                pip_package = package.partition('.')[0]
        else:
            pip_package = module.partition('.')[0]
    try:
        return import_module(module, package)
    except ModuleNotFoundError:
        if not pip_package:
            raise
        pip_install(cast(str, pip_package))
        return import_module(module, package)


def iter_packages(
    list_command_args: list[str] = [], 
    get_latest_version: bool = False, 
) -> Iterator[BaseDistribution]:
    """Iterate over the distribution info of all installed packages.
    """
    cmd = cast(ListCommand, create_command('list'))
    options, _ = cmd.parse_args(list_command_args)
    skip = set(stdlib_pkgs)
    if options.excludes:
        skip.update(map(canonicalize_name, options.excludes))
    packages = get_environment(options.path).iter_installed_distributions(
        local_only        = options.local,
        user_only         = options.user,
        editables_only    = options.editable,
        include_editables = options.include_editable,
        skip              = skip,
    )
    if get_latest_version:
        packages = cmd.iter_packages_latest_infos(packages, options) # type: ignore
    return packages


def iter_uptodate_packages(
    list_command_args: list[str] = [], 
) -> Iterator[BaseDistribution]:
    """Iterate over the distribution info of all installed and updated packages.
    """
    for dist in iter_packages(get_latest_version=True):
        if dist.version >= dist.latest_version: # type: ignore
            yield dist


def iter_outdated_packages(
    list_command_args: list[str] = [], 
) -> Iterator[BaseDistribution]:
    """Iterate over the distribution info of all installed but not updated packages.
    """
    for dist in iter_packages(get_latest_version=True):
        if dist.version < dist.latest_version: # type: ignore
            yield dist


class LatestInfo(NamedTuple):
    canonical_name: str
    latest_info: Optional[InstallationCandidate]


def iter_latest_info(
    canonical_names: Iterable[str], 
    list_command_args: list[str] = [], 
) -> Iterator[LatestInfo]:
    """Iterate over the latest info of all installed packages.
    """
    cmd = cast(ListCommand, create_command('list'))
    options, _ = cmd.parse_args(list_command_args)
    with cmd._build_session(options) as session:
        finder = cmd._build_package_finder(options, session)

        def latest_info(canonical_name):
            all_candidates = finder.find_all_candidates(canonical_name)
            if not options.pre:
                # Remove prereleases
                all_candidates = [
                    candidate
                    for candidate in all_candidates
                    if not candidate.version.is_prerelease
                ]
            evaluator = finder.make_candidate_evaluator(project_name=canonical_name)
            return evaluator.sort_best_candidate(all_candidates)

        for canonical_name in canonical_names:
            yield LatestInfo(canonical_name, latest_info(canonical_name))


def update_all_packages(
    check_outdated: bool = False, 
    list_command_args: list[str] = [], 
    install_command_args: list[str] = [], 
) -> None:
    """Update all installed packages.
    """
    if check_outdated:
        packages = iter_outdated_packages(list_command_args)
    else:
        packages = iter_packages(list_command_args)
    for dist in packages:
        # NOTE: `dist.raw_name` is still available.
        pip_install(*install_command_args, dist.canonical_name, upgrade=True)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment