Abstract image of a runner jumping over a house with a yellow and green gradient background

Supercharging Apache Airflow with Runhouse

Part 1 of a series on data pipelines for the modern ML stack

Photo of Josh Lewittes
Josh Lewittes

CTO @ 🏃‍♀️Runhouse🏠

April 21, 2023

Airflow is one of the more popular tools in the ML community for scheduling, orchestrating, and monitoring pipelines or workflows.

While an obvious choice for building data pipelines, it has its share of shortcomings when used for ML use cases.

Suppose we want to build a pipeline with the following steps: data preprocessing, data splitting, model training, model prediction, and measuring model accuracy.

With Airflow, we would construct a DAG that looks something like this:

preprocess >> split >> training >> predict >> accuracy

There are a few issues with this approach:

  • Setup: Airflow requires quite a bit of setup. We need config files, docker image(s), a database, a scheduler, and a web server. That’s a lot of time to spend on configurations!
  • DSL specifications: Airflow’s DSL requires us to break up our existing python code, which has already been tried and tested. By introducing this extra translation step we lose reproducibility and iteration speeds are much slower.
  • Hardware heterogeneity: A core appeal of Airflow is the ability to run various nodes of the DAG on different compute. This can be done in Kubernetes (commonly with Kubeflow or Airflow’s KubernetesPodOperator), but resources are limited to the existing cluster, and are probably not multi-cloud.
  • Local vs Remote execution: Debugging in Airflow is a nightmare. This shouldn’t come as a total surprise since the system was built to run static data and SQL operations, not dynamic and iterative machine learning code. By submitting to a remote execution engine, we also lose our “production to research” path.
  • Packaging: Airflow (along with other orchestration engines) requires us to package up the code for each node in the DAG. By splitting the code into discrete blocks, the “glue code” that lives in each node becomes less reusable and shareable.

Wouldn’t it be easier if we didn’t have to build docker images and config files, translate existing code into a DSL, package that code into docker containers, submit that code to an execution engine, and then wait and hope that each task in the DAG runs smoothly?

Runhouse is a unified python interface into compute and data infra that can be shared by research (notebooks, scripts), and production (pipelines, serving). It lets us send code and data to any of our compute infra, all in python, and continue to use them eagerly exactly as they were. There’s no magic YAML, DSL, or “submitting for execution.”


Let’s go back to the pipeline we want to build and highlight some of the differences in how that might look using either Airflow or Runhouse, or integrating Runhouse within an existing Airflow instance:

Setup

Airflow

Setting up Airflow roughly follows these steps:

  • Installing Airflow
  • Initializing the Airflow metadata database
  • Configuring the airflow.cfg file
  • Starting the scheduler
  • Creating a user account to access the webserver
  • Starting the webserver
  • Translating/migrating code into a DAG
  • Running the DAG

Runhouse

pip install runhouse

Runhouse requires access to a cloud provider (AWS, Azure, GCP, Lambda Labs, or an existing/on-prem cluster), which should be configured locally or wherever our python interpreter lives.

Run sky check to confirm which cloud providers are ready to use. This enables us to provision clusters within our existing cloud infra, and deploy our Runhouse functions, or microservices, to those clusters.

Airflow & Runhouse

Integrating Runhouse with Airflow is pretty seamless, as we can leave our Runhouse python code alone (as initially constructed) and simply drop it into our Airflow node. This is much easier than having to connect Airflow with some other compute system, since the only additional requirement is having the Runhouse python SDK installed.

By using Airflow and Runhouse together we get the best of both worlds: A dedicated Airflow instance for triggering, scheduling, and monitoring jobs with the benefit of keeping our python code in its initial state and running each task across heterogenous compute.

DSL / Specification Language

Airflow

With Airflow we define workflows using a series of operators and tasks, which are combined together to create pipelines. Each operator is a single task or action that needs to be performed within the workflow.

For our pipeline example we want to start by processing data with 32 CPUs. We would create an operator that resembles something like this:

