第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定

如何利用向量數(shù)據(jù)庫構(gòu)建實(shí)時(shí)的新聞搜索引擎

动手指南:使用Apache Kafka、Upstash向量数据库和Bytewax的实时新闻聚合流处理管道的实施。

据报道, 根据earthweb.com的一项研究,每天的新闻文章发布量(网上的和网下的)大约为200万到300万左右!

这么多新闻从四面八方涌来,让人难以跟上。这就是为什么越来越多的人希望用更短更快的方式来了解关心的新闻。

在这篇文章里,我们将着手解决这个问题!
具体来说,我们将构建一个可以从多个来源获取信息并根据您的兴趣筛选信息的系统——一个新闻搜索工具

我们不仅会讨论理论,更会一步步地教你如何实现这样的系统。我们会教你并展示给你看。

完整架构(作者提供图片)

在开始之前,请注意,本文中提到的所有内容都在解码ML文章的GitHub代码仓库 [1]中有完整的代码实现。

目录
  1. 架构概览
  2. 工具考量
  3. 先决条件
    3.1 创建一个新的Upstash Kafka集群
    3.2 创建一个新的Upstash向量索引
    3.3 注册到2个实时新闻API接口
    3.4 安装依赖
  4. 数据收集
    4.1 文章获取管理器
    4.2 生产Kafka消息
    4.3 使用Pydantic的数据交换模型
    4.4 运行KafkaProducers
  5. 数据摄取流程
    5.1 从Kafka消息队列中消费消息
    5.2 实现Bytewax数据流处理
    5.3 精炼、格式化、切分、嵌入文章
    5.4 将向量组装并插入向量数据库
  6. 启动管道流程
  7. 用户界面设计
  8. 总结
  9. 参考文献
架构介绍

完整架构(图源作者)

作为总结,我们将构建一个实时系统,从实时新闻API抓取新闻文章,解析并格式化原始数据为规定的格式,然后将消息序列化并流式传输到Kafka主题作为第一阶段。第二阶段将使用Bytewax来处理我们的Kafka主题中的消息,通过进一步清理、解析、分块和嵌入向量,然后将它们插入向量数据库,最后提供一个UI,通过该UI我们可以与数据库交互。

工具考虑事项

选择合适的工具对于实现高性能、可扩展性和易于实现是至关重要的。下面我们就来看看项目中用到的工具。

  • Upstash 无服务器 Kafka:提供强大且可扩展的事件流处理,无需担心基础设施管理。
  • Python 线程:用于从多个新闻 API 并行获取数据,同时共享线程安全的 Kafka 生产者实例,优化内存占用和性能。
  • Pydantic 模型:确保一致且经过验证的数据交换结构。
  • Bytewax:因其在处理和转换流数据时简单且快速。
  • Upstash 向量数据库:无服务器,易于设置,并且可以轻松地在 Python、JS 和 GO 中集成。一个很大的优点是从 UI 控制台仪表板丰富的导航选项和实时状态指标。

根据硬件要求,不需要显卡。

这要多少钱? — 完全免费。

我将此指南专门使用免费层级的计划 — 所以你完全不用为这些平台花一分钱!

前提条件

在实施任何行动或步骤之前,我们必须确保可以访问每个服务,这意味我们需要做以下设置:

  • 新建的Upstash Kafka集群
  • 新建的Upstash向量索引
  • 注册新闻API
  • 配置环境

只需要大约5分钟,尤其是在刚开始的时候。

新建一个Upstash的Kafka集群

首先,我们需要在Upstash注册账号,登录后,你会看到一个类似的控制面板。

接下来,我们得从顶部栏选择Kafka选项,并点击_+ 创建集群_按钮来创建一个新的集群。点击后,您将看到此模态窗口:

Upstash 到 Kafka 的截图

给集群取个名字,选择你附近的一个区域,然后点击_创建集群_按钮。完成后,新的 Kafka 集群就会出现在这里。接着选择你的新 Kafka 集群,你就会看到这个界面。

让我们来拆解这个观点的关键点:

  • 详情:向您展示集群的概览,以及Upstash提供的各项功能
  • 使用情况:向您展示消息生产与消费的数量、成本影响等信息图表
  • 主题:此选项卡允许您创建和监控已创建的Kafka主题的详细情况

在创建集群之后的下一步是定义主题,我们可以通过该主题发送消息和接收消息。

Topics标签页中,点击创建主题,你将会看到这个视图:

给这个主题起个名字,其他都用默认设置,然后点击创建就这么简单

在高级选项卡里,你可以自定义主题消息的处理方式。默认情况下,一条消息的大小限制不能超过1MB,主题中存储的消息总大小上限最多为256MB(超过限制后会自动删除较旧的消息),并且主题会在一周后被清理。

