Call Modes

Kubetorch provides flexible options for how your remote function and class method calls are executed and how data is transferred between your client and remote compute environments. This includes serialization methods for data transfer and async execution support.

Serialization Methods

When calling remote functions, Kubetorch needs to serialize function arguments. You can control this serialization method using the serialization parameter. The supported serialization formats are "json" and "pickle".

JSON (Default)

JSON is the default serialization format and works well for most common Python types.

import kubetorch as kt remote_fn = kt.fn(my_function).to( compute=kt.Compute(cpus="0.5") ) # Default JSON serialization result = remote_fn(*args, **kwargs) # Explicitly specify JSON result = remote_fn(*args, **kwargs, serialization="json")

Supported types:

  • Basic types: str, int, float, bool, None
  • Collections: list, dict, tuple (converted to list)
  • Nested combinations of the above

Limitations:

  • Cannot serialize custom Python objects
  • Cannot serialize functions, classes, or complex types
  • No support for circular references

Pickle

Pickle serialization supports arbitrary Python objects, making it ideal for more complex data structures or custom classes. When using pickle, data is pickled and then base64-encoded for safe HTTP transport.

import kubetorch as kt remote_fn = kt.fn(my_function).to( compute=kt.Compute(cpus="0.5") ) # Use pickle serialization result = remote_fn(*args, **kwargs, serialization="pickle")

Supported types:

  • All JSON-supported types
  • Custom Python classes and objects
  • NumPy arrays, Pandas DataFrames
  • Most Python built-in types
  • Functions and lambda expressions (with limitations)

Limitations:

  • Larger payload size due to base64 encoding
  • Security considerations (see below)
  • Some objects may not be picklable (e.g., open file handles, database connections)

Pickle Warning: Pickle serialization can execute arbitrary code during deserialization.

Only use pickle serialization when:

  • You trust the source of the serialized data
  • You're working in a controlled environment
  • You need to transfer complex Python objects

For security-sensitive applications, you can restrict which serialization methods are allowed using the KT_ALLOWED_SERIALIZATION environment variable.

Configuration

Restricting Serialization Methods

You can control which serialization methods are allowed on your Kubetorch server using an environment variable:

# Allow both JSON and pickle (default) export KT_ALLOWED_SERIALIZATION="json,pickle" # Allow only JSON (more secure) export KT_ALLOWED_SERIALIZATION="json" # Allow only pickle export KT_ALLOWED_SERIALIZATION="pickle"

If a client attempts to use a serialization method that's not allowed, the server will return a 400 Bad Request error.

Example error:

HTTPException: Serialization format 'pickle' not allowed. Allowed formats: ['json']

How It Works

When you call a remote function with a specific serialization method:

  1. Client side: Arguments are serialized using the specified format
  2. Transport: Data is sent via HTTP POST request
  3. Server side:
    • Server validates the serialization method against KT_ALLOWED_SERIALIZATION
    • If allowed, deserializes the arguments
    • Executes your function
    • Serializes the result using the same format
  4. Client side: Deserializes and returns the result

Async Support

Kubetorch provides rich support for Python's asynchronous programming capabilities. Whether you're working with native async functions or want to control how synchronous functions are executed, Kubetorch handles it seamlessly.

Native Async Functions

Kubetorch automatically detects async functions using Python's async keyword. When you send an async function to your remote cluster, it executes within the environment process's async event loop, not in a separate thread.

Warning: Be careful not to run costly, synchronous code within an async function. Blocking operations can tie up the event loop within your environment. Poorly written async functions won't block the entire Kubetorch daemon, but will block other functions within the same environment.

Example: Basic Async Function

import kubetorch as kt async def async_process(time_to_sleep: int): import asyncio # Simulate async I/O operation await asyncio.sleep(time_to_sleep) return f"Processed after {time_to_sleep} seconds" # Send to Kubetorch remote_fn = kt.fn(async_process).to(compute=kt.Compute(cpus="0.5")) # Must await the call, just like a local async function result = await remote_fn(2)

Client-side, you must await calls to async functions the same way you would if the function were running locally. The network call to the remote cluster executes asynchronously within your local event loop, and the async function itself executes within the async event loop on the remote server.

Example: Async with Real I/O

import kubetorch as kt async def fetch_and_process(url: str): import aiohttp async with aiohttp.ClientSession() as session: async with session.get(url) as response: data = await response.json() # Process data return {"status": response.status, "items": len(data)} remote_fn = kt.fn(fetch_and_process).to(compute=kt.Compute(cpus="1")) # Await the async remote call result = await remote_fn(url="https://api.example.com/data")

