Simple diagram showing three dimensions: scheduling layer, infrastructure layer, and asset/lineage layer with associated companies for each

Lean Data Automation: A Principal Components Approach

Plus: A nimble, composable example with Github Actions, Hamilton, and Runhouse

Photo of Donny Greenberg
Donny Greenberg

CEO @ 🏃‍♀️Runhouse🏠

Photo of Elijah ben Izzy
Elijah ben Izzy

CTO @ DAGWorks

June 11, 2024

This post was written in collaboration with DAGWorks. They posted it here.

TL;DR

Our goal is to break data orchestration into its principal components, and show the advantages of first-principles thinking about architecting and operationalizing data automation. Rather than thinking “orchestrator-first” (what tasks and workflows should I run to produce the data I need), we propose focusing on the assets your system produces, the schedule on which they must be produced, and the infrastructure needed to produce them, and the “workflows” click together naturally and modularly on their own. We will talk a bit about the current way data teams work with orchestration, propose a mental model for navigating the tooling landscape, and walk through an example.

This post is aimed towards AI/ML engineers, data engineers, and data scientists at organizations of any size. We particularly wrote it for those who want to start their stack off with a lean and extensible approach, or whose over-reliance on orchestration has slowed down iteration speed.

Unbundling Orchestration for Parts

Data teams have traditionally relied on orchestrators for three value drivers – we break them into layers of the stack:

Scheduling Jobs – The Cron Layer

Managing a swarm of recurring workflows with interdependencies at varying cadences is a hard problem. Data pipeline failure can often be an enterprise-level concern that requires awareness of downstream impacts and appropriate reactions. Orchestrators provide a control plane to manage the “data heartbeat” of a business. Good ones enable interventions (retry, manual parameter adjustment, fallback, etc…) when error conditions are met.

Creating Data – The Asset Layer

The workflows above exist solely for the purpose of creating data assets. Historically, orchestrators just made sure some code ran to produce some data. Data catalogs then sprang up to capture and track the data that code produced to try to make sense of how things connected. More recently, some orchestrators (e.g. Dagster, dbt, Prefect, Flyte) have vertically integrated, adding features to record these assets (or associated metadata) and thus provide a data lineage system. By coupling your code and logic to an orchestrator, you can often automatically populate a catalog of some of the assets/metadata you care about.

A good asset layer gives insight into both materialized (intentionally saved to a database) and transient (non-materialized) assets – giving you a finer-grained view of the data you produced and code you ran. For example you should be able to capture feature transforms, metrics definitions, and transformed datasets. It can also facilitate caching, ensuring that you don’t waste time and compute on producing the same assets when a job fails or two run in sequence.

Running Code – The Infrastructure Layer

Orchestrators also handle dispatching tasks in a workflow to execute on their required infrastructure, often as pods in Kubernetes. The AI world, in particular, has grown extremely reliant on orchestrators to match stages of offline workflows to their required heterogeneous compute, ensuring that resources are only allocated while they’re in use. Due to the costliness of GPUs, this can be essential to a justifiable ROI for data investments. Workflows (and even tasks themselves) can often be extremely varied – some (E.G. model training jobs) will need large GPU access whereas others may need significant CPU/RAM (or just a small machine to make a simple API call…).

A common strategy for handling this is leveraging inter-task boundaries to manage compute – ensuring that the execution structure is coupled to the compute required for each task so as little possible is wasted.

Why Revisit Orchestration?

Notice how we didn’t include “specifying sequential workflows” as part of the core value of an orchestrator. If your workflow doesn’t require one of the above services, it might as well be a bash or Python script and maybe a cron scheduler. If you treat your orchestration framework as a one-size-fits-all hammer, your team will inevitably develop using the orchestrator (and only the orchestrator), which can cause a host of problems:

  1. Iteration through an airflow-like system can be far slower than it would be otherwise, cluttering your workflow with frustrating wait times; they aren’t designed to be used for development of code that produces assets, i.e. creating data, or quickly spin up compute for research.
  2. Coupling all execution to the scheduler (e.g. testing a change to workflow by triggering a run in the orchestrator) turns your team’s list of jobs and workflows into a noisy mess. Visibility into the scheduling layer loses value when valid breakage is drowned out by frequent clutter job failures, and the team becomes used to ignoring them.
  3. When the orchestrator gates access to infrastructure, the path of least resistance to execution becomes copying some existing DAG and adapting it to run your code, which means separately allocating the same compute over and over. The first time the cloud team gives a data scientist access to the m7i.48xlarge instance is almost always copy/pasted ad-nauseum. This infrastructure waste is endemic to Machine Learning, where it’s not uncommon to see 10 workflows allocating separate compute for nearly identical preprocessing or evaluation tasks, which could otherwise be shared as services if they could be moved outside the orchestrator.
  4. Code coupled to an orchestrator is often nearly impossible to reuse outside of the orchestration context (E.G. in testing or in an online service).

