Scaling Batch Processing on Cloud Compute with Runhouse and SkyPilot

Parallelizing work across cloud compute is clunky and manual. It doesn't have to be.

Photo of Donny Greenberg
Donny Greenberg

CEO @ 🏃‍♀️Runhouse🏠

Published June 7, 2024
Generated image of a woman in a pilot uniform sitting at a server rack

You have a Python function which works for one sample of data, and now need to parallelize it to thousands, millions, or billions of samples across cloud VMs. You’d think this would be the “hello world” killer app of having a cloud account, and yet, it’s not trivial. You’d either need to set up dedicated batch infrastructure (e.g. Ray, Spark, Dask) or learn your way around some cloud-specific batch processing system. Your job has suddenly grown from a Python function to include manual setup steps, config files, and DSLs, and soon you’ll be documenting the steps for others to reproduce it. Your flexibility to cost-optimize is also now limited by the batch system, so utilizing spot compute, multiple cloud accounts, or idle existing compute (e.g. sitting in Kubernetes clusters) are often off the table.

Cloud Compute at your Fingertips

The Runhouse team has been working with the SkyPilot team since late 2022 to provide a magical cloud programming experience which makes your compute natively programmable. It allows you to fully specify your cloud program in Python, from optimizing and allocating the compute (via SkyPilot), to dispatching and executing your Python functions and classes on it (via Runhouse), taking advantage of whatever heterogeneous infrastructure you like. With Runhouse and SkyPilot, horizontally or vertically scaling your Python code is easy, with no manual setup or configs needed to keep track of alongside your code, no cloud-specific systems to learn or migrate your code to, and total flexibility to optimize cost.

A simple example

Below is a function I want to parallelize - it takes in a URL and returns a dictionary with counts for each word on the page. Let's walk through the code to parallelize it on cloud VMs with Runhouse and SkyPilot. You can also view and run the full code here. It’s exactly what you’d hope “hello world” cloud parallelism would look like.

import runhouse as rh from bs4 import BeautifulSoup import requests def word_counts(url): response = requests.get(url) soup = BeautifulSoup(response.text, "html.parser") words = soup.get_text().split() return {word: words.count(word) for word in words}

Next, we specify the desired compute. SkyPilot uses your local cloud credentials to launch the desired VMs, and finds the cheapest available compute across your accounts.

if __name__ == "__main__": cluster = rh.cluster( name="rh-4x2CPU", instance_type="CPU:2", num_instances=4, spot=True, ).up_if_not()

SkyPilot’s rich launcher allows you to allocate from any provider, region, instance type, or existing compute (e.g. K8s) you like, and you can easily create multiple clusters to utilize a mix. Each is a separate Ray cluster, so you don’t need to worry about the complexity of coupling them all together (networking, auth, failures, etc.). For example, in the code above we’ve launched a four node spot cluster for simplicity, but we could also bring up four separate one-node spot clusters in a for loop so we don’t need to worry about preemption bringing down the other nodes.

We’ll send the word_counts function to our remote cluster(s) multiple times to create worker replicas of the function which execute in parallel. Note that Runhouse does not serialize the function to send it to the cluster (which creates a lot of versioning mayhem), but rather syncs the code over and imports from within a webserver on the cluster, just as you would if you deployed it manually through a serving framework like Flask or FastAPI.

We’re also specifying the environment to send each function into, including dependencies and local code. Each env in Runhouse is actually in a separate process on the cluster, so sending our replicas into separate envs will parallelize them. Env processes are always created on the head node by default, so we also specify compute resources here so the processes are spread out across multiple nodes in the cluster (we could also send each to a specified node for finer control).

NUM_REPLICAS = 8 workers = [] for i in range(NUM_REPLICAS): env = rh.env( name=f"count_env_{i}", reqs=["bs4", "requests"], compute={"CPU": 1}, ) worker_fn = rh.function(word_counts).to(cluster, env=env, name=f"word_counts_{i}") workers.append(worker_fn)

We now have a list of local Python callables which behave just like our original function, but forward calls to the remote replica over HTTP. All that’s left to do is call them in parallel over a list of URLs.

urls = [ "https://en.wikipedia.org/wiki/Python_(programming_language)", "https://en.wikipedia.org/wiki/Python_(genus)", "https://en.wikipedia.org/wiki/Python_(mythology)", "https://en.wikipedia.org/wiki/Python_(painter)", "https://en.wikipedia.org/wiki/Python_(Efteling)", "https://en.wikipedia.org/wiki/Python_(automobile_maker)", "https://en.wikipedia.org/wiki/Python_(nuclear_primary)", "https://en.wikipedia.org/wiki/Python_(missile)", "https://en.wikipedia.org/wiki/Python_(codename)", "https://en.wikipedia.org/wiki/Python_(film)", ] with ThreadPoolExecutor(max_workers=len(workers)) as executor: def call_with_round_robin(*args): while not workers: time.sleep(.25) worker = workers.pop(0) result = worker(*args) workers.append(worker) return result all_counts = executor.map(call_with_round_robin, urls)

Two Crucial Advantages: Flexibility and Reproducibility

Runhouse+SkyPilot allows you to write any hyperscale cloud program you can imagine in Python, realizing the vision of Sky-native application development. It’s easy to extend this basic example to scale and cost optimize any which way:

  1. Vertically - bigger nodes and accelerators; greedy mixes of VM types for different size batches; using vectorized or distributed libraries from within your dispatched code (PyTorch, Jax, Ray, Spark, etc.)
  2. Horizontally - more nodes or clusters; using a mix of clouds or Kubernetes; various price models (spot, on-demand, reserve)
  3. Concurrently - more replicas per node; interleaving IO with computation; using map/reduce steps or blob storage to reduce roundtrips
  4. Fault-tolerantly - failure handling, preemption, caching, retries

Our users have seen cost reductions of ~50% out of the gate due to this greater flexibility, while unlocking new use cases through the improved local iteration experience of their workflows.

Using SkyPilot with Runhouse also greatly improves reproducibility by keeping the entire job inside Python. Normally, scaling and cost-optimization add operational complexity, meaning that others may struggle to recreate the setup and launch sequence, or it may be incompatible with certain execution or DevOps flows. Everyone has at some point inherited a “here’s how you run our code” doc or worse, been on a team where only certain people can run the jobs. Reproducibility across execution systems is even more pernicious, where the existing orchestration or DevOps can’t have new steps outside the code proper shoehorned in, or can’t be cleanly extended to new infrastructure (e.g. distributed systems, multi-region, multi-cloud, specific instance types).

Cloud programming through Runhouse+SkyPilot takes advantage of the one invariant which is true across all these execution systems and users: running Python. Because the job fully fits inside Python, it can go anywhere Python can go - containers, notebooks, Airflow, serverless, new laptops, old laptops, IDEs, CI/CD, etc. When a new hire joins the team, running your batch job is just a matter of setting up cloud credentials and running your Python script. When you want to cost-optimize the evaluation step in your ML pipeline by using a different cloud, you pull down the code, send the evaluation to the other cloud, test locally, and then push your changes to prod normally.

Conclusion

Sometimes you don’t need to adopt complex new infrastructure to do your batch processing, you just need native access to bigger compute. Whether you’re processing genomes, images, transactions, or clicks, your cloud parallel processing should be fast, cost-effective, and straightforward. Using SkyPilot with Runhouse is a powerful way to cost-optimize, scale, or simplify your existing batch jobs, or unblock new ones in only a few days.

Stay up to speed 🏃‍♀️📩

Subscribe to our newsletter to receive updates about upcoming Runhouse features and announcements.