使用 Milvus 和 LlamaIndex 异步 API 构建 RAG
本教程演示如何使用 LlamaIndex 和 Milvus 构建异步文档处理管道进行 RAG。LlamaIndex 提供了处理文档并存储在 Milvus 等向量数据库中的方法。通过利用 LlamaIndex 和 Milvus Python 客户端库的异步 API,我们可以提高管道的吞吐量,有效地处理和索引大量数据。
在本教程中,我们将首先从高层次介绍使用异步方法构建 LlamaIndex 和 Milvus 的 RAG,然后介绍低级方法的使用以及同步和异步的性能比较。
开始之前
本页面的代码片段需要 pymilvus 和 llamaindex 依赖。您可以使用以下命令安装它们:
$ pip install -U pymilvus llama-index-vector-stores-milvus llama-index nest-asyncio
如果您使用 Google Colab,为了启用刚安装的依赖项,您可能需要重启运行时(点击屏幕顶部的"Runtime"菜单,并从下拉菜单中选择"Restart session")。
我们将使用 OpenAI 的模型。您应该准备 api key OPENAI_API_KEY
作为环境变量。
import os
os.environ["OPENAI_API_KEY"] = "sk-***********"
如果您使用 Jupyter Notebook,需要在运行异步代码之前运行这行代码。
import nest_asyncio
nest_asyncio.apply()
准备数据
您可以使用以下命令下载示例数据:
$ mkdir -p 'data/'
$ wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/paul_graham/paul_graham_essay.txt' -O 'data/paul_graham_essay.txt'
$ wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/10k/uber_2021.pdf' -O 'data/uber_2021.pdf'
使用异步处理构建 RAG
本节展示如何构建可以异步处理文档的 RAG 系统。
导入必要的库并定义 Milvus URI 和嵌入的维度。
import asyncio
import random
import time
from llama_index.core.schema import TextNode, NodeRelationship, RelatedNodeInfo
from llama_index.core.vector_stores import VectorStoreQuery
from llama_index.vector_stores.milvus import MilvusVectorStore
URI = "http://localhost:19530"
DIM = 768
- 如果您有大规模数据,可以在 docker 或 kubernetes 上设置高性能的 Milvus 服务器。在此设置中,请使用服务器 uri,例如
http://localhost:19530
,作为您的uri
。 - 如果您想使用 Zilliz Cloud,Milvus 的完全托管云服务,请调整
uri
和token
,它们对应于 Zilliz Cloud 中的公共端点和 API 密钥。 - 在复杂系统(如网络通信)的情况下,异步处理与同步相比可以带来性能改进。因此我们认为 Milvus-Lite 不适合使用异步接口,因为使用的场景不太合适。
定义一个初始化函数,我们可以再次使用它来重新构建 Milvus Collection。
def init_vector_store():
return MilvusVectorStore(
uri=URI,
# token=TOKEN,
dim=DIM,
collection_name="test_collection",
embedding_field="embedding",
id_field="id",
similarity_metric="COSINE",
consistency_level="Strong", # 支持的值为 (`"Strong"`, `"Session"`, `"Bounded"`, `"Eventually"`)。更多详情请参见 https://milvus.io/docs/consistency.md#Consistency-Level
overwrite=True, # 如果 Collection 已存在则覆盖
)
vector_store = init_vector_store()
2025-01-24 20:04:39,414 [DEBUG][_create_connection]: Created new connection using: faa8be8753f74288bffc7e6d38942f8a (async_milvus_client.py:600)
使用 SimpleDirectoryReader 从文件 paul_graham_essay.txt
包装一个 LlamaIndex 文档对象。
from llama_index.core import SimpleDirectoryReader
# 加载文档
documents = SimpleDirectoryReader(
input_files=["./data/paul_graham_essay.txt"]
).load_data()
print("Document ID:", documents[0].doc_id)
Document ID: 41a6f99c-489f-49ff-9821-14e2561140eb
本地实例化一个 Hugging Face 嵌入模型。使用本地模型可以避免在异步数据插入期间达到 API 速率限制的风险,因为并发 API 请求可能会迅速累积并消耗您在公共 API 中的预算。但是,如果您有较高的速率限制,您可以选择使用远程模型服务。
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-base-en-v1.5")
创建索引并插入文档。
我们将 use_async
设置为 True
以启用异步插入模式。
# 创建文档索引
from llama_index.core import VectorStoreIndex, StorageContext
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(
documents,
storage_context=storage_context,
embed_model=embed_model,
use_async=True,
)
初始化 LLM。
from llama_index.llms.openai import OpenAI
llm = OpenAI(model="gpt-3.5-turbo")
在构建查询引擎时,您也可以将 use_async
参数设置为 True
以启用异步搜索。
query_engine = index.as_query_engine(use_async=True, llm=llm)
response = await query_engine.aquery("What did the author learn?")
print(response)
The author learned that the field of artificial intelligence, as practiced at the time, was not as promising as initially believed. The approach of using explicit data structures to represent concepts in AI was not effective in achieving true understanding of natural language. This realization led the author to shift his focus towards Lisp and eventually towards exploring the field of art.
探索异步 API
在本节中,我们将介绍更低级的 API 使用方法,并比较同步和异步运行的性能。
异步添加
重新初始化向量存储。
vector_store = init_vector_store()
2025-01-24 20:07:38,727 [DEBUG][_create_connection]: Created new connection using: 5e0d130f3b644555ad7ea6b8df5f1fc2 (async_milvus_client.py:600)
让我们定义一个节点生成函数,用于为索引生成大量测试节点。
def random_id():
random_num_str = ""
for _ in range(16):
random_digit = str(random.randint(0, 9))
random_num_str += random_digit
return random_num_str
def produce_nodes(num_adding):
node_list = []
for i in range(num_adding):
node = TextNode(
id_=random_id(),
text=f"n{i}_text",
embedding=[0.5] * (DIM - 1) + [random.random()],
relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id=f"n{i+1}")},
)
node_list.append(node)
return node_list
定义一个异步函数来向向量存储添加文档。我们在 Milvus 向量存储实例中使用 async_add()
函数。
async def async_add(num_adding):
node_list = produce_nodes(num_adding)
start_time = time.time()
tasks = []
for i in range(num_adding):
sub_nodes = node_list[i]
task = vector_store.async_add([sub_nodes]) # 使用 async_add()
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
return end_time - start_time
add_counts = [10, 100, 1000]
获取事件循环。
loop = asyncio.get_event_loop()
异步向向量存储添加文档。
for count in add_counts:
async def measure_async_add():
async_time = await async_add(count)
print(f"Async add for {count} took {async_time:.2f} seconds")
return async_time
loop.run_until_complete(measure_async_add())
Async add for 10 took 0.19 seconds Async add for 100 took 0.48 seconds Async add for 1000 took 3.22 seconds
vector_store = init_vector_store()
2025-01-24 20:07:45,554 [DEBUG][_create_connection]: Created new connection using: b14dde8d6d24489bba26a907593f692d (async_milvus_client.py:600)
与同步添加比较
定义一个同步添加函数。然后在相同条件下测量运行时间。
def sync_add(num_adding):
node_list = produce_nodes(num_adding)
start_time = time.time()
for node in node_list:
result = vector_store.add([node])
end_time = time.time()
return end_time - start_time
for count in add_counts:
sync_time = sync_add(count)
print(f"Sync add for {count} took {sync_time:.2f} seconds")
Sync add for 10 took 0.56 seconds Sync add for 100 took 5.85 seconds Sync add for 1000 took 62.91 seconds
结果显示同步添加过程比异步过程慢得多。
异步搜索
重新初始化向量存储并在运行搜索之前添加一些文档。
vector_store = init_vector_store()
node_list = produce_nodes(num_adding=1000)
inserted_ids = vector_store.add(node_list)
2025-01-24 20:08:57,982 [DEBUG][_create_connection]: Created new connection using: 351dc7ea4fb14d4386cfab02621ab7d1 (async_milvus_client.py:600)
定义一个异步搜索函数。我们在 Milvus 向量存储实例中使用 aquery()
函数。
async def async_search(num_queries):
start_time = time.time()
tasks = []
for _ in range(num_queries):
query = VectorStoreQuery(
query_embedding=[0.5] * (DIM - 1) + [0.6], similarity_top_k=3
)
task = vector_store.aquery(query=query) # 使用 aquery()
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
return end_time - start_time
query_counts = [10, 100, 1000]
从 Milvus 存储中异步搜索。
for count in query_counts:
async def measure_async_search():
async_time = await async_search(count)
print(f"Async search for {count} queries took {async_time:.2f} seconds")
return async_time
loop.run_until_complete(measure_async_search())
Async search for 10 queries took 0.55 seconds Async search for 100 queries took 1.39 seconds Async search for 1000 queries took 8.81 seconds
与同步搜索比较
定义一个同步搜索函数。然后在相同条件下测量运行时间。
def sync_search(num_queries):
start_time = time.time()
for _ in range(num_queries):
query = VectorStoreQuery(
query_embedding=[0.5] * (DIM - 1) + [0.6], similarity_top_k=3
)
result = vector_store.query(query=query)
end_time = time.time()
return end_time - start_time
for count in query_counts:
sync_time = sync_search(count)
print(f"Sync search for {count} queries took {sync_time:.2f} seconds")
Sync search for 10 queries took 3.29 seconds Sync search for 100 queries took 30.80 seconds Sync search for 1000 queries took 308.80 seconds
结果显示同步搜索过程比异步过程慢得多。