Data Store
The Kubetorch Data Store is an in-cluster distributed data system for Kubernetes, solving two critical gaps:
- Out-of-cluster direct transfers: Sync code and data up to your cluster instantly and scalably - no need for container rebuilds or bouncing off blob storage
- In-cluster data transfer and caching: fast peer-to-peer data transfer between pods with automatic caching and discovery, for filesystem and GPU data
The data store supports both filesystem data (files/directories via distributed rsync) and GPU data (tensors/state dicts via NCCL), with the same simple kt.put()/kt.get() APIs.
Why This Matters
Kubernetes has no built-in way to efficiently move data to or from the cluster or between pods. Without the data store:
- Distributing model weights to 100 inference pods (e.g. RL rollouts) requires bouncing off slow shared storage
- Hot-reloading code requires rebuilding and redeploying containers
- ML practitioners have no intuitive way to take advantage of powerful technologies like RDMA and GPUDirect / Infiniband
- Existing data sharing systems like NFS or Dragonfly are expensive and suffer from thundering herd and "lots of small files" problems
- Lower-level data systems require ongoing cleanup and management
With the data store:
- Easily move data to and from the cluster during development and debugging
- One pod publishes data, others fetch directly from it via P2P
- Tree-based broadcasts scale to thousands of pods without thundering herds
- Move GPU data directly without bouncing off the filesystem
- Code syncs in seconds, not minutes
- Cache ephemeral data locally to the cluster instead of turning blob storage into a dumping ground
- Enjoy automated lifecycle management, with TTLs and cleanup built-in
Quick Start
Sync Files to/from Cluster
import kubetorch as kt # Upload files to the cluster (from your laptop/CI) kt.put(key="my-service/weights", src="./trained_model/") # Download files from the cluster kt.get(key="my-service/weights", dest="./local_model/") # List and delete items = kt.ls("my-service") kt.rm("my-service/old-checkpoint", recursive=True)
Share Data Between Pods (P2P)
# On producer pod - publish data locally (zero-copy) kt.put(key="features/batch-1", src="/data/features/", locale="local") # On consumer pods - fetch directly from producer kt.get(key="features/batch-1", dest="/local/features/")
Transfer GPU Tensors
import torch # On producer pod tensor = torch.randn(1000, 1000, device="cuda") kt.put(key="layer1/weights", src=tensor) # On consumer pod dest = torch.empty(1000, 1000, device="cuda") kt.get(key="layer1/weights", dest=dest)
Use Cases
1. Quick file transfers to and from the cluster
The data store makes it easy to transfer files to or from the cluster during debugging and development, like toy datasets or profiles.
From your laptop:
kt put /my_train_svc/my_toy_dataset --src dataset.csv kt ls /my_train_svc
From inside the training worker:
kt.get(key="/my_train_svc/my_toy_dataset", dest="dataset.csv") kt.put(key="/my_train_svc/train_profile", src="training.profile")
From your laptop
kt get /my_train_svc/train_profile # Optional, but also done automatically when my_train_svc is terminated kt rm -r /my_train_svc
2. Point-to-point in-cluster data transfers
Sync data directly from data processing pods to your training pods:
# Trainer publishes checkpoint (stays local, no copy to store) kt.put( key=f"/data_proc_svc/train/data_shard_{rank}", src="./data_out.parquet", locale="local" # Zero-copy: data stays on processer pod ) # Training pods fetch all shards directly from data processors (P2P) kt.get(key="/data_proc_svc/train/", dest=".")
3. Scalable Broadcast to Many Pods
Distribute data to hundreds of pods efficiently using tree-based broadcast:
# Producer publishes with broadcast coordination kt.put( key="dataset/shard-1", src="/data/shard/", locale="local", broadcast=kt.BroadcastWindow(timeout=30.0) ) # Each consumer joins the broadcast tree # Pods download from their parent in the tree, not all from source kt.get( key="dataset/shard-1", dest="/local/shard/", broadcast=kt.BroadcastWindow(timeout=30.0) )
With tree topology (default fanout=50), broadcasting to 1000 pods takes only 2 hops instead of 1000 sequential transfers.
4. GPU State Dict Transfer
Transfer entire model state dicts between pods via NCCL:
# On trainer pod model = MyModel().cuda() # ... training ... kt.put(key="checkpoint/epoch-10", src=model.state_dict()) # On inference pod model = MyModel().cuda() dest = model.state_dict() # Pre-allocated tensors with same structure kt.get(key="checkpoint/epoch-10", dest=dest) model.load_state_dict(dest) # Tensors updated in-place
5. Coordinated Multi-Party GPU Broadcast
For synchronized data distribution (e.g., all-reduce patterns):
# All pods join the same broadcast group broadcast = kt.BroadcastWindow( group_id="training-sync", world_size=8, # Wait for 8 participants timeout=30.0 ) # On putter pods kt.put(key="gradients", src=gradient_tensor, broadcast=broadcast) # On getter pods kt.get(key="gradients", dest=dest_tensor, broadcast=broadcast)
Peer-to-Peer Architecture
The data store uses a central metadata server (one per namespace) for discovery and peer tracking:
Producer Pod Metadata Server Consumer Pods ββββββββββββ βββββββββββββββ βββββββββββββ kt.put(locale="local") Track source locations kt.get() β β β βββ Register key βββββββββββββββΊβ β β β ββββββ Query source ββββββββββββ β β βββββ Return peer IP βββββββββββΊ β Direct P2P transfer (rsync or NCCL)
Key benefits:
- No central bottleneck: Data flows directly between pods
- Automatic caching: Every getter can become a source for subsequent getters
- Failover: Falls back to another seeder pod if peer unavailable
- Load balancing: Metadata server tracks concurrent requests per source
Storage Locations (Locale)
locale="store" (default)
Data is copied to the central store pod. Best for:
- Persistent data that survives pod restarts
- Data accessed from outside the cluster
- Small/medium datasets that benefit from centralized storage
kt.put(key="datasets/train.csv", src="./data/", locale="store")
locale="local" (zero-copy P2P)
Data stays on the source pod. Best for:
- Large datasets already on a pod (no copy overhead)
- High-throughput P2P distribution
- Ephemeral data during training/inference
# Producer: register local data (zero-copy, instant) kt.put(key="embeddings/cache", src="/data/embeddings/", locale="local") # Consumers: fetch directly from producer via P2P kt.get(key="embeddings/cache", dest="/local/embeddings/")
When using locale="local":
- An rsync daemon starts on the producer pod
- Consumer pods fetch directly via peer-to-peer
- The metadata server coordinates discovery
Filesystem Broadcast (Tree Topology)
For distributing data to many pods, use BroadcastWindow with tree-based propagation:
# Producer (rank 0 - root of tree) kt.put( key="model/weights", src="/checkpoints/", locale="local", broadcast=kt.BroadcastWindow( group_id="weight-distribution", timeout=60.0, fanout=50 # Each node serves up to 50 children ) ) # Consumers join the tree as they arrive, and initiate # transfer from parent immediately, no need to wait for quorum kt.get( key="model/weights", dest="/local/model/", broadcast=kt.BroadcastWindow( group_id="weight-distribution", timeout=60.0, fanout=50 ) )
How it works:
- The first
fanoutgetters (ranks 1 to fanout) download directly from source (rank 0) - Subsequent getters (rank > fanout) download from their computed parent in the tree
- Each completed getter becomes a potential parent for new getters
- Tree grows dynamically as pods join
Scaling:
- fanout=50: 50 pods at depth 1, 2500 at depth 2, 125K at depth 3
- fanout=1: Linear chain (useful for testing/debugging)
GPU Data Transfer
GPU tensors transfer via NCCL broadcast, coordinated by an in-pod GPU server daemon. This allows us to take advantage of technologies like Infiniband / RDMA while insulating your processes from NCCL / process group failures. We use NCCL because it's already installed with PyTorch, but may consider lower-level RDMA solutions in the future to avoid quorum overhead and rigidity if we can package them sanely.
Single Tensor
import torch import kubetorch as kt # Producer tensor = torch.randn(1000, 1000, device="cuda") kt.put(key="activations/layer1", src=tensor) # Consumer (must pre-allocate destination) dest = torch.empty(1000, 1000, device="cuda") kt.get(key="activations/layer1", dest=dest)
State Dict (Model Weights)
State dict are unrolled into lists of elements and transferred efficiently:
# Producer (trainer) model = LargeModel().cuda() kt.put(key="checkpoint/final", src=model.state_dict()) # Consumer (inference) model = LargeModel().cuda() dest = model.state_dict() # Pre-allocated with same structure kt.get(key="checkpoint/final", dest=dest) model.load_state_dict(dest)
Point-to-point mode (default):
- For quick point-to-point transfers between one putter and one getter
- No process group quorum wait or coordination overhead
- Batching groups transfers by source
Broadcast mode:
- All putters and getters join a quorum for one-to-many, many-to-one, or many-to-many transfers
- All tensors transferred in single coordinated NCCL session
- Especially for when all consumers need all tensors simultaneously
broadcast = kt.BroadcastWindow(world_size=4, timeout=30.0) # Producer kt.put(key="sync/weights", src=state_dict, broadcast=broadcast) # Consumers (3 pods, all join same broadcast) kt.get(key="sync/weights", dest=dest_state_dict, broadcast=broadcast)
Packed Mode (Maximum Throughput)
For tensors or state dicts with matching shapes and dtypes in the putter and getter, pack all tensors into single buffer, transferred at once:
broadcast = kt.BroadcastWindow(world_size=4, pack=True) # Single NCCL broadcast for entire state dict kt.put(key="packed/weights", src=state_dict, broadcast=broadcast) kt.get(key="packed/weights", dest=dest_state_dict, broadcast=broadcast)
BroadcastWindow Options
kt.BroadcastWindow( # Quorum conditions (OR semantics - first condition met closes quorum) timeout=30.0, # Max seconds to wait for participants world_size=4, # Close when N participants join ips=["10.0.0.1"], # Close when specific IPs join # Group identification group_id="my-broadcast", # Explicit group identifier # Tree topology fanout=50, # Branching factor (default: 50 for filesystem, 2 for GPU) # GPU state dict optimization pack=True, # Pack all tensors into single buffer (requires same dtype) )
Lifespan Management
When services are terminated, their base path in the data store is deleted (key="/my_service/..."),
and any zero-copy (locale="local") are treated as stale. We provide shorthand flags to control
this lifecycle behavior:
lifespan="cluster" (default): Data persists until explicitly deleted. This is
synonymous with making an explicit copy of the data in the data store (locale="store")
without a service name in the key base path ("/my_model", not "my_service/my_model").
lifespan="resource": Data are auto-deleted when associated service is torn down. This
is synonymous with either zero-copy data sharing (locale="local") and/or assigning a key
with the service name as the base path ("my_training/my_model").
# Persistent checkpoint kt.put(key="models/final", src="./checkpoint.pt", lifespan="cluster") # Temporary training artifacts kt.put(key="my-service/temp", src="./logs/", lifespan="resource")
CLI Commands
# Upload files kt put my-service/weights --src ./trained_model/ # Download files kt get my-service/weights --dest ./local_model/ # List contents kt ls my-service # Delete data kt rm my-service/old-checkpoint --recursive
Listing and Discovery
# List contents items = kt.ls("my-service") items = kt.ls("my-service/models") # Each item contains: # - name: Item name # - is_directory: True if directory # - locale: "store" or pod name where data lives
Best Practices
-
Use
locale="local"for large data: Avoid copying large datasets to the store pod. Publish locally and let consumers fetch via P2P. -
Use broadcasts for multi-pod distribution: Tree topology scales efficiently vs. sequential transfers.
-
Use unique broadcast names: Better to add a random hash than reuse broadcast name across runs.
-
Organize keys hierarchically: Use paths like
team/project/artifact-type/versionfor easy discovery or data aggregation/resharding.
Comparison: Store vs Local
| Aspect | locale="store" | locale="local" |
|---|---|---|
| Data location | Central store pod | Source pod |
| Persistence | Survives pod restarts | Lost if source pod dies |
| External access | Yes (from outside cluster) | No (in-cluster only) |
| Copy overhead | Yes (rsync to store) | No (zero-copy) |
| Best for | Persistent, widely-accessed data | Large, ephemeral P2P data |
API Reference
For detailed API documentation, see the Python API Reference and CLI Reference.