使用 Apache Spark™ 与 Milvus/Zilliz Cloud 构建 AI 管道
Spark-Milvus 连接器提供了 Apache Spark 和 Databricks 与 Milvus 和 Zilliz Cloud 的集成。它将 Apache Spark 强大的大数据处理和机器学习(ML)功能与 Milvus 最先进的向量搜索能力相结合。这种集成为 AI 驱动的搜索、高级分析、ML 训练和大规模向量数据的高效管理提供了简化的工作流程。
Apache Spark 是一个分布式数据处理平台,专为高速计算处理大规模数据集而设计。当与 Milvus 或 Zilliz Cloud 配对时,它为语义搜索、推荐系统和 AI 驱动的数据分析等用例释放了新的可能性。
例如,Spark 可以通过 ML 模型批处理大型数据集以生成嵌入,然后使用 Spark-Milvus 连接器将这些嵌入直接存储在 Milvus 或 Zilliz Cloud 中。一旦建立索引,这些数据就可以快速搜索或分析,为 AI 和大数据工作流程创建强大的管道。
Spark-Milvus 连接器支持诸如向 Milvus 迭代和批量数据摄取、系统间数据同步以及对存储在 Milvus 中的向量数据进行高级分析等任务。本指南将引导您完成有效配置和使用连接器的步骤,用于以下用例:
- 高效地将向量数据大批量加载到 Milvus 中,
- 在 Milvus 和其他存储系统或数据库之间移动数据,
- 通过利用 Spark MLlib 和其他 AI 工具分析 Milvus 中的数据。
快速开始
准备工作
Spark-Milvus 连接器支持 Scala 和 Python 编程语言。用户可以使用 Pyspark 或 Spark-shell。要运行此演示,请按照以下步骤设置包含 Spark-Milvus 连接器依赖项的 Spark 环境:
-
安装 Apache Spark(版本 >= 3.3.0)
您可以参考官方文档安装 Apache Spark。
-
下载 spark-milvus jar 文件。
wget https://github.com/zilliztech/spark-milvus/raw/1.0.0-SNAPSHOT/output/spark-milvus-1.0.0-SNAPSHOT.jar
-
将 spark-milvus jar 作为依赖项启动 Spark 运行时。
要使用 Spark-Milvus 连接器启动 Spark 运行时,请将下载的 spark-milvus 作为依赖项添加到命令中。
-
pyspark
./bin/pyspark --jars spark-milvus-1.0.0-SNAPSHOT.jar
-
spark-shell
./bin/spark-shell --jars spark-milvus-1.0.0-SNAPSHOT.jar
-
演示
在此演示中,我们创建一个带有向量数据的示例 Spark DataFrame,并通过 Spark-Milvus 连接器将其写入 Milvus。将根据模式和指定选项在 Milvus 中自动创建一个 Collection。
from pyspark.sql import SparkSession
columns = ["id", "text", "vec"]
data = [(1, "a", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
(2, "b", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
(3, "c", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
(4, "d", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0])]
sample_df = spark.sparkContext.parallelize(data).toDF(columns)
sample_df.write \
.mode("append") \
.option("milvus.host", "localhost") \
.option("milvus.port", "19530") \
.option("milvus.collection.name", "hello_spark_milvus") \
.option("milvus.collection.vectorField", "vec") \
.option("milvus.collection.vectorDim", "8") \
.option("milvus.collection.primaryKeyField", "id") \
.format("milvus") \
.save()
import org.apache.spark.sql.{SaveMode, SparkSession}
object Hello extends App {
val spark = SparkSession.builder().master("local[*]")
.appName("HelloSparkMilvus")
.getOrCreate()
import spark.implicits._
// Create DataFrame
val sampleDF = Seq(
(1, "a", Seq(1.0,2.0,3.0,4.0,5.0)),
(2, "b", Seq(1.0,2.0,3.0,4.0,5.0)),
(3, "c", Seq(1.0,2.0,3.0,4.0,5.0)),
(4, "d", Seq(1.0,2.0,3.0,4.0,5.0))
).toDF("id", "text", "vec")
// set milvus options
val milvusOptions = Map(
"milvus.host" -> "localhost" -> uri,
"milvus.port" -> "19530",
"milvus.collection.name" -> "hello_spark_milvus",
"milvus.collection.vectorField" -> "vec",
"milvus.collection.vectorDim" -> "5",
"milvus.collection.primaryKeyField", "id"
)
sampleDF.write.format("milvus")
.options(milvusOptions)
.mode(SaveMode.Append)
.save()
}
执行上述代码后,您可以使用 SDK 或 Attu(Milvus 仪表板)查看 Milvus 中插入的数据。您可以找到一个名为 hello_spark_milvus
的 Collection,其中已插入 4 个实体。
功能和概念
Milvus 选项
在快速开始部分,我们展示了在与 Milvus 操作期间设置选项。这些选项被抽象为 Milvus 选项。它们用于创建与 Milvus 的连接并控制其他 Milvus 行为。并非所有选项都是必需的。
选项键 | 默认值 | 描述 |
---|---|---|
milvus.host | localhost | Milvus 服务器主机。详见管理 Milvus 连接。 |
milvus.port | 19530 | Milvus 服务器端口。详见管理 Milvus 连接。 |
milvus.username | root | Milvus 服务器用户名。详见管理 Milvus 连接。 |
milvus.password | Milvus | Milvus 服务器密码。详见管理 Milvus 连接。 |
milvus.uri | -- | Milvus 服务器 URI。详见管理 Milvus 连接。 |
milvus.token | -- | Milvus 服务器令牌。详见管理 Milvus 连接。 |
milvus.database.name | default | 要读取或写入的 Milvus 数据库名称。 |
milvus.collection.name | hello_milvus | 要读取或写入的 Milvus Collection 名称。 |
milvus.collection.primaryKeyField | None | Collection 中主键字段的名称。如果 Collection 不存在则为必需。 |
milvus.collection.vectorField | None | Collection 中向量字段的名称。如果 Collection 不存在则为必需。 |
milvus.collection.vectorDim | None | Collection 中向量字段的维度。如果 Collection 不存在则为必需。 |
milvus.collection.autoID | false | 如果 Collection 不存在,此选项指定是否为实体自动生成 ID。详见create_collection |
milvus.bucket | a-bucket | Milvus 存储中的存储桶名称。应与 milvus.yaml 中的 minio.bucketName 相同。 |
milvus.rootpath | files | Milvus 存储的根路径。应与 milvus.yaml 中的 minio.rootpath 相同。 |
milvus.fs | s3a:// | Milvus 存储的文件系统。对于开源 Spark,值为 s3a:// 。对于 Databricks,使用 s3:// 。 |
milvus.storage.endpoint | localhost:9000 | Milvus 存储的端点。应与 milvus.yaml 中的 minio.address :minio.port 相同。 |
milvus.storage.user | minioadmin | Milvus 存储的用户。应与 milvus.yaml 中的 minio.accessKeyID 相同。 |
milvus.storage.password | minioadmin | Milvus 存储的密码。应与 milvus.yaml 中的 minio.secretAccessKey 相同。 |
milvus.storage.useSSL | false | 是否对 Milvus 存储使用 SSL。应与 milvus.yaml 中的 minio.useSSL 相同。 |
Milvus 数据格式
Spark-Milvus 连接器支持以下 Milvus 数据格式读取和写入数据:
milvus
:用于从 Spark DataFrame 无缝转换为 Milvus 实体的 Milvus 数据格式。milvusbinlog
:用于读取 Milvus 内置 binlog 数据的 Milvus 数据格式。mjson
:用于将数据批量插入 Milvus 的 Milvus JSON 格式。
milvus
在快速开始中,我们使用 milvus 格式将示例数据写入 Milvus 集群。milvus 格式是一种新的数据格式,支持将 Spark DataFrame 数据无缝写入 Milvus Collection。这是通过批量调用 Milvus SDK 的 Insert API 实现的。如果 Milvus 中不存在 Collection,将根据 Dataframe 的模式创建新的 Collection。但是,自动创建的 Collection 可能不支持 Collection 模式的所有功能。因此,建议首先通过 SDK 创建 Collection,然后使用 spark-milvus 进行写入。有关更多信息,请参考演示。
milvusbinlog
新的数据格式 milvusbinlog 用于读取 Milvus 内置 binlog 数据。Binlog 是 Milvus 基于 parquet 的内部数据存储格式。不幸的是,它无法被常规的 parquet 库读取,因此我们实现了这种新的数据格式来帮助 Spark 作业读取它。 除非您熟悉 milvus 内部存储详细信息,否则不建议直接使用 milvusbinlog。我们建议使用将在下一节中介绍的 MilvusUtils 函数。
val df = spark.read
.format("milvusbinlog")
.load(path)
.withColumnRenamed("val", "embedding")
mjson
Milvus 提供 Bulkinsert 功能,在操作大型数据集时提供更好的写入性能。但是,Milvus 使用的 JSON 格式与 Spark 的默认 JSON 输出格式略有不同。 为了解决这个问题,我们引入了 mjson 数据格式来生成满足 Milvus 要求的数据。以下是显示 JSON-lines 和 mjson 之间差异的示例:
-
JSON-lines:
{"book_id": 101, "word_count": 13, "book_intro": [1.1, 1.2]}
{"book_id": 102, "word_count": 25, "book_intro": [2.1, 2.2]}
{"book_id": 103, "word_count": 7, "book_intro": [3.1, 3.2]}
{"book_id": 104, "word_count": 12, "book_intro": [4.1, 4.2]}
{"book_id": 105, "word_count": 34, "book_intro": [5.1, 5.2]} -
mjson(Milvus Bulkinsert 所需):
{
"rows":[
{"book_id": 101, "word_count": 13, "book_intro": [1.1, 1.2]},
{"book_id": 102, "word_count": 25, "book_intro": [2.1, 2.2]},
{"book_id": 103, "word_count": 7, "book_intro": [3.1, 3.2]},
{"book_id": 104, "word_count": 12, "book_intro": [4.1, 4.2]},
{"book_id": 105, "word_count": 34, "book_intro": [5.1, 5.2]}
]
}
这将在未来得到改进。如果您的 Milvus 版本是 v2.3.7+,该版本支持 Parquet 格式的 bulkinsert,我们建议在 spark-milvus 集成中使用 parquet 格式。请参见 Github 上的演示。
MilvusUtils
MilvusUtils 包含几个有用的实用函数。目前仅支持 Scala。更多使用示例在高级用法部分。
MilvusUtils.readMilvusCollection
MilvusUtils.readMilvusCollection 是将整个 Milvus Collection 加载到 Spark Dataframe 的简单接口。它包装了各种操作,包括调用 Milvus SDK、读取 milvusbinlog 和常见的 union/join 操作。
val collectionDF = MilvusUtils.readMilvusCollection(spark, milvusOptions)
MilvusUtils.bulkInsertFromSpark
MilvusUtils.bulkInsertFromSpark 提供了一种将 Spark 输出文件大批量导入 Milvus 的便捷方式。它包装了 Milvus SDK 的 Bulkinsert API。
df.write.format("parquet").save(outputPath)
MilvusUtils.bulkInsertFromSpark(spark, milvusOptions, outputPath, "parquet")
高级用法
在本节中,您将找到 Spark-Milvus 连接器用于数据分析和迁移的高级用法示例。有关更多演示,请参见示例。
MySQL -> 嵌入 -> Milvus
在此演示中,我们将
- 通过 Spark-MySQL 连接器从 MySQL 读取数据,
- 生成嵌入(以 Word2Vec 为例),以及
- 将嵌入数据写入 Milvus。
要启用 Spark-MySQL 连接器,您需要将以下依赖项添加到您的 Spark 环境:
spark-shell --jars spark-milvus-1.0.0-SNAPSHOT.jar,mysql-connector-j-x.x.x.jar
import org.apache.spark.ml.feature.{Tokenizer, Word2Vec}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{SaveMode, SparkSession}
import zilliztech.spark.milvus.MilvusOptions._
import org.apache.spark.ml.linalg.Vector
object Mysql2MilvusDemo extends App {
val spark = SparkSession.builder().master("local[*]")
.appName("Mysql2MilvusDemo")
.getOrCreate()
import spark.implicits._
// Create DataFrame
val sampleDF = Seq(
(1, "Milvus was created in 2019 with a singular goal: store, index, and manage massive embedding vectors generated by deep neural networks and other machine learning (ML) models."),
(2, "As a database specifically designed to handle queries over input vectors, it is capable of indexing vectors on a trillion scale. "),
(3, "Unlike existing relational databases which mainly deal with structured data following a pre-defined pattern, Milvus is designed from the bottom-up to handle embedding vectors converted from unstructured data."),
(4, "As the Internet grew and evolved, unstructured data became more and more common, including emails, papers, IoT sensor data, Facebook photos, protein structures, and much more.")
).toDF("id", "text")
// Write to MySQL Table
sampleDF.write
.mode(SaveMode.Append)
.format("jdbc")
.option("driver","com.mysql.cj.jdbc.Driver")
.option("url", "jdbc:mysql://localhost:3306/test")
.option("dbtable", "demo")
.option("user", "root")
.option("password", "123456")
.save()
// Read from MySQL Table
val dfMysql = spark.read
.format("jdbc")
.option("driver","com.mysql.cj.jdbc.Driver")
.option("url", "jdbc:mysql://localhost:3306/test")
.option("dbtable", "demo")
.option("user", "root")
.option("password", "123456")
.load()
val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("tokens")
val tokenizedDf = tokenizer.transform(dfMysql)
// Learn a mapping from words to Vectors.
val word2Vec = new Word2Vec()
.setInputCol("tokens")
.setOutputCol("vectors")
.setVectorSize(128)
.setMinCount(0)
val model = word2Vec.fit(tokenizedDf)
val result = model.transform(tokenizedDf)
val vectorToArrayUDF = udf((v: Vector) => v.toArray)
// Apply the UDF to the DataFrame
val resultDF = result.withColumn("embedding", vectorToArrayUDF($"vectors"))
val milvusDf = resultDF.drop("tokens").drop("vectors")
milvusDf.write.format("milvus")
.option(MILVUS_HOST, "localhost")
.option(MILVUS_PORT, "19530")
.option(MILVUS_COLLECTION_NAME, "text_embedding")
.option(MILVUS_COLLECTION_VECTOR_FIELD, "embedding")
.option(MILVUS_COLLECTION_VECTOR_DIM, "128")
.option(MILVUS_COLLECTION_PRIMARY_KEY, "id")
.mode(SaveMode.Append)
.save()
}
Milvus -> 转换 -> Milvus
在此演示中,我们将
- 从 Milvus Collection 读取数据,
- 应用转换(以 PCA 为例),以及
- 通过 Bulkinsert API 将转换后的数据写入另一个 Milvus。
PCA 模型是一个降低嵌入向量维度的转换模型,这是机器学习中的常见操作。 您可以在转换步骤中添加任何其他处理操作,如过滤、连接或规范化。
import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import zilliztech.spark.milvus.{MilvusOptions, MilvusUtils}
import scala.collection.JavaConverters._
object TransformDemo extends App {
val sparkConf = new SparkConf().setMaster("local")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
val host = "localhost"
val port = 19530
val user = "root"
val password = "Milvus"
val fs = "s3a://"
val bucketName = "a-bucket"
val rootPath = "files"
val minioAK = "minioadmin"
val minioSK = "minioadmin"
val minioEndpoint = "localhost:9000"
val collectionName = "hello_spark_milvus1"
val targetCollectionName = "hello_spark_milvus2"
val properties = Map(
MilvusOptions.MILVUS_HOST -> host,
MilvusOptions.MILVUS_PORT -> port.toString,
MilvusOptions.MILVUS_COLLECTION_NAME -> collectionName,
MilvusOptions.MILVUS_BUCKET -> bucketName,
MilvusOptions.MILVUS_ROOTPATH -> rootPath,
MilvusOptions.MILVUS_FS -> fs,
MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
)
// 1, configurations
val milvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(properties.asJava))
// 2, batch read milvus collection data to dataframe
// Schema: dim of `embeddings` is 8
// +-+------------+------------+------------------+
// | | field name | field type | other attributes |
// +-+------------+------------+------------------+
// |1| "pk" | Int64 | is_primary=True |
// | | | | auto_id=False |
// +-+------------+------------+------------------+
// |2| "random" | Double | |
// +-+------------+------------+------------------+
// |3|"embeddings"| FloatVector| dim=8 |
// +-+------------+------------+------------------+
val arrayToVectorUDF = udf((arr: Seq[Double]) => Vectors.dense(arr.toArray[Double]))
val collectionDF = MilvusUtils.readMilvusCollection(spark, milvusOptions)
.withColumn("embeddings_vec", arrayToVectorUDF($"embeddings"))
.drop("embeddings")
// 3. Use PCA to reduce dim of vector
val dim = 4
val pca = new PCA()
.setInputCol("embeddings_vec")
.setOutputCol("pca_vec")
.setK(dim)
.fit(collectionDF)
val vectorToArrayUDF = udf((v: Vector) => v.toArray)
// embeddings dim number reduce to 4
// +-+------------+------------+------------------+
// | | field name | field type | other attributes |
// +-+------------+------------+------------------+
// |1| "pk" | Int64 | is_primary=True |
// | | | | auto_id=False |
// +-+------------+------------+------------------+
// |2| "random" | Double | |
// +-+------------+------------+------------------+
// |3|"embeddings"| FloatVector| dim=4 |
// +-+------------+------------+------------------+
val pcaDf = pca.transform(collectionDF)
.withColumn("embeddings", vectorToArrayUDF($"pca_vec"))
.select("pk", "random", "embeddings")
// 4. Write PCAed data to S3
val outputPath = "s3a://a-bucket/result"
pcaDf.write
.mode("overwrite")
.format("parquet")
.save(outputPath)
// 5. Config MilvusOptions of target table
val targetProperties = Map(
MilvusOptions.MILVUS_HOST -> host,
MilvusOptions.MILVUS_PORT -> port.toString,
MilvusOptions.MILVUS_COLLECTION_NAME -> targetCollectionName,
MilvusOptions.MILVUS_BUCKET -> bucketName,
MilvusOptions.MILVUS_ROOTPATH -> rootPath,
MilvusOptions.MILVUS_FS -> fs,
MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
)
val targetMilvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(targetProperties.asJava))
// 6. Bulkinsert Spark output files into milvus
MilvusUtils.bulkInsertFromSpark(spark, targetMilvusOptions, outputPath, "parquet")
}
Databricks -> Zilliz Cloud
如果您使用的是 Zilliz Cloud(托管的 Milvus 服务),您可以利用其便捷的数据导入 API。Zilliz Cloud 提供全面的工具和文档,帮助您高效地将数据从各种数据源(包括 Spark 和 Databricks)移动。只需设置一个 S3 存储桶作为中介,并向您的 Zilliz Cloud 账户开放其访问权限。Zilliz Cloud 的数据导入 API 将自动从 S3 存储桶将整批数据加载到您的 Zilliz Cloud 集群。
准备工作
-
通过将 jar 文件添加到您的 Databricks 集群来加载 Spark 运行时。
您可以通过不同方式安装库。此屏幕截图显示从本地上传 jar 到集群。有关更多信息,请参见 Databricks 文档中的集群库。
-
创建一个 S3 存储桶并将其配置为您的 Databricks 集群的外部存储位置。
Bulkinsert 需要将数据存储在临时存储桶中,以便 Zilliz Cloud 可以批量导入数据。您可以创建一个 S3 存储桶并将其配置为 databricks 的外部位置。详情请参考外部位置。
-
保护您的 Databricks 凭据。
有关更多详细信息,请参考博客在 Databricks 中安全管理凭据上的说明。
演示
以下是展示批量数据迁移过程的代码片段。与上面的 Milvus 示例类似,您只需要替换凭据和 S3 存储桶地址。
// 将数据批量写入 Milvus 存储桶存储。
val outputPath = "s3://my-temp-bucket/result"
df.write
.mode("overwrite")
.format("mjson")
.save(outputPath)
// 指定 Milvus 选项。
val targetProperties = Map(
MilvusOptions.MILVUS_URI -> zilliz_uri,
MilvusOptions.MILVUS_TOKEN -> zilliz_token,
MilvusOptions.MILVUS_COLLECTION_NAME -> targetCollectionName,
MilvusOptions.MILVUS_BUCKET -> bucketName,
MilvusOptions.MILVUS_ROOTPATH -> rootPath,
MilvusOptions.MILVUS_FS -> fs,
MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
)
val targetMilvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(targetProperties.asJava))
// 将 Spark 输出文件批量插入 Milvus
MilvusUtils.bulkInsertFromSpark(spark, targetMilvusOptions, outputPath, "mjson")
实践 Notebook
为了帮助您快速开始使用 Spark-Milvus 连接器,您可以查看引导您完成 Spark 到 Milvus 和 Zilliz Cloud 的流式和批量数据摄取示例的 notebook。
快速部署
要了解如何使用此教程启动在线演示,请参阅示例应用程序。