一致性级别
作为分布式向量数据库,Milvus 提供多种一致性级别,以确保每个节点或副本在读写操作期间可以访问相同的数据。目前,支持的一致性级别包括 Strong、Bounded、Eventually 和 Session,其中 Bounded 是默认使用的一致性级别。
概述
Milvus 是一个存储与计算分离的系统。在这个系统中,DataNodes 负责数据的持久化,最终将数据存储在分布式对象存储(如 MinIO/S3)中。QueryNodes 处理计算任务,如搜索。这些任务涉及处理 批量数据 和 流数据。简单来说,批量数据可以理解为已经存储在对象存储中的数据,而流数据指的是尚未存储在对象存储中的数据。由于网络延迟,QueryNodes 通常不持有最新的流数据。如果没有额外的保护措施,直接在流数据上执行搜索可能会导致许多未提交数据点的丢失,影响搜索结果的准确性。
Milvus 商业版是一个存储与计算分离的系统。在这个系统中,DataNodes 负责数据的持久化,最终将数据存储在分布式对象存储(如 MinIO/S3)中。QueryNodes 处理计算任务,如搜索。这些任务涉及处理批量数据和流数据。简单来说,批量数据可以理解为已经存储在对象存储中的数据,而流数据指的是尚未存储在对象存储中的数据。由于网络延迟,QueryNodes 通常不持有最新的流数据。如果没有额外的保护措施,直接在流数据上执行搜索可能会导致许多未提交数据点的丢失,影响搜索结果的准确性。
如上图所示,QueryNodes 在接收搜索请求后可以同时接收流数据和批量数据。但是,由于网络延迟,QueryNodes 获得的流数据可能是不完整的。
为了解决这个问题,Milvus 为数据队列中的每条记录加上时间戳,并持续向数据队列中插入同步时间戳。每当接收到同步时间戳(syncTs)时,QueryNodes 将其设置为 ServiceTime,意味着 QueryNodes 可以看到该 Service Time 之前的所有数据。基于 ServiceTime,Milvus 可以提供保证时间戳(GuaranteeTs)来满足用户对一致性和可用性的不同要求。用户可以通过在搜索请求中指定 GuaranteeTs 来告知 QueryNodes 需要在搜索范围内包含指定时间点之前的数据。
如上图所示,如果 GuaranteeTs 小于 ServiceTime,这意味着指定时间点之前的所有数据都已完全写入磁盘,允许 QueryNodes 立即执行搜索操作。当 GuaranteeTs 大于 ServiceTime 时,QueryNodes 必须等到 ServiceTime 超过 GuaranteeTs 后才能执行搜索操作。
用户需要在查询准确性和查询延迟之间做出权衡。如果用户对一致性要求较高且对查询延迟不敏感,可以将 GuaranteeTs 设置为尽可能大的值;如果用户希望快速收到搜索结果且对查询准确性更宽容,则可以将 GuaranteeTs 设置为较小的值。
Milvus 提供四种具有不同 GuaranteeTs 的一致性级别。
-
Strong
使用最新时间戳作为 GuaranteeTs,QueryNodes 必须等到 ServiceTime 满足 GuaranteeTs 后才能执行搜索请求。
-
Eventual
GuaranteeTs 设置为极小值,如 1,以避免一致性检查,使 QueryNodes 可以立即在所有批量数据上执行搜索请求。
-
Bounded Staleness
GuaranteeTs 设置为比最新时间戳更早的时间点,使 QueryNodes 在可容忍一定数据丢失的情况下执行搜索。
-
Session
使用客户端插入数据的最新时间点作为 GuaranteeTs,使 QueryNodes 可以在客户端插入的所有数据上执行搜索。
Milvus 使用 Bounded Staleness 作为默认一致性级别。如果未指定 GuaranteeTs,则使用最新的 ServiceTime 作为 GuaranteeTs。
设置一致性级别
您可以在创建 Collection 时以及执行搜索和查询时设置不同的一致性级别。
创建 Collection 时设置一致性级别
创建 Collection 时,您可以为 Collection 内的搜索和查询设置一致性级别。以下代码示例将一致性级别设置为 Strong。
client.create_collection(
collection_name="my_collection",
schema=schema,
# highlight-next
consistency_level="Strong",
)
CreateCollectionReq createCollectionReq = CreateCollectionReq.builder()
.collectionName("my_collection")
.collectionSchema(schema)
// highlight-next
.consistencyLevel(ConsistencyLevel.STRONG)
.build();
client.createCollection(createCollectionReq);
err = client.CreateCollection(ctx,
milvusclient.NewCreateCollectionOption("my_collection", schema).
WithConsistencyLevel(entity.ClStrong))
if err != nil {
fmt.Println(err.Error())
// handle error
}
export schema='{
"autoId": true,
"enabledDynamicField": false,
"fields": [
{
"fieldName": "id",
"dataType": "Int64",
"isPrimary": true
},
{
"fieldName": "vector",
"dataType": "FloatVector",
"elementTypeParams": {
"dim": "5"
}
},
{
"fieldName": "my_varchar",
"dataType": "VarChar",
"isClusteringKey": true,
"elementTypeParams": {
"max_length": 512
}
}
]
}'
export params='{
"consistencyLevel": "Strong"
}'
curl --request POST \
--url "${CLUSTER_ENDPOINT}/v2/vectordb/collections/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
-d "{
\"collectionName\": \"my_collection\",
\"schema\": $schema,
\"params\": $params
}"
consistency_level
参数的可能值为 Strong
、Bounded
、Eventually
和 Session
。
在搜索中设置一致性级别
您可以随时为特定搜索更改一致性级别。以下代码示例将一致性级别设置回 Bounded。此更改仅适用于当前搜索请求。
res = client.search(
collection_name="my_collection",
data=[query_vector],
limit=3,
search_params={"metric_type": "IP"},
consistency_level="Bounded",
# highlight-next
)
SearchReq searchReq = SearchReq.builder()
.collectionName("my_collection")
.data(Collections.singletonList(queryVector))
.topK(3)
.searchParams(params)
.consistencyLevel(ConsistencyLevel.BOUNDED)
.build();
SearchResp searchResp = client.search(searchReq);
resultSets, err := client.Search(ctx, milvusclient.NewSearchOption(
"my_collection", // collectionName
3, // limit
[]entity.Vector{entity.FloatVector(queryVector)},
).WithConsistencyLevel(entity.ClBounded).
WithANNSField("vector"))
if err != nil {
fmt.Println(err.Error())
// handle error
}
curl --request POST \
--url "${CLUSTER_ENDPOINT}/v2/vectordb/entities/search" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
-d '{
"collectionName": "my_collection",
"data": [
[0.3580376395471989, -0.6023495712049978, 0.18414012509913835, -0.26286205330961354, 0.9029438446296592]
],
"limit": 3,
"consistencyLevel": "Bounded"
}'
此参数在混合搜索和搜索迭代器中也可用。consistency_level
参数的可能值为 Strong
、Bounded
、Eventually
和 Session
。
在查询中设置一致性级别
您可以随时为特定搜索更改一致性级别。以下代码示例将一致性级别设置为 Eventually。此设置仅适用于当前查询请求。
res = client.query(
collection_name="my_collection",
filter="color like \"red%\"",
output_fields=["vector", "color"],
limit=3,
consistency_level="Eventually",
# highlight-next
)
QueryReq queryReq = QueryReq.builder()
.collectionName("my_collection")
.filter("color like \"red%\"")
.outputFields(Arrays.asList("vector", "color"))
.limit(3)
.consistencyLevel(ConsistencyLevel.EVENTUALLY)
.build();
QueryResp getResp = client.query(queryReq);
resultSet, err := client.Query(ctx, milvusclient.NewQueryOption("my_collection").
WithFilter("color like \"red%\"").
WithOutputFields("vector", "color").
WithLimit(3).
WithConsistencyLevel(entity.ClEventually))
if err != nil {
fmt.Println(err.Error())
// handle error
}
此参数在查询迭代器中也可用。consistency_level
参数的可能值为 Strong
、Bounded
、Eventually
和 Session
。