跳到主要内容

一致性级别

作为分布式向量数据库,Milvus 提供多种一致性级别,以确保每个节点或副本在读写操作期间可以访问相同的数据。目前,支持的一致性级别包括 StrongBoundedEventuallySession,其中 Bounded 是默认使用的一致性级别。

概述

Milvus 是一个存储与计算分离的系统。在这个系统中,DataNodes 负责数据的持久化,最终将数据存储在分布式对象存储(如 MinIO/S3)中。QueryNodes 处理计算任务,如搜索。这些任务涉及处理 批量数据流数据。简单来说,批量数据可以理解为已经存储在对象存储中的数据,而流数据指的是尚未存储在对象存储中的数据。由于网络延迟,QueryNodes 通常不持有最新的流数据。如果没有额外的保护措施,直接在流数据上执行搜索可能会导致许多未提交数据点的丢失,影响搜索结果的准确性。

Milvus 商业版是一个存储与计算分离的系统。在这个系统中,DataNodes 负责数据的持久化,最终将数据存储在分布式对象存储(如 MinIO/S3)中。QueryNodes 处理计算任务,如搜索。这些任务涉及处理批量数据和流数据。简单来说,批量数据可以理解为已经存储在对象存储中的数据,而流数据指的是尚未存储在对象存储中的数据。由于网络延迟,QueryNodes 通常不持有最新的流数据。如果没有额外的保护措施,直接在流数据上执行搜索可能会导致许多未提交数据点的丢失,影响搜索结果的准确性。

批量数据和流数据

如上图所示,QueryNodes 在接收搜索请求后可以同时接收流数据和批量数据。但是,由于网络延迟,QueryNodes 获得的流数据可能是不完整的。

为了解决这个问题,Milvus 为数据队列中的每条记录加上时间戳,并持续向数据队列中插入同步时间戳。每当接收到同步时间戳(syncTs)时,QueryNodes 将其设置为 ServiceTime,意味着 QueryNodes 可以看到该 Service Time 之前的所有数据。基于 ServiceTime,Milvus 可以提供保证时间戳(GuaranteeTs)来满足用户对一致性和可用性的不同要求。用户可以通过在搜索请求中指定 GuaranteeTs 来告知 QueryNodes 需要在搜索范围内包含指定时间点之前的数据。

Service Time 和 Guarantee Time

如上图所示,如果 GuaranteeTs 小于 ServiceTime,这意味着指定时间点之前的所有数据都已完全写入磁盘,允许 QueryNodes 立即执行搜索操作。当 GuaranteeTs 大于 ServiceTime 时,QueryNodes 必须等到 ServiceTime 超过 GuaranteeTs 后才能执行搜索操作。

用户需要在查询准确性和查询延迟之间做出权衡。如果用户对一致性要求较高且对查询延迟不敏感,可以将 GuaranteeTs 设置为尽可能大的值;如果用户希望快速收到搜索结果且对查询准确性更宽容,则可以将 GuaranteeTs 设置为较小的值。

一致性级别示意图

Milvus 提供四种具有不同 GuaranteeTs 的一致性级别。

  • Strong

    使用最新时间戳作为 GuaranteeTs,QueryNodes 必须等到 ServiceTime 满足 GuaranteeTs 后才能执行搜索请求。

  • Eventual

    GuaranteeTs 设置为极小值,如 1,以避免一致性检查,使 QueryNodes 可以立即在所有批量数据上执行搜索请求。

  • Bounded Staleness

    GuaranteeTs 设置为比最新时间戳更早的时间点,使 QueryNodes 在可容忍一定数据丢失的情况下执行搜索。

  • Session

    使用客户端插入数据的最新时间点作为 GuaranteeTs,使 QueryNodes 可以在客户端插入的所有数据上执行搜索。

Milvus 使用 Bounded Staleness 作为默认一致性级别。如果未指定 GuaranteeTs,则使用最新的 ServiceTime 作为 GuaranteeTs。

设置一致性级别

您可以在创建 Collection 时以及执行搜索和查询时设置不同的一致性级别。

创建 Collection 时设置一致性级别

