Compute

The Compute class lets you specify the right resources to request for your workloads, and control how that compute behaves.

Compute Class

class kubetorch.Compute(cpus: str | int = None, memory: str = None, disk_size: str = None, gpus: str | int = None, queue: str = None, priority_class_name: str = None, gpu_type: str = None, gpu_memory: str = None, concurrency: int = None, namespace: str = None, image: Image = None, labels: Dict = None, annotations: Dict = None, service_template: Dict = None, tolerations: List[Dict] = None, env_vars: Dict = None, secrets: List[str | Secret] = None, freeze: bool = False, kubeconfig_path: str = None, service_account_name: str = None, image_pull_policy: str = None, inactivity_ttl: str = None, gpu_anti_affinity: bool = None, launch_timeout: int = None, working_dir: str = None, shared_memory_limit: str = None, allowed_serialization: List[str] | None = None, replicas: int = 1)
__init__(cpus: str | int = None, memory: str = None, disk_size: str = None, gpus: str | int = None, queue: str = None, priority_class_name: str = None, gpu_type: str = None, gpu_memory: str = None, concurrency: int = None, namespace: str = None, image: Image = None, labels: Dict = None, annotations: Dict = None, service_template: Dict = None, tolerations: List[Dict] = None, env_vars: Dict = None, secrets: List[str | Secret] = None, freeze: bool = False, kubeconfig_path: str = None, service_account_name: str = None, image_pull_policy: str = None, inactivity_ttl: str = None, gpu_anti_affinity: bool = None, launch_timeout: int = None, working_dir: str = None, shared_memory_limit: str = None, allowed_serialization: List[str] | None = None, replicas: int = 1)

Initialize the compute requirements for a Kubetorch service.

Parameters:
  • cpus (str, int, optional) – CPU resource request. Can be specified in cores (“1.0”) or millicores (“1000m”).

  • memory (str, optional) – Memory resource request. Can use binary (Ki, Mi, Gi) or decimal (K, M, G) units.

  • disk_size (str, optional) – Ephemeral storage request. Uses same format as memory.

  • gpus (str or int, optional) – Number of GPUs to request. Fractional GPUs not currently supported.

  • gpu_type (str, optional) – GPU type to request. Corresponds to the “nvidia.com/gpu.product” label on the node (if GPU feature discovery is enabled), or a full string like “nvidia.com/gpu.product: L4” can be passed, which will be used to set a nodeSelector on the service. More info below.

  • gpu_memory (str, optional) – GPU memory request (e.g., “4Gi”). Will still request whole GPU but limit memory usage.

  • queue (str, optional) – Name of the Kubernetes queue that will be responsible for placing the service’s pods onto nodes. Controls how cluster resources are allocated and prioritized for this service. Pods will be scheduled according to the quota, priority, and limits configured for the queue.

  • priority_class_name (str, optional) – Name of the Kubernetes priority class to use for the service. If not specified, the default priority class will be used.

  • concurrency (int, optional) – Number of concurrent requests to handle per pod at once before queueing up and/or autoscaling.

  • namespace (str, optional) – Kubernetes namespace. Defaults to global config default, or “default”.

  • image (Image, optional) – Kubetorch image configuration. See Image for more details.

  • labels (Dict, optional) – Kubernetes labels to apply to the service.

  • annotations (Dict, optional) – Kubernetes annotations to apply to the service.

  • service_template (Dict, optional) – Nested dictionary of service template arguments to apply to the service. E.g. {"spec": {"template": {"spec": {"nodeSelector": {"node.kubernetes.io/instance-type": "g4dn.xlarge"}}}}}

  • tolerations (List[Dict], optional) – Kubernetes tolerations to apply to the service. Each toleration should be a dictionary with keys like “key”, “operator”, “value”, and “effect”. More info here.

  • env_vars (Dict, optional) – Environment variables to set in containers.

  • secrets (List[Union[str, Secret]], optional) – Secrets to mount or expose.

  • freeze (bool, optional) – Whether to freeze the compute configuration (e.g. for production).

  • kubeconfig_path (str, optional) – Path to local kubeconfig file used for cluster authentication.

  • service_account_name (str, optional) – Kubernetes service account to use.

  • image_pull_policy (str, optional) – Container image pull policy. More info here.

  • inactivity_ttl (str, optional) – Time-to-live after inactivity. Once hit, the service will be destroyed. Values below 1m may cause premature deletion.

  • gpu_anti_affinity (bool, optional) – Whether to prevent scheduling the service on a GPU, should no GPUs be requested. Can also control globally by setting the KT_GPU_ANTI_AFFINITY environment variable. (Default: False)

  • launch_timeout (int, optional) – Determines how long to wait for the service to ready before giving up. If not specified, will wait {serving_constants.KT_LAUNCH_TIMEOUT} seconds. Note: you can also control this timeout globally by setting the KT_LAUNCH_TIMEOUT environment variable.

  • replicas (int, optional) – Number of replicas to create for deployment-based services. Can also be set via the .distribute(workers=N) method for distributed training. (Default: 1)

  • working_dir (str, optional) – Working directory to use inside the remote images. Must be an absolute path (e.g. /kt)

  • shared_memory_limit (str, optional) – Maximum size of the shared memory filesystem (/dev/shm) available to each pod created by the service. Value should be a Kubernetes quantity string, for example: “512Mi”, “2Gi”, “1G”, “1024Mi”, “100M”. If not provided, /dev/shm will default to the pod’s memory limit (if set) or up to half the node’s RAM.