我们已经成功创建了一个Kafka集群环境,现在只需要复制并设置环境变量以帮助我们连接到集群。为此,请转到您的集群仪表板,在详情标签页中,复制端点、_用户名_和密码,并将它们粘贴到您的.env文件中。
之后,转到主题标签页,复制您的Kafka主题名称。

这是到现在为止你的 .env 文件应该是这样的:

    UPSTASH_KAFKA_UNAME="用户名在此填写"  
    UPSTASH_KAFKA_PASS="密码在此填写"  
    UPSTASH_KAFKA_ENDPOINT="端点在此填写"  
    UPSTASH_KAFKA_TOPIC="主题名称在此填写"
创建一个新Upstash向量索引

现在,让我们直接创建一个新的向量数据库。从主仪表板中选择Vector,然后选择+ 新建索引,您会看到这样的界面:

给你的向量数据库取个名字,然后选择一个离你位置最近的区域,在“模型设置”中选择“这个模型”,即 sentence-transformers/all-MiniLM-L6-v2,我们将使用这个模型来生成新闻文章的嵌入,并选择余弦相似度作为向量距离比较的度量。

你也可以选择另一个模型,但如果这个模型的维度和 Upstash 创建的向量不一样,你可能需要调整维度以匹配,这不建议这样做。

创建新的向量索引之后,您可以按照与 Kafka 集群相同的工作流程。复制 Index 名称、EndpointToken,并将它们粘贴到我们的 .env 文件里。

这是你的.env文件应该有的样子:


    UPSTASH_KAFKA_UNAME="[USERNAME HERE]"  
    UPSTASH_KAFKA_PASS="[PASSWORD HERE]"  
    UPSTASH_KAFKA_ENDPOINT="[ENDPOINT HERE]"  
    UPSTASH_KAFKA_TOPIC="[主题名称此处]"  

    UPSTASH_VECTOR_ENDPOINT="[VECTOR ENDPOINT HERE]"  
    UPSTASH_VECTOR_TOPIC="[向量名称此处]"  
    UPSTASH_VECTOR_KEY="[令牌此处]"  
注册新闻API

我们将使用以下这些API来获取文章内容:

  1. 🔗 NewsAPI

提供一个免费的 开发者 版本,我们可以在其中每天可以调用他们的 API 100 次。

2. 🔗 新闻数据 (NewsData)

提供一个免费的计划,我们每天在这个计划中可以获得200个积分,每个积分等于10篇文章,这意味着我们每天可以拿到总共2000篇文章。

对于我们当前的用例,这些 API 提供了足够的功能来实现和验证我们正在构建的新闻搜索引擎,同时留有改进和扩展的余地,因为底层的工作流程不变。唯一的限制是免费计划不支持定时批量抓取,即在调用这些 API 时我们不能使用 from_dateto_date,但这对我们来说不是问题。我们将通过在两次抓取之间设置一个 等待时间 来模拟这种行为。

下一步是在两个平台上注册——别担心,这非常简单。

  1. 注册了 NewsAPI 后,前往 /account,您会看到一个 API_KEY 字段,将其复制并粘贴到我们的 .env 文件里的 NEWSAPI_KEY 位置。
  2. 注册了 NewsData 后,前往 /api-key,复制 API KEY 并粘贴到我们的 .env 文件里的 NEWSDATAIO_KEY 位置。

无聊的部分完成了,我们现在可以使用这些API来获取文章。下面是从两个API获取的数据包示例。

两个 API 的一些有效载荷示例。(图来自作者)

先决条件回顾

经过这三个步骤,创建Kafka集群,创建向量索引,并注册到News API之后,我们的.env文件应该如下所示。

    UPSTASH_KAFKA_UNAME="[USERNAME HERE]"  
    UPSTASH_KAFKA_PASS="[PASSWORD HERE]"  
    UPSTASH_KAFKA_ENDPOINT="[ENDPOINT HERE]"  
    UPSTASH_KAFKA_TOPIC="[TOPIC NAME HERE]"  

    UPSTASH_VECTOR_ENDPOINT="[VECTOR ENDPOINT HERE]"  
    UPSTASH_VECTOR_TOPIC="[VECTOR NAME HERE]"  
    UPSTASH_VECTOR_KEY="[VECTOR TOKEN HERE]"  

    NEWSAPI_KEY="[NEWSAPI KEY HERE]"  
    NEWSDATAIO_KEY="[NEWSDATA KEY HERE]"  
    NEWS_TOPIC = "news" # 这是我们要获取的文章类型