This begs the question, even if you do need one (or more) of the value drivers above, when should you reach for an orchestrator that tries to do everything? Or, can we take a first-principles view of the right tools for the job to avoid the downsides we don’t need? For example:

  • [infrastructure layer] You only need to run some code - e.g. you’re iterating on some ML experiment and need heterogeneous compute, but don’t want to clutter your scheduling systems and accidentally clobber your production assets.
  • [asset layer] You only need to create some data - A job which is triggered by some action, like a code push or write to a database, that has no workflow dependencies or infrastructure complexity but produces some assets which should be recorded.
  • [cron layer] You only need to schedule jobs - A simple recurring job which runs on local CPU and doesn’t produce any assets, but produces side-effects (sending an email or triggering some deployment).

Bundling is often a feature. Many orchestrators promise you the world if you buy into their paradigm for all your data work. Unfortunately that is not always practical. With that view, why not write every program as an Airflow DAG? Why not record every function output as an asset to preserve lineage and caching? Obviously, because tools have tradeoffs, especially as they scale. We’ve talked about these tradeoffs before in Please separate orchestration and execution and how well-structured should your data code be. Many teams see dizzying lists of workflows, jobs, failures, and assets in their orchestrator UIs, including many forks of pipelines for various experiments. Many ML researchers writing non-heterogeneous training workflows wonder why they’re adding months of busywork and wrecking their iteration and debuggability by fully adopting an orchestrator’s paradigm.

Navigating the Tooling Landscape from First Principles

The unbundling of these overlapping pieces is actually a valuable lens through which to inspect the recent evolutions in the tooling landscape. As with anything in the data world there are many tools for the job. A few of the most popular (and our favorite) implementations for each layer include:

Scheduling Jobs (Cron Layer)

Creating Data (Asset Layer)

Running Code (Infrastructure Layer)

Key Advantages of Unbundling

So, when do you choose to unbundle, and when should you use an orchestrator out of the box? There’s no one-size fits all solution, but unbundling can be advantageous if:

  1. You want your code to remain independent of the orchestration, to reuse it elsewhere (online, streaming, etc…)
  2. You require a rapid iteration loop, and need to be able to develop and debug locally (while still using cloud infra) before deploying into the scheduler
  3. You don’t want your workflows to be limited by the orchestrator in the choice or flexibility of the compute infrastructure, e.g. if you’ll want to orchestrate between heterogeneous instance types, regions, or clouds, or reuse compute or services across workflows
  4. Key assets are generated outside orchestration, and you want to be able to comprehensively capture lineage in those diverse environments (e.g. CI/CD)
  5. Your workflows are collaboratively updated, branched, and iterated over time, so structuring the code strictly as imperative workflows makes adding branches to existing code paths the path of least resistance for most work, putting you at risk of config explosion. You prefer to maintain a modular library separately, out of which workflows are composed.

Case in Point - Github Actions is a Scheduler!

View the full source code for this example here.

To demonstrate how unbundling can look in practice, let’s demonstrate a lean and flexible data automation system, starting with a scheduling layer most of you have access to (and are probably using already) – Github Actions! If you’re not familiar, Actions is Github’s built-in CI/CD automation system, allowing you to natively run code at various (scheduled + reactive) points in your software lifecycle.

To be clear, this is not meant to claim that Github Actions is all anyone needs for scheduling. Many orchestrators are best at the cron layer, and you will probably be happy using one for scheduling even if you totally unbundle the asset and infrastructure layers.

Our example job will be a simple crawler to traverse a web page and compute embeddings for all of its linked pages, scheduled to happen daily in Github Actions. We’ll use Hamilton (and the Hamilton UI) to implement our asset layer, so we can track, execute, and iterate on assets and dependencies. We’ll use Runhouse to implement our infrastructure layer (so we can leverage cloud infrastructure with a simple pythonic API inside our script).

First, let’s quickly review our tools:

Github Actions

