Compute

The Compute class lets you specify the right resources to request for your workloads, and control how that compute behaves.

Compute Class

class kubetorch.Compute(cpus: str | int = None, memory: str = None, disk_size: str = None, gpus: str | int = None, gpu_type: str = None, priority_class_name: str = None, gpu_memory: str = None, namespace: str = None, image: Image = None, labels: Dict = None, annotations: Dict = None, volumes: List[str | Volume] = None, node_selector: Dict = None, service_template: Dict = None, tolerations: List[Dict] = None, env_vars: Dict = None, secrets: List[str | Secret] = None, freeze: bool = False, kubeconfig_path: str = None, service_account_name: str = None, image_pull_policy: str = None, inactivity_ttl: str = None, gpu_anti_affinity: bool = None, launch_timeout: int = None, working_dir: str = None, shared_memory_limit: str = None, allowed_serialization: List[str] | None = None, replicas: int = None, logging_config: LoggingConfig = None, queue_name: str = None, selector: Dict[str, str] = None, endpoint: Endpoint = None, _skip_template_init: bool = False)
__init__(cpus: str | int = None, memory: str = None, disk_size: str = None, gpus: str | int = None, gpu_type: str = None, priority_class_name: str = None, gpu_memory: str = None, namespace: str = None, image: Image = None, labels: Dict = None, annotations: Dict = None, volumes: List[str | Volume] = None, node_selector: Dict = None, service_template: Dict = None, tolerations: List[Dict] = None, env_vars: Dict = None, secrets: List[str | Secret] = None, freeze: bool = False, kubeconfig_path: str = None, service_account_name: str = None, image_pull_policy: str = None, inactivity_ttl: str = None, gpu_anti_affinity: bool = None, launch_timeout: int = None, working_dir: str = None, shared_memory_limit: str = None, allowed_serialization: List[str] | None = None, replicas: int = None, logging_config: LoggingConfig = None, queue_name: str = None, selector: Dict[str, str] = None, endpoint: Endpoint = None, _skip_template_init: bool = False)

Initialize the compute requirements for a Kubetorch service.

