Async
Kubetorch provides comprehensive support for Python's asynchronous programming. You can use async patterns for both deployment (launching services) and calls (invoking remote functions).
Async Deployment with to_async
By default, .to() blocks until the service is deployed and ready. Use .to_async() to deploy
services without blocking, which is useful when launching multiple services concurrently.
import kubetorch as kt import asyncio async def deploy_services(): compute = kt.Compute(cpus="0.5") # Deploy multiple services concurrently fn1, fn2, fn3 = await asyncio.gather( kt.fn(process_data).to_async(compute), kt.fn(train_model).to_async(compute), kt.fn(evaluate_model).to_async(compute), ) return fn1, fn2, fn3 # Run deployment fn1, fn2, fn3 = asyncio.run(deploy_services())
The to_async() method accepts the same parameters as to():
remote_cls = await kt.cls(MyClass, name="my-service").to_async( kt.Compute(cpus="1", gpus="1"), init_args={"config": config}, stream_logs=True, get_if_exists=True, )
Async Calls with async_=True
By default, all remote function calls are synchronous - they block until the result is returned.
Use async_=True to make calls return coroutines instead.
Per-Call Async
import kubetorch as kt remote_fn = kt.fn(my_function).to(kt.Compute(cpus="0.5")) # Synchronous (default) - blocks until complete result = remote_fn(arg1, arg2) # Async - returns coroutine result = await remote_fn(arg1, arg2, async_=True)
Setting Async Mode on the Function
You can set the async_ property to make all calls async by default:
remote_fn.async_ = True # Now all calls return coroutines result = await remote_fn(arg1, arg2) # Override for a specific call result = remote_fn(arg1, arg2, async_=False) # Blocks
Concurrent Calls
The main benefit of async calls is running multiple remote operations concurrently:
import kubetorch as kt import asyncio remote_fn = kt.fn(slow_process).to(kt.Compute(cpus="0.5")) async def run_concurrent(): # Launch 3 calls concurrently results = await asyncio.gather( remote_fn(1, async_=True), remote_fn(2, async_=True), remote_fn(3, async_=True), ) return results # All 3 calls run in parallel results = asyncio.run(run_concurrent())
Using create_task for Fire-and-Forget
async def run_workflow(): # Start long-running task without waiting training_task = asyncio.create_task(train_fn(epochs=100, async_=True)) # Do other work while training runs data = await fetch_fn(url, async_=True) # Wait for training when needed model = await training_task return model, data
Server-Side Async Execution
Kubetorch automatically detects whether your function is async and executes it appropriately on the server:
async deffunctions: Run directly in the server's async event loopdeffunctions: Run in a thread pool to avoid blocking the event loop
# This runs in the async event loop on the server async def fetch_data(url: str): import aiohttp async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.json() # This runs in a thread pool on the server def compute_heavy(data): import numpy as np return np.sum(data ** 2)
Important: This server-side behavior is independent of the async_=True parameter. The async_
parameter only controls whether the client-side HTTP call is async - it doesn't affect how your
function runs on the server.
Client vs Server Async Behavior
| Aspect | Client Side (async_=) | Server Side (function definition) |
|---|---|---|
| Controls | Whether HTTP call is async | Whether function runs in event loop or thread pool |
| Default | False (sync) | Auto-detected from function signature |
| Override | Per-call or via fn.async_ | N/A |
Both sync and async functions behave identically from the client's perspective - the difference is in server-side performance characteristics.
Best Practices
Use async for I/O-bound server code
If your function does network requests, file I/O, or database operations, define it as async def
to run efficiently in the server's event loop:
async def fetch_and_process(urls: list): import aiohttp async with aiohttp.ClientSession() as session: tasks = [session.get(url) for url in urls] responses = await asyncio.gather(*tasks) return [await r.json() for r in responses]
Keep CPU-intensive work synchronous
Don't make CPU-bound functions async - they'll block the server's event loop:
# Good: sync function for CPU work def train_model(data, epochs): import torch # Heavy computation runs in thread pool for epoch in range(epochs): ... return model # Bad: async function that blocks event loop async def train_model(data, epochs): # Don't do this! import torch for epoch in range(epochs): # Blocks event loop! ... return model
Avoid blocking calls in async functions
Never use blocking calls inside async functions:
# Bad async def bad_example(): import time import requests time.sleep(5) # Blocks event loop! data = requests.get(url) # Blocks event loop! return data # Good async def good_example(): import asyncio import aiohttp await asyncio.sleep(5) async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.json()
Use async_=True for concurrent client calls
When making multiple remote calls, use async_=True with asyncio.gather() to avoid blocking:
async def process_batch(items): results = await asyncio.gather(*[ remote_fn(item, async_=True) for item in items ]) return results
Complete Example
import kubetorch as kt import asyncio # CPU-intensive sync function def train_model(epochs: int): import time for i in range(epochs): time.sleep(0.5) # Simulate training return {"loss": 0.123, "accuracy": 0.95} # I/O-bound async function async def fetch_training_data(dataset_url: str): import aiohttp async with aiohttp.ClientSession() as session: async with session.get(dataset_url) as response: return await response.json() async def main(): compute = kt.Compute(cpus="1") # Deploy both services concurrently train_fn, fetch_fn = await asyncio.gather( kt.fn(train_model).to_async(compute), kt.fn(fetch_training_data).to_async(compute), ) # Run training and data fetch concurrently model, data = await asyncio.gather( train_fn(epochs=10, async_=True), fetch_fn("https://api.example.com/dataset", async_=True), ) return {"model": model, "data": data} result = asyncio.run(main())