在数据科学工作流中,单一代理往往面临能力边界模糊、上下文溢出和任务切换开销大等问题。business-science 开源的 AI Data Science Team 项目通过 Supervisor 架构模式,将复杂的数据科学任务分解为专业化代理集群,由中央调度器统一编排,实现了加载、清洗、可视化、建模等环节的流水线自动化。本文将从代理职责划分、任务路由决策和状态管理三个维度,给出可落地的工程参数与实现要点。
代理职责的边界划分
多代理系统的首要挑战是定义每个代理的职责边界。AI Data Science Team 将数据科学工作流划分为九个核心代理,每个代理聚焦单一能力域,形成职责单一的专家池。这种设计遵循了单一职责原则,使得代理内部的提示词可以针对特定任务进行高度优化,避免通用代理常见的指令混淆问题。
数据加载代理(Data Loader Tools Agent) 负责识别数据源格式、执行初始加载并返回标准化数据结构。该代理内置了对 CSV、Parquet、JSON、Excel 等常见格式的识别逻辑,加载失败时会返回结构化的错误描述而非直接抛出异常,便于上游代理进行重试或降级处理。工程实践中建议为该代理配置 3 秒超时和 2 次自动重试,避免因网络波动或格式异常导致整个流水线阻塞。
数据清洗代理(Data Cleaning Agent) 处理缺失值填充、异常值检测与修正、重复行消除等操作。该代理的输入输出均遵循统一的 DataFrame 描述协议,包含列类型统计、空值分布热区和清洗建议列表。清洗策略的生成采用约束驱动模式:代理先生成候选方案,再通过规则引擎验证方案的合法性。例如,若某列被识别为分类变量但包含超过 50 个唯一值,代理会触发高基数告警并建议人工复核。
数据可视化代理(Data Visualization Agent) 和 EDA 工具代理(EDA Tools Agent) 共同承担探索性分析职责。前者负责生成符合出版标准的静态图表,支持 matplotlib、seaborn、plotly 三种后端;后者提供统计摘要、相关系数矩阵和分布检验等文本化输出。这两个代理的设计亮点在于它们共享同一份中间状态,上游代理的清洗结果会自动传递为下游代理的输入,无需额外的数据序列化工序。
特征工程代理(Feature Engineering Agent) 负责派生新特征、编码分类变量和降维操作。该代理的独特之处在于它维护了一个特征血缘图,记录每个派生特征的原始列来源和变换函数。这对于后续的模型可解释性分析至关重要,也为回溯实验提供了可复现的保障。
SQL 数据库代理(SQL Database Agent) 支持自然语言到 SQL 的转换,连接配置通过环境变量注入,代理内部仅持有连接句柄而不存储凭证。该代理还支持查询优化建议功能,当检测到全表扫描模式时会输出重构后的等价查询。
H2O 机器学习代理(H2O ML Agent) 和 MLflow 工具代理(MLflow Tools Agent) 形成模型训练与实验管理的闭环。前者封装了 H2O.ai 的 AutoML 能力,支持分类、回归和时间序列三种任务类型;后者提供实验版本管理、指标对比和模型注册功能。两者通过 MLflow Tracking Server 实现状态共享,代理间的调用延迟控制在 500 毫秒以内。
Supervisor 代理 是整个系统的中枢,负责解析用户意图、规划任务序列、调度子代理执行和聚合最终结果。该代理不直接操作数据,而是通过 LangGraph 的 Command 对象驱动其他代理的执行流向。这种设计使得 Supervisor 本身保持无状态,状态全部下沉到 LangGraph 的 StateGraph 中,便于故障恢复和水平扩展。
任务路由的决策机制
LangGraph 框架为多代理系统提供了图结构编排能力,核心在于定义状态类型、节点执行逻辑和条件边跳转规则。AI Data Science Team 的任务路由采用分层决策模式:第一层由用户输入触发,Supervisor 代理进行任务分解;第二层由子代理的结果触发,判断是否需要转交其他代理或直接返回。
LangGraph 的 StateGraph 定义了全局状态结构,包含任务描述、当前阶段、中间结果、错误日志和完成标志五个核心字段。状态更新采用增量模式,每次代理执行后仅修改变更字段而非全量覆盖,这对于处理大规模数据集时的内存优化尤为重要。状态持久化通过 Checkpointer 机制实现,支持将中间状态写入磁盘或外部存储,默认配置下每个检查点占用约 200 字节的元数据空间。
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_openai import ChatOpenAI
from typing import TypedDict, Literal, Command
class AgentState(TypedDict):
task: str
current_agent: str
intermediate_results: dict
errors: list
completed: bool
def supervisor_node(state: AgentState) -> Command[Literal["data_loader", "data_cleaner", "visualizer", "FINISH"]]:
model = ChatOpenAI(model="gpt-4o-mini")
response = model.with_structured_output(Router).invoke(
f"任务: {state['task']}\n当前进度: {state['current_agent']}\n"
"请选择下一个代理: data_loader / data_cleaner / visualizer / feature_engineer / FINISH"
)
return Command(
goto=response.next_agent,
update={"current_agent": response.next_agent}
)
条件边的跳转逻辑由 Router 结构化输出驱动。每个子代理在执行完毕后,会根据自身任务的完成状态返回下一跳指令。若代理检测到依赖条件不满足(例如数据清洗代理发现数据尚未加载),会返回特殊标记触发 Supervisor 的重调度逻辑。这种设计避免了硬编码的流水线顺序,使得系统能够根据实际数据特征动态调整执行路径。
Supervisor 代理的提示词设计包含三个关键要素:成员列表(当前活跃的子代理及其能力描述)、任务分解策略(将复杂任务拆解为原子操作的启发式规则)、结束条件判断(基于输出完整性评估的终止标准)。在 AI Data Science Team 的实现中,这三部分内容被分别抽离为独立配置项,支持通过 YAML 文件动态调整代理池的组成和调度策略。
LangGraph 的 createSupervisor 封装了上述模式,提供 outputMode 参数控制输出聚合方式。当 outputMode="last_message" 时,系统仅保留最后一个代理的返回结果,适用于端到端的单轮交互;当 outputMode="all_messages" 时,系统会收集所有代理的中间输出,生成完整的执行日志供审计和复盘使用。
状态管理与持久化配置
多代理系统的状态管理面临三个核心挑战:跨代理的状态传递、故障恢复后的状态恢复和并发执行的一致性保证。AI Data Science Team 通过 LangGraph 的 StateGraph 和 Checkpointer 机制提供了系统化的解决方案。
状态传递采用消息队列模式,每个代理的输出被封装为消息对象,包含内容、时间戳、执行耗时和元数据。消息对象通过共享状态传递,下游代理可以访问完整的历史消息记录,这为长周期任务的上下文管理提供了基础。消息保留策略可配置,默认保留最近 5 轮对话的完整上下文,超出限制时自动压缩为摘要形式。
故障恢复机制基于检查点持久化实现。每个 LangGraph 的 compile() 调用可传入 checkpointer 参数,配置存储后端和检查点频率。AI Data Science Team 推荐使用 SQLite 本地存储或 Redis 分布式存储两种方案:对于单用户场景,SQLite 的文件锁机制足以保证一致性;对于多用户并发场景,Redis 的原子操作和过期机制能够有效防止状态竞争。
from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.sqlite import SqliteSaver
# 单进程内存模式
memory = MemorySaver()
# 文件持久化模式
sqlite = SqliteSaver.from_conn_string("checkpoints/workflow.db")
app = builder.compile(
checkpointer=sqlite,
interrupt_before=["requires_approval"]
)
interrupt_before 参数是实现人工审核流程的关键。当流水线执行到特定节点时,系统会自动暂停并等待外部信号。这种设计适用于高风险操作(如模型部署、数据库写入)的审批流程,也便于在调试阶段逐节点检查中间状态。恢复执行时,只需调用 app.invoke(None, config=config),系统会从最近的检查点继续执行。
并发控制方面,LangGraph 支持配置线程隔离。每个执行线程拥有独立的状态副本,不同线程之间的状态互不干扰。在 AI Pipeline Studio 应用中,每个用户的交互会话对应一个独立线程,会话 ID 作为 thread_id 参数传入配置,确保多用户场景下的状态隔离。
工程落地的关键参数
基于上述架构分析,以下是数据科学多代理系统部署时的关键参数建议。代理超时配置应根据任务复杂度和模型响应时间动态调整:数据加载代理建议 3 秒超时、2 次重试;数据清洗代理建议 10 秒超时、1 次重试;机器学习代理建议 120 秒超时、0 次重试(长任务应拆分为独立阶段)。
LLM 模型选择需权衡能力与成本。Supervisor 代理作为决策中枢,建议使用 GPT-4o 或 Claude 3.5 等高能力模型,确保任务分解的准确性;子代理可使用 GPT-4o-mini 或 Haiku 等轻量模型,降低调用成本。AI Data Science Team 默认配置下,单次完整流水线的 LLM 调用成本约为 0.05 美元(基于 GPT-4o-mini)。
检查点存储的频率与存储空间呈反比。默认配置下每个检查点保存完整状态,存储空间约为状态大小的 1.2 倍。对于 GB 级别的大规模数据集,建议开启增量检查点模式,仅保存变更字段,存储空间可降低至原始大小的 10% 以下。
代理池的规模应与并发用户数匹配。经验公式为:代理池大小 = 峰值并发用户数 × 平均流水线深度 × 0.5。例如,若平台日活 1000 用户、平均流水线深度 5 步,建议配置 2500 个代理实例,通过 Kubernetes HPA 实现弹性伸缩。
资料来源
- AI Data Science Team 官方仓库:https://github.com/business-science/ai-data-science-team
- LangGraph 多代理概念文档:https://raw.githubusercontent.com/langchain-ai/langgraph/main/docs/docs/concepts/multi_agent.md