在接下来的步骤中,在了解实现细节之前,是安装环境以及所需的包。
下面是如何在Makefile中的安装步骤的样例。

    # Makefile  
    ...  
    install:  
     @echo "$(GREEN) [CONDA] 创建 [$(ENV_NAME)] python 环境 $(RESET)"  
     conda create --name $(ENV_NAME) python=3.9 -y  
     @echo "激活环境 $(ENV_NAME)..."  
     @bash -c "source $(conda info --base)/etc/profile.d/conda.sh && conda activate $(ENV_NAME) \  
       && pip install poetry \  
       poetry env use $(which python) -- python"  
     @echo "安装依赖..."  
     @echo "切换到 pyproject.toml 文件所在目录..."  
     @bash -c " PYTHON_KEYRING_BACKEND=keyring.backends.fail.Keyring poetry install"  
    ...

要准备环境,可以运行 make install

接下来,我们来看看从这些来源获取文章的实现。

数据采集

此模块的目的是封装查询两个API的功能,解析响应数据,将两个负载中的信息解析并格式化为一个通用文档格式,利用两个负载中共有的属性,并通过共享KafkaProducer实例将消息发送到我们的集群。

具体来说,我们将包括这些子模块。

  • 文章抓取管理类
  • 如何向我们的Kafka集群发送消息(或数据)
  • Pydantic数据模型
  • 运行数据管道
文章获取管理器

让我们从实现部分开始看看。

    import datetime  
    import functools  
    import logging  
    from typing import Callable, Dict, List  

    from newsapi import NewsApiClient  
    from newsdataapi import NewsDataApiClient  
    from pydantic import ValidationError  

    from models import NewsAPIModel, NewsDataIOModel  
    from settings import settings  

    logging.basicConfig(level=logging.INFO)  
    logger = logging.getLogger(__name__)  
    logger.setLevel(logging.DEBUG)  

    def handle_article_fetching(func: Callable) -> Callable:  
        """  
        装饰器用于处理获取文章时的异常。  

        此装饰器包装文章获取函数以捕获并记录获取过程中发生的任何异常。如果发生错误,则记录错误并返回一个空列表。  

        参数:  
            func (Callable): 要包装的文章获取函数。  

        返回:  
            Callable: 包装后的函数。  
        """  

        @functools.wraps(func)  
        def wrapper(*args, **kwargs):  
            try:  
                return func(*args, **kwargs)  
            except ValidationError as e:  
                logger.error(f"验证错误:{e}")  
            except Exception as e:  
                logger.error(f"从源获取数据时出错:{e}")  
                logger.exception(e)  
            return []  

        return wrapper  

    class NewsFetcher:  
        """  
        新闻文章获取类。  

        属性:  
            _newsapi (NewsApiClient): NewsAPI客户端。  
            _newsdataapi (NewsDataApiClient): NewsDataAPI客户端。  

        方法:  
            fetch_from_newsapi(): 获取NewsAPI头条。  
            fetch_from_newsdataapi(): 获取NewsDataAPI新闻。  
            sources: 新闻源获取函数列表。  
        """  

        def __init__(self):  
            self._newsapi = NewsApiClient(api_key=settings.NEWSAPI_KEY)  
            self._newsdataapi = NewsDataApiClient(apikey=settings.NEWSDATAIO_KEY)  

        @handle_article_fetching  
        def fetch_from_newsapi(self) -> List[Dict]:  
            """从NewsAPI获取头条新闻。"""  
            response = self._newsapi.get_everything(  
                q=settings.NEWS_TOPIC,  
                language="en",  
                page=settings.ARTICLES_BATCH_SIZE,  
                page_size=settings.ARTICLES_BATCH_SIZE,  
            )  
            return [  
                NewsAPIModel(**article).to_common()  
                for article in response.get("articles", [])  
            ]  

        @handle_article_fetching  
        def fetch_from_newsdataapi(self) -> List[Dict]:  
            """从NewsDataAPI获取新闻数据。"""  
            response = self._newsdataapi.news_api(  
                q=settings.NEWS_TOPIC,  
                language="en",  
                size=settings.ARTICLES_BATCH_SIZE,  
            )  
            return [  
                NewsDataIOModel(**article).to_common()  
                for article in response.get("results", [])  
            ]  

        @property  
        def sources(self) -> List[callable]:  
            """新闻源获取函数列表。"""  
            return [self.fetch_from_newsapi, self.fetch_from_newsdataapi]

这里有几个在实现中的关键点:

  • NewsAPIModelNewsDataIOModel 是适应特定载荷格式的 Pydantic 模型。
  • 我们使用 handle_article_fetching 装饰器来捕获验证错误或更广泛的异常,当将原始载荷转换为 Pydantic 模型时。
  • 我们有一个叫做 sources 的属性,它返回用于查询 API 的可调用函数。这将在数据收集模块中被使用,该模块启动多个生产者线程将消息发送到我们的 Kafka 集群。我们将在下一部分详细讨论
