The Kubetorch Data Store is a intuitive and scalable distributed data system for Kubernetes, solving two critical gaps:
Out-of-cluster direct transfers: Sync code and data up to your cluster instantly and scalably - no need for container rebuilds or bouncing off blob storage
In-cluster data transfer and caching: fast peer-to-peer data transfer between pods with automatic caching and discovery, for filesystem and GPU data
The unified APIs handle two types of data: - Filesystem data: Files/directories via distributed rsync (zero-copy P2P or to/from central store) - GPU data: CUDA tensors/state dicts via NCCL broadcast
Key capabilities include: - External sync: Push/pull files to/from cluster - Zero-copy P2P: locale=”local” publishes data in-place, consumers fetch directly - Scalable broadcast: Tree-based differential propagation for distributing to thousands of pods (no NFS thundering herd or “many small files” problems) - GPU transfers: Point-to-point and coordinated broadcasts for tensors/state dicts with Infiniband/RDMA support - Automatic caching: Every getter can become a source for subsequent getters - Automatic lifecycle management, with TTLs and cleanup built-in
The top-level functions kt.put(), kt.get(), kt.ls(), and kt.rm() provide a simple interface
for data operations. The data type (filesystem vs GPU) is auto-detected based on the parameters you provide.
Upload data to the cluster using a key-value store interface.
Supports two data types (auto-detected from src): - Filesystem data: Files/directories uploaded via rsync - GPU data: GPU tensors or state dicts broadcast via NCCL
key – Storage key(s). Keys should be explicit paths like “my-service/models/v1”. Can be a single key or list of keys for batch filesystem operations.
src – Data to upload. Can be: - Path(s) to local file(s) or directory(s) for filesystem transfer - GPU tensor for single tensor broadcast via NCCL - Dict of GPU tensors (state dict) for multi-tensor broadcast via NCCL
locale –
Where data is stored: - “store” (default): Copy to central store pod. Data is persisted and
accessible from any pod. (Filesystem only - GPU data always uses “local”)
”local”: Zero-copy mode. Data stays on the local pod and is only registered with the metadata server. Other pods fetch directly from this pod.
broadcast – Optional BroadcastWindow for coordinated multi-party transfers. When specified, this put() joins as a “putter” and waits for other participants before transferring data.
contents – If True, copy directory contents (adds trailing slashes for rsync). (Filesystem only)
filter_options – Additional rsync filter options. (Filesystem only)
force – Force overwrite of existing files. (Filesystem only)
verbose – Show detailed progress.
namespace – Kubernetes namespace.
kubeconfig_path – Path to kubeconfig file (for compatibility).
start_rsyncd – For locale=”local”: Start rsync daemon to serve data (default: True). (Filesystem only)
base_path – For locale=”local”: Root path for rsync daemon (default: “/”). (Filesystem only)
nccl_port – Port for NCCL communication (default: 29500). (GPU only)
nccl_pg_mode – NCCL process group mode (GPU only): - “concurrent” (default): Create ProcessGroupNCCL directly, allows parallel transfers - “global”: Use global process group with semaphore serialization
Examples
# Upload filesystem data to central store >>> import kubetorch as kt >>> kt.put(key=”my-service/weights”, src=”./trained_model/”)
# Zero-copy mode (data stays local, other pods fetch directly) >>> kt.put(key=”my-service/data”, src=”/app/data”, locale=”local”)
# Coordinated filesystem broadcast with timeout >>> kt.put( … key=”my-service/weights”, … src=”./weights/”, … locale=”local”, … broadcast=kt.BroadcastWindow(timeout=10.0) … )
# GPU tensor - other pods receive via NCCL broadcast >>> import torch >>> tensor = torch.randn(1000, 1000, device=”cuda”) >>> kt.put(key=”model/layer1”, src=tensor)
# GPU state dict - all tensors broadcast over single NCCL process group >>> state_dict = model.state_dict() # Contains CUDA tensors >>> kt.put(key=”model/weights”, src=state_dict, broadcast=kt.BroadcastWindow(world_size=4))
Download data from the cluster using a key-value store interface.
Supports two data types: - Filesystem data: Files/directories downloaded via rsync - GPU data: GPU tensors or state dicts received via NCCL broadcast
The data type is auto-detected from the dest parameter: - If dest is a path (str/Path) or None: filesystem data - If dest is a GPU tensor or dict of GPU tensors: GPU data
key – Storage key(s) to retrieve. Keys should be explicit paths like “my-service/models/v1”. Can be a single key or list of keys.
dest – Destination for the data: - For filesystem: Local path (defaults to current working directory) - For GPU: Pre-allocated tensor or state_dict (dict of tensors) to receive into
broadcast – Optional BroadcastWindow for coordinated multi-party transfers. When specified, this get() joins as a “getter” and waits for putters before receiving data. Use broadcast.timeout to control wait time.
contents – If True, copy directory contents (adds trailing slashes). (Filesystem only)
filter_options – Additional rsync filter options. (Filesystem only)
force – Force overwrite of existing files. (Filesystem only)
verbose – Show detailed progress.
namespace – Kubernetes namespace.
kubeconfig_path – Path to kubeconfig file (for compatibility).
nccl_pg_mode – NCCL process group mode (GPU only): - “concurrent” (default): Create ProcessGroupNCCL directly, allows parallel transfers - “global”: Use global process group with semaphore serialization
Examples
# Download from store >>> import kubetorch as kt >>> import torch >>> >>> # Filesystem data >>> kt.get(key=”my-service/weights”) # Downloads to current directory >>> kt.get(key=”my-service/weights”, dest=”./local_model/”) # Downloads to local_model/ >>> >>> # Coordinated broadcast (wait for putter) >>> kt.get( … key=”my-service/weights”, … dest=”./weights/”, … broadcast=kt.BroadcastWindow(timeout=10.0) … ) >>> >>> # GPU tensor - provide pre-allocated destination >>> tensor = torch.empty(1000, 1000, device=”cuda:0”) >>> kt.get(key=”model/layer1”, dest=tensor) >>> >>> # GPU state dict with coordinated broadcast >>> model = MyModel().cuda() >>> kt.get( … key=”model/weights”, … dest=model.state_dict(), … broadcast=kt.BroadcastWindow(world_size=4) … ) >>> model.load_state_dict(model.state_dict()) # Already updated in-place
List files and directories under a key path in the store. Combines locally-published keys and filesystem contents from the central store.
Examples
>>> import kubetorch as kt >>> kt.ls() # List root of store >>> kt.ls("my-service") # List contents of my-service >>> kt.ls("my-service/models") # List models directory
name: Item name (directories have trailing /)
is_directory: True if directory
locale: Where the data lives - “store” for central store, or pod name for local data
List of dicts with item information
Delete a file or directory from the store.
key – Storage key to delete. Trailing slashes are stripped.
recursive – If True, delete directories recursively (like rm -r)
prefix – If True, delete all keys starting with this string prefix
verbose – Show detailed progress
namespace – Kubernetes namespace
kubeconfig_path – Path to kubeconfig file (for compatibility)
Examples
>>> import kubetorch as kt >>> kt.rm("my-service/old-model.pkl") # Delete a file >>> kt.rm("my-service/temp-data", recursive=True) # Delete a directory >>> kt.rm("gpu-test", prefix=True) # Delete all keys starting with "gpu-test"
Bases: object
Configuration for coordinated broadcast transfers between multiple participants.
A broadcast window allows multiple putters and getters to coordinate data transfer. The quorum closes when ANY of the specified conditions is met (OR semantics).
When a broadcast window is specified: - put() calls join as “putters” (data sources) - get() calls join as “getters” (data destinations) - Once the quorum closes, putters send data to all getters
For GPU transfers: - Both putters and getters join the broadcast group - Quorum waits for all participants before starting NCCL - Default fanout of 2 (binary tree)
For filesystem transfers: - Only getters participate (original putter is discovered via metadata) - Rolling participation - new joiners get assigned a parent immediately - Tree-based propagation with configurable fanout (~50 for filesystem) - Each getter rsyncs from its parent’s rsync daemon
Maximum time in seconds to wait for participants. The quorum closes after this timeout even if other conditions aren’t met. (Default: None)
float, optional
Wait for this many total participants (putters + getters) before closing the quorum. (Default: None)
int, optional
Wait for participants from these specific IP addresses before closing the quorum. (Default: None)
List[str], optional
Optional name for the broadcast group. If not provided, one is auto-generated from the keys being transferred. Use the same group_id across put/get calls to ensure they join the same quorum. (Default: None)
str, optional
Number of children each node can have in the broadcast tree. Defaults to 2 for GPU (binary tree), can be set higher for filesystem transfers where rsync can handle many concurrent clients (~50). (Default: None)
int, optional
For GPU state_dict transfers only. When True, concatenates all tensors into a single packed buffer before broadcasting for maximum efficiency (single NCCL call). Requires all participants to have identical dict structure (same keys). Default False uses async overlapped broadcasts (one per tensor, pipelined). (Default: False)
bool, optional
Examples
# Wait up to 10 seconds for participants BroadcastWindow(timeout=10.0)
# Wait for exactly 4 participants (e.g., 1 putter + 3 getters) BroadcastWindow(world_size=4)
# Wait for specific pods to join BroadcastWindow(ips=[“10.0.0.1”, “10.0.0.2”, “10.0.0.3”])
# Combined: wait for 4 participants OR 30 seconds, whichever first BroadcastWindow(world_size=4, timeout=30.0)
# Filesystem broadcast with high fanout BroadcastWindow(world_size=100, fanout=50, timeout=60.0)
# GPU state_dict with packed mode for maximum efficiency BroadcastWindow(world_size=4, timeout=30.0, pack=True)
Convert to dictionary for API requests.
Where data is stored/accessible from.
“store”: Copy data to the central store pod (default). Data is persisted and accessible from any pod via the store.
“local”: Zero-copy mode. Data stays on the local pod and is only registered with the metadata server. Other pods can rsync directly from this pod.
alias of Literal[‘store’, ‘local’]
The following CLI commands provide the same functionality from the command line:
Store files or directories in the cluster.
kt put <key> --src <path> [options] # Examples kt put my-service/weights --src ./trained_model/ kt put datasets/train --src ./data/ --contents kt put my-service/models --src ./model1/ --src ./model2/
Options:
--src, -s: Local file(s) or directory(s) to upload (required, can be specified multiple times)
--force, -f: Force overwrite of existing files
--exclude: Exclude patterns (rsync format, e.g., '*.pyc')
--include: Include patterns to override .gitignore exclusions
--contents, -c: Copy directory contents (adds trailing slashes for rsync)
--verbose, -v: Show detailed progress
--namespace, -n: Kubernetes namespace
Retrieve files or directories from the cluster.
kt get <key> [--dest <path>] [options] # Examples kt get my-service/weights # Download to current directory kt get my-service/weights --dest ./local/ # Download to specific path kt get datasets/train --contents # Download directory contents
Options:
--dest, -d: Local destination path (defaults to current directory)
--force, -f: Force overwrite of existing files
--exclude: Exclude patterns (rsync format)
--include: Include patterns
--contents, -c: Copy directory contents
--seed-data/--no-seed-data: Automatically seed data after retrieval for peer-to-peer (default: enabled)
--verbose, -v: Show detailed progress
--namespace, -n: Kubernetes namespace
List files and directories in the cluster store.
kt ls [key] [options] # Examples kt ls # List root of store kt ls my-service # List contents of my-service kt ls my-service/models # List models directory
Options:
--verbose, -v: Show detailed progress
--namespace, -n: Kubernetes namespace
Delete files or directories from the cluster store.
kt rm <key> [options] # Examples kt rm my-service/old-model.pkl # Delete a file kt rm my-service/temp-data --recursive # Delete a directory
Options:
--recursive, -r: Delete directories recursively
--verbose, -v: Show detailed progress
--namespace, -n: Kubernetes namespace