Plenty has been written about Github Actions which we won’t rehash here, but the important point is that you can use an Action to trigger jobs on many kinds of schedules - e.g. recurring based on some frequency like a cron job, or triggered by an action like a code push or merge. Just like a workflow orchestrator, Actions has comprehensive monitoring and fault-tolerance controls. It comes built-in with a Github account and has a generous free tier, so it might be all you need to fulfill your scheduling needs, without any manual or costly setup.

Hamilton

Hamilton is a standardized way to build asset-tracking dataflows (any code that processes data) in Python. The core concepts are simple – you write each data transformation step as a single python function, with the following rules:

  1. The name of the function corresponds to the output variable it computes.
  2. The parameter names (and types) correspond to inputs. These can be either passed-in parameters or names of other upstream functions.

This approach allows you to represent assets in ways that correspond closely to code, are naturally self-documenting and portable across infrastructure.

Code sample showing simple functions A, B, and C that depend on an external input

After writing your functions (assets), you write a driver that executes them – in most cases this is a simple import/run (specifying the assets you want computed and letting the framework do the rest), but it provides options to customize execution:

The Hamilton UI comes with a view to track these assets, explore run history, and visualize your DAG.

import my_module dr = (driver .Builder() .with_modules(my_module) .build() ) dr.execute(["c", "a", "b"], inputs={"external_input" : 10}) # dataframe with c, a, and b # inputs are upstream dependencies

Runhouse

Runhouse allows you to access and program your cloud compute in Python as if it were one giant distributed computer. It’s not using serverless or repackaged cloud compute, but rather your own local cloud credentials or existing compute (instances, Ray, or Kubernetes). It’s designed to give you back your iteration loop when you’re working with ML or data infrastructure, and give you total flexibility in how you want to utilize it. By moving the infrastructure layer inside pure Python, your powerful programs can run anywhere you can run Python, whether a notebook, orchestrator, container, or IDE, and there’s no prescriptive structure or DSL you’ll need to migrate your code to. Your code remains identical between local development and production, running on identical remote machines, giving you back the research<->production iteration loop as well. Here, we’ll highlight how you can write and iterate a powerful GPU program locally and then deploy it as-is into a Github action.

Suppose I want to run Stable Diffusion inference on a GPU. With Runhouse, my Python can simply incorporate a remote cloud GPU, either allocating a fresh one from my cloud account (using local credentials) or Kubernetes cluster, or using an existing VM or cluster. I just send my existing function to my cluster and specify the environment I want it to run in, like this:

import runhouse as rh from diffusers import StableDiffusionPipeline MODEL_ID = "stabilityai/stable-diffusion-2-base" # My function def sd_generate(prompt, **inference_kwargs): model = StableDiffusionPipeline.from_pretrained(MODEL_ID) model = model.to("cuda") return model(prompt, **inference_kwargs).images if __name__ == "__main__": # The compute gpu = rh.cluster(name="rh-a10x-a", instance_type="A10G:1", provider="aws").up_if_not() # The environment, its own process on the cluster sd_env = rh.env(reqs=["torch", "transformers", "diffusers", "accelerate"]) # Deploying my function to my env on the cluster remote_sd_generate = rh.function(sd_generate).to(gpu, env=sd_env) # Calling my function normally, but it's running remotely imgs = remote_sd_generate("A hot dog made out of matcha.") imgs[0].show()

Runhouse is Python generic, and supports data processing, training, evaluation, inference - whatever you like. It just brings your code to your compute and allows you to call it remotely as a service, and what you do there is your own business.

Defining Our Scraping and Embedding Job

At a high level, we’re going to be creating a recurring workflow in a Github Action, leveraging Hamilton to define our assets/structure our pipelines, and using Runhouse to execute the compute-intensive pieces (calculating embeddings on a GPU).

Our Github Action is simple. It runs daily at midnight, installing Hamilton and Runhouse and then executing a Python script.

name: Embed URLs on: schedule: - cron: '0 0 * * *' jobs: build_emebeddings: runs-on: ubuntu-latest - name: Check out repository code uses: actions/checkout@v3 - name: setup python uses: actions/setup-python@v4 with: python-version: '3.10' - name: Install Runhouse with AWS dependencies run: | python -m pip install --upgrade pip pip install runhouse[aws] hamilton - name: Run the embedding script and save the embedding revision run: python batch_process.py

The steps in our workflow are quite simple, and can be represented as four “assets” which will be generated:

saved_embeddings (in yellow) is a final asset, the rest are intermediate
saved_embeddings (in yellow) is a final asset, the rest are intermediate

