We will use BGE to embed a large amount of text. We start by defining a class
that defines each replica of our inference service, using vllm
to load the model.
import kubetorch as kt class BGEEmbedder: def __init__(self, model_id="BAAI/bge-large-en-v1.5", batch_size=None): self.model_id = model_id self.model = None self.batch_size = batch_size def load_model(self): from vllm import LLM self.model = LLM(model=self.model_id, task="embed") def load_data(self, dataset_name, text_column_name, split=None, data_files=None): from datasets import load_dataset from transformers import AutoTokenizer dataset = load_dataset( dataset_name, data_files=data_files, split=split, ) tokenizer = AutoTokenizer.from_pretrained(self.model_id) max_length = tokenizer.model_max_length def tokenize_function(examples): return tokenizer( examples[text_column_name], padding=True, truncation=True, max_length=max_length, ) tokenized_dataset = dataset.map( tokenize_function, num_proc=7, batched=True, remove_columns=dataset.column_names, batch_size=2000, ) inputs = tokenized_dataset["input_ids"] return [{"prompt_token_ids": inp} for inp in inputs] def embed_dataset(self, dataset_name, text_column_name, split, data_files): if self.model is None: self.load_model() data = self.load_data( dataset_name, text_column_name=text_column_name, split=split, data_files=data_files, ) if not self.batch_size: results = self.model.encode(data) return results[0].outputs.embedding # Process in batches all_embeddings = [] for i in range(0, len(data), self.batch_size): batch = data[i : i + self.batch_size] batch_results = self.model.encode(batch) batch_embeddings = [result.outputs.embedding for result in batch_results] all_embeddings.extend(batch_embeddings) return all_embeddings[0]
We can use the BGEEmbedder
class to embed a dataset. We will use the wikimedia/wikipedia
dataset.
The base example just does this on a singl enode.
def batch_inference_single(): img = kt.Image(image_id="vllm/vllm-openai:latest").pip_install(["datasets"]) compute = kt.Compute(gpus="1", cpus="7", memory="24", image=img, launch_timeout=600) embedder = kt.cls(BGEEmbedder).to(compute) embeddings = embedder.embed_dataset( "wikimedia/wikipedia", text_column_name="text", split="train", data_files="20231101.en/train-00000-of-00041.parquet", ) print(f"Generated {len(embeddings)} embeddings") print(f"First embedding dimension: {len(embeddings[0])}") print(embeddings)
We can simply call distribute
on the compute object to distribute the inference to multiple nodes for
embrassingly parallel tasks. This is especially useful for large datasets or large scale batch jobs.
We handle all of the distribution and parallelism for you, as well as managing an efficient queue of
work to replicas.
def batch_inference_distributed(): replicas = 4 img = kt.Image(image_id="vllm/vllm-openai:latest").pip_install(["datasets"]) compute = kt.Compute( gpus="1", cpus="7", memory="24", image=img, concurrency=1, launch_timeout=600 ).distribute(workers=replicas) embedder = kt.cls(BGEEmbedder).to(compute) data_files_list = [ "20231101.en/train-00000-of-00041.parquet", "20231101.en/train-00001-of-00041.parquet", "20231101.en/train-00002-of-00041.parquet", "20231101.en/train-00003-of-00041.parquet", ] # ETC from concurrent.futures import ThreadPoolExecutor from functools import partial with ThreadPoolExecutor(max_workers=replicas) as executor: embed_file = partial( embedder.embed_dataset, "wikimedia/wikipedia", "text", "train", ) results = list(executor.map(embed_file, data_files_list)) return results if __name__ == "__main__": batch_inference_single()