Running Sync Functions as Async Locally

Sometimes you have synchronous functions running on a remote Kubetorch cluster, but you don't want your local code to block on the network I/O involved in communicating with the cluster. This is especially useful when the function takes a long time to execute.

You can make synchronous remote functions return coroutines locally by using the run_async=True parameter. Note that this means your local code must use async primitives, even though the function itself is defined as synchronous.

Example: Sync Function with Async Local Execution

import kubetorch as kt def synchronous_process(time_to_process: int): import time # This is a synchronous function time.sleep(time_to_process) return f"Processed for {time_to_process} seconds" remote_fn = kt.fn(synchronous_process).to(compute=kt.Compute(cpus="0.5")) # Call with run_async=True to get a coroutine result = await remote_fn(5, run_async=True)

On the remote cluster, the function still runs in a thread (since it's synchronous), but the call to it locally is asynchronous and uses an async HTTP client.

Example: Non-blocking with create_task

You can use asyncio.create_task() to avoid blocking your code on execution:

import kubetorch as kt import asyncio remote_fn = kt.fn(synchronous_process).to(compute=kt.Compute(cpus="0.5")) # Create task without blocking task1 = asyncio.create_task(remote_fn(3, run_async=True)) task2 = asyncio.create_task(remote_fn(5, run_async=True)) # Do other work... # Await when you need the results result1 = await task1 result2 = await task2

You can still call the same function synchronously if needed:

# Synchronous call (blocks until complete) result = remote_fn(2)

Running Async Functions as Sync Locally

This is useful when you're writing async code specifically to take advantage of Kubetorch's async server performance, but want to make synchronous calls from your client. Your async function runs efficiently on the server's event loop, while you call it like a normal synchronous function locally.

Example: Async Function with Sync Local Execution

import kubetorch as kt async def async_process(time_to_sleep: int): import asyncio await asyncio.sleep(time_to_sleep) return time_to_sleep remote_fn = kt.fn(async_process).to(compute=kt.Compute(cpus="0.5")) # Call async function synchronously (no await needed) result = remote_fn(2, run_async=False)

This allows you to write high-performance async code for the server while keeping your client code simple and synchronous.

How Kubetorch Detects Async Functions

Kubetorch uses Python's inspect.iscoroutinefunction() to automatically detect whether your function is async:

import inspect # Kubetorch checks this internally is_async = inspect.iscoroutinefunction(user_method) if is_async: result = await user_method(*args, **kwargs) else: # Run sync method in thread pool to avoid blocking result = await run_in_executor_with_context(None, lambda: user_method(*args, **kwargs))

Call Mode Summary

Function TypeDefault BehaviorWith run_async=TrueWith run_async=False
def (sync)Returns result directlyReturns coroutine to awaitReturns result directly
async def (async)Returns coroutine to awaitReturns coroutine to awaitReturns result directly

Best Practices

  1. Use native async for I/O-bound operations: If your function does network requests, file I/O, or database operations, define it as async for best performance
  2. Keep CPU-intensive work synchronous: Don't make CPU-bound functions async - they'll block the event loop
  3. Avoid blocking calls in async functions: Never use time.sleep(), requests.get(), or other blocking calls inside async functions. Use async equivalents like asyncio.sleep() and aiohttp
  4. Use run_async=True for long-running functions: When calling long-running sync functions remotely, use run_async=True to avoid blocking your local code

Complete Example: Mixing Sync and Async

import kubetorch as kt import asyncio # CPU-intensive synchronous function def train_model(epochs: int): import time for i in range(epochs): time.sleep(0.5) # Simulate training return {"loss": 0.123, "accuracy": 0.95} # I/O-bound async function async def fetch_training_data(dataset_url: str): import aiohttp async with aiohttp.ClientSession() as session: async with session.get(dataset_url) as response: return await response.json() # Send both to Kubetorch train_fn = kt.fn(train_model).to(compute=kt.Compute(gpus="1")) fetch_fn = kt.fn(fetch_training_data).to(compute=kt.Compute(cpus="0.5")) # Fetch data asynchronously while training runs async def run_workflow(): # Start training (sync function, but don't block locally) training_task = asyncio.create_task(train_fn(10, run_async=True)) # Fetch data asynchronously data = await fetch_fn("https://api.example.com/dataset") # Wait for training to complete model = await training_task return {"model": model, "data": data} # Run the workflow result = await run_workflow()