介紹Graphite——一個基于事件驅(qū)動的AI代理框架
推荐
GGraphite 是一个开源框架,用于构建特定领域的人工智能助手,使用可组合的代理式工作流。它提供了一个高度可扩展的平台,可以根据独特的业务需求灵活定制,使组织能够开发适合其特定业务领域的自定义工作流。
我们之前提到了这个产品,现在我们很高兴地在这里开源了 Graphite!您可以在 GitHub 给它一个 Star ⭐ 吧,如果您喜欢它!
GitHub - binome-dev/graphite点击这里通过在 GitHub 上创建帐户,你可以加入到 binome-dev/graphite 的贡献中来。
你可能在想,“既然已经有了像ChatGPT、Claude以及各种代理平台和框架这样的AI解决方案,”简短来说:我们发现现有的AI工具在解决现实世界问题上存在一些不足之处。许多通用代理——比如典型的ReAct或CoT代理——在执行关键任务时可能会遇到麻烦,而在这个过程中犯错可能代价不菲。
石墨提供了一个简单且易于控制的框架来构建工作流程,使大型语言模型(LLMs)不仅能够进行推理和规划,还可以在明确定义、可审计和易于恢复的工作流程中运行。它还内置了可观测性、幂等性、可审计性等重要功能。
本文将简要介绍 Graphite,从其架构到重要特性,并提供一个简单的示例:例如,使用 Graphite 构建一个客户了解型 AI 助手。
这个架构:简单、强大而灵活石墨(Graphite)由三个概念层架构组成——助手、节点和工具:
- 助手 负责协调工作流程并管理对话的状态。
- 节点 封装了独立的逻辑单元,每个节点专注于特定功能(例如调用大语言模型或执行特定函数)。
- 工具 是纯粹的函数,负责执行任务,例如调用API或运行Python函数。
此外,Graphite 使用事件溯源模式来记录每次状态的变化。每当 Assistant、Node 或 Tool 被调用、响应或失败时,都会生成相应的事件并存储在事件存储库中。
命令模式通过良好的命令对象和处理器,干净地将请求发起方和执行方分开。这些命令包含所有必需的信息,使节点能够简单地、独立地调用工具。
节点通过一种轻量级的发布/订阅流程协调机制来进行协调。
- 工作流 利用发布-订阅模式和内存中的消息队列来协调各节点间的互动。
这种架构实现了即插即用的模块化——你可以像搭乐高积木一样搭建工作流,添加或移除节点,并且可以精确地隔离故障,就像外科手术一样。
我们上面介绍的核心设计原则表明,Graphite 与其他代理框架的区别在于:
-
一个简单的三层执行模型
三个不同的层次——助手层、节点层和工具层——管理执行,而专门的工作流层则负责编排。 -
基于发布/订阅的事件驱动编排
通信依赖于发布和订阅事件,确保数据在整个系统中以解耦和模块化的方式流动,保持系统的灵活性和扩展性。 - 事件作为单一事实来源
所有操作状态和转换都被记录为事件,提供了一种统一的方式,可以用来追踪和重放系统行为。
将这些元素结合在一起,Graphite 提供了一个生产级的 AI 应用框架,能够可靠地在大规模环境中运行,优雅地应对故障,并维持用户及利益相关者的信任。这种方法的核心所在是由四个基本能力作为这一方法的基础。
- 可观测性
复杂的人工智能解决方案涉及多个步骤、数据源和模型。Graphite 的事件驱动架构、日志记录和追踪使其能够及时发现瓶颈或错误,确保每个组件的行为透明且可量化。 - 幂等性
异步工作流在网络条件变化或部分失败时常常需要重试。Graphite 的设计强调幂等操作,避免重复发布或订阅数据的损坏,以防止在必须重复调用时出现数据重复或损坏。 - 可审计性
通过将事件视为唯一的真实来源,Graphite 自动记录每个状态变化和决策路径。这种详细记录对于在受管制行业工作的用户或需要完整可追溯性以进行调试和合规性检查的用户来说是必不可少的。 - 可恢复性
长时间运行的任务在执行过程中失败时有重新工作的风险。在 Graphite 中,检查点和事件驱动的回放使工作流可以从中断的精确点恢复,从而最大限度地减少停机时间并提高资源利用率。
这些能力——可观测性、幂等特性、审计性和可恢复性——共同使 Graphite 独树一帜作为构建稳健和可信的 AI 应用程序的这一框架。下面我们将详细说明 Graphite 实现每个功能的方式。
展示代码 — KYC 助手在设计基于AI的工作流程时,请记住大型语言模型会带来不确定性。遵循奥卡姆剃刀原则会有帮助,这意味着,工作流程越简单越有效。
设计工作流程假设你想创建一个为客户了解(KYC)的助手,用于健身房的注册流程,并且具有人工审核环节。用户需要提供全名和电子邮件地址才能完成注册。如果缺少任何信息,流程会暂停并要求用户提供更多信息。当然,实际应用中需要更多细节,但这次我们只简化了流程。
首先,通过pip安装Graphite
pip install grafi
运行此命令来安装grafi包
根据上面的工作流程图,我们需要添加以下组件。
7个话题:
输入的主题,通过代理由框架提供
从grafi.common.topics.topic模块导入代理输入主题(agent_input_topic)
提取用户信息
user_info_extract_topic = Topic(name="user_info_extract_topic") # 初始化一个名为"user_info_extract_topic"的主题
- 人类呼叫的话题
hitl_call_topic = Topic(
name="hitl_call_topic",
# condition: 检查最后一个消息的工具调用是否不是"register_client"函数
condition=lambda msgs: msgs[-1].tool_calls[0].func_name
!= "register_client",
# "Topic":表示某个主题或话题,这里用于特定条件下的消息处理
)
- 人机协作(框架提供)
from grafi.common.topics.人类请求主题 import 人类请求主题
- 注册 用户话题
register_user_topic = Topic(
name="用户注册主题",
condition=lambda msgs: msgs[-1].tool_calls[0].function.name
== "register_client", # 最后一条消息的工具调用中的函数名为 "register_client"
)
- 用户回答帖子
register_user_respond_topic = Topic(name="注册用户响应话题")
- 代理输出话题(由框架提供)
# 从grafi.common.topics.output_topic导入agent_output_topic模块
5个节点:(此处可添加简短描述或上下文信息)
- LLM 节点(用户信息提取节点)用于提取用户输入中的姓名和电子邮件地址
user_info_extract_node = (
LLMNode.Builder() # 大模型节点构建器
.name("思想节点") # 节点名称为"思想节点"
.subscribe(
SubscriptionBuilder()
.subscribed_to(agent_input_topic) # 订阅代理输入主题
.或者() # 或者
.subscribed_to(human_request_topic) # 订阅人类请求主题
.build() # 构建订阅设置
)
.command(
LLMResponseCommand.Builder()
.llm(
OpenAITool.Builder()
.name("思想大模型") # 大模型名称为"思想大模型"
.api_key(self.api_key) # 使用API密钥
.model(self.model) # 模型名称
.system_message(self.user_info_extract_system_message) # 系统消息
.build() # 构建大模型设置
)
.build() # 构建命令设置
)
.publish_to(user_info_extract_topic) # 发布到用户信息提取主题
.build() # 构建节点
)
- LLM 节点(LLM Node)根据提取的信息来创建动作(执行)
action_node = (
LLMNode.Builder()
.name("ActionNode") # 设置节点名称为“ActionNode”
.subscribe(user_info_extract_topic) # 订阅用户信息提取主题
.command(
LLMResponseCommand.Builder()
.llm(
OpenAITool.Builder()
.name("ActionLLM") # 设置LLM名称为“ActionLLM”
.api_key(self.api_key) # 设置API密钥
.model(self.model) # 设置模型
.system_message(self.action_llm_system_message) # 设置系统消息
.build()
)
.build()
)
.publish_to(hitl_call_topic) # 发布到HITL调用主题
.publish_to(register_user_topic) # 发布到用户注册主题
.build() # 构建节点
)
- 如果有缺失的信息,向用户询问(人机循环)
human_request_function_call_node = (
LLMFunctionCallNode.Builder()
.name("HumanRequestNode") # 人类请求节点
.subscribe(hitl_call_topic) # 订阅hitl调用主题
.command(
FunctionCallingCommand.Builder()
.function_tool(self.hitl_request) # 功能工具
.build()
)
.publish_to(human_request_topic) # 发布到人类请求主题
.build()
)
- 功能工具节点(注册用户)可以帮助用户注册。
register_user_node = ( # 注释: 定义用户注册节点
LLMFunctionCallNode.Builder() # 注释: 创建LLM函数调用节点构建器
.名称("功能调用注册节点") # 注释: 设置节点名称
.订阅(register_user_topic) # 注释: 订阅用户注册主题
.命令( # 注释: 设置命令
FunctionCallingCommand.Builder() # 注释: 创建函数调用命令构建器
.function_tool(self.注册请求) # 注释: 设置函数工具
.build() # 注释: 构建命令对象
)
.发布到(register_user_respond_topic) # 注释: 发布到用户注册响应主题
.build() # 注释: 构建节点对象
)
- LLM节点(回应用户)来起草给用户的最终回复
// 创建LLMNode实例
user_reply_node = (
LLMNode.Builder()
// 设置节点名称
.name("LLMResponseToUserNode")
// 订阅用户回复主题
.subscribe(register_user_respond_topic)
// 设置命令
.command(
LLMResponseCommand.Builder()
// 设置LLM
.llm(
OpenAITool.Builder()
// 设置名称
.name("ResponseToUserLLM")
// 设置API密钥
.api_key(self.api_key)
// 设置模型
.model(self.model)
// 设置系统消息
.system_message(self.summary_llm_system_message)
.build()
)
.build()
)
// 发布到代理输出主题
.publish_to(agent_output_topic)
.build()
)
// self.api_key 表示当前实例的API密钥
// self.model 表示当前实例的模型
// self.summary_llm_system_message 表示当前实例的系统消息
源代码在这里 — KYC 助手,其中 KYC 代表“了解您的客户”。
那么让我们写一个简单的代码程序来测试。
import json
import uuid
from kyc_assistant import KycAssistant
from grafi.common.decorators.llm_function import llm_function
from grafi.common.models.execution_context import ExecutionContext
from grafi.common.models.message import Message
from grafi.tools.functions.function_tool import FunctionTool
class ClientInfo(FunctionTool):
@llm_function
def request_client_information(self, question_description: str):
"""
根据给定的问题描述请求用户的个人信息。
"""
return json.dumps({"question_description": question_description})
class RegisterClient(FunctionTool):
@llm_function
def register_client(self, name: str, email: str):
"""
电子邮件为{email}的用户已注册。
"""
return f"电子邮件为{name}的用户已注册。"
user_info_extract_system_message = """
你是一个严格的验证器,用于检查给定的输入中是否包含用户的全名和电子邮件地址。你的任务是分析输入,并确定是否同时存在一个全名(包括名和姓)和一个有效的电子邮件地址。
### 验证标准:
- **全名**:输入应包含至少两个单词,它们看起来像名和姓。忽略常见的占位符(例如“John Doe”)。
- **电子邮件地址**:输入应包含一个有效的电子邮件格式(例如example@example.com)。
- **大小写不敏感**:电子邮件验证应不区分大小写。
- **准确性**:通过确保随机文本、用户名或部分名称不会触发验证来避免假阳性。
- **输出**:如果同时存在全名和电子邮件,则响应“有效”,否则响应“无效”。可选择提供无效的原因。
### 示例响应:
- **输入**:"John Smith, john.smith@example.com" → **输出**:"有效"
- **输入**:"john.smith@example.com" → **输出**:"无效 - 缺少全名"
- **输入**:"John" → **输出**:"无效 - 缺少全名和电子邮件"
严格遵循这些验证规则,不要假设缺失的细节。
"""
def get_execution_context():
return ExecutionContext(
conversation_id="conversation_id",
execution_id=uuid.uuid4().hex,
assistant_request_id=uuid.uuid4().hex,
)
def test_kyc_assistant():
execution_context = get_execution_context()
assistant = (
KycAssistant.Builder()
.name("KycAssistant")
.api_key(
"YOUR_OPENAI_API_KEY"
)
.user_info_extract_system_message(user_info_extract_system_message)
.action_llm_system_message(
"根据请求选择最合适的功能。"
)
.summary_llm_system_message(
"使用注册结果回复用户。如果成功,你的回复必须包含'已注册'。"
)
.hitl_request(ClientInfo(name="request_human_information"))
.register_request(RegisterClient(name="register_client"))
.build()
)
while True:
# 初始用户输入
user_input = input("用户:")
input_data = [Message(role="user", content=user_input)]
output = assistant.execute(execution_context, input_data)
responses = []
for message in output:
try:
content_json = json.loads(message.content)
responses.append(content_json["question_description"])
except json.JSONDecodeError:
responses.append(message.content)
respond_to_user = ",".join(responses)
print("助手:", respond_to_user)
if "已注册" in output[0].content:
break
if __name__ == "__main__":
test_kyc_assistant()
import json
import uuid
from kyc_assistant import KycAssistant
from grafi.common.decorators.llm_function import llm_function
from grafi.common.models.execution_context import ExecutionContext
from grafi.common.models.message import Message
from grafi.tools.functions.function_tool import FunctionTool
class ClientInfo(FunctionTool):
@llm_function
def request_client_information(self, question_description: str):
"""
根据给定的问题描述请求用户的个人信息。
"""
return json.dumps({"question_description": question_description})
class RegisterClient(FunctionTool):
@llm_function
def register_client(self, name: str, email: str):
"""
电子邮件为{email}的用户已注册。
"""
return f"电子邮件为{name}的用户已注册。"
user_info_extract_system_message = """
你是一个严格的验证器,用于检查给定的输入中是否包含用户的全名和电子邮件地址。你的任务是分析输入,并确定是否同时存在一个全名(包括名和姓)和一个有效的电子邮件地址。
### 验证标准:
- **全名**:输入应包含至少两个单词,它们看起来像名和姓。忽略常见的占位符(例如“John Doe”)。
- **电子邮件地址**:输入应包含一个有效的电子邮件格式(例如example@example.com)。
- **大小写不敏感**:电子邮件验证应不区分大小写。
- **准确性**:通过确保随机文本、用户名或部分名称不会触发验证来避免假阳性。
- **输出**:如果同时存在全名和电子邮件,则响应“有效”,否则响应“无效”。可选择提供无效的原因。
### 示例响应:
- **输入**:"John Smith, john.smith@example.com" → **输出**:"有效"
- **输入**:"john.smith@example.com" → **输出**:"无效 - 缺少全名"
- **输入**:"John" → **输出**:"无效 - 缺少全名和电子邮件"
严格遵循这些验证规则,不要假设缺失的细节。
"""
def get_execution_context():
return ExecutionContext(
conversation_id="conversation_id",
execution_id=uuid.uuid4().hex,
assistant_request_id=uuid.uuid4().hex,
)
def test_kyc_assistant():
execution_context = get_execution_context()
assistant = (
KycAssistant.Builder()
.name("KycAssistant")
.api_key(
"YOUR_OPENAI_API_KEY"
)
.user_info_extract_system_message(user_info_extract_system_message)
.action_llm_system_message(
"根据请求选择最合适的功能。"
)
.summary_llm_system_message(
"使用注册结果回复用户。如果成功,你的回复必须包含'已注册'。"
)
.hitl_request(ClientInfo(name="request_human_information"))
.register_request(RegisterClient(name="register_client"))
.build()
)
while True:
# 初始用户输入
user_input = input("用户:")
input_data = [Message(role="user", content=user_input)]
output = assistant.execute(execution_context, input_data)
responses = []
for message in output:
try:
content_json = json.loads(message.content)
responses.append(content_json["question_description"])
except json.JSONDecodeError:
responses.append(message.content)
respond_to_user = ",".join(responses)
print("助手:", respond_to_user)
if "已注册" in output[0].content:
break
if __name__ == "__main__":
test_kyc_assistant()
测试、观察、调试和优化
石墨使用 OpenTelemetry 和 Arize 的 OpenInference 来追踪、观察助手的行为表现。
我们可以本地运行phoenix来捕捉这些痕迹。
phoenix serve (启动凤凰服务器 qǐdòng fènghuáng fúwùqī)
这里有一个用户和助手之间聊天的示例。
> 用户: 嗨,我想报名参加你们的健身房。你能帮我注册吗?
> 助手: 请提供您的全名和电子邮件地址来报名参加健身房。
> 用户: 我的名字是Craig李,这是我的电子邮件:craig@example.com
> 助手: 恭喜你,Craig!你现在已经在我们的健身房注册了。如果你有任何问题或需要帮助,请随时咨询!
看上去挺好的。但如果你把测试的边界推得更远些,错误很快就会冒出来。像下面这样
> 用户: 嗨,今天过得怎么样呢?
出现了错误,当我们用Phoenix调试时,可以精准找到它的位置:
问题始于Action LLM。根据给定的输入,它应该选择合适的工具,但实际上却生成了一个纯粹的字符串回复。
为了解决这个问题,我们可以提供明确的指示:如果用户问的是与健身房注册无关的问题,LLM应使用request_client_information工具来礼貌地回答并询问他们是否需要帮助注册健身房。
那么,让我们更新LLM系统的提示。
.action_llm_system_message(
"根据请求选择最合适的工具:"
"如果用户提出的问题与健身房注册无关,大语言模型(LLM)应该使用`request_client_information`工具礼貌地回应,并询问是否需要帮助注册健身房。"
)
再试试看
> 用户:嗨,今天怎么样?
> 助手:我是来帮您注册健身房会员的,请您提供您的全名和邮箱。
你已经建立了一个可以协助办理健身房会员注册的助手!助手的代码可以在这个链接找到,同时你可以在这里链接找到一个示例。
摘要格拉斐特是一种开源框架,用于构建特定领域的AI助手,通过可组合、事件驱动的工作流。借助松散耦合的组件——助手、节点、工具和工作流,格拉斐特支持模块化设计,事件作为唯一的真实来源。其架构支持复杂的逻辑,如大型语言模型调用、函数执行和RAG检索,所有这些都通过专门的发布/订阅主题和命令进行协调。内置可观测性、幂等性、可审计性和可恢复性,使它具备了生产就绪性,使工作流能够扩展、恢复并适应合规性要求。不论是构建对话代理还是自动化流程,格拉斐特都为现实世界的AI系统提供了一个灵活且可扩展的基础。
共同學(xué)習(xí),寫下你的評論
評論加載中...
作者其他優(yōu)質(zhì)文章