使用 Milvus 和 Gemini 构建 RAG
Gemini API 和 Google AI Studio 帮助您开始使用 Google 的最新模型,并将您的想法转化为可扩展的应用程序。Gemini 提供对强大语言模型的访问,如 Gemini-1.5-Flash
、Gemini-1.5-Flash-8B
和 Gemini-1.5-Pro
,用于文本生成、文档处理、视觉、音频分析等任务。该 API 允许您输入包含数百万个 token 的长上下文,为特定任务微调模型,生成 JSON 等结构化输出,并利用语义检索和代码执行等功能。
在本教程中,我们将向您展示如何使用 Milvus 和 Gemini 构建 RAG(检索增强生成)管道。我们将使用 Gemini 模型基于给定查询生成文本。我们还将使用 Milvus 来存储和检索生成的文本。
准备工作
依赖项和环境
$ pip install --upgrade pymilvus google-generativeai requests tqdm
如果您使用的是 Google Colab,为了启用刚安装的依赖项,您可能需要重启运行时(点击屏幕顶部的"Runtime"菜单,然后从下拉菜单中选择"Restart session")。
您应该首先登录 Google AI Studio 平台并准备 api key GEMINI_API_KEY
作为环境变量。
import os
os.environ["GEMINI_API_KEY"] = "***********"
准备数据
我们使用 Milvus Documentation 2.4.x 的 FAQ 页面作为我们 RAG 中的私有知识,这是一个简单 RAG 管道的良好数据源。
下载 zip 文件并将文档提取到文件夹 milvus_docs
。
$ wget https://github.com/milvus-io/milvus-docs/releases/download/v2.4.6-preview/milvus_docs_2.4.x_en.zip
$ unzip -q milvus_docs_2.4.x_en.zip -d milvus_docs
我们从文件夹 milvus_docs/en/faq
加载所有 markdown 文件。对于每个文档,我们只是简单地使用 "# " 来分离文件中的内容,这可以大致分离 markdown 文件每个主要部分的内容。
from glob import glob
text_lines = []
for file_path in glob("milvus_docs/en/faq/*.md", recursive=True):
with open(file_path, "r") as file:
file_text = file.read()
text_lines += file_text.split("# ")
准备 LLM 和嵌入模型
我们使用 gemini-1.5-flash
作为 LLM,使用 text-embedding-004
作为嵌入模型。
让我们尝试从 LLM 生成一个测试响应:
import google.generativeai as genai
genai.configure(api_key=os.environ["GEMINI_API_KEY"])
gemini_model = genai.GenerativeModel("gemini-1.5-flash")
response = gemini_model.generate_content("who are you")
print(response.text)
I am a large language model, trained by Google. I am an AI and don't have a personal identity or consciousness. My purpose is to process information and respond to a wide range of prompts and questions in a helpful and informative way.
生成一个测试嵌入并打印其维度和前几个元素。
test_embeddings = genai.embed_content(
model="models/text-embedding-004", content=["This is a test1", "This is a test2"]
)["embedding"]
embedding_dim = len(test_embeddings[0])
print(embedding_dim)
print(test_embeddings[0][:10])
768 [0.013588584, -0.004361838, -0.08481652, -0.039724775, 0.04723794, -0.0051557426, 0.026071774, 0.045514572, -0.016867816, 0.039378334]
将数据加载到 Milvus
创建 Collection
from pymilvus import MilvusClient
milvus_client = MilvusClient(uri="./milvus_demo.db")
collection_name = "my_rag_collection"
关于 MilvusClient
的参数:
- 将
uri
设置为本地文件,例如./milvus.db
,是最方便的方法,因为它会自动利用 Milvus Lite 将所有数据存储在此文件中。 - 如果您有大规模数据,可以在 docker 或 kubernetes 上设置性能更高的 Milvus 服务器。在此设置中,请使用服务器 uri,例如
http://localhost:19530
,作为您的uri
。 - 如果您想使用 Zilliz Cloud,Milvus 的完全托管云服务,请调整
uri
和token
,它们对应于 Zilliz Cloud 中的 Public Endpoint 和 Api key。
检查 collection 是否已存在,如果存在则删除它。
if milvus_client.has_collection(collection_name):
milvus_client.drop_collection(collection_name)
使用指定参数创建新的 collection。
如果我们不指定任何字段信息,Milvus 将自动创建一个默认的 id
字段作为主键,以及一个 vector
字段来存储向量数据。保留的 JSON 字段用于存储非模式定义的字段及其值。
milvus_client.create_collection(
collection_name=collection_name,
dimension=embedding_dim,
metric_type="IP", # 内积距离
consistency_level="Strong", # 支持的值为 (`"Strong"`, `"Session"`, `"Bounded"`, `"Eventually"`)。详见 https://milvus.io/docs/consistency.md#Consistency-Level
)
插入数据
遍历文本行,创建嵌入,然后将数据插入到 Milvus 中。
这里有一个新字段 text
,它是 collection 模式中的非定义字段。它将自动添加到保留的 JSON 动态字段中,在高级别上可以被视为普通字段。
from tqdm import tqdm
data = []
doc_embeddings = genai.embed_content(
model="models/text-embedding-004", content=text_lines
)["embedding"]
for i, line in enumerate(tqdm(text_lines, desc="Creating embeddings")):
data.append({"id": i, "vector": doc_embeddings[i], "text": line})
milvus_client.insert(collection_name=collection_name, data=data)
Creating embeddings: 100%|██████████| 72/72 [00:00<00:00, 468201.38it/s]
{'insert_count': 72, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71], 'cost': 0}
构建 RAG
为查询检索数据
让我们指定一个关于 Milvus 的常见问题。
question = "How is data stored in milvus?"
在 collection 中搜索问题并检索语义上最相关的前 3 个匹配项。
question_embedding = genai.embed_content(
model="models/text-embedding-004", content=question
)["embedding"]
search_res = milvus_client.search(
collection_name=collection_name,
data=[question_embedding], # 将问题转换为嵌入向量
limit=3, # 返回前 3 个结果
search_params={"metric_type": "IP", "params": {}}, # 内积距离
output_fields=["text"], # 返回文本字段
)
让我们看看查询的搜索结果
retrieved_lines_with_distances = [
(res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=2))
[
[
" Where does Milvus store data?\n\nMilvus deals with two types of data, inserted data and metadata. \n\nInserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental log. Milvus supports multiple object storage backends, including MinIO, AWS S3, Google Cloud Storage (GCS), Azure Blob Storage, Alibaba Cloud OSS, and Tencent Cloud Object Storage (COS).\n\nMetadata are generated within Milvus. Each Milvus module has its own metadata that are stored in etcd.\n\n###",
0.6488019227981567
],
[
"How does Milvus flush data?\n\nMilvus returns success when inserted data are loaded to the message queue. However, the data are not yet flushed to the disk. Then Milvus' data node writes the data in the message queue to persistent storage as incremental logs. If flush()
is called, the data node is forced to write all data in the message queue to persistent storage immediately.\n\n###",
0.5974207520484924
],
[
"What is the maximum dataset size Milvus can handle?\n\n \nTheoretically, the maximum dataset size Milvus can handle is determined by the hardware it is run on, specifically system memory and storage:\n\n- Milvus loads all specified collections and partitions into memory before running queries. Therefore, memory size determines the maximum amount of data Milvus can query.\n- When new entities and and collection-related schema (currently only MinIO is supported for data persistence) are added to Milvus, system storage determines the maximum allowable size of inserted data.\n\n###",
0.5833579301834106
]
]
使用 LLM 生成响应
将检索到的文档转换为字符串格式。
context = "\n".join([line_with_distance[0] for line_with_distance in retrieved_lines_with_distances])
定义系统和用户提示符以供 Gemini 使用。此提示符是根据检索到的文档组装的。
SYSTEM_PROMPT = """
Human: You are an AI assistant. You are given a user question, and please write clean, concise and accurate answer to the question. You will be given a set of related contexts to the question, please answer based on the context. Please say "information is not available" if the question cannot be answered based on the context.
"""
USER_PROMPT = f"""
Use the following pieces of information enclosed in <context> tags to provide an answer to the question enclosed in <question> tags.
<context>
{{context}}
</context>
<question>
{{question}}
</question>
"""
使用 Gemini 生成基于提示符的响应。
prompt = SYSTEM_PROMPT + USER_PROMPT.format(context=context, question=question)
response = gemini_model.generate_content(prompt)
print(response.text)
Based on the provided context, here's how data is stored in Milvus:
Milvus handles two types of data:
-
Inserted Data: This includes vector data, scalar data, and collection-specific schema. This data is stored in persistent storage as incremental logs. Milvus supports multiple object storage backends for this purpose:
- MinIO
- AWS S3
- Google Cloud Storage (GCS)
- Azure Blob Storage
- Alibaba Cloud OSS
- Tencent Cloud Object Storage (COS)
-
Metadata: This is generated within Milvus itself. Each Milvus module has its own metadata, which is stored in etcd.
The data flow works as follows: when data is inserted, it's first loaded to a message queue, and then Milvus' data node writes this data from the message queue to persistent storage as incremental logs. The system also supports forcing immediate data persistence through the flush()
operation.
太好了!我们已经使用 Milvus 和 Gemini 构建了一个 RAG 管道。
快速部署
要了解如何使用此教程启动在线演示,请参阅示例应用程序。