发送 Kafka 消息

下面是我们即将实施的工作流程。

发送 Kafka 消息 (图片来自作者)

这里的关键点:

  • 我们使用不同的线程来从API获取数据
  • 我们共享同一个Kafka生产者实例来发送消息
  • 我们使用Pydantic模型来确保数据交换的准确性

使用单独的线程来获取文章,并使用单个 Kafka 生产者将消息发送到集群是我们推荐的做法。这里有几个原因:

  • 效率和性能:KafkaProducer 是线程安全的。创建一个新的实例需要建立网络连接并进行一些配置。在多个线程间共享一个实例可以减少这些操作带来的开销。
  • 吞吐量:单个生产者实例在发送消息到 Kafka 集群前会先批量处理消息。
  • 资源:虽然对我们的情况来说不完全适用,因为我们只有两个生产者线程,但限制实例数量可以更好地利用系统资源。

这里是主要负责发送到Kafka[2]消息的功能:

    def run(self) -> NoReturn:  
        """持续地从Kafka主题获取并发送消息."""  
        while self.running.is_set():  
            try:  
                messages: List[CommonDocument] = self.fetch_function()  
                if messages:  
                    messages = [msg.to_kafka_payload() for msg in messages]  
                    self.producer.send(self.topic, value=messages)  
                    self.producer.flush()  
                logger.info(  
                    f"Producer : {self.producer_id} 发了: {len(messages)} 条消息。"  
                )  
                time.sleep(self.wait_window_sec)  
            except Exception as e:  
                logger.error(f"生产者线程 {self.producer_id} 出现错误: {e}")  
                self.running.clear()  # 线程在遇到错误时会停止运行

实施过程中的关键考虑

  • 我们根据拉取源的数量生成相应的 KafkaProducerThread 实例。
  • 我们将所有这些线程包装在 KafkaProducerSwarm 中。
  • 我们在所有线程间共享一个 KafkaProducer 实例,用于与我们的集群通信。
  • 我们采用单例设计模式 [5],这样即使扩展到 N 个拉取线程,依然保持一个 KafkaProducer 实例。
使用 Pydantic 的数据模型

从上面的代码片段实现中,你可能已经注意到未解释过的 *Document, *Model 对象。让我们在这一部分深入了解下它们。

以下是我们正在构建的应用程序中用于数据交换目的的Pydantic模型,如下所示:

  • NewsDataIOModel : 包装并格式化来自NewsData API的原始负载。
  • NewsAPIModel : 包装并格式化来自NewsAPI API的原始负载。
  • CommonDocument : 为上述不同的新闻格式建立一个通用格式。
  • RefinedDocument : 过滤通用格式,将有用的字段归类到元数据中,并突出显示关键字段如 article description text
  • ChunkedDocument : 将文本切分,并确保chunk_id与document_id之间的关联。
  • EmbeddedDocument : 嵌入切分的文本,并确保chunk_id与document_id之间的关联。

例如,这是CommonDocument模型的样子,它作为不同新闻载荷格式之间的桥梁(bridge)

    class CommonDocument(BaseModel):  
        article_id: str = Field(default_factory=lambda: str(uuid4()))  
        title: str = Field(default_factory=lambda: "N/A")  
        url: str = Field(default_factory=lambda: "N/A")  
        published_at: str = Field(  
            default_factory=lambda: datetime.now().strftime("%Y-%m-%d %H:%M:%S")  
        )  
        source_name: str = Field(default_factory=lambda: "Unknown")  
        image_url: Optional[str] = Field(default_factory=lambda: None)  
        author: Optional[str] = Field(default_factory=lambda: "Unknown")  
        description: Optional[str] = Field(default_factory=lambda: None)  
        content: Optional[str] = Field(default_factory=lambda: None)  

        @field_validator("title", "description", "content")  
        def 清理文本字段(cls, v):  
            if v is None or v == "":  
                return "N/A"  
            return clean_full(v)  

        @field_validator("url", "image_url")  
        def 清理URL字段(cls, v):  
            if v is None:  
                return "N/A"  
            v = remove_html_tags(v)  
            v = normalize_whitespace(v)  
            return v  

        @field_validator("published_at")  
        def 清理日期字段(cls, v):  
            try:  
                parsed_date = parser.parse(v)  
                return parsed_date.strftime("%Y-%m-%d %H:%M:%S")  
            except (ValueError, TypeError):  
                logger.error(f"解析日期并返回正确的日期格式。如果有错误则使用当前日期代替.")  

        @classmethod  
        def from_json(cls, data: dict) -> "CommonDocument":  
            """从 JSON 对象创建 CommonDocument 实例."""  
            return cls(**data)  

        def 转换为Kafka消息负载(self) -> dict:  
            """将CommonDocument对象转换为Kafka消息负载的字典表示形式。"""  
            return self.model_dump(exclude_none=False)