dag = DAG("training_pipeline", description="ML Training Pipeline", default_args=default_args, dagrun_timeout=timedelta(minutes=60*10), catchup=False ) with dag: task1 = KubernetesPodOperator( task_id="preprocessing", name=f"airflow-{env}-preprocessing-{uuid}", namespace="default", image="docker/image/url:latest", cmds=[ "python", "training/preprocess.py", " - env", env, " - uuid", uuid, ], in_cluster=IN_CLUSTER, cluster_context=CLUSTER_CONTEXT, get_logs=True, startup_timeout_seconds=60 * 5, is_delete_operator_pod=True, resources={"request_cpu": "32000m", "request_memory": "1Gi"}, image_pull_policy="Always", node_selectors={ "cloud.google.com/gke-nodepool": "n1-standard-4-pool" }, tolerations=[ { "key": "node-pool", "operator": "Equal", "value": "n1-standard-4-pool", "effect": "NoExecute", } ], retries=1, retry_delay=timedelta(minutes=5), xcom_push=True, dag=dag,) task2 = ... task1 >> task2

The Airflow worker spins up the requested Kubernetes resources (if available) to run the operator’s work. The operator executes a Docker container, polls for its completion, and returns the result.

Runhouse

Runhouse by design has no DSL and is python native. This means we can construct our workflows with minimal changes to our existing codebase.

For the preprocessing task, we can create a 32 CPU cluster with our desired specifications, and launch the cluster with a simple one liner:

import runhouse as rh # On-Demand providers currently supported: AWS, Azure, GCP, and Lambda Labs cpu = rh.cluster(name="rh-32-cpu", instance_type="CPU:32+", provider="cheapest").up_if_not() # Existing clusters can be accessed via SSH creds and IP addresses: # cpu = rh.cluster(ips=['<ip of the cluster>'], # ssh_creds={'ssh_user': '...', # 'ssh_private_key':'<path_to_key>'}, # name='rh-32-cpu')

Next, we create our Function object, which holds the function logic itself and any other requirements needed for it to run. Calling .to(cpu) on the function ensures it gets synced over to the cluster we defined above.

# Create a Function object with the preprocessed function, # then send it to the CPU cluster with any additional reqs it needs to run preprocess_raw_data = rh.function(preprocess_raw_data).to(cpu, reqs=["scikit-learn"]) # Run the the function on the cluster, which returns a callable object or # remote reference to the dataset object saved on the cluster. # When we call the function with `raw_data`, it executes on the cluster. preprocessed_data = preprocess_raw_data(raw_data) print(f"Saved processed data on cluster to path: {preprocessed_data.path}")

When we call the function with our raw data, we get back a python callable that behaves exactly as it would when we initially defined it, except the function is now running on the remote cluster.

Airflow & Runhouse

Using Airflow creates additional overhead as we need to build operators, docker containers, write config files, etc. Using Runhouse code as a drop in replacement for the packaged DSL code removes the additional migration steps that orchestration engines typically require.

Hardware Heterogeneity

Airflow

By default, tasks are limited to the resources that exist on the Airflow worker at execution time. To prevent resource contention and to allow for general scalability required for ML use cases, it usually makes more sense to run tasks in Kubernetes, where each task runs in its own pod. This gives us more flexility and power over runtime environments, resources, and secrets used by each task.

Runhouse

Runhouse simplifies running code across heterogeneous compute within the same script, function, notebook, or any python environment.

Just as we can easily do the data preprocessing on a 32 CPU cluster in AWS, we can do the model training on an A100 instance in GCP.

# Provision a one A100 instance on GCP gpu = rh.cluster(name='rh-a100', instance_type='A100:1', provider='gcp').up_if_not() # Call the model training function with the train and test data # The function will run on the cluster, and return remote references # to the model and test predictions saved to the cluster's local file system model_training = rh.function(model_training).to(gpu) trained_model, test_predictions = model_training(train_data, test_data) print(f"Saved model on cluster to path: {trained_model.path}")

