Source code for runhouse.resources.packages.package

import copy
import re
import sys
from pathlib import Path
from typing import Dict, Optional, Union

from runhouse.resources.envs.utils import install_conda, run_setup_command
from runhouse.resources.folders import Folder, folder
from runhouse.resources.hardware.cluster import Cluster
from runhouse.resources.hardware.utils import (
    _get_cluster_from,
    detect_cuda_version_or_cpu,
)
from runhouse.resources.resource import Resource
from runhouse.utils import (
    find_locally_installed_version,
    get_local_install_path,
    is_python_package_string,
    locate_working_dir,
)


INSTALL_METHODS = {"local", "reqs", "pip", "conda", "rh"}

from runhouse.logger import logger


class CodeSyncError(Exception):
    pass


[docs]class Package(Resource): RESOURCE_TYPE = "package" # https://pytorch.org/get-started/locally/ # Note: no binaries exist for 11.4 (https://github.com/pytorch/pytorch/issues/75992) TORCH_INDEX_URLS = { "11.3": "https://download.pytorch.org/whl/cu113", "11.5": "https://download.pytorch.org/whl/cu115", "11.6": "https://download.pytorch.org/whl/cu116", "11.7": "https://download.pytorch.org/whl/cu117", "11.8": "https://download.pytorch.org/whl/cu118", "12.1": "https://download.pytorch.org/whl/cu121", "cpu": "https://download.pytorch.org/whl/cpu", }
[docs] def __init__( self, name: str = None, install_method: str = None, install_target: Union[str, Folder] = None, install_args: str = None, dryrun: bool = False, **kwargs, # We have this here to ignore extra arguments when calling from from_config ): """ Runhouse Package resource. .. note:: To create a git package, please use the factory method :func:`package`. """ super().__init__( name=name, dryrun=dryrun, ) self.install_method = install_method self.install_target = install_target self.install_args = install_args
def config(self, condensed=True): # If the package is just a simple Package.from_string string, no # need to store it in rns, just give back the string. # if self.install_method in ['pip', 'conda', 'git']: # return f'{self.install_method}:{self.name}' config = super().config(condensed) config["install_method"] = self.install_method config["install_target"] = self._resource_string_for_subconfig( self.install_target, condensed ) config["install_args"] = self.install_args return config def __str__(self): if self.name: return f"Package: {self.name}" if isinstance(self.install_target, Folder): # if self.install_target.name: # return f'Package: {self.install_target.name}' return f"Package: {self.install_target.path}" return f"Package: {self.install_target}" @staticmethod def _prepend_python_executable( install_cmd: str, env: Union[str, "Env"] = None, cluster: "Cluster" = None ): return ( f"python3 -m {install_cmd}" if cluster or env else f"{sys.executable} -m {install_cmd}" ) @staticmethod def _prepend_env_command(install_cmd: str, env: Union[str, "Env"] = None): if env: from runhouse.resources.envs.utils import _get_env_from env = _get_env_from(env) install_cmd = env._full_command(install_cmd) return install_cmd def _validate_folder_path(self): # If self.path is the same as the user's home directory, raise an error. # Check this with Path and expanduser to handle both relative and absolute paths. if Path(self.install_target.path).expanduser() in [ Path("~").expanduser(), Path("/"), ]: raise CodeSyncError( "Cannot sync the home directory. Please include a Python configuration file in a subdirectory." ) def _pip_install_cmd( self, env: Union[str, "Env"] = None, cluster: "Cluster" = None ): install_args = f" {self.install_args}" if self.install_args else "" if isinstance(self.install_target, Folder): install_cmd = ( f"{str(Path(self.install_target.local_path).absolute())}" + install_args ) else: install_target = f'"{self.install_target}"' install_cmd = install_target + install_args install_cmd = f"pip install {self._install_cmd_for_torch(install_cmd, cluster)}" install_cmd = self._prepend_python_executable( install_cmd, cluster=cluster, env=env ) install_cmd = self._prepend_env_command(install_cmd, env=env) return install_cmd def _conda_install_cmd( self, env: Union[str, "Env"] = None, cluster: "Cluster" = None ): install_args = f" {self.install_args}" if self.install_args else "" if isinstance(self.install_target, Folder): install_cmd = f"{self.install_target.local_path}" + install_args else: install_cmd = self.install_target + install_args install_cmd = f"conda install -y {install_cmd}" install_cmd = self._prepend_env_command(install_cmd, env=env) install_conda(cluster=cluster) return install_cmd def _reqs_install_cmd( self, env: Union[str, "Env"] = None, cluster: "Cluster" = None ): install_args = f" {self.install_args}" if self.install_args else "" if not isinstance(self.install_target, Folder): install_cmd = self.install_target + install_args else: path = self.install_target.local_path # Read reqs_path and reqs from local if not cluster if not cluster: reqs_path = f"{path}/requirements.txt" if not Path(reqs_path).expanduser().exists(): return None with open(reqs_path) as f: reqs_list = f.readlines() # Otherwise, make sure the target folder is on the cluster, # and read reqs from the cluster else: if not self.install_target.system == cluster: install_target = self.to(cluster).install_target else: install_target = self.install_target if not install_target.exists_in_system(): return None elif "requirements.txt" not in install_target.ls(full_paths=False): return None reqs_list = ( install_target.get("requirements.txt", mode="r") .strip("\n") .split("\n") ) reqs_path = f"{install_target.path}/requirements.txt" install_cmd = self._reqs_install_cmd_for_torch( reqs_path, reqs_list, install_args, cluster=cluster ) install_cmd = f"pip install {install_cmd}" install_cmd = self._prepend_python_executable( install_cmd, env=env, cluster=cluster ) install_cmd = self._prepend_env_command(install_cmd, env=env) return install_cmd def _install(self, env: Union[str, "Env"] = None, cluster: "Cluster" = None): """Install package. Args: env (Env or str): Environment to install package on. If left empty, defaults to base environment. (Default: ``None``) cluster (Optional[Cluster]): If provided, will install package on cluster using SSH. """ logger.info(f"Installing {str(self)} with method {self.install_method}.") if isinstance(self.install_target, Folder): if not cluster: path = self.install_target.local_path elif self.install_target.exists_in_system(): path = self.install_target.path else: path = self.to(cluster).install_target.path if not path: return if self.install_method == "pip": install_cmd = self._pip_install_cmd(env=env, cluster=cluster) logger.info(f"Running via install_method pip: {install_cmd}") retcode = run_setup_command(install_cmd, cluster=cluster)[0] if retcode != 0: raise RuntimeError( f"Pip install {install_cmd} failed, check that the package exists and is available for your platform." ) elif self.install_method == "conda": install_cmd = self._conda_install_cmd(env=env, cluster=cluster) logger.info(f"Running via install_method conda: {install_cmd}") retcode = run_setup_command(install_cmd, cluster=cluster)[0] if retcode != 0: raise RuntimeError( f"Conda install {install_cmd} failed, check that the package exists and is " "available for your platform." ) elif self.install_method == "reqs": install_cmd = self._reqs_install_cmd(env=env, cluster=cluster) if install_cmd: logger.info(f"Running via install_method reqs: {install_cmd}") retcode = run_setup_command(install_cmd, cluster=cluster)[0] if retcode != 0: raise RuntimeError( f"Reqs install {install_cmd} failed, check that the package exists and is available for your platform." ) else: logger.info(f"{path}/requirements.txt not found, skipping reqs install") else: if self.install_method != "local": raise ValueError( f"Unknown install method {self.install_method}. Must be one of {INSTALL_METHODS}" ) # Need to append to path if self.install_method in ["local", "reqs"]: if isinstance(self.install_target, Folder): sys.path.insert(0, path) if not cluster else run_setup_command( f"export PATH=$PATH;{path}", cluster=cluster ) elif not cluster: if Path(self.install_target).resolve().expanduser().exists(): sys.path.insert( 0, str(Path(self.install_target).resolve().expanduser()) ) else: raise ValueError( f"install_target {self.install_target} must be a Folder or a path to a directory for install_method {self.install_method}" ) else: raise ValueError( f"If cluster is provided, install_target must be a Folder for install_method {self.install_method}" ) # ---------------------------------- # Torch Install Helpers # ---------------------------------- def _reqs_install_cmd_for_torch( self, reqs_path, reqs_list, install_args="", cluster=None ): """Read requirements from file, append --index-url and --extra-index-url where relevant for torch packages, and return list of formatted packages.""" # if torch extra index url is already defined by the user or torch isn't a req, directly pip install reqs file if not any("torch" in req for req in reqs_list): return f"-r {reqs_path}" + install_args cuda_version_or_cpu = detect_cuda_version_or_cpu(cluster=cluster) for req in reqs_list: if ( "--index-url" in req or "--extra-index-url" in req ) and "pytorch.org" in req: return f"-r {reqs_path}" + install_args # add extra-index-url for torch if not found return f"-r {reqs_path} --extra-index-url {self._torch_index_url(cuda_version_or_cpu)}" def _install_cmd_for_torch(self, install_cmd, cluster=None): """Return the correct formatted pip install command for the torch package(s) provided.""" if install_cmd.startswith("#"): return None torch_source_packages = ["torch", "torchvision", "torchaudio"] if not any([x in install_cmd for x in torch_source_packages]): return install_cmd packages_to_install: list = self._packages_to_install_from_cmd(install_cmd) final_install_cmd = "" cuda_version_or_cpu = detect_cuda_version_or_cpu(cluster=cluster) for package_install_cmd in packages_to_install: formatted_cmd = self._install_url_for_torch_package( package_install_cmd, cuda_version_or_cpu ) if formatted_cmd: final_install_cmd += formatted_cmd + " " final_install_cmd = final_install_cmd.rstrip() return final_install_cmd if final_install_cmd != "" else None def _install_url_for_torch_package(self, install_cmd, cuda_version_or_cpu): """Build the full install command, adding a --index-url and --extra-index-url where applicable.""" # Grab the relevant index url for torch based on the CUDA version provided if "," in install_cmd: # If installing a range of versions format the string to make it compatible with `pip_install` method install_cmd = install_cmd.replace(" ", "") index_url = self._torch_index_url(cuda_version_or_cpu) if index_url and not any( specifier in install_cmd for specifier in ["--index-url ", "-i "] ): install_cmd = f"{install_cmd} --index-url {index_url}" if "--extra-index-url" not in install_cmd: return f"{install_cmd} --extra-index-url https://pypi.python.org/simple/" return install_cmd def _torch_index_url(self, cuda_version_or_cpu: str): return self.TORCH_INDEX_URLS.get(cuda_version_or_cpu) @staticmethod def _packages_to_install_from_cmd(install_cmd: str): """Split a string of command(s) into a list of separate commands""" # Remove any --extra-index-url flags from the install command (to be added later by default) install_cmd = re.sub(r"--extra-index-url\s+\S+", "", install_cmd) install_cmd = install_cmd.strip() if ", " in install_cmd: # Ex: 'torch>=1.13.0,<2.0.0' return [install_cmd] matches = re.findall(r"(\S+(?:\s+(-i|--index-url)\s+\S+)?)", install_cmd) packages_to_install = [match[0] for match in matches] return packages_to_install
[docs] def to( self, system: Union[str, Dict, "Cluster"], path: Optional[str] = None, mount: bool = False, ): """Copy the package onto filesystem or cluster, and return the new Package object.""" if not isinstance(self.install_target, Folder): raise TypeError( "`install_target` must be a Folder in order to copy the package to a system." ) if ( isinstance(self.install_target.system, str) and not self.install_target.system == "file" ): self.install_target.system = _get_cluster_from(self.install_target.system) install_system_name = ( self.install_target.system.name if isinstance(self.install_target.system, Cluster) else self.install_target.system ) system = _get_cluster_from(system) system_name = system.name if isinstance(system, Cluster) else system if system_name == install_system_name: return self if isinstance(system, Resource): if isinstance(self.install_target.system, Resource): # if these are both clusters, check if they're pointing to the same address. # We use endpoint instead of address here because if address is localhost, we need to port too if self.install_target.system.endpoint( external=False ) == system.endpoint(external=False): # If we're on the target system, just make sure the package is in the Python path sys.path.insert(0, self.install_target.local_path) return self logger.info( f"Copying package from {self.install_target.fsspec_url} to: {getattr(system, 'name', system)}" ) self._validate_folder_path() new_folder = self.install_target._to_cluster(system, path=path, mount=mount) else: # to fs self._validate_folder_path() new_folder = self.install_target.to(system, path=path) new_folder.system = system new_package = copy.copy(self) new_package.install_target = new_folder return new_package
[docs] @staticmethod def split_req_install_method(req_str: str): """Split a requirements string into a install method and the rest of the string.""" splat = req_str.split(":", 1) return (splat[0], splat[1]) if len(splat) > 1 else ("", splat[0])
@staticmethod def from_config(config: dict, dryrun=False, _resolve_children=True): if isinstance(config.get("install_target"), dict): config["install_target"] = Folder.from_config( config["install_target"], dryrun=dryrun, _resolve_children=_resolve_children, ) if config.get("resource_subtype") == "GitPackage": from runhouse import GitPackage return GitPackage.from_config( config, dryrun=dryrun, _resolve_children=_resolve_children ) return Package(**config, dryrun=dryrun) @staticmethod def from_string(specifier: str, dryrun=False): if specifier == "requirements.txt": specifier = "reqs:./" # Use regex to check if specifier matches '<method>:https://github.com/<path>' or 'https://github.com/<path>' match = re.search( r"^(?:(?P<method>[^:]+):)?(?P<path>https://github.com/.+)", specifier ) if match: install_method = match.group("method") url = match.group("path") from runhouse.resources.packages.git_package import git_package return git_package( git_url=url, install_method=install_method, dryrun=dryrun ) install_method, target_and_args = Package.split_req_install_method(specifier) # Handles a case like "torch --index-url https://download.pytorch.org/whl/cu113" rel_target, args = ( target_and_args.split(" ", 1) if " " in target_and_args else (target_and_args, "") ) # We need to do this because relative paths are relative to the current working directory! abs_target = ( Path(rel_target).expanduser() if Path(rel_target).expanduser().is_absolute() else Path(locate_working_dir()) / rel_target ) if abs_target.exists(): target = Folder( path=abs_target, dryrun=True ) # No need to create the folder here else: target = rel_target # If install method is not provided, we need to infer it if not install_method: if Path(specifier).resolve().exists(): install_method = "reqs" else: install_method = "pip" # If we are just defaulting to pip, attempt to install the same version of the package # that is already installed locally # Check if the target is only letters, nothing else. This means its a string like 'numpy'. if install_method == "pip" and is_python_package_string(target): locally_installed_version = find_locally_installed_version(target) if locally_installed_version: # Check if this is a package that was installed from local local_install_path = get_local_install_path(target) if local_install_path and Path(local_install_path).exists(): target = Folder(path=local_install_path, dryrun=True) # "Local" install method is a special case where we just copy a local folder and add to path if install_method == "local": return Package( install_target=target, install_method=install_method, dryrun=dryrun ) elif install_method in ["reqs", "pip", "conda"]: return Package( install_target=target, install_args=args, install_method=install_method, dryrun=dryrun, ) elif install_method == "rh": # Calling the factory method below return package(name=specifier[len("rh:") :], dryrun=dryrun) else: raise ValueError( f"Unknown install method {install_method}. Must be one of {INSTALL_METHODS}" )
[docs]def package( name: str = None, install_method: str = None, install_str: str = None, path: str = None, system: str = None, dryrun: bool = False, local_mount: bool = False, data_config: Optional[Dict] = None, ) -> Package: """ Builds an instance of :class:`Package`. Args: name (str): Name to assign the package resource. install_method (str): Method for installing the package. Options: [``pip``, ``conda``, ``reqs``, ``local``] install_str (str): Additional arguments to install. path (str): URL of the package to install. system (str): File system or cluster on which the package lives. Currently this must a cluster or one of: [``file``, ``github``, ``sftp``, ``ssh``, ``s3``, ``gs``, ``azure``]. dryrun (bool): Whether to create the Package if it doesn't exist, or load the Package object as a dryrun. (Default: ``False``) local_mount (bool): Whether to locally mount the installed package. (Default: ``False``) data_config (Optional[Dict]): The data config to pass to the underlying fsspec handler. Returns: Package: The resulting package. Example: >>> import runhouse as rh >>> reloaded_package = rh.package(name="my-package") >>> local_package = rh.package(path="local/folder/path", install_method="local") """ if name and not any( [install_method, install_str, path, system, data_config, local_mount] ): # If only the name is provided and dryrun is set to True return Package.from_name(name, dryrun) install_target = None install_args = None if path is not None: system = system or Folder.DEFAULT_FS install_target = folder( path=path, system=system, local_mount=local_mount, data_config=data_config ) install_args = install_str elif install_str is not None: install_target, install_args = install_str.split(" ", 1) return Package( install_method=install_method, install_target=install_target, install_args=install_args, name=name, dryrun=dryrun, )