我们来看看这背后的意思:

  • 它包含对于两种新闻文章格式都通用的一系列属性。
  • 使用 field_validator 装饰器验证每个字段,清理或为其分配默认值。
  • to_kafka_payload 方法确保在将消息发送到 Kafka 集群之前对其进行序列化。
文本字段清理步骤

清洗过程其实很简单,我们使用一些方法来清洁文本,并确保做到。我们这样做是为了确保清洁效果。

  • 删除末尾的空格和制表符 \t,换行符 \n。
  • 删除列表项前的符号。
  • 删除文本中的 HTML 标签。

我们正在使用unstructured来简化处理这些转换。

💡 查看更多未结构化数据示例:可以在这里: 清理示例.

运行Kafka生产者程序

截至目前,我们已经实现了以下模块:

  • 注册了所有必需的服务
  • 创建了Kafka集群和向量数据库(Vector Database)
  • 实现了新闻文章的抓取功能。
  • 实现了用于数据交换的Pydantic模型。
  • 实现了KafkaProducer功能。

完成这些后,我们现在可以安全地运行管道中的produce阶段,并检查Upstash上的KafkaCluster里的消息。

我们就这么干吧!
在我们的项目核心,Makefile 中有一个命令用于运行 数据采集

    ....

    run_producers:  
     @echo "$(GREEN) [正在运行] 数据收集管道程序 Kafka 生产者脚本 $(RESET)"  
     @bash -c "poetry run python -m src.producer"  

    ...

这个 🔗Makefile 文件包含了与我们正在构建的解决方案交互的有用命令。在这个使用场景中,我们需要运行 make run_producers 来执行 run_producers 命令。这将会启动 KafkaSwarm,并处理从NewsAPI抓取文章、格式化并推送到我们Kafka集群的线程。

执行命令后,CLI 会生成日志。(图由作者提供)

从日志中,我们看到两个生产者线程(Producer 线程)各自发送了5 条消息。为了检查消息是否已送达集群,请前往Upstash 控制台 → Kafka 集群 → 消息 页面。你应该会看到类似下面这样的视图:

Upstash Kafka 消息图

截至目前,我们已经完成了数据收集管道流程的实现和测试,其中我们抓取新闻文章,对它们进行格式化,并将这些消息发送到Kafka。
在这之后,我们将着手实现“消费者”管道或摄入管道,来处理Kafka中的新消息。

数据摄取管道

在我们确认之后,如果有消息在Kafka主题上,我们必须实现“消费者”管道,即实现消费端处理流程:

  • 即实现消息的读取和处理流程。
  1. 从我们的 Kafka 主题中读取消息
  2. 解析、格式化、分块并生成嵌入向量
  3. 生成向量索引对象并将其插入或更新到Upstash 向量索引[3]。

为此,我们将使用 Bytewax 来定义数据流(DataFlow),按正确的顺序把步骤连起来。

让我们直接进入实现,并解释一些关键概念吧!

将 Kafka 源作为 Bytewax 流的输入源定义。

import json  
from typing import List  

from bytewax.connectors.kafka import KafkaSinkMessage, KafkaSource  

from logger import get_logger  
from models import CommonDocument  
from settings import settings  

logger = get_logger(__name__)  

def build_kafka_stream_client():  
    """  
    构建一个Kafka流客户端,使用ByteWax KafkaSource连接器从Upstash Kafka主题读取消息。  
    """  
    kafka_config = {  
        "bootstrap.servers": settings.UPSTASH_KAFKA_ENDPOINT,  
        "security.protocol": "SASL_SSL",  
        "sasl.mechanisms": "SCRAM-SHA-256",  
        "sasl.username": settings.UPSTASH_KAFKA_UNAME,  
        "sasl.password": settings.UPSTASH_KAFKA_PASS,  
        "auto.offset.reset": "earliest",  # 从最早的message开始读取  
    }  
    kafka_input = KafkaSource(  
        topics=[settings.UPSTASH_KAFKA_TOPIC],  
        brokers=[settings.UPSTASH_KAFKA_ENDPOINT],  
        add_config=kafka_config,  
    )  
    logger.info("KafkaSource客户端创建成功。")  
    return kafka_input  

