This commit is contained in:
yinsx
2025-12-13 15:40:01 +08:00
commit 39c0f7e708
104 changed files with 6460 additions and 0 deletions

2
backend/.env.example Normal file
View File

@ -0,0 +1,2 @@
OPENAI_API_KEY=your_api_key_here
OPENAI_BASE_URL=

197
backend/bun.lock Normal file

File diff suppressed because one or more lines are too long

BIN
backend/flow.db Normal file

Binary file not shown.

12
backend/package.json Normal file
View File

@ -0,0 +1,12 @@
{
"name": "flow-backend",
"type": "module",
"scripts": {
"dev": "bun --watch src/index.ts",
"start": "bun src/index.ts"
},
"dependencies": {
"@langchain/openai": "^0.0.14",
"langchain": "^0.1.0"
}
}

25
backend/src/db.ts Normal file
View File

@ -0,0 +1,25 @@
import { Database } from "bun:sqlite";
const db = new Database("flow.db");
db.run(`
CREATE TABLE IF NOT EXISTS flows (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
schema_json TEXT NOT NULL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
`);
db.run(`
CREATE TABLE IF NOT EXISTS executions (
id TEXT PRIMARY KEY,
flow_id TEXT NOT NULL,
status TEXT DEFAULT 'pending',
result TEXT,
start_time TEXT DEFAULT CURRENT_TIMESTAMP,
end_time TEXT
)
`);
export default db;

View File

@ -0,0 +1,7 @@
export interface NodeConfig {
[key: string]: any;
}
export interface BaseExecutor {
execute(config: NodeConfig, input: Record<string, any>): Promise<Record<string, any>>;
}

View File

@ -0,0 +1,14 @@
import type { BaseExecutor } from "./base";
import { llmExecutor } from "./llm";
const executors: Record<string, BaseExecutor> = {
llmAgentNode: llmExecutor,
toolAgentNode: {
async execute(config, input) {
// 简单的搜索模拟,可扩展接入真实工具
return { output: `Tool ${config.tool_name} result for: ${input.output || input.input}` };
},
},
};
export const getExecutor = (type: string) => executors[type];

View File

@ -0,0 +1,20 @@
import { ChatOpenAI } from "@langchain/openai";
import { PromptTemplate } from "@langchain/core/prompts";
import type { BaseExecutor, NodeConfig } from "./base";
export const llmExecutor: BaseExecutor = {
async execute(config: NodeConfig, input: Record<string, any>) {
const llm = new ChatOpenAI({
modelName: config.model || "gpt-4o",
openAIApiKey: process.env.OPENAI_API_KEY,
configuration: { baseURL: process.env.OPENAI_BASE_URL || undefined },
});
const template = config.prompt_template || "{input}";
const prompt = PromptTemplate.fromTemplate(template);
const chain = prompt.pipe(llm);
const result = await chain.invoke(input.output ? { input: input.output } : input);
return { output: result.content };
},
};

101
backend/src/index.ts Normal file
View File

@ -0,0 +1,101 @@
import db from "./db";
import { socketManager } from "./services/socket";
import { executeFlow } from "./services/executor";
const server = Bun.serve({
port: 8000,
async fetch(req, server) {
const url = new URL(req.url);
// CORS
if (req.method === "OPTIONS") {
return new Response(null, { headers: corsHeaders });
}
// WebSocket upgrade
if (url.pathname.startsWith("/ws/flows/")) {
const executionId = url.pathname.split("/").pop()!;
if (server.upgrade(req, { data: { executionId } })) return;
return new Response("Upgrade failed", { status: 500 });
}
// API routes
if (url.pathname === "/api/flows" && req.method === "POST") {
const { name, schema_json } = await req.json();
const id = crypto.randomUUID();
db.run("INSERT INTO flows (id, name, schema_json) VALUES (?, ?, ?)", [id, name, JSON.stringify(schema_json)]);
return json({ id, name });
}
if (url.pathname.match(/^\/api\/flows\/[\w-]+$/) && req.method === "PUT") {
const id = url.pathname.split("/").pop()!;
const { name, schema_json } = await req.json();
db.run("UPDATE flows SET name = ?, schema_json = ? WHERE id = ?", [name, JSON.stringify(schema_json), id]);
return json({ id, name });
}
if (url.pathname.match(/^\/api\/flows\/[\w-]+$/) && req.method === "GET") {
const id = url.pathname.split("/").pop()!;
const row = db.query("SELECT * FROM flows WHERE id = ?").get(id) as any;
if (!row) return json({ error: "Not found" }, 404);
return json({ id: row.id, name: row.name, schema_json: JSON.parse(row.schema_json) });
}
if (url.pathname.match(/^\/api\/flows\/[\w-]+\/execute$/) && req.method === "POST") {
const flowId = url.pathname.split("/")[3];
const { runtime_data } = await req.json().catch(() => ({}));
const row = db.query("SELECT * FROM flows WHERE id = ?").get(flowId) as any;
if (!row) return json({ error: "Not found" }, 404);
const executionId = crypto.randomUUID();
db.run("INSERT INTO executions (id, flow_id, status) VALUES (?, ?, 'running')", [executionId, flowId]);
// 异步执行
(async () => {
try {
const result = await executeFlow(executionId, JSON.parse(row.schema_json), runtime_data);
db.run("UPDATE executions SET status = 'success', result = ?, end_time = CURRENT_TIMESTAMP WHERE id = ?", [JSON.stringify(result), executionId]);
} catch (err: any) {
db.run("UPDATE executions SET status = 'error', result = ?, end_time = CURRENT_TIMESTAMP WHERE id = ?", [JSON.stringify({ error: err.message }), executionId]);
}
})();
return json({ execution_id: executionId });
}
if (url.pathname.match(/^\/api\/flows\/executions\/[\w-]+$/) && req.method === "GET") {
const id = url.pathname.split("/").pop()!;
const row = db.query("SELECT * FROM executions WHERE id = ?").get(id) as any;
if (!row) return json({ error: "Not found" }, 404);
return json({ ...row, result: row.result ? JSON.parse(row.result) : null });
}
if (url.pathname === "/health") return json({ status: "ok" });
return json({ error: "Not found" }, 404);
},
websocket: {
open(ws) {
socketManager.add(ws.data.executionId, ws);
},
close(ws) {
socketManager.remove(ws.data.executionId);
},
message() {},
},
});
const corsHeaders = {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, PUT, DELETE, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type",
};
function json(data: any, status = 200) {
return new Response(JSON.stringify(data), {
status,
headers: { "Content-Type": "application/json", ...corsHeaders },
});
}
console.log(`Server running at http://localhost:${server.port}`);

