Async

Kubetorch provides comprehensive support for Python's asynchronous programming. You can use async patterns for both deployment (launching services) and calls (invoking remote functions).

Async Deployment with to_async

By default, .to() blocks until the service is deployed and ready. Use .to_async() to deploy services without blocking, which is useful when launching multiple services concurrently.

import kubetorch as kt import asyncio async def deploy_services(): compute = kt.Compute(cpus="0.5") # Deploy multiple services concurrently fn1, fn2, fn3 = await asyncio.gather( kt.fn(process_data).to_async(compute), kt.fn(train_model).to_async(compute), kt.fn(evaluate_model).to_async(compute), ) return fn1, fn2, fn3 # Run deployment fn1, fn2, fn3 = asyncio.run(deploy_services())

The to_async() method accepts the same parameters as to():

remote_cls = await kt.cls(MyClass, name="my-service").to_async( kt.Compute(cpus="1", gpus="1"), init_args={"config": config}, stream_logs=True, get_if_exists=True, )

Async Calls with async_=True

By default, all remote function calls are synchronous - they block until the result is returned. Use async_=True to make calls return coroutines instead.

Per-Call Async

import kubetorch as kt remote_fn = kt.fn(my_function).to(kt.Compute(cpus="0.5")) # Synchronous (default) - blocks until complete result = remote_fn(arg1, arg2) # Async - returns coroutine result = await remote_fn(arg1, arg2, async_=True)

Setting Async Mode on the Function

You can set the async_ property to make all calls async by default:

remote_fn.async_ = True # Now all calls return coroutines result = await remote_fn(arg1, arg2) # Override for a specific call result = remote_fn(arg1, arg2, async_=False) # Blocks

Concurrent Calls

The main benefit of async calls is running multiple remote operations concurrently:

import kubetorch as kt import asyncio remote_fn = kt.fn(slow_process).to(kt.Compute(cpus="0.5")) async def run_concurrent(): # Launch 3 calls concurrently results = await asyncio.gather( remote_fn(1, async_=True), remote_fn(2, async_=True), remote_fn(3, async_=True), ) return results # All 3 calls run in parallel results = asyncio.run(run_concurrent())

Using create_task for Fire-and-Forget

async def run_workflow(): # Start long-running task without waiting training_task = asyncio.create_task(train_fn(epochs=100, async_=True)) # Do other work while training runs data = await fetch_fn(url, async_=True) # Wait for training when needed model = await training_task return model, data

Server-Side Async Execution

Kubetorch automatically detects whether your function is async and executes it appropriately on the server:

  • async def functions: Run directly in the server's async event loop
  • def functions: Run in a thread pool to avoid blocking the event loop
# This runs in the async event loop on the server async def fetch_data(url: str): import aiohttp async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.json() # This runs in a thread pool on the server def compute_heavy(data): import numpy as np return np.sum(data ** 2)

Important: This server-side behavior is independent of the async_=True parameter. The async_ parameter only controls whether the client-side HTTP call is async - it doesn't affect how your function runs on the server.

Client vs Server Async Behavior

AspectClient Side (async_=)Server Side (function definition)
ControlsWhether HTTP call is asyncWhether function runs in event loop or thread pool
DefaultFalse (sync)Auto-detected from function signature
OverridePer-call or via fn.async_N/A

Both sync and async functions behave identically from the client's perspective - the difference is in server-side performance characteristics.

Best Practices

Use async for I/O-bound server code

If your function does network requests, file I/O, or database operations, define it as async def to run efficiently in the server's event loop:

async def fetch_and_process(urls: list): import aiohttp async with aiohttp.ClientSession() as session: tasks = [session.get(url) for url in urls] responses = await asyncio.gather(*tasks) return [await r.json() for r in responses]

Keep CPU-intensive work synchronous

Don't make CPU-bound functions async - they'll block the server's event loop:

# Good: sync function for CPU work def train_model(data, epochs): import torch # Heavy computation runs in thread pool for epoch in range(epochs): ... return model # Bad: async function that blocks event loop async def train_model(data, epochs): # Don't do this! import torch for epoch in range(epochs): # Blocks event loop! ... return model

Avoid blocking calls in async functions

Never use blocking calls inside async functions:

# Bad async def bad_example(): import time import requests time.sleep(5) # Blocks event loop! data = requests.get(url) # Blocks event loop! return data # Good async def good_example(): import asyncio import aiohttp await asyncio.sleep(5) async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.json()

Use async_=True for concurrent client calls

When making multiple remote calls, use async_=True with asyncio.gather() to avoid blocking:

async def process_batch(items): results = await asyncio.gather(*[ remote_fn(item, async_=True) for item in items ]) return results

Complete Example

import kubetorch as kt import asyncio # CPU-intensive sync function def train_model(epochs: int): import time for i in range(epochs): time.sleep(0.5) # Simulate training return {"loss": 0.123, "accuracy": 0.95} # I/O-bound async function async def fetch_training_data(dataset_url: str): import aiohttp async with aiohttp.ClientSession() as session: async with session.get(dataset_url) as response: return await response.json() async def main(): compute = kt.Compute(cpus="1") # Deploy both services concurrently train_fn, fetch_fn = await asyncio.gather( kt.fn(train_model).to_async(compute), kt.fn(fetch_training_data).to_async(compute), ) # Run training and data fetch concurrently model, data = await asyncio.gather( train_fn(epochs=10, async_=True), fetch_fn("https://api.example.com/dataset", async_=True), ) return {"model": model, "data": data} result = asyncio.run(main())