def process_message(message: KafkaSinkMessage):  
    """  
    处理接收到的Kafka消息,并返回CommonDocument的列表。  
    - message: KafkaSinkMessage(key, value),其中value是消息负载。  
    """  
    documents: List[CommonDocument] = []  
    try:  
        json_str = message.value.decode("utf-8")  
        data = json.loads(json_str)  
        documents = [CommonDocument.from_json(obj) for obj in data]  
        logger.info(f"解码为{len(documents)}个CommonDocuments")  
        return documents  
    except StopIteration:  
        logger.info("没有更多的文档可以获取。")  
    except KeyError as e:  
        logger.error(f"处理文档批量时出现键错误:{e}")  
    except json.JSONDecodeError as e:  
        logger.error(f"在解码消息中的JSON时遇到错误:{e}")  
        raise  
    except Exception as e:  
        logger.exception(f"在处理过程中遇到意外错误:{e}")

这个实现的主要要点:

  • build_kafka_stream_client : 构建 Kafka 流客户端,使用预定义的 Bytewax KafkaSource 连接器创建一个 KafkaConsumer 实例。
  • process_message : 一个回调,用于处理来自我们 Kafka 主题的消息。

2. 定义 Upstash 索引作为我们 Bytewax 流的输出结果。

    from typing import Optional, List  

    from bytewax.outputs import DynamicSink, StatelessSinkPartition  
    from upstash_vector import Index, Vector  
    from models import EmbeddedDocument  
    from settings import settings  
    from logger import get_logger  

    logger = get_logger(__name__)  

    class UpstashVectorOutput(DynamicSink):  
        """表示Upstash向量输出的类。  

        此类用于创建Upstash向量输出,这是一种支持至少一次处理的动态输出类型。恢复后,恢复过程中的消息将被重复。  

        Args:  
            vector_size (int): 向量的大小。  
            collection_name (str, 可选): 集合的名称。默认为settings.VECTOR_DB_OUTPUT_COLLECTION_NAME。  
            client (Optional[Index], 可选): Upstash客户端。默认为None。  
        """  

        def __init__(  
            self,  
            vector_size: int = settings.EMBEDDING_MODEL_MAX_INPUT_LENGTH,  
            collection_name: str = settings.UPSTASH_VECTOR_TOPIC,  
            client: Optional[Index] = None,  
        ):  
            self._collection_name = collection_name  
            self._vector_size = vector_size  

            if client:  
                self.client = client  
            else:  
                self.client = Index(  
                    url=settings.UPSTASH_VECTOR_ENDPOINT,  
                    token=settings.UPSTASH_VECTOR_KEY,  
                    retries=settings.UPSTASH_VECTOR_RETRIES,  
                    retry_interval=settings.UPSTASH_VECTOR_WAIT_INTERVAL,  
                )  

        def build(  
            self, step_id: str, worker_index: int, worker_count: int  
        ) -> StatelessSinkPartition:  
            return UpstashVectorSink(self.client, self._collection_name)  

    class UpstashVectorSink(StatelessSinkPartition):  
        """  
        将文档嵌入写入Upstash Vector数据库集合的类。  
        此实现增强了错误处理和日志记录,利用批处理插入提高效率,并遵循Python的最佳实践以提高可读性和可维护性。  

        Args:  
            client (Index): 用于写入的Upstash Vector客户端。  
            collection_name (str, 可选): 要写入的集合名称。默认为UPSTASH_VECTOR_TOPIC环境变量的值。  
        """  

        def __init__(  
            self,  
            client: Index,  
            collection_name: str = None,  
        ):  
            self._client = client  
            self._collection_name = collection_name  
            self._upsert_batch_size = settings.UPSTASH_VECTOR_UPSERT_BATCH_SIZE  

        def write_batch(self, documents: List[EmbeddedDocument]):  
            """  
            将一批文档嵌入写入配置的Upstash Vector数据库集合。  

            Args:  
                documents (List[EmbeddedDocument]): 要写入的文档。  
            """  
            vectors = [  
                Vector(id=doc.doc_id, vector=doc.embeddings, metadata=doc.metadata)  
                for doc in documents  
            ]  

            # 批处理插入以提高效率  
            for i in range(0, len(vectors), self._upsert_batch_size):  
                batch_vectors = vectors[i : i + self._upsert_batch_size]  
                try:  
                    self._client.upsert(vectors=batch_vectors)  
                except Exception as e:  
                    logger.error(f"批处理插入期间捕获异常:{e}")

这里有几个关于这个实现的关键点:

  • UpstashVectorOutput : 实例化 Bytewax DynamicSink 抽象,用于将数据路由到各种目的地。在这种情况下,它将包裹 Upstash Vector Index 客户端连接。
  • UpstashVectorSink : 包裹我们的 DynamicSink 并处理将向量的更新和插入到我们 VectorDatabase 的功能。这个 StatelessSinkPartition 意味着 DynamicSink 不会保留任何状态,并且我们的 Sink 的任何输入都会根据 write_batch 实现处理。
