连接 Apache Kafka® 与 Milvus/Zilliz Cloud 进行实时向量数据摄取
在本快速入门指南中,我们展示如何设置开源 Kafka 和 Zilliz Cloud 来摄取向量数据。
本教程解释如何使用 Apache Kafka® 将向量数据流式传输并摄取到 Milvus 向量数据库和 Zilliz Cloud(完全托管的 Milvus)中,从而实现高级实时应用程序,如语义搜索、推荐系统和 AI 驱动的分析。
Apache Kafka 是一个分布式事件流处理平台,专为高吞吐量、低延迟管道而设计。它广泛用于从数据库、IoT 设备、移动应用程序和云服务等源收集、存储和处理实时数据流。Kafka 处理大量数据的能力使其成为 Milvus 或 Zilliz Cloud 等向量数据库的重要数据源。
例如,Kafka 可以捕获实时数据流——如用户交互、传感器读数,以及来自机器学习模型的嵌入——并将这些流直接发布到 Milvus 或 Zilliz Cloud。一旦进入向量数据库,这些数据就可以高效地进行索引、搜索和分析。
Kafka 与 Milvus 和 Zilliz Cloud 的集成为构建强大的非结构化数据工作流管道提供了无缝的方式。该连接器适用于开源 Kafka 部署和托管服务,如 Confluent 和 StreamNative。
在本教程中,我们使用 Zilliz Cloud 作为演示:
步骤 1:下载 kafka-connect-milvus 插件
完成以下步骤以下载 kafka-connect-milvus 插件。
- 从这里下载最新的插件 zip 文件
zilliz-kafka-connect-milvus-xxx.zip
。
步骤 2:下载 Kafka
- 从这里下载最新的 Kafka。
- 解压下载的文件并进入 Kafka 目录。
$ tar -xzf kafka_2.13-3.6.1.tgz
$ cd kafka_2.13-3.6.1
步骤 3:启动 Kafka 环境
注意:您的本地环境必须安装 Java 8+ 版本。
按顺序运行以下命令以正确启动所有服务:
-
启动 ZooKeeper 服务
$ bin/zookeeper-server-start.sh config/zookeeper.properties
-
启动 Kafka broker 服务
打开另一个终端会话并运行:
$ bin/kafka-server-start.sh config/server.properties
一旦所有服务成功启动,您就拥有了一个运行中且可以使用的基本 Kafka 环境。
- 详细信息请查看 kafka 官方快速入门指南:https://kafka.apache.org/quickstart
步骤 4:配置 Kafka 和 Zilliz Cloud
确保您已设置并正确配置了 Kafka 和 Zilliz Cloud。
-
如果您在 Kafka 中还没有主题,请在 Kafka 中创建一个主题(例如
topic_0
)。$ bin/kafka-topics.sh --create --topic topic_0 --bootstrap-server localhost:9092
-
如果您在 Zilliz Cloud 中还没有 Collection,请创建一个带有向量字段的 Collection(在此示例中向量的
dimension=8
)。您可以在 Zilliz Cloud 上使用以下示例模式:注意:确保两侧的模式相互匹配。在模式中,恰好有一个向量字段。两侧每个字段的名称完全相同。
步骤 5:将 kafka-connect-milvus 插件加载到 Kafka 实例
-
解压您在步骤 1 中下载的
zilliz-kafka-connect-milvus-xxx.zip
文件。 -
将
zilliz-kafka-connect-milvus
目录复制到您的 Kafka 安装的libs
目录中。 -
修改您的 Kafka 安装的
config
目录中的connect-standalone.properties
文件。key.converter.schemas.enable=false
value.converter.schemas.enable=false
plugin.path=libs/zilliz-kafka-connect-milvus-xxx -
在您的 Kafka 安装的
config
目录中创建并配置milvus-sink-connector.properties
文件。name=zilliz-kafka-connect-milvus
connector.class=com.milvus.io.kafka.MilvusSinkConnector
public.endpoint=https://<public.endpoint>:port
token=*****************************************
collection.name=topic_0
topics=topic_0
步骤 6:启动连接器
-
使用之前的配置文件启动连接器
$ bin/connect-standalone.sh config/connect-standalone.properties config/milvus-sink-connector.properties
-
尝试向您刚在 Kafka 中创建的 Kafka 主题发送消息
bin/kafka-console-producer.sh --topic topic_0 --bootstrap-server localhost:9092
>{"id": 0, "title": "The Reported Mortality Rate of Coronavirus Is Not Important", "title_vector": [0.041732933, 0.013779674, -0.027564144, -0.013061441, 0.009748648, 0.00082446384, -0.00071647146, 0.048612226], "link": "https://medium.com/swlh/the-reported-mortality-rate-of-coronavirus-is-not-important-369989c8d912"} -
检查实体是否已插入到 Zilliz Cloud 中的 Collection。如果插入成功,在 Zilliz Cloud 上看起来是这样的:
支持
如果您需要任何帮助或对 Kafka Connect Milvus 连接器有疑问,请随时联系连接器的维护者:邮箱:support@zilliz.com
快速部署
要了解如何使用此教程启动在线演示,请参阅示例应用程序。