Airbyte:开源数据移动基础设施
Airbyte 是一个用于构建提取和加载(EL)数据管道的开源数据移动基础设施。它的设计注重多功能性、可扩展性和易用性。Airbyte 的连接器目录"开箱即用",包含超过 350 个预构建连接器。这些连接器可以在几分钟内开始将数据从源复制到目标。
Airbyte 的主要组件
1. 连接器目录
- 350+ 预构建连接器:Airbyte 的连接器目录"开箱即用",包含超过 350 个预构建连接器。这些连接器可以在几分钟内开始将数据从源复制到目标。
- 无代码连接器构建器:您可以通过无代码连接器构建器等工具轻松扩展 Airbyte 的功能以支持您的自定义用例。
2. 平台
Airbyte 的平台提供配置和扩展数据移动操作所需的所有水平服务,可作为云管理或自管理提供。
3. 用户界面
Airbyte 提供 UI、PyAirbyte(Python 库)、API 和 Terraform Provider,以与您首选的工具和基础设施管理方法集成。
通过 Airbyte 的能力,用户可以将数据源集成到 Milvus 集群中进行相似性搜索。
开始之前
您需要:
- Zendesk 账户(或您想要同步数据的其他数据源)
- Airbyte 账户或本地实例
- OpenAI API 密钥
- Milvus 集群
- 本地安装的 Python 3.10
设置 Milvus 集群
如果您已经为生产环境部署了 K8s 集群,您可以跳过此步骤,直接进行部署 Milvus Operator。如果没有,您可以按照这些步骤使用 Milvus Operator 部署 Milvus 集群。
单个实体(在我们的例子中是支持工单和知识库文章)存储在一个"Collection"中——设置集群后,您需要创建一个 Collection。选择一个合适的名称,并将维度设置为 1536 以匹配 OpenAI 嵌入服务生成的向量维度。
创建后,记录端点和身份验证信息。
在 Airbyte 中设置连接
我们的数据库已经准备好了,让我们传输一些数据!为此,我们需要在 Airbyte 中配置连接。要么在 cloud.airbyte.com 注册一个 Airbyte 云账户,要么按照文档中的描述启动本地实例。
设置数据源
实例运行后,我们需要设置连接——点击"New connection"并选择"Zendesk Support"连接器作为数据源。点击"Test and Save"按钮后,Airbyte 将检查是否可以建立连接。
在 Airbyte cloud 上,您可以通过点击 Authenticate 按钮轻松进行身份验证。使用本地 Airbyte 实例时,请按照文档页面上的说明进行操作。
设置目标
如果一切正常,下一步是设置要将数据移动到的目标。在这里,选择"Milvus"连接器。
Milvus 连接器执行三个操作:
- 分块和格式化 - 将 Zendesk 记录分割为文本和元数据。如果文本大于指定的分块大小,记录将被分成多个部分,单独加载到 Collection 中。文本分割(或分块)可能发生在大型支持工单或知识文章的情况下。通过分割文本,您可以确保搜索始终产生有用的结果。
让我们使用 1000 个令牌的分块大小和 body、title、description 和 subject 的文本字段,因为这些字段将出现在我们从 Zendesk 接收的数据中。
- 嵌入 - 使用机器学习模型将处理部分产生的文本块转换为向量嵌入,然后您可以搜索语义相似性。要创建嵌入,您必须提供 OpenAI API 密钥。Airbyte 将向 OpenAI 发送每个块,并将结果向量添加到加载到您的 Milvus 集群中的实体中。
- 索引 - 一旦您对块进行了向量化,就可以将它们加载到数据库中。为此,请插入您在 Milvus 集群中设置集群和 Collection 时获得的信息。点击"Test and save"将检查一切是否正确设置(有效凭据、Collection 存在且与配置的嵌入具有相同的向量维度等)
Set up stream sync flow
The last step before data is ready to flow is selecting which "streams" to sync. A stream is a collection of records in the source. As Zendesk supports a large number of streams that are not relevant to our use case, let's only select "tickets" and "articles" and disable all others to save bandwidth and make sure only the relevant information will show up in searches:

As soon as the connection is set up, Airbyte will start syncing data. It can take a few minutes to appear in your Milvus collection.
If you select a replication frequency, Airbyte will run regularly to keep your Milvus collection up to date with changes to Zendesk articles and newly created issues.
Check flow
You can check in the Milvus cluster UI how the data is structured in the collection by navigating to the playground and executing a "Query Data" query with a filter set to "_ab_stream == "tickets"".

