Files
zguiy-flow/flow.md
2025-12-13 15:40:01 +08:00

4.6 KiB
Raw Permalink Blame History

  1. 项目概览与架构设计1.1 核心目标提供基于 Vue Flow 的可视化流程编排界面。后端采用 Executor 模式 和 LangChain确保 Agent 逻辑高度模块化和可扩展。通过 WebSocket 实现流程执行状态的实时监控。1.2 技术栈 (Technology Stack)领域技术/框架职责前端 (FE)Vue 3, Vue Flow, Axios流程图渲染、节点配置、API 通信。后端 (BE)FastAPI高性能 API 服务路由WebSocket 管理。Agent 核心LangChainAgent 定义、工具调用、Prompt 模板。数据库PostgreSQL/SQLite存储流程 Schema、Agent 配置和执行历史。1.3 核心架构前端 (Vue Flow): 用户设计流程,提交 Schema。FastAPI (API/WS Router): 接收请求,管理 WebSocket 连接。Flow Parser: 将 Vue Flow Schema 转换为内部 DAG (有向无环图) 结构。Flow Executor: 遍历 DAG调度 Agent并向 Socket Manager 推送状态。Agent Executors: LangChain 驱动的模块化 Agent 实例。2. 前端设计与实现 (Vue Flow)2.1 节点类型与配置组件所有自定义节点Custom Nodes必须包含一个用于配置其后端参数的组件。节点 Type ID描述核心配置字段 (Node Data)startNode流程的输入/触发器。{"input_key": "user_query"}llmAgentNode基础 LLM 任务。{"model": "gpt-4o", "prompt_template": "...", "input_map": {...}}toolAgentNode调用外部工具。{"tool_name": "Search", "tool_config": {...}}conditionalNode逻辑路由判断。{"check_expression": "output.type == 'sales'"} outputNode流程结果输出。{"output_key": "final_result"}2.2 数据契约Schema 到后端前端将 Vue Flow toObject() 结果发送给后端。JSON{ "flowId": "uuid_flow_1", "nodes": [ // ... Node Definition (包含 type 和 data) ], "edges": [ // ... Edge Definition (包含 source, target, label/condition) ], // 增加运行时数据,方便流程重启或监控 "runtime_data": {} } 2.3 状态监控逻辑 (WebSocket Client)连接建立: 流程执行 API 成功调用后,前端立即使用返回的 execution_id 建立 WS 连接ws://backend_url/ws/flows/{execution_id}。状态映射: 监听 WebSocket 消息,根据消息中的 nodeId 和 status 实时更新 Vue Flow 节点的外观样式 (CSS Class)。running \rightarrow 节点边框闪烁/黄色success \rightarrow 绿色边框/实心error \rightarrow 红色边框/错误图标3. 后端设计与实现 (FastAPI + LangChain)3.1 核心数据模型 (Pydantic/ORM)模型/类描述关键字段FlowSchema存储 Vue Flow 结构。id, name, schema_json, created_atExecution存储流程运行实例。id, flow_id, status, start_time, end_timeExecutionLog存储运行时日志。execution_id, node_id, status, message, timestamp3.2 API 接口规范接口方法路径描述创建/更新流程POST / PUT/api/flows存储前端设计的流程 Schema。流程启动POST/api/flows/{flow_id}/execute启动流程执行返回 execution_id。流程历史GET/api/executions/{exec_id}获取流程历史状态和最终输出。实时监控WS/ws/flows/{execution_id}实时推送节点状态和日志。3.3 Agent 执行器与扩展性所有 Agent 逻辑必须遵循统一的执行器接口,保证 Flow Executor 不关心底层细节。Python# Base Agent Executor Interface from typing import Dict, Any

class BaseAgentExecutor: # 核心执行方法 async def execute(self, node_config: Dict[str, Any], input_data: Dict[str, Any]) -> Dict[str, Any]: """执行 Agent 任务,返回结构化输出。""" # 内部应调用 LangChain Components (Chains, Agents, Tools) raise NotImplementedError

扩展示例:新增 Database Agent

class DBQueryExecutor(BaseAgentExecutor): async def execute(self, node_config, input_data): # 1. 解析 node_config 获取 SQL 模板 # 2. 调用 LangChain SQL ToolChain # 3. 返回结果 pass 3.4 WebSocket 消息契约 (Backend -> Frontend)所有状态更新都通过这个统一的 JSON 格式推送。JSON{ "eventType": "status_update", "executionId": "...", "nodeId": "node_A", "status": "success", // running, success, error, skipped "timestamp": "2025-12-11T...", "payload": { "log_message": "Agent finished successfully.", "output_preview": "Summary: The project is highly scalable." } } 4. 部署与环境规范4.1 环境依赖后端: Python 3.11+ (FastAPI, LangChain, Uvicorn)前端: Node.js 18+ (Vue 3+vite+ts, Vue Flow)4.2 部署建议分离部署: 前后端应分开部署。CORS 配置: 确保 FastAPI 允许前端 URL 访问 API 和 WebSocket 端口。密钥管理: 使用环境变量 (.env 或 secrets manager) 管理 OPENAI_API_KEY