构建 Bytewax 流的其他部分

这里是我们的DataFlow的完整实现,它从Upstash Kafka主题中获取消息,进行清洗、精炼、分块、嵌入向量,并将向量更新或插入到Upstash向量索引库中。

    """
        本脚本为Upstash用例定义了ByteWax数据流实现。
        数据流包含以下步骤:

1. 输入:从Kafka流中读取数据。

2. 精炼:将输入数据转换为统一格式。

3. 分块:将输入数据分割成更小的块。

4. 嵌入:为输入数据生成嵌入向量。

5. 输出:将输出数据写入Upstash向量数据库系统。
    """

    from pathlib import Path
    from typing import Optional

    import bytewax.operators as op
    from vector import UpstashVectorOutput
    from consumer import process_message, build_kafka_stream_client
    from bytewax.connectors.kafka import KafkaSource
    from bytewax.dataflow import Dataflow
    from bytewax.outputs import DynamicSink
    from embeddings import TextEmbedder
    from models import ChunkedDocument, EmbeddedDocument, RefinedDocument
    from logger import get_logger

    logger = get_logger(__name__)

    def build(
        model_cache_dir: Optional[Path] = None,
    ) -> Dataflow:
        """
        构建ByteWax数据流,用于Upstash用例。
        数据流如下:

* 1. 标记:['kafka_input'] = 从KafkaSource中读取输入数据

* 2. 标记:['map_kinp'] = 将来自KafkaSource的消息处理为通用文档

* 2.1 [可选] 标记 ['dbg_map_kinp'] = 在['map_kinp']之后进行调试

* 3. 标记:['refine'] = 将消息转换为精炼文档格式

* 3.1 [可选] 标记 ['dbg_refine'] = 在['refine']之后进行调试

* 4. 标记:['chunkenize'] = 将精炼文档分割成更小的数据块

* 4.1 [可选] 标记 ['dbg_chunkenize'] = 在['chunkenize']之后进行调试

* 5. 标记:['embed'] = 为数据块生成嵌入向量

* 5.1 [可选] 标记 ['dbg_embed'] = 在['embed']之后进行调试

* 6. 标记:['output'] = 将嵌入向量写入Upstash向量数据库系统
        注意:
            每个可选标记都是调试步骤,可以根据需要启用进行故障排除。
        """
        model = TextEmbedder(cache_dir=model_cache_dir)

        dataflow = Dataflow(flow_id="news-to-upstash")
        stream = op.input(
            step_id="kafka_input",
            flow=dataflow,
            source=_build_input(),
        )
        stream = op.flat_map("map_kinp", stream, process_message)
        # _ = op.inspect("dbg_map_kinp", stream)
        stream = op.map("refine", stream, RefinedDocument.from_common)
        # _ = op.inspect("dbg_refine", stream)
        stream = op.flat_map(
            "chunkenize",
            stream,
            lambda refined_doc: ChunkedDocument.from_refined(refined_doc, model),
        )
        # _ = op.inspect("dbg_chunkenize", stream)
        stream = op.map(
            "embed",
            stream,
            lambda chunked_doc: EmbeddedDocument.from_chunked(chunked_doc, model),
        )
        # _ = op.inspect("dbg_embed", stream)
        stream = op.output("output", stream, _build_output())
        logger.info("成功创建了bytewax数据流。")
        logger.info("\t阶段:Kafka 输入 -> 映射 -> 精炼 -> 分块 -> 嵌入 -> 插入或更新")
        return dataflow

    def _build_input() -> KafkaSource:
        return build_kafka_stream_client()

    def _build_output() -> DynamicSink:
        return UpstashVectorOutput()