Note

Resource Specification Formats:

CPUs:
  • Decimal core count: “0.5”, “1.0”, “2.0”

  • Millicores: “500m”, “1000m”, “2000m”

Memory:
  • Bytes: “1000000”

  • Binary units: “1Ki”, “1Mi”, “1Gi”, “1Ti”

  • Decimal units: “1K”, “1M”, “1G”, “1T”

GPU Specifications:
  1. gpus for whole GPUs: “1”, “2”

  2. gpu_memory: “$Gi”, “16Gi”

Disk Size:
  • Same format as memory

Note

  • Memory/disk values are case sensitive (Mi != mi)

  • When using gpu_memory, a whole GPU is still requested but memory is limited

Examples:

import kubetorch as kt # Basic CPU/Memory request compute = kt.Compute(cpus="0.5", memory="2Gi") # GPU request with memory limit compute = kt.Compute(gpu_memory="4Gi", cpus="1.0") # Multiple whole GPUs compute = kt.Compute(gpus="2", memory="16Gi")
property client_port: int

Client port.

Returns the existing port if this service already has a port forward, otherwise creates a new unique client port number.

property requested_resources

Requested resources.

property autoscaling_config

Autoscaling config.

property deployment_mode

Determine deployment mode based on distributed config and autoscaling.

property server_image

Base server image.

property install_otel_dependencies: bool

Install OTEL dependencies at runtime only if tracing is enabled and a custom image is specified.

property gpu_annotations

GPU annotations.

property tolerations

Tolerations added to the pod template for workloads that require a GPU.

pod_names()

Returns a list of pod names.

is_up()

Whether the pods are running.

pip_install(reqs: List[str] | str, node: str | None = None, override_remote_version: bool = False)

Pip install reqs onto compute pod(s).

sync_package(package: str, node: str | None = None)

Sync package (locally installed, or path to package) to compute pod(s).

run_bash(commands, node: str | List[str] | None = None, container: str | None = None)

Run bash commands on the pod(s).

rsync(source: str | List[str], dest: str | None = None, local_port: int | None = None, contents: bool = False, filter_options: str | None = None, force: bool = False)

Rsync from local to the rsync pod.

async rsync_async(source: str | List[str], dest: str | None = None, local_port: int | None = None, contents: bool = False, filter_options: str | None = None, force: bool = False)

Async version of rsync. Rsync from local to the rsync pod.

rsync_in_cluster(source: str | List[str], dest: str | None = None, contents: bool = False, filter_options: str | None = None, force: bool = False)

Rsync from inside the cluster to the rsync pod.

async rsync_in_cluster_async(source: str | List[str], dest: str | None = None, contents: bool = False, filter_options: str | None = None, force: bool = False)

Async version of rsync_in_cluster. Rsync from inside the cluster to the rsync pod.

config()

Config for the compute. Pods must be up to run this.

Autoscaling

Compute.autoscale(**kwargs)

Configure the service with the provided autoscaling parameters.

You can pass any of the following keyword arguments:

Parameters:
  • target (int) – The concurrency/RPS target per pod (default: 100).

  • window (str) – Time window for scaling decisions, e.g. “60s” (default: “60s”).

  • metric (str) – “concurrency” or “rps” (default: “concurrency”).

  • target_utilization (int) – Utilization % to trigger scaling (default: 70).

  • min_scale (int) – Minimum number of replicas (default: None).

  • max_scale (int) – Maximum number of replicas (default: 0 = unlimited).

  • initial_scale (int) – Initial number of pods (default: 1).

  • scale_to_zero_grace_period (int) – Minutes before scaling to zero if no activity is registered (default: None).

  • **extra_annotations – Additional Knative autoscaling annotations.

Note

The service will be deployed as a Knative service.

Examples:

import kubetorch as kt remote_fn = kt.fn(my_fn_obj).to( kt.Compute( cpus=".1", ).autoscale(min_replicas=1) ) remote_fn = kt.fn(summer).to( compute=kt.Compute( cpus=".01", ).autoscale(min_scale=3, scale_to_zero_grace_period=50), )

Distributed

Compute.distribute(distribution_type: str | None = None, workers: int | None = None, quorum_timeout: int | None = None, **kwargs)

Configure the distributed worker compute needed by each service replica.

Parameters:
  • distribution_type (str) – The type of distributed supervisor to create. Options: "ray", "pytorch", "jax", "tensorflow", or None for generic SPMD.

  • workers (int) – Int representing the number of workers to create, with identical compute resources to the service compute. Or List of <int, Compute> pairs specifying the number of workers and the compute resources for each worker StatefulSet.

  • quorum_timeout (int, optional) – Timeout in seconds for workers to become ready and join the cluster. Defaults to launch_timeout if not provided, for both SPMD frameworks and for Ray. Increase this if workers need more time to start (e.g., during node autoscaling or loading down data during initialization).

  • **kwargs – Additional framework-specific parameters (e.g., num_proc, port).

Note

List of <int, Compute> pairs is not yet supported for workers.

Examples:

import kubetorch as kt remote_fn = kt.fn(simple_summer, service_name).to( kt.Compute( cpus="2", memory="4Gi", image=kt.Image(image_id="rayproject/ray"), launch_timeout=300, ).distribute("ray", workers=2) ) gpus = kt.Compute( gpus=1, image=kt.Image(image_id="nvcr.io/nvidia/pytorch:23.10-py3"), launch_timeout=600, inactivity_ttl="4h", ).distribute("pytorch", workers=4)