创建 Collection 时,您可以为 Collection 内的搜索和查询设置一致性级别。以下代码示例将一致性级别设置为 Strong

client.create_collection(
collection_name="my_collection",
schema=schema,
# highlight-next
consistency_level="Strong",
)
CreateCollectionReq createCollectionReq = CreateCollectionReq.builder()
.collectionName("my_collection")
.collectionSchema(schema)
// highlight-next
.consistencyLevel(ConsistencyLevel.STRONG)
.build();
client.createCollection(createCollectionReq);
err = client.CreateCollection(ctx,
milvusclient.NewCreateCollectionOption("my_collection", schema).
WithConsistencyLevel(entity.ClStrong))
if err != nil {
fmt.Println(err.Error())
// handle error
}
export schema='{
"autoId": true,
"enabledDynamicField": false,
"fields": [
{
"fieldName": "id",
"dataType": "Int64",
"isPrimary": true
},
{
"fieldName": "vector",
"dataType": "FloatVector",
"elementTypeParams": {
"dim": "5"
}
},
{
"fieldName": "my_varchar",
"dataType": "VarChar",
"isClusteringKey": true,
"elementTypeParams": {
"max_length": 512
}
}
]
}'

export params='{
"consistencyLevel": "Strong"
}'

curl --request POST \
--url "${CLUSTER_ENDPOINT}/v2/vectordb/collections/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
-d "{
\"collectionName\": \"my_collection\",
\"schema\": $schema,
\"params\": $params
}"

consistency_level 参数的可能值为 StrongBoundedEventuallySession

在搜索中设置一致性级别

您可以随时为特定搜索更改一致性级别。以下代码示例将一致性级别设置回 Bounded。此更改仅适用于当前搜索请求。

res = client.search(
collection_name="my_collection",
data=[query_vector],
limit=3,
search_params={"metric_type": "IP"}
consistency_level="Bounded",
# highlight-next
)
SearchReq searchReq = SearchReq.builder()
.collectionName("my_collection")
.data(Collections.singletonList(queryVector))
.topK(3)
.searchParams(params)
.consistencyLevel(ConsistencyLevel.BOUNDED)
.build();

SearchResp searchResp = client.search(searchReq);
resultSets, err := client.Search(ctx, milvusclient.NewSearchOption(
"my_collection", // collectionName
3, // limit
[]entity.Vector{entity.FloatVector(queryVector)},
).WithConsistencyLevel(entity.ClBounded).
WithANNSField("vector"))
if err != nil {
fmt.Println(err.Error())
// handle error
}
curl --request POST \
--url "${CLUSTER_ENDPOINT}/v2/vectordb/entities/search" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
-d '{
"collectionName": "my_collection",
"data": [
[0.3580376395471989, -0.6023495712049978, 0.18414012509913835, -0.26286205330961354, 0.9029438446296592]
],
"limit": 3,
"consistencyLevel": "Bounded"
}'

此参数在混合搜索和搜索迭代器中也可用。consistency_level 参数的可能值为 StrongBoundedEventuallySession

在查询中设置一致性级别

您可以随时为特定搜索更改一致性级别。以下代码示例将一致性级别设置为 Eventually。此设置仅适用于当前查询请求。

res = client.query(
collection_name="my_collection",
filter="color like \"red%\"",
output_fields=["vector", "color"],
limit=3
consistency_level="Eventually",
# highlight-next
)
QueryReq queryReq = QueryReq.builder()
.collectionName("my_collection")
.filter("color like \"red%\"")
.outputFields(Arrays.asList("vector", "color"))
.limit(3)
.consistencyLevel(ConsistencyLevel.EVENTUALLY)
.build();

QueryResp getResp = client.query(queryReq);
resultSet, err := client.Query(ctx, milvusclient.NewQueryOption("my_collection").
WithFilter("color like \"red%\"").
WithOutputFields("vector", "color").
WithLimit(3).
WithConsistencyLevel(entity.ClEventually))
if err != nil {
fmt.Println(err.Error())
// handle error
}

此参数在查询迭代器中也可用。consistency_level 参数的可能值为 StrongBoundedEventuallySession