此实现的关键点:

  • 一个 TextEmbedder 实例,它是单例包装器,封装了我们的嵌入模型 [sentence-transformers/all-MiniLM-L6-v2](https://huggingface.co/sentence-transformers/all-MiniLM-L6-v2),将用于计算文章文本的嵌入。
  • 一个 stream 变量用于定义和控制 Bytewax 数据流。
  • 使用 Bytewax 的 op.inspect 操作符在数据流的不同阶段进行各种调试。
  • 定义一个 _build_input() 方法,用于封装 KafkaSource 客户端,为了简单起见。
  • 定义一个 _build_output() 方法,用于封装 UpstashVector 客户端,为了简单起见。

在这个工作流程中,我们给以下阶段起了这些名字:

  1. kafka_input :获取并转换 Kafka 消息到 CommonDocument Pydantic 格式的阶段。
  2. map_kinp :表示 Map Kafka 输入,它对收到的消息应用展平处理,生成 List[CommonDocument] 类型的 Pydantic 对象。
  3. refine :迭代 List[CommonDocument] 并创建 RefinedDocument 实例。
  4. chunkenize :迭代 List[RefinedDocument] 并生成 ChunkedDocument 实例。
  5. embed :迭代 List[ChunkedDocument] 并创建 EmbeddedDocument 实例。
  6. output :迭代 List[EmbeddedDocument] ,创建 Vector 对象,并将它们更新到我们的 Upstash 向量索引中。
启动管道:

到目前为止,我们已经实现了。

  • 数据采集管道:在设定的时间间隔内从NewsAPIs抓取原始数据,对其进行格式化,并将消息发送到我们的Kafka主题。
  • 摄取管道:这是一个Bytewax DataFlow,连接并消费我们的Kafka主题上的消息,最终更新或插入我们的向量数据库向量数据库

我们可以通过位于项目根目录的Makefile中预先定义好的命令开始这些服务。

    # 运行命令以生成Kafka消息的管道
    make run_producers  

    # 运行命令以摄入Kafka消息并更新或插入向量的管道
    make run_pipeline

而且……好了
我们成功启动了生产者/消费者服务。
现在只剩下用于与向量数据库交互并查找新闻文章的UI。

用户接口

UI 是一个基本的 Streamlit (一个流处理的框架)应用,具备以下功能:

  • 一个文本搜索栏
  • 一个 div 区域,显示从向量数据库获取的文章卡片组件

一张卡片包括以下数据项:

  • 发布日期
  • 相似度
  • 文章图片
  • 一个 SeeMore 按钮,点击后会跳转到原始文章的链接。

一旦你在文本栏中输入了消息或问题——输入会被清理(转为小写,移除非ASCII字符等)然后进行嵌入。使用新的嵌入,我们查询向量数据库以找到最相似的条目——这些结果将被构造并呈现。

这是一个例子,

UI 文本输入框 (图片由作者提供)

UI 卡片视图的文章(由作者提供)

最后说一下

恭喜啦!

你做到了! 你打造了一個新聞搜索引擎,這不僅是一個酷炫的項目,但更重要的是已經準備好要上线了。我们不仅只是拼凑在一起,还遵循了最佳的软件开发实践。

我们使用 Pydantic 来让数据格式更加规范,通过多线程提高了效率,并编写了单元测试,引入了 Upstash 的无服务器 Kafka 和向量数据库服务,不仅轻松搭建了数据管道,还让它们变得快速、可扩展且具有容错性。

你现在掌握了窍门,可以将这个蓝图应用于几乎所有数据驱动的想法。这是一大胜利,不仅对这个项目,而且对将来你想要构建的所有酷炫的东西也都适用。

参考资料

[1] 基于Upstash Vector的新闻搜索工具 — 解码ML Github (2024)
[2] Upstash Serverless Kafka 连接方式
[3] Upstash Serverless 向量数据库
[4] 使用Python进行流处理
[5] 单例设计模式
[6] sentence-transformers/all-MiniLM-L6-v2(预训练模型)
[7] unstructured Python库(非结构化数据处理库)
[8] 使用Python的Streamlit

更多阅读

按相关性排序

使用 Pydantic 驗證來增強數據處理工作流程使用 Pydantic 模型和字段驗證器確保數據模型的一致性,就像專業人士那樣。medium.com 中

在这篇文章中,你将了解 Pydantic 的酷功能,以及为什么我建议在结构化数据交换类模型时使用 Pydantic 模型。展示的示例直接与当前文章相关。你将了解到你必须了解的关键属性、优势和功能,以及如何在你的数据工程工作流程中应用最佳实践。

點(diǎn)擊查看更多內(nèi)容
TA 點(diǎn)贊

若覺得本文不錯(cuò),就分享一下吧!

評(píng)論

作者其他優(yōu)質(zhì)文章

正在加載中
  • 推薦
  • 評(píng)論
  • 收藏
  • 共同學(xué)習(xí),寫下你的評(píng)論
感謝您的支持,我會(huì)繼續(xù)努力的~
掃碼打賞,你說多少就多少
贊賞金額會(huì)直接到老師賬戶
支付方式
打開微信掃一掃,即可進(jìn)行掃碼打賞哦
今天注冊(cè)有機(jī)會(huì)得

100積分直接送

付費(fèi)專欄免費(fèi)學(xué)

大額優(yōu)惠券免費(fèi)領(lǐng)

立即參與 放棄機(jī)會(huì)
微信客服

購課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)

舉報(bào)

0/150
提交
取消