We load up a set of URLs by crawling + traversing the base URL, compute it to form an embeddings_df, and then save the embeddings, returning the pointer to the file (locally or on s3) that stores it.

You can see all the code (in DAG form from the Hamilton UI) here:

Code in DAG form from the Hamilton UI

It’s a lot to look at, but the thing to note is that we have four functions – one for each “asset”:

def urls(base_url: str, max_scrape_depth: int = 1, cutoff: int = None) -> pd.DataFrame: """Gives all recursive URLs from the given base URL.""" def embedder() -> URLEmbedder: """Sets up an embedder to embed URLs on a remote GPU box.""" def embeddings_df(urls: pd.DataFrame, embedder: URLEmbedder) -> pd.DataFrame: """Adds embeddings to the given URL DataFrame.""" def saved_embeddings(embeddings_df: pd.DataFrame) -> str: """Saves the embeddings to disk + returns URL"""

The most notable piece to call out here is the embedder(), where we delegate the heavier work to a GPU with Runhouse. We have an Embedder class with our embedding logic — all we need to do to ensure it runs remotely with the intended resources is create a Runhouse module (rh.module), which is essentially a remotely dispatchable Python class, pass it the appropriate environment on our cluster, and execute!

def embedder() -> URLEmbedder: env = rh.env( name=f"langchain_embed_env", reqs=["langchain", "langchain-community", "langchainhub", "sentence_transformers", "bs4"], ) cluster = rh.cluster(f"/dongreenberg/rh-hamilton-a10g", instance_type="A10G:1", auto_stop_mins=5, spot=True).up_if_not() RemoteURLEmbedder = rh.module(URLEmbedder).to(cluster, env) return RemoteURLEmbedder( model_name_or_path="BAAI/bge-large-en-v1.5", device="cuda", name=f"doc_embedder", )

Then we can call it as we normally would (this does it in sequence, but we can easily run in parallel – Runhouse is fully thread + async safe):

def embeddings_df(urls: pd.DataFrame, embedder: URLEmbedder) -> pd.DataFrame: """Adds embeddings to the given URL DataFrame.""" urls["embeddings"] = [embedder.embed(url, normalize_embeddings=True) for url in urls["url"]] return urls

Finally, we trigger the execution through Hamilton so our assets are recorded to its lineage store.

def generate_url_embeddings(base_url): dr = ( driver .Builder() .with_adapters( hamilton_sdk.adapters.HamiltonTracker( project_id=19374, username="elijah@dagworks.io", dag_name="runhouse_macro_version", api_key=os.environ.get("DAGWORKS_API_KEY"), hamilton_api_url="https://api.app.dagworks.io", hamilton_ui_url="https://app.dagworks.io", ) ) .with_modules(sys.modules[__name__]) # code in current module .build() ) dr.visualize_execution( ["saved_embeddings"], bypass_validation=True, output_file_path="./hamilton_macro.png", ) print(dr.execute( ["saved_embeddings"], inputs={ "base_url": base_url, "max_scrape_depth": 1, "cutoff": 4 } ))

Now, we can view the execution history and asset lineage in the Hamilton UI:

execution history in the Hamilton UI
execution history and asset lineage in the Hamilton UI
asset lineage in the Hamilton UI

And it’s as easy as that! Our github actions will run the Hamilton DAG nightly, which will in turn delegate the heavier portions of the execution to Runhouse. We’ve also operationalized this workflow so we can inspect, manage, or evolve it as needed:

  1. Delegate to any infrastructure (both fresh from the cloud or in existing clusters or VMs), leveraging Runhouse’s powerful python API
  2. Track assets more finely in the Hamilton UI – understand performance, debug failures, and gain a feature catalog with no extra work
  3. Set up more complex schedules or triggers, such as when new code is pushed
  4. Visualize the pipeline, execution, and data lineage within a broader graph of assets
  5. Interactively debug or iterate on the job further!

Conclusion

By unbundling the principle components of orchestration, we can take advantage of a lean, cost-effective, and flexible stack, which preserves all the visibility, collaboration, and scale we need. Github Actions is a free and widely available scheduler, so using open-source dedicated asset and infrastructure layers with it is as lean as we can get!

If you want to read more, check out the following:

  1. The Github repository with all the examples
  2. Runhouse on Github
  3. Hamilton on Github

Stay up to speed 🏃‍♀️📩

Subscribe to our newsletter to receive updates about upcoming Runhouse features and announcements.