跳到主要内容

使用 Milvus 和 DeepSeek 构建 RAG

Open In Colab GitHub Repository

DeepSeek 使开发者能够使用高性能语言模型构建和扩展 AI 应用程序。它提供高效的推理、灵活的 API 和先进的专家混合(MoE)架构,用于强大的推理和检索任务。

在本教程中,我们将向您展示如何使用 Milvus 和 DeepSeek 构建检索增强生成(RAG)管道。

准备工作

依赖项和环境

! pip install --upgrade pymilvus[model] openai requests tqdm

如果您使用的是 Google Colab,为了启用刚安装的依赖项,您可能需要重启运行时(点击屏幕顶部的"Runtime"菜单,然后从下拉菜单中选择"Restart session")。

DeepSeek 支持 OpenAI 风格的 API。您可以登录其官方网站并准备 api key DEEPSEEK_API_KEY 作为环境变量。

import os

os.environ["DEEPSEEK_API_KEY"] = "***********"

准备数据

我们使用 Milvus Documentation 2.4.x 的 FAQ 页面作为我们 RAG 中的私有知识,这是一个简单 RAG 管道的良好数据源。

下载 zip 文件并将文档提取到文件夹 milvus_docs

! wget https://github.com/milvus-io/milvus-docs/releases/download/v2.4.6-preview/milvus_docs_2.4.x_en.zip
! unzip -q milvus_docs_2.4.x_en.zip -d milvus_docs

我们从文件夹 milvus_docs/en/faq 加载所有 markdown 文件。对于每个文档,我们只是简单地使用 "# " 来分离文件中的内容,这可以大致分离 markdown 文件每个主要部分的内容。

from glob import glob

text_lines = []

for file_path in glob("milvus_docs/en/faq/*.md", recursive=True):
with open(file_path, "r") as file:
file_text = file.read()

text_lines += file_text.split("# ")

准备 LLM 和嵌入模型

DeepSeek 支持 OpenAI 风格的 API,您可以使用相同的 API 进行少量调整来调用 LLM。

from openai import OpenAI

deepseek_client = OpenAI(
api_key=os.environ["DEEPSEEK_API_KEY"],
base_url="https://api.deepseek.com",
)

定义一个嵌入模型,使用 milvus_model 生成文本嵌入。我们使用 DefaultEmbeddingFunction 模型作为示例,这是一个预训练的轻量级嵌入模型。

from pymilvus import model as milvus_model

embedding_model = milvus_model.DefaultEmbeddingFunction()

生成一个测试嵌入并打印其维度和前几个元素。

test_embedding = embedding_model.encode_queries(["This is a test"])[0]
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])

768 [-0.04836066 0.07163023 -0.01130064 -0.03789345 -0.03320649 -0.01318448 -0.03041712 -0.02269499 -0.02317863 -0.00426028]

将数据加载到 Milvus

创建 Collection

from pymilvus import MilvusClient

milvus_client = MilvusClient(uri="./milvus_demo.db")

collection_name = "my_rag_collection"

关于 MilvusClient 的参数:

  • uri 设置为本地文件,例如 ./milvus.db,是最方便的方法,因为它会自动利用 Milvus Lite 将所有数据存储在此文件中。
  • 如果您有大规模数据,可以在 docker 或 kubernetes 上设置性能更高的 Milvus 服务器。在此设置中,请使用服务器 uri,例如 http://localhost:19530,作为您的 uri
  • 如果您想使用 Zilliz Cloud,Milvus 的完全托管云服务,请调整 uritoken,它们对应于 Zilliz Cloud 中的 Public Endpoint 和 Api key

检查 collection 是否已存在,如果存在则删除它。

if milvus_client.has_collection(collection_name):
milvus_client.drop_collection(collection_name)

使用指定参数创建新的 collection。

如果我们不指定任何字段信息,Milvus 将自动创建一个默认的 id 字段作为主键,以及一个 vector 字段来存储向量数据。保留的 JSON 字段用于存储非模式定义的字段及其值。

milvus_client.create_collection(
collection_name=collection_name,
dimension=embedding_dim,
metric_type="IP", # 内积距离
consistency_level="Strong", # 支持的值为 (`"Strong"`, `"Session"`, `"Bounded"`, `"Eventually"`)。详见 https://milvus.io/docs/consistency.md#Consistency-Level
)

插入数据

遍历文本行,创建嵌入,然后将数据插入到 Milvus 中。

这里有一个新字段 text,它是 collection 模式中的非定义字段。它将自动添加到保留的 JSON 动态字段中,在高级别上可以被视为普通字段。

from tqdm import tqdm

data = []

