跳到主要内容

一致性级别​

作为分布式向量数据库,Milvus 提供多个一致性级别,以确保每个节点或副本在读写操作期间可以访问相同的数据。目前,支持的一致性级别包括 StrongBoundedEventuallySession,其中 Bounded 是默认使用的一致性级别。​

概述​

Milvus 是一个存储和计算分离的系统。在此系统中,DataNodes 负责数据的持久化,最终将其存储在分布式对象存储(如 MinIO/S3)中。QueryNodes 处理搜索等计算任务。这些任务涉及处理 批量数据流数据。简单地说,批量数据可以理解为已经存储在对象存储中的数据,而流数据是指尚未存储在对象存储中的数据。由于网络延迟,QueryNodes 通常不持有最新的流数据。如果没有额外的保障措施,直接在流数据上执行搜索可能会导致许多未提交数据点的丢失,影响搜索结果的准确性。​

Batch data and streaming data

如上图所示,QueryNodes 在接收到搜索请求后可以同时接收流数据和批量数据。但是,由于网络延迟,QueryNodes 获得的流数据可能是不完整的。​

为了解决这个问题,Milvus 为数据队列中的每个记录添加时间戳,并持续向数据队列插入同步时间戳。每当接收到同步时间戳(syncTs)时,QueryNodes 将其设置为 ServiceTime,这意味着 QueryNodes 可以看到该 Service Time 之前的所有数据。基于 ServiceTime,Milvus 可以提供保证时间戳(GuaranteeTs)来满足用户对一致性和可用性的不同要求。用户可以通过在搜索请求中指定 GuaranteeTs 来告知 QueryNodes 需要在搜索范围中包含指定时间点之前的数据。​

ServiceTime and GuaranteeTs

如上图所示,如果 GuaranteeTs 小于 ServiceTime,这意味着指定时间点之前的所有数据都已完全写入磁盘,允许 QueryNodes 立即执行搜索操作。当 GuaranteeTs 大于 ServiceTime 时,QueryNodes 必须等到 ServiceTime 超过 GuaranteeTs 才能执行搜索操作。​

用户需要在查询准确性和查询延迟之间做出权衡。如果用户对一致性要求很高且对查询延迟不敏感,他们可以将 GuaranteeTs 设置为尽可能大的值;如果用户希望快速接收搜索结果并且对查询准确性更宽容,那么可以将 GuaranteeTs 设置为较小的值。​

Consistency Levels Illustrated

Milvus 提供具有不同 GuaranteeTs 的四种一致性级别。​

  • Strong

    使用最新时间戳作为 GuaranteeTs,QueryNodes 必须等到 ServiceTime 满足 GuaranteeTs 才能执行搜索请求。​

  • Eventually

    GuaranteeTs 设置为极小的值,如 1,以避免一致性检查,使 QueryNodes 可以立即对所有批量数据执行搜索请求。​

  • Bounded​(默认)

    GuaranteeTs 设置为比最新时间戳更早的时间点,使 QueryNodes 能够在容忍一定数据丢失的情况下执行搜索。​

  • Session

    使用客户端插入数据的最新时间点作为 GuaranteeTs,使 QueryNodes 可以对客户端插入的所有数据执行搜索。​

Milvus 使用 Bounded Staleness 作为默认一致性级别。如果未指定 GuaranteeTs,则使用最新的 ServiceTime 作为 GuaranteeTs。​

设置一致性级别​

您可以在创建 collection 以及执行搜索和查询时设置不同的一致性级别。​

在创建 Collection 时设置一致性级别​

创建 collection 时,您可以为 collection 内的搜索和查询设置一致性级别。以下代码示例将一致性级别设置为 Bounded。​

client.create_collection(
collection_name="my_collection",
schema=schema,
consistency_level="Bounded",# Defaults to Bounded if not specified​
)

CreateCollectionReq createCollectionReq = CreateCollectionReq.builder()​
.collectionName("my_collection")​
.collectionSchema(schema)​
.consistencyLevel(ConsistencyLevel.BOUNDED)​
.build();​
client.createCollection(createCollectionReq);​

export schema='{​
"autoId": true,​
"enabledDynamicField": false,​
"fields": [​
{​
"fieldName": "my_id",​
"dataType": "Int64",​
"isPrimary": true​
},​
{​
"fieldName": "my_vector",​
"dataType": "FloatVector",​
"elementTypeParams": {​
"dim": "5"​
}​
},​
{​
"fieldName": "my_varchar",​
"dataType": "VarChar",​
"isClusteringKey": true,​
"elementTypeParams": {​
"max_length": 512​
}​
}​
]​
}'​

export params='{​
"consistencyLevel": "Bounded"​
}'​

curl --request POST \​
--url "${CLUSTER_ENDPOINT}/v2/vectordb/collections/create" \​
--header "Authorization: Bearer ${TOKEN}" \​
--header "Content-Type: application/json" \​
-d "{​
\"collectionName\": \"my_collection\",​
\"schema\": $schema,​
\"params\": $params​
}"​

consistency_level 参数的可能值为 StrongBoundedEventuallySession。​

在搜索中设置一致性级别​

您始终可以为特定搜索更改一致性级别。以下代码示例将一致性级别设置回 Bounded。更改仅适用于当前搜索请求。​

res = client.search(
collection_name="my_collection",
data=[query_vector],
limit=3,
search_params={"metric_type": "IP"},​
# highlight-start​
consistency_level="Bounded",
# highlight-next​
)

SearchReq searchReq = SearchReq.builder()​
.collectionName("my_collection")​
.data(Collections.singletonList(queryVector))​
.topK(3)​
.searchParams(params)​
.consistencyLevel(ConsistencyLevel.BOUNDED)​
.build();​

SearchResp searchResp = client.search(searchReq);​

curl --request POST \​
--url "${CLUSTER_ENDPOINT}/v2/vectordb/entities/search" \​
--header "Authorization: Bearer ${TOKEN}" \​
--header "Content-Type: application/json" \​
-d '{​
"collectionName": "my_collection",​
"data": [​
[0.3580376395471989, -0.6023495712049978, 0.18414012509913835, -0.26286205330961354, 0.9029438446296592]​
],​
"limit": 3,​
"consistencyLevel": "Bounded"​
}'​

此参数在混合搜索和搜索迭代器中也可用。consistency_level 参数的可能值为 StrongBoundedEventuallySession。​

在查询中设置一致性级别​

您始终可以为特定搜索更改一致性级别。以下代码示例将一致性级别设置为 Eventually。设置仅适用于当前查询请求。​

res = client.query(
collection_name="my_collection",
filter="color like \"red%\"",
output_fields=["vector", "color"],
limit=3,​
# highlight-start​
consistency_level="Eventually",
# highlight-next​
)

QueryReq queryReq = QueryReq.builder()​
.collectionName("my_collection")​
.filter("color like \"red%\"")​
.outputFields(Arrays.asList("vector", "color"))​
.limit(3)​
.consistencyLevel(ConsistencyLevel.EVENTUALLY)​
.build();​

QueryResp getResp = client.query(queryReq);​

此参数在查询迭代器中也可用。consistency_level 参数的可能值为 StrongBoundedEventuallySession。​