Parameters:
  • cpus (str, int, optional) – CPU resource request. Can be specified in cores (“1.0”) or millicores (“1000m”).

  • memory (str, optional) – Memory resource request. Can use binary (Ki, Mi, Gi) or decimal (K, M, G) units.

  • disk_size (str, optional) – Ephemeral storage request. Uses same format as memory.

  • gpus (str or int, optional) – Number of GPUs to request. Fractional GPUs not currently supported.

  • gpu_type (str, optional) – GPU type to request. Corresponds to the “nvidia.com/gpu.product” label on the node (if GPU feature discovery is enabled), or a full string like “nvidia.com/gpu.product: L4” can be passed, which will be used to set a nodeSelector on the service. More info below.

  • gpu_memory (str, optional) – GPU memory request (e.g., “4Gi”). Will still request whole GPU but limit memory usage.

  • priority_class_name (str, optional) – Name of the Kubernetes priority class to use for the service. If not specified, the default priority class will be used.

  • namespace (str, optional) – Kubernetes namespace. Defaults to global config default, or “default”.

  • image (Image, optional) – Kubetorch image configuration. See Image for more details.

  • labels (Dict, optional) – Kubernetes labels to apply to the service.

  • annotations (Dict, optional) – Kubernetes annotations to apply to the service.

  • volumes (List[Union[str or Volume]], optional) – Volumes to attach to the service. Can be specified as a list of volume names (strings) or Volume objects. If using strings, they must be the names of existing PersistentVolumeClaims (PVCs) in the specified namespace.

  • node_selector (Dict, optional) – Kubernetes node selector to constrain pods to specific nodes. Should be a dictionary of key-value pairs, e.g. {“node.kubernetes.io/instance-type”: “g4dn.xlarge”}.

  • service_template (Dict, optional) – Nested dictionary of service template arguments to apply to the service. E.g. {"spec": {"template": {"spec": {"nodeSelector": {"node.kubernetes.io/instance-type": "g4dn.xlarge"}}}}}

  • tolerations (List[Dict], optional) – Kubernetes tolerations to apply to the service. Each toleration should be a dictionary with keys like “key”, “operator”, “value”, and “effect”. More info here.

  • env_vars (Dict, optional) – Environment variables to set in containers.

  • secrets (List[Union[str, Secret]], optional) – Secrets to mount or expose.

  • freeze (bool, optional) – Whether to freeze the compute configuration (e.g. for production).

  • kubeconfig_path (str, optional) – Path to local kubeconfig file used for cluster authentication.

  • service_account_name (str, optional) – Kubernetes service account to use.

  • image_pull_policy (str, optional) – Container image pull policy. More info here.

  • inactivity_ttl (str, optional) – Time-to-live after inactivity. Once hit, the service will be destroyed. Values below 1m may cause premature deletion.

  • gpu_anti_affinity (bool, optional) – Whether to prevent scheduling the service on a GPU, should no GPUs be requested. Can also control globally by setting the KT_GPU_ANTI_AFFINITY environment variable. (Default: False)

  • launch_timeout (int, optional) – Determines how long to wait for the service to ready before giving up. If not specified, will wait {provisioning_constants.KT_LAUNCH_TIMEOUT} seconds. Note: you can also control this timeout globally by setting the KT_LAUNCH_TIMEOUT environment variable.

  • replicas (int, optional) – Number of replicas to create for deployment-based services. Can also be set via the .distribute(workers=N) method for distributed training. If not specified, defaults to 1 for new manifests. (Default: None)

  • working_dir (str, optional) – Working directory to use inside the remote images. Must be an absolute path (e.g. /kt)

  • shared_memory_limit (str, optional) – Maximum size of the shared memory filesystem (/dev/shm) available to each pod created by the service. Value should be a Kubernetes quantity string, for example: “512Mi”, “2Gi”, “1G”, “1024Mi”, “100M”. If not provided, /dev/shm will default to the pod’s memory limit (if set) or up to half the node’s RAM.

  • logging_config (LoggingConfig, optional) – Configuration for logging behavior on this service. Controls log level, streaming options, and grace periods. See LoggingConfig for details.

  • queue_name (str, optional) – Kueue LocalQueue name for GPU scheduling. When set, adds the kueue.x-k8s.io/queue-name label to the pod template metadata. For training jobs (PyTorchJob, TFJob, etc.), also sets spec.runPolicy.suspend: true so Kueue can manage admission. Requires Kueue to be installed in the cluster.

  • selector (Dict[str, str], optional) – Label selector to identify pods that belong to this compute. Use this when you’ve already applied your own K8s manifest (e.g., via kubectl) and want to deploy functions to those pods. Example: {"app": "workers", "team": "ml"}.

  • endpoint (Endpoint, optional) – Custom endpoint configuration for routing calls to pods. Use this to specify your own URL (skip Service creation) or a custom selector (route to subset of pool). See Endpoint for details.

Note

CPUs:
  • Decimal core count: “0.5”, “1.0”, “2.0”

  • Millicores: “500m”, “1000m”, “2000m”

Memory:
  • Bytes: “1000000”

  • Binary units: “1Ki”, “1Mi”, “1Gi”, “1Ti”

  • Decimal units: “1K”, “1M”, “1G”, “1T”

GPU Specifications:
  1. gpus for whole GPUs: “1”, “2”

  2. gpu_memory: “$Gi”, “16Gi”

Disk Size:
  • Same format as memory

Note

  • Memory/disk values are case sensitive (Mi != mi)

  • When using gpu_memory, a whole GPU is still requested but memory is limited

Examples:

import kubetorch as kt # Basic CPU/Memory request compute = kt.Compute(cpus="0.5", memory="2Gi") # GPU request with memory limit compute = kt.Compute(gpu_memory="4Gi", cpus="1.0") # Multiple whole GPUs compute = kt.Compute(gpus="2", memory="16Gi")
classmethod from_template(service_info: dict)

Create a Compute object from a deployed Kubernetes resource.

classmethod from_manifest(manifest: Dict | str, selector: Dict[str, str] | None = None, endpoint: Endpoint | None = None)

Create a Compute instance from a user-provided Kubernetes manifest.

Use this when you have an existing K8s deployment and want to deploy kubetorch functions to it. The manifest can be one you’ve already applied via kubectl, or one you want kubetorch to apply for you.

Parameters:
  • manifest – Kubernetes manifest dict or path to YAML file

  • selector – Label selector to identify pods belonging to this compute. If not provided, uses the manifest’s spec.selector.matchLabels. Example: {“app”: “my-workers”, “team”: “ml”}

  • endpoint – Custom endpoint configuration for routing calls to pods. Use Endpoint(url="...") for your own Service/Ingress, or Endpoint(selector={...}) to route to a subset of pool pods.

Returns:

Compute instance

Examples:

import kubetorch as kt # Basic usage - manifest already applied via kubectl compute = kt.Compute.from_manifest( manifest=my_manifest, selector={"app": "my-workers"} ) # Deploy a function to the existing pods remote_fn = kt.fn(my_func).to(compute) result = remote_fn(1, 2) # With custom endpoint URL (user-created service) compute = kt.Compute.from_manifest( manifest=my_manifest, selector={"app": "my-workers"}, endpoint=kt.Endpoint(url="my-svc.my-ns.svc.cluster.local:8080") ) # Route to subset of pool (e.g., head node only) compute = kt.Compute.from_manifest( manifest=ray_manifest, selector={"app": "ray"}, # Pool: all ray pods endpoint=kt.Endpoint(selector={"app": "ray", "role": "head"}) # Route: head only )
property manifest

Get the current resource manifest.

property selector_only: bool

Check if this compute is in selector-only mode.

Selector-only mode is when the user provides a pod selector to identify existing running pods, without providing a manifest or resource requirements. In this mode, Kubetorch just registers the pool and routes calls to the pods.

property logging_config: LoggingConfig

Get the logging configuration for this compute.

property pod_spec

Get the pod spec from the manifest.

property shm_size_limit

Use shared_memory_limit instead.

Type:

Deprecated

property kind

Get the manifest kind, defaulting to ‘Deployment’ if not set.

property freeze

Get freeze setting. Set in setup script, not manifest env vars.

property allowed_serialization

Get allowed_serialization. Flows to pods via WebSocket runtime config.

property autoscaling_config

Get autoscaling config from manifest or stored value.

property distributed_config

Get the distributed config for this compute.

Distributed config flows to pods via WebSocket metadata from the controller.

property user_annotations

Get user-provided annotations (excludes kubetorch internal annotations).

property user_labels

Get user-provided labels (excludes kubetorch internal labels).

property pod_template

Get the pod template from the manifest (includes metadata and spec).

add_labels(labels: Dict)

Add or update labels in the manifest metadata.

Parameters:

labels (Dict) – Dictionary of labels to add or update.

add_pod_template_labels(labels: Dict, remove_keys: List[str] | None = None)

Add or update labels in the pod template metadata.

This is useful for labels that need to be on the pod itself, such as Kueue queue labels (kueue.x-k8s.io/queue-name).

Parameters:
  • labels (Dict) – Dictionary of labels to add or update.

  • remove_keys (List[str], optional) – List of label keys to remove.

add_annotations(annotations: Dict)

Add or update annotations in the manifest metadata.

Parameters:

annotations (Dict) – Dictionary of annotations to add or update.

add_tolerations(tolerations: List[Dict])

Add or update tolerations in the pod spec.

Parameters:

tolerations (List[Dict]) – List of toleration dictionaries to add or update. Each toleration should have keys like “key”, “operator”, “value”, and “effect”.

add_env_vars(env_vars: Dict)

Add or update environment variables in the container spec.

Parameters:

env_vars (Dict) – Dictionary of environment variables to add or update. Existing env vars with the same key will be overridden.

pod_names()

Returns a list of pod names.

is_up()

Whether the pods are running.

pip_install(reqs: List[str] | str, node: str | None = None, override_remote_version: bool = False)

Pip install reqs onto compute pod(s).

sync_package(package: str, node: str | None = None)

Sync package (locally installed, or path to package) to compute pod(s).

run_bash(commands, node: str | List[str] | None = None, container: str | None = None)

Run bash commands on the pod(s).

Autoscaling

Compute.autoscale(**kwargs)

Configure the service with the provided autoscaling parameters.

You can pass any of the following keyword arguments:

Parameters:
  • target (int) – The concurrency/RPS/CPU/memory target per pod.

  • window (str) – Time window for scaling decisions, e.g. “60s”.

  • metric (str) – Metric to scale on: “concurrency”, “rps”, “cpu”, “memory” or custom. Note: “cpu” and “memory” require autoscaler_class=”hpa.autoscaling.knative.dev”.

  • target_utilization (int) – Utilization % to trigger scaling (1-100).

  • min_scale (int) – Minimum number of replicas. 0 allows scale to zero.

  • max_scale (int) – Maximum number of replicas.

  • initial_scale (int) – Initial number of pods.

  • concurrency (int) – Maximum concurrent requests per pod (containerConcurrency). If not set, pods accept unlimited concurrent requests.

  • scale_to_zero_pod_retention_period (str) – Time to keep last pod before scaling to zero, e.g. “30s”, “1m5s”.

  • scale_down_delay (str) – Delay before scaling down, e.g. “15m”. Only for KPA.

  • autoscaler_class (str) – Autoscaler implementation: - “kpa.autoscaling.knative.dev” (default, supports concurrency/rps) - “hpa.autoscaling.knative.dev” (supports cpu/memory/custom metrics)

  • progress_deadline (str) – Time to wait for deployment to be ready, e.g. “10m”. Must be longer than startup probe timeout.

  • **extra_annotations – Additional Knative autoscaling annotations.

Note

The service will be deployed as a Knative service.

Timing-related defaults are applied if not explicitly set (for ML workloads): - scale_down_delay=”1m” (avoid rapid scaling cycles) - scale_to_zero_pod_retention_period=”10m” (keep last pod longer before scale to zero) - progress_deadline=”10m” or greater (ensures enough time for initialization, automatically adjusted based on launch_timeout)

Examples:

import kubetorch as kt remote_fn = kt.fn(my_fn_obj).to( kt.Compute( cpus=".1", ).autoscale(min_replicas=1) ) remote_fn = kt.fn(summer).to( compute=kt.Compute( cpus=".01", ).autoscale(min_scale=3, scale_to_zero_grace_period=50), )

Distributed

Compute.distribute(distribution_type: str | None = None, workers: int | None = None, quorum_timeout: int | None = None, quorum_workers: int | None = None, monitor_members: bool | None = None, **kwargs)

Configure the distributed worker compute needed by each service replica.

Parameters:
  • distribution_type (str) – The type of distributed supervisor to create. Options: spmd (default, if empty), "pytorch", "ray", "monarch", "jax", or "tensorflow".

  • workers (int) – Int representing the number of workers to create, with identical compute resources to the service compute. Or List of <int, Compute> pairs specifying the number of workers and the compute resources for each worker StatefulSet.

  • quorum_timeout (int, optional) – Timeout in seconds for workers to become ready and join the cluster. Defaults to launch_timeout if not provided, for both SPMD frameworks and for Ray. Increase this if workers need more time to start (e.g., during node autoscaling or loading down data during initialization).

  • **kwargs – Additional framework-specific parameters (e.g., num_proc, port).

Note

List of <int, Compute> pairs is not yet supported for workers.

Examples:

import kubetorch as kt remote_fn = kt.fn(simple_summer, service_name).to( kt.Compute( cpus="2", memory="4Gi", image=kt.Image(image_id="rayproject/ray"), launch_timeout=300, ).distribute("ray", workers=2) ) gpus = kt.Compute( gpus=1, image=kt.Image(image_id="nvcr.io/nvidia/pytorch:23.10-py3"), launch_timeout=600, inactivity_ttl="4h", ).distribute("pytorch", workers=4)