View File

@ -0,0 +1,73 @@
import { getExecutor } from "../executors";
import { socketManager } from "./socket";
interface FlowSchema {
nodes: Array<{ id: string; type: string; data: any }>;
edges: Array<{ source: string; target: string }>;
}
export async function executeFlow(executionId: string, schema: FlowSchema, runtimeData: Record<string, any> = {}) {
const nodes = Object.fromEntries(schema.nodes.map((n) => [n.id, n]));
const adj: Record<string, string[]> = {};
const inDegree: Record<string, number> = {};
for (const n of schema.nodes) {
adj[n.id] = [];
inDegree[n.id] = 0;
}
for (const e of schema.edges) {
adj[e.source].push(e.target);
inDegree[e.target]++;
}
// 拓扑排序
const queue = Object.keys(inDegree).filter((id) => inDegree[id] === 0);
const sorted: string[] = [];
while (queue.length) {
const id = queue.shift()!;
sorted.push(id);
for (const next of adj[id]) {
if (--inDegree[next] === 0) queue.push(next);
}
}
const context: Record<string, any> = { ...runtimeData };
for (const nodeId of sorted) {
const node = nodes[nodeId];
socketManager.send(executionId, nodeId, "running", "开始执行");
try {
let result: Record<string, any>;
if (node.type === "startNode") {
result = { output: context[node.data?.input_key] || "" };
} else if (node.type === "outputNode") {
const prev = schema.edges.find((e) => e.target === nodeId)?.source;
result = { output: prev ? context[prev]?.output : "" };
} else if (node.type === "conditionalNode") {
const prev = schema.edges.find((e) => e.target === nodeId)?.source;
const output = prev ? context[prev] : {};
try {
result = { output: eval(node.data?.check_expression || "true"), condition_result: true };
} catch {
result = { output: false, condition_result: false };
}
} else {
const executor = getExecutor(node.type);
if (!executor) throw new Error(`No executor for ${node.type}`);
const prev = schema.edges.find((e) => e.target === nodeId)?.source;
const input = prev ? context[prev] : context;
result = await executor.execute(node.data || {}, input);
}
context[nodeId] = result;
socketManager.send(executionId, nodeId, "success", "执行完成", String(result.output || ""));
} catch (err: any) {
socketManager.send(executionId, nodeId, "error", err.message);
throw err;
}
}
return context;
}

View File

@ -0,0 +1,25 @@
import type { ServerWebSocket } from "bun";
const connections = new Map<string, ServerWebSocket<{ executionId: string }>>();
export const socketManager = {
add(executionId: string, ws: ServerWebSocket<{ executionId: string }>) {
connections.set(executionId, ws);
},
remove(executionId: string) {
connections.delete(executionId);
},
send(executionId: string, nodeId: string, status: string, message = "", outputPreview = "") {
const ws = connections.get(executionId);
if (ws) {
ws.send(JSON.stringify({
eventType: "status_update",
executionId,
nodeId,
status,
timestamp: new Date().toISOString(),
payload: { log_message: message, output_preview: outputPreview.slice(0, 200) },
}));
}
},
};