Build Streamlit app querying the collection
Our data is ready — now we need to build the application to use it. In this case, the application will be a simple support form for users to submit support cases. When the user hits submit, we will do two things:
- Search for similar tickets submitted by users of the same organization
- Search for knowledge-based articles that might be relevant to the user
In both cases, we will leverage semantic search using OpenAI embeddings. To do this, the description of the problem the user entered is also embedded and used to retrieve similar entities from the Milvus cluster. If there are relevant results, they are shown below the form.
Set up UI environment
You will need a local Python installation as we will use Streamlit to implement the application.
First, install Streamlit, the Milvus client library, and the OpenAI client library locally:
pip install streamlit pymilvus openai
To render a basic support form, create a python file basic_support_form.py
:
import streamlit as st
with st.form("my_form"):
st.write("Submit a support case")
text_val = st.text_area("Describe your problem")
submitted = st.form_submit_button("Submit")
if submitted:
# TODO check for related support cases and articles
st.write("Submitted!")
To run your application, use Streamlit run:
streamlit run basic_support_form.py
This will render a basic form:

Set up backend query service
Next, let's check for existing open tickets that might be relevant. To do this, we embed the text the user entered using OpenAI, then did a similarity search on our collection, filtering for still open tickets. If there is one with a very low distance between the supplied ticket and the existing ticket, let the user know and don't submit:
import streamlit as st
import os
import pymilvus
import openai
with st.form("my_form"):
st.write("Submit a support case")
text_val = st.text_area("Describe your problem?")
submitted = st.form_submit_button("Submit")
if submitted:
import os
import pymilvus
import openai
org_id = 360033549136 # TODO Load from customer login data
pymilvus.connections.connect(uri=os.environ["MILVUS_URL"], token=os.environ["MILVUS_TOKEN"])
collection = pymilvus.Collection("zendesk")
embedding = openai.Embedding.create(input=text_val, model="text-embedding-ada-002")['data'][0]['embedding']
results = collection.search(data=[embedding], anns_field="vector", param={}, limit=2, output_fields=["_id", "subject", "description"], expr=f'status == "new" and organization_id == {org_id}')
st.write(results[0])
if len(results[0]) > 0 and results[0].distances[0] < 0.35:
matching_ticket = results[0][0].entity
st.write(f"This case seems very similar to {matching_ticket.get('subject')} (id #{matching_ticket.get('_id')}). Make sure it has not been submitted before")
else:
st.write("Submitted!")
Several things are happening here:
- The connection to the Milvus cluster is set up.
- The OpenAI service is used to generate an embedding of the description the user entered.
- A similarity search is performed, filtering results by the ticket status and the organization id (as only open tickets of the same organization are relevant).
- If there are results and the distance between the embedding vectors of the existing ticket and the newly entered text is below a certain threshold, call out this fact.
To run the new app, you need to set the environment variables for OpenAI and Milvus first:
export MILVUS_TOKEN=...
export MILVUS_URL=https://...
export OPENAI_API_KEY=sk-...
streamlit run app.py
When trying to submit a ticket that exists already, this is how the result will look:

Show more relevant information
As you can see in the green debug output hidden in the final version, two tickets matched our search (in status new, from the current organization, and close to the embedding vector). However, the first (relevant) ranked higher than the second (irrelevant in this situation), which is reflected in the lower distance value. This relationship is captured in the embedding vectors without directly matching words, like in a regular full-text search.
To wrap it up, let's show helpful information after the ticket gets submitted to give the user as much relevant information upfront as possible.
To do this, we are going to do a second search after the ticket gets submitted to fetch the top-matching knowledge base articles:
......
else:
# TODO Actually send out the ticket
st.write("Submitted!")
article_results = collection.search(data=[embedding], anns_field="vector", param={}, limit=5, output_fields=["title", "html_url"], expr=f'_ab_stream == "articles"')
st.write(article_results[0])
if len(article_results[0]) > 0:
st.write("We also found some articles that might help you:")
for hit in article_results[0]:
if hit.distance < 0.362:
st.write(f"* [{hit.entity.get('title')}]({hit.entity.get('html_url')})")
If there is no open support ticket with a high similarity score, the new ticket gets submitted and relevant knowledge articles are shown below:

结论
虽然这里显示的 UI 不是真实的支持表单,而是一个用于说明用例的示例,但 Airbyte 和 Milvus 的组合是非常强大的——它使从各种来源(从 Postgres 等数据库到 Zendesk 或 GitHub 等 API,再到使用 Airbyte SDK 或可视化连接器构建器构建的完全自定义源)加载文本变得容易,并将其以嵌入形式在 Milvus 中建立索引,Milvus 是一个强大的向量搜索引擎,能够扩展到大量数据。
Airbyte 和 Milvus 都是开源的,在您的基础设施上完全免费使用,如果需要,还提供云服务来减轻运营负担。
除了本文中阐述的经典语义搜索用例之外,一般设置还可以用于使用 RAG 方法(检索增强生成)构建问答聊天机器人、推荐系统,或帮助使广告更相关和高效。