doc_embeddings = embedding_model.encode_documents(text_lines)

for i, line in enumerate(tqdm(text_lines, desc="Creating embeddings")):
data.append({"id": i, "vector": doc_embeddings[i], "text": line})

milvus_client.insert(collection_name=collection_name, data=data)

Creating embeddings: 0%| | 0/72 [00:00<?, ?it/s]huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks... To disable this warning, you can either:

  • Avoid using tokenizers before the fork if possible
  • Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false) Creating embeddings: 100%|██████████| 72/72 [00:00<00:00, 246522.36it/s]
{'insert_count': 72, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71], 'cost': 0}

构建 RAG

为查询检索数据

让我们指定一个关于 Milvus 的常见问题。

question = "How is data stored in milvus?"

在 collection 中搜索问题并检索语义上最相关的前 3 个匹配项。

search_res = milvus_client.search(
collection_name=collection_name,
data=embedding_model.encode_queries(
[question]
), # 将问题转换为嵌入向量
limit=3, # 返回前 3 个结果
search_params={"metric_type": "IP", "params": {}}, # 内积距离
output_fields=["text"], # 返回文本字段
)

让我们看看查询的搜索结果

retrieved_lines_with_distances = [
(res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=2))

[ [ " Where does Milvus store data?\n\nMilvus deals with two types of data, inserted data and metadata. \n\nInserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental log. Milvus supports multiple object storage backends, including MinIO, AWS S3, Google Cloud Storage (GCS), Azure Blob Storage, Alibaba Cloud OSS, and Tencent Cloud Object Storage (COS).\n\nMetadata are generated within Milvus. Each Milvus module has its own metadata that are stored in etcd.\n\n###", 0.6488019227981567 ], [ "How does Milvus flush data?\n\nMilvus returns success when inserted data are loaded to the message queue. However, the data are not yet flushed to the disk. Then Milvus' data node writes the data in the message queue to persistent storage as incremental logs. If flush() is called, the data node is forced to write all data in the message queue to persistent storage immediately.\n\n###", 0.5974207520484924 ], [ "What is the maximum dataset size Milvus can handle?\n\n \nTheoretically, the maximum dataset size Milvus can handle is determined by the hardware it is run on, specifically system memory and storage:\n\n- Milvus loads all specified collections and partitions into memory before running queries. Therefore, memory size determines the maximum amount of data Milvus can query.\n- When new entities and and collection-related schema (currently only MinIO is supported for data persistence) are added to Milvus, system storage determines the maximum allowable size of inserted data.\n\n###", 0.5833579301834106 ] ]

使用 LLM 生成响应

将检索到的文档转换为字符串格式。

context = "\n".join([line_with_distance[0] for line_with_distance in retrieved_lines_with_distances])

定义系统和用户提示符以供 DeepSeek 使用。此提示符是根据检索到的文档组装的。

SYSTEM_PROMPT = """
Human: You are an AI assistant. You are given a user question, and please write clean, concise and accurate answer to the question. You will be given a set of related contexts to the question, please answer based on the context. Please say "information is not available" if the question cannot be answered based on the context.
"""

USER_PROMPT = f"""
Use the following pieces of information enclosed in &lt;context&gt; tags to provide an answer to the question enclosed in &lt;question&gt; tags.
&lt;context&gt;
{{context}}
&lt;/context&gt;
&lt;question&gt;
{{question}}
&lt;/question&gt;
"""

使用 DeepSeek 生成基于提示符的响应。

response = deepseek_client.chat.completions.create(
model="deepseek-chat",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{
"role": "user",
"content": USER_PROMPT.format(context=context, question=question),
},
],
)

print(response.choices[0].message.content)

Based on the provided context, here's how data is stored in Milvus:

Milvus handles two types of data:

  1. Inserted Data: This includes vector data, scalar data, and collection-specific schema. This data is stored in persistent storage as incremental logs. Milvus supports multiple object storage backends for this purpose:

    • MinIO
    • AWS S3
    • Google Cloud Storage (GCS)
    • Azure Blob Storage
    • Alibaba Cloud OSS
    • Tencent Cloud Object Storage (COS)
  2. Metadata: This is generated within Milvus itself. Each Milvus module has its own metadata, which is stored in etcd.

The data flow works as follows: when data is inserted, it's first loaded to a message queue, and then Milvus' data node writes this data from the message queue to persistent storage as incremental logs. The system also supports forcing immediate data persistence through the flush() operation.

太好了!我们已经使用 Milvus 和 DeepSeek 构建了一个 RAG 管道。

快速部署

要了解如何使用此教程启动在线演示,请参阅示例应用程序