A Run is a Runhouse primitive used for capturing logs, inputs, results within a particular function call, CLI or Python command, or context manager. Runs also serve as a convenient way to trace the usage and upstream / downstream dependencies between different Runhouse artifacts.
Constructs a Run object.
name (Optional[str]) – Name of the Run to load.
log_dest (Optional[str]) – Whether to save the Run’s logs to a file or stream them back. (Default: file
)
path (Optional[str]) – Path to the Run’s dedicated folder on the system where the Run lives.
system (Optional[str or Cluster]) – File system or cluster name where the Run lives.
If providing a file system this must be one of:
[file
, github
, sftp
, ssh
, s3
, gs
, azure
].
We are working to add additional file system support.
data_config (Optional[Dict]) – The data config to pass to the underlying fsspec handler for the folder.
load (bool) – Whether to try reloading an existing Run from configs. (Default: True
)
dryrun (bool) – Whether to create the Blob if it doesn’t exist, or load a Blob object as a dryrun.
(Default: False
)
**kwargs – Optional kwargs for the Run.
The loaded Run object.
- __init__(name: str | None = None, fn_name: str | None = None, cmds: list | None = None, log_dest: str = 'file', path: str | None = None, system: str | Cluster | None = None, data_config: dict | None = None, status: RunStatus = RunStatus.NOT_STARTED, start_time: str | None = None, end_time: str | None = None, creator: str | None = None, creation_stacktrace: str | None = None, upstream_artifacts: List | None = None, downstream_artifacts: List | None = None, run_type: RunType = RunType.CMD_RUN, error: str | None = None, error_traceback: str | None = None, overwrite: bool = False, dryrun: bool = False, **kwargs)[source]
Runhouse Run object
Note
To load an existing Run, please use the factory method
run()
.
Metadata to store in RNS for the Run.
Load the pickled function inputs saved on the system for the Run.
Reload the Run object from the system. This is useful for checking the status of a Run.
For example: my_run.refresh().status
Load the function result saved on the system for the Run. If the Run has failed return the stderr, otherwise return the stdout.
Config to save in the Run’s dedicated folder on the system. Note: this is different from the config saved in RNS, which is the metadata for the Run.
If the Run name is being overwritten (ex: initially created with auto-generated name), update the Run config stored on the system before saving to RNS.
Read the stderr saved on the system for the Run.
Read the stdout saved on the system for the Run.
Send a Run to another system.
system (Union[str or Cluster]) – Name of the system or Cluster object to copy the Run to.
path (Optional[str]) – Path to the on the system to save the Run. Defaults to the local path for Runs (in the rh folder of the working directory).
data_config (Optional[dict]) – Config to pass into fsspec handler for copying the Run.
A copy of the Run on the destination system and path.
Write data (ex: function inputs or result, stdout, stderr) to the Run’s dedicated folder on the system.
Runs can be useful in a number of different ways:
Caching: Using Runs, we can easily trace the usage and dependencies between various resources. For example, we may have a pipeline that produces a pre-preprocessed dataset, trains a model, and exports the model for inference. It would be useful to know which functions (or microservices) were used to produce each stage of the pipeline, and which data artifacts were created along the way. Caching also works among a team, so when one person on a team creates a Run, others can benefit from the cached result (without needing to check whether the result is present yet).
Auditability: With Runs we can easily inspect inputs, logs and outputs for a particular function call or command execution. This makes it much easier to debug or reproduce a particular workflow.
Sharing: Runs (like all other Runhouse objects) can easily be shared among team members. This is useful when we have different services that are dependent on a single output (e.g. a model).
Reusability: Runs make it much easier to reproduce or re-run previous workflows. For example, if we need to run some script on a recurring basis, we don’t have to worry about re-running each step in its entirety if we have already have a cached run for that step.
A Run may contain some (or all) of these core components:
Name: A unique identifier for the Run.
Folder: Where the Run’s data lives on its associated system.
Function: A function to be executed on a cluster.
Commands: A record of Python or CLI commands executed on a cluster.
Upstream dependencies: Runhouse artifacts loaded by the Run.
Downstream dependencies: Runhouse artifacts saved by the Run.
When a Run is completed it stores the following information in its dedicated folder on the system:
Config: A JSON of the Run’s core data. This includes fields such as: name
, status
,
start_time
, end_time
, run_type
, fn_name
, cmds
, upstream_artifacts
,
and downstream_artifacts
.
Note: When saving the Run to Runhouse Den (RNS) (via the save()
method), the Run’s metadata that will be
stored.
Function inputs: The pickled inputs to the function that triggered the Run (where relevant).
Function result: The pickled result of the function that triggered the Run (where relevant).
Stdout: Stdout from the Run’s execution.
Stderr: Stderr from the Run’s execution.
Note
By default, the Run’s folder sits in the ~/.rh/logs/<run_name>
directory if the system is a cluster,
or in the rh
folder of the working directory if the Run is local. See Run Logs for more details.
There are three ways to create a Run:
We can create a Run when executing a function by providing the run_name
argument, a string
representing the custom name to assign the Run.
By default run_name
is set to None
.
import runhouse as rh
def summer(a, b):
return a + b
# Initialize the cluster object (and provision the cluster if it does not already exist)
cpu = rh.cluster(name="^rh-cpu")
# Create a function object and send it to the cpu cluster
my_func = rh.function(summer, name="my_test_func").to(cpu)
# Call the function with its input args, and provide it with a `run_name` argument
fn_res = my_func(1, 2, run_name="my_fn_run")
When this function my_func
is called, Runhouse triggers the function execution on the cluster
and returns the Run’s result. The Run’s config metadata, inputs, and stdout / stderr are also stored in the .rh folder
of the cluster’s file system.
Note
In addition to calling the function directly, you can also
use .run()
async interface.
Another way to create a Run is by executing Python or CLI command(s). When we run these commands, the stdout and stderr from the command will be saved in the Run’s dedicated folder on the cluster.
To create a Run by executing a CLI command:
# Run a CLI command on the cluster and provide the `run_name` param to trigger a run
return_codes = cpu.run(["python --version"], run_name="my_cli_run")
To create a Run by executing Python commands:
return_codes = cpu.run_python(
[
"import pickle",
"import logging",
"local_blob = rh.blob(name='local_blob', data=pickle.dumps(list(range(50))), mkdir=True).write()",
"logging.info(f'Blob path: {local_blob.path}')",
"local_blob.rm()",
],
run_name="my_cli_run",
)
There are a few ways to load a Run:
From a cluster: Load a Run that lives on a cluster by using cluster.get_run()
.
This method takes a run_name
argument with the name of the Run to load.
From a system: Load a Run from any system by using the the rh.run()
method. This method takes either
a path
argument specifying the path to the Run’s folder, or a name
argument specifying the
name of the Run to load.
Note
Each of these methods will return a Run object.
Loading Results
To load the results of the Run, we can call the result()
method:
# Load the run from its cluster
fn_run = cpu.get_run("my_fn_run")
# If the function for this run has finished executing, we can load the result:
result = pickle.loads(fn_run.result())
Tip
We can also call fn_run.stderr()
to view the Run’s stderr output, and fn_run.stdout()
to view the Run’s stdout output.
We can trace activity and capture logs within a block of code using a context manager. By default the Run’s config
will be stored on the local filesystem in the rh/<run_name>
folder of the current working directory,
or the .rh/logs/<run_name>
folder if running on a cluster.
import runhouse as rh
with rh.run(name="ctx_run") as r:
# Add all Runhouse objects loaded or saved in the context manager to
# the Run's artifact registry (upstream + downstream artifacts)
my_func = rh.function(name="my_existing_run")
my_func.save("my_new_func")
my_func(1, 2, run_name="my_new_run")
current_run = my_func.system.get_run("my_new_run")
run_res = pickle.loads(current_run.result())
print(f"Run result: {run_res}")
r.save()
print(f"Saved Run to path: {r.path}")
Note
We can specify the path to the Run’s folder by providing the path
argument to the Run
object.
If we do not specify a path the folder will be created in the rh
folder of the current working directory.
We can then re-load this Run from its system (in this case the local file system):
import runhouse as rh
ctx_run = rh.run(path="~/rh/runs/my_ctx_run")
Runhouse provides varying levels of control for running and caching the results of a Run.
We can invoke a run both synchronously and asynchronously, and with or without caching:
To create a Run which executes a function synchronously without any caching, we call the function and
provide the run_name
argument. The function will be executed on the cluster, and will
return its result once completed.
For a fully synchronous run which also checks for a cached result, we can call the get_or_call()
method
on the function. If a result already exists with this Run name, the result will be returned.
Otherwise, the function will be executed synchronously on the cluster and the result will be returned once
the function execution is complete:
import runhouse as rh
my_func = rh.function(name="my_func")
res = my_func.get_or_call(1, 2, run_name="my_fn_run")
To run a function asynchronously without any caching, we can call the run()
method. The function will
begin executing on the cluster in the background, and in the meantime a Run
object will be returned:
import runhouse as rh
my_func = rh.function(name="my_func")
run_obj = my_func.run(1, 2, run_name="my_async_run")
For a fully asynchronous run which also checks for a cached result, we can call the get_or_run()
method
on the function. A Run
object will be returned whether the result is cached or not:
import runhouse as rh
my_func = rh.function(name="my_func")
run_obj = my_func.get_or_run(1, 2, run_name="my_async_run")
To copy the Run’s folder contents from the cluster to the local environment:
my_run = cpu.get_run("my_run")
local_run = my_run.to("here")
By default, this will be copied in to the rh
directory in your current projects working directory, but
you can overwrite this by passing in a path
parameter.
To copy the Run’s folder contents to a remote storage bucket:
my_run = cpu.get_run("my_run")
my_run_on_s3 = my_run.to("s3", path="/s3-bucket/s3-folder")
You can view a Run’s logs in python:
cpu.get("my_run", stream_logs=True)
Or via the command line:
runhouse logs cpu my_run
For cancelling a Run:
cpu.cancel("my_run")
Or via the command line:
runhouse cancel cpu my_run