Airflow & Runhouse

Using Airflow and Runhouse together lets us achieve hardware heterogeneity in our workflows out of the box. With Runhouse we can easily assign each function to a diverse set of compute which can be on-prem or spun up on demand across various cloud providers. This saves us the trouble of creating Kubernetes like operators or worrying about resource contention within the confines of the existing cluster we are trying to use via Airflow.

Local vs Remote Execution

Airflow

Airflow works well for static workflows, but is not adept at handling ML use cases which are inherently more dynamic. MLEs and Data Scientists want to run lots of experiments, where results can be quickly measured and iterated on. By submitting our code to a remote execution engine, it becomes much harder to run unit tests, CI, or quickly debug or jump through the code that we already packaged and shipped off.

If we want to optimize for “pulling more levers” or running more experiments, then it would make more sense to use a dynamic or flexible system to manage these workflows.

Runhouse

Runhouse unbundles code execution and dispatch, allowing us to construct complex programs that traverse our own infrastructure all while staying in python. This makes it much easier to debug and iterate on our workflows.

A typical lifecycle of an ML project may include:

  • Prototyping in a notebook environment
  • Migrating code into functions / classes in an IDE
  • Translating code into the orchestrator’s DSL
  • Submitting packaged code for execution to the DAG engine

Runhouse lets us shrink this development process significantly by blurring the lines between program execution and deployment.

Airflow & Runhouse

Running Airflow locally can quickly become a bit cumbersome, as setting up a local airflow instance requires multiple steps and still does not save us the need to translate our code into a DSL. Instead of submitting our code to a remote execution engine, we can run and debug our code locally with Runhouse before dropping that same working code into Airflow for handling the scheduling, running, and monitoring of the pipeline.

Packaging

Airflow

Airflow demands quite a bit of manual work as we need to package up our code for each task. This work can be tedious, time consuming, and makes code versioning more difficult as we are moving another step away from the original code structure. This causes teams to lose their shared foundation of code, often leading to rampant copy-pasting and lost artifacts.

Runhouse

Runhouse allows us to natively program across compute resources and command data between storage and compute by providing a path of least resistance for running arbitrary code on specific hardware. Since we are leaving our code just as it was developed we no longer have to worry about the packaging and versioning of our code. We can also create a corpus of functions and services to be shared among teams and pipelines, making it much easier to have universal lineage and tracking of all ML artifacts.

Airflow & Runhouse

Using Runhouse inside of an Airflow DAG saves us the trouble of having to package up our code. We can simply have each Airflow task run our Runhouse code in the exact format it was developed, and not have to worry about migrating or translating code for each task in our pipeline. This makes it far easier to maintain automatic lineage, versioning, and governance across all pipelines.


Runhouse + Airflow: Better together

ML pipelines are living systems composed of smaller living systems that run repeatedly and undergo lots of iterations over time. Instead of slicing and packaging our pipelines into smaller scripts, we should ideally modularize them into small living microservices.

Runhouse provides a low lift and migration free alternative to building ML pipelines. By being aggressively DSL free, Runhouse requires the same activation energy to deploy a microservice on remote compute as it does to create one, effectively eliminating the division between “code” and “workflow” and creating a much more pleasant DevX.

There is certainly a lot of value in using an orchestrator like Airflow for scheduling, monitoring, and fault tolerance, but the DAG as an execution interface into heterogeneous ML infra can introduce debugging and iterability pain. Runhouse unbundles that infra interface, and by using Airflow and Runhouse together, we can get the best of both worlds. If we still want to use an orchestrator for scheduling, triggering and monitoring workflows, and Runhouse slots in nicely as a convenient and python-native way of defining the workflow proper.


🏃‍♀️Runhouse🏠 Resources

Stay up to speed 🏃‍♀️📩

Subscribe to our newsletter to receive updates about upcoming Runhouse features and announcements.