The Compute class lets you specify the right resources to request for your workloads, and control how that compute
behaves.
- __init__(cpus: str | int = None, memory: str = None, disk_size: str = None, gpus: str | int = None, gpu_type: str = None, priority_class_name: str = None, gpu_memory: str = None, namespace: str = None, image: Image = None, labels: Dict = None, annotations: Dict = None, volumes: List[str | Volume] = None, node_selector: Dict = None, service_template: Dict = None, tolerations: List[Dict] = None, env_vars: Dict = None, secrets: List[str | Secret] = None, freeze: bool = False, kubeconfig_path: str = None, service_account_name: str = None, image_pull_policy: str = None, inactivity_ttl: str = None, gpu_anti_affinity: bool = None, launch_timeout: int = None, working_dir: str = None, shared_memory_limit: str = None, allowed_serialization: List[str] | None = None, replicas: int = None, logging_config: LoggingConfig = None, queue_name: str = None, selector: Dict[str, str] = None, endpoint: Endpoint = None, _skip_template_init: bool = False)
Initialize the compute requirements for a Kubetorch service.
- Parameters:
cpus (str, int, optional) – CPU resource request. Can be specified in cores (“1.0”) or millicores (“1000m”).
memory (str, optional) – Memory resource request. Can use binary (Ki, Mi, Gi) or decimal (K, M, G) units.
disk_size (str, optional) – Ephemeral storage request. Uses same format as memory.
gpus (str or int, optional) – Number of GPUs to request. Fractional GPUs not currently supported.
gpu_type (str, optional) – GPU type to request. Corresponds to the “nvidia.com/gpu.product” label on the node (if GPU feature discovery is enabled), or a full string like “nvidia.com/gpu.product: L4” can be passed, which will be used to set a nodeSelector on the service. More info below.
gpu_memory (str, optional) – GPU memory request (e.g., “4Gi”). Will still request whole GPU but limit memory usage.
priority_class_name (str, optional) – Name of the Kubernetes priority class to use for the service. If not specified, the default priority class will be used.
namespace (str, optional) – Kubernetes namespace. Defaults to global config default, or “default”.
image (Image, optional) – Kubetorch image configuration. See
Imagefor more details.labels (Dict, optional) – Kubernetes labels to apply to the service.
annotations (Dict, optional) – Kubernetes annotations to apply to the service.
volumes (List[Union[str or Volume]], optional) – Volumes to attach to the service. Can be specified as a list of volume names (strings) or Volume objects. If using strings, they must be the names of existing PersistentVolumeClaims (PVCs) in the specified namespace.
node_selector (Dict, optional) – Kubernetes node selector to constrain pods to specific nodes. Should be a dictionary of key-value pairs, e.g. {“node.kubernetes.io/instance-type”: “g4dn.xlarge”}.
service_template (Dict, optional) – Nested dictionary of service template arguments to apply to the service. E.g.
{"spec": {"template": {"spec": {"nodeSelector": {"node.kubernetes.io/instance-type": "g4dn.xlarge"}}}}}tolerations (List[Dict], optional) – Kubernetes tolerations to apply to the service. Each toleration should be a dictionary with keys like “key”, “operator”, “value”, and “effect”. More info here.
env_vars (Dict, optional) – Environment variables to set in containers.
secrets (List[Union[str, Secret]], optional) – Secrets to mount or expose.
freeze (bool, optional) – Whether to freeze the compute configuration (e.g. for production).
kubeconfig_path (str, optional) – Path to local kubeconfig file used for cluster authentication.
service_account_name (str, optional) – Kubernetes service account to use.
image_pull_policy (str, optional) – Container image pull policy. More info here.
inactivity_ttl (str, optional) – Time-to-live after inactivity. Once hit, the service will be destroyed. Values below 1m may cause premature deletion.
gpu_anti_affinity (bool, optional) – Whether to prevent scheduling the service on a GPU, should no GPUs be requested. Can also control globally by setting the KT_GPU_ANTI_AFFINITY environment variable. (Default:
False)launch_timeout (int, optional) – Determines how long to wait for the service to ready before giving up. If not specified, will wait {provisioning_constants.KT_LAUNCH_TIMEOUT} seconds. Note: you can also control this timeout globally by setting the KT_LAUNCH_TIMEOUT environment variable.
replicas (int, optional) – Number of replicas to create for deployment-based services. Can also be set via the .distribute(workers=N) method for distributed training. If not specified, defaults to 1 for new manifests. (Default: None)
working_dir (str, optional) – Working directory to use inside the remote images. Must be an absolute path (e.g. /kt)
shared_memory_limit (str, optional) – Maximum size of the shared memory filesystem (/dev/shm) available to each pod created by the service. Value should be a Kubernetes quantity string, for example: “512Mi”, “2Gi”, “1G”, “1024Mi”, “100M”. If not provided, /dev/shm will default to the pod’s memory limit (if set) or up to half the node’s RAM.
logging_config (LoggingConfig, optional) – Configuration for logging behavior on this service. Controls log level, streaming options, and grace periods. See
LoggingConfigfor details.queue_name (str, optional) – Kueue LocalQueue name for GPU scheduling. When set, adds the
kueue.x-k8s.io/queue-namelabel to the pod template metadata. For training jobs (PyTorchJob, TFJob, etc.), also setsspec.runPolicy.suspend: trueso Kueue can manage admission. Requires Kueue to be installed in the cluster.selector (Dict[str, str], optional) – Label selector to identify pods that belong to this compute. Use this when you’ve already applied your own K8s manifest (e.g., via kubectl) and want to deploy functions to those pods. Example:
{"app": "workers", "team": "ml"}.endpoint (Endpoint, optional) – Custom endpoint configuration for routing calls to pods. Use this to specify your own URL (skip Service creation) or a custom selector (route to subset of pool). See
Endpointfor details.Note
- CPUs:
Decimal core count: “0.5”, “1.0”, “2.0”
Millicores: “500m”, “1000m”, “2000m”
- Memory:
Bytes: “1000000”
Binary units: “1Ki”, “1Mi”, “1Gi”, “1Ti”
Decimal units: “1K”, “1M”, “1G”, “1T”
- GPU Specifications:
gpusfor whole GPUs: “1”, “2”
gpu_memory: “$Gi”, “16Gi”- Disk Size:
Same format as memory
Note
Memory/disk values are case sensitive (Mi != mi)
When using
gpu_memory, a whole GPU is still requested but memory is limitedExamples:
import kubetorch as kt # Basic CPU/Memory request compute = kt.Compute(cpus="0.5", memory="2Gi") # GPU request with memory limit compute = kt.Compute(gpu_memory="4Gi", cpus="1.0") # Multiple whole GPUs compute = kt.Compute(gpus="2", memory="16Gi")
Create a Compute object from a deployed Kubernetes resource.
Create a Compute instance from a user-provided Kubernetes manifest.
Use this when you have an existing K8s deployment and want to deploy kubetorch functions to it. The manifest can be one you’ve already applied via kubectl, or one you want kubetorch to apply for you.
manifest – Kubernetes manifest dict or path to YAML file
selector – Label selector to identify pods belonging to this compute. If not provided, uses the manifest’s spec.selector.matchLabels. Example: {“app”: “my-workers”, “team”: “ml”}
endpoint – Custom endpoint configuration for routing calls to pods.
Use Endpoint(url="...") for your own Service/Ingress, or
Endpoint(selector={...}) to route to a subset of pool pods.
Compute instance
Examples:
import kubetorch as kt # Basic usage - manifest already applied via kubectl compute = kt.Compute.from_manifest( manifest=my_manifest, selector={"app": "my-workers"} ) # Deploy a function to the existing pods remote_fn = kt.fn(my_func).to(compute) result = remote_fn(1, 2) # With custom endpoint URL (user-created service) compute = kt.Compute.from_manifest( manifest=my_manifest, selector={"app": "my-workers"}, endpoint=kt.Endpoint(url="my-svc.my-ns.svc.cluster.local:8080") ) # Route to subset of pool (e.g., head node only) compute = kt.Compute.from_manifest( manifest=ray_manifest, selector={"app": "ray"}, # Pool: all ray pods endpoint=kt.Endpoint(selector={"app": "ray", "role": "head"}) # Route: head only )
Get the current resource manifest.
Check if this compute is in selector-only mode.
Selector-only mode is when the user provides a pod selector to identify existing running pods, without providing a manifest or resource requirements. In this mode, Kubetorch just registers the pool and routes calls to the pods.
Get the logging configuration for this compute.
Get the pod spec from the manifest.
Use shared_memory_limit instead.
Deprecated
Get the manifest kind, defaulting to ‘Deployment’ if not set.
Get freeze setting. Set in setup script, not manifest env vars.
Get allowed_serialization. Flows to pods via WebSocket runtime config.
Get autoscaling config from manifest or stored value.
Get the distributed config for this compute.
Distributed config flows to pods via WebSocket metadata from the controller.
Get user-provided annotations (excludes kubetorch internal annotations).
Get user-provided labels (excludes kubetorch internal labels).
Get the pod template from the manifest (includes metadata and spec).
Add or update labels in the manifest metadata.
labels (Dict) – Dictionary of labels to add or update.
Add or update labels in the pod template metadata.
This is useful for labels that need to be on the pod itself, such as Kueue queue labels (kueue.x-k8s.io/queue-name).
labels (Dict) – Dictionary of labels to add or update.
remove_keys (List[str], optional) – List of label keys to remove.
Add or update annotations in the manifest metadata.
annotations (Dict) – Dictionary of annotations to add or update.
Add or update tolerations in the pod spec.
tolerations (List[Dict]) – List of toleration dictionaries to add or update. Each toleration should have keys like “key”, “operator”, “value”, and “effect”.
Add or update environment variables in the container spec.
env_vars (Dict) – Dictionary of environment variables to add or update. Existing env vars with the same key will be overridden.
Returns a list of pod names.
Whether the pods are running.
Pip install reqs onto compute pod(s).
Sync package (locally installed, or path to package) to compute pod(s).
Run bash commands on the pod(s).
Configure the service with the provided autoscaling parameters.
You can pass any of the following keyword arguments:
target (int) – The concurrency/RPS/CPU/memory target per pod.
window (str) – Time window for scaling decisions, e.g. “60s”.
metric (str) – Metric to scale on: “concurrency”, “rps”, “cpu”, “memory” or custom. Note: “cpu” and “memory” require autoscaler_class=”hpa.autoscaling.knative.dev”.
target_utilization (int) – Utilization % to trigger scaling (1-100).
min_scale (int) – Minimum number of replicas. 0 allows scale to zero.
max_scale (int) – Maximum number of replicas.
initial_scale (int) – Initial number of pods.
concurrency (int) – Maximum concurrent requests per pod (containerConcurrency). If not set, pods accept unlimited concurrent requests.
scale_to_zero_pod_retention_period (str) – Time to keep last pod before scaling to zero, e.g. “30s”, “1m5s”.
scale_down_delay (str) – Delay before scaling down, e.g. “15m”. Only for KPA.
autoscaler_class (str) – Autoscaler implementation: - “kpa.autoscaling.knative.dev” (default, supports concurrency/rps) - “hpa.autoscaling.knative.dev” (supports cpu/memory/custom metrics)
progress_deadline (str) – Time to wait for deployment to be ready, e.g. “10m”. Must be longer than startup probe timeout.
**extra_annotations – Additional Knative autoscaling annotations.
Note
The service will be deployed as a Knative service.
Timing-related defaults are applied if not explicitly set (for ML workloads): - scale_down_delay=”1m” (avoid rapid scaling cycles) - scale_to_zero_pod_retention_period=”10m” (keep last pod longer before scale to zero) - progress_deadline=”10m” or greater (ensures enough time for initialization, automatically adjusted based on launch_timeout)
Examples:
import kubetorch as kt remote_fn = kt.fn(my_fn_obj).to( kt.Compute( cpus=".1", ).autoscale(min_replicas=1) ) remote_fn = kt.fn(summer).to( compute=kt.Compute( cpus=".01", ).autoscale(min_scale=3, scale_to_zero_grace_period=50), )
Configure the distributed worker compute needed by each service replica.
distribution_type (str) – The type of distributed supervisor to create.
Options: spmd (default, if empty), "pytorch", "ray", "monarch", "jax", or "tensorflow".
workers (int) – Int representing the number of workers to create, with identical compute resources to
the service compute. Or List of <int, Compute> pairs specifying the number of workers and the compute
resources for each worker StatefulSet.
quorum_timeout (int, optional) – Timeout in seconds for workers to become ready and join the cluster. Defaults to launch_timeout if not provided, for both SPMD frameworks and for Ray. Increase this if workers need more time to start (e.g., during node autoscaling or loading down data during initialization).
**kwargs – Additional framework-specific parameters (e.g., num_proc, port).
Note
List of <int, Compute> pairs is not yet supported for workers.
Examples:
import kubetorch as kt remote_fn = kt.fn(simple_summer, service_name).to( kt.Compute( cpus="2", memory="4Gi", image=kt.Image(image_id="rayproject/ray"), launch_timeout=300, ).distribute("ray", workers=2) ) gpus = kt.Compute( gpus=1, image=kt.Image(image_id="nvcr.io/nvidia/pytorch:23.10-py3"), launch_timeout=600, inactivity_ttl="4h", ).distribute("pytorch", workers=4)