You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

238 lines
7.2 KiB

const fs = require("fs");
const path = require("path");
const { v4: uuidv4 } = require("uuid");
const { FlowExecutor } = require("./executor");
const { normalizePath } = require("../files");
const { safeJsonParse } = require("../http");
class AgentFlows {
static flowsDir = process.env.STORAGE_DIR
? path.join(process.env.STORAGE_DIR, "plugins", "agent-flows")
: path.join(process.cwd(), "storage", "plugins", "agent-flows");
constructor() {}
/**
* Ensure flows directory exists
* @returns {Boolean} True if directory exists, false otherwise
*/
static createOrCheckFlowsDir() {
try {
if (fs.existsSync(AgentFlows.flowsDir)) return true;
fs.mkdirSync(AgentFlows.flowsDir, { recursive: true });
return true;
} catch (error) {
console.error("Failed to create flows directory:", error);
return false;
}
}
/**
* Helper to get all flow files with their contents
* @returns {Object} Map of flow UUID to flow config
*/
static getAllFlows() {
AgentFlows.createOrCheckFlowsDir();
const files = fs.readdirSync(AgentFlows.flowsDir);
const flows = {};
for (const file of files) {
if (!file.endsWith(".json")) continue;
try {
const filePath = path.join(AgentFlows.flowsDir, file);
const content = fs.readFileSync(normalizePath(filePath), "utf8");
const config = JSON.parse(content);
const id = file.replace(".json", "");
flows[id] = config;
} catch (error) {
console.error(`Error reading flow file ${file}:`, error);
}
}
return flows;
}
/**
* Load a flow configuration by UUID
* @param {string} uuid - The UUID of the flow to load
* @returns {Object|null} Flow configuration or null if not found
*/
static loadFlow(uuid) {
try {
const flowJsonPath = normalizePath(
path.join(AgentFlows.flowsDir, `${uuid}.json`)
);
if (!uuid || !fs.existsSync(flowJsonPath)) return null;
const flow = safeJsonParse(fs.readFileSync(flowJsonPath, "utf8"), null);
if (!flow) return null;
return {
name: flow.name,
uuid,
config: flow,
};
} catch (error) {
console.error("Failed to load flow:", error);
return null;
}
}
/**
* Save a flow configuration
* @param {string} name - The name of the flow
* @param {Object} config - The flow configuration
* @param {string|null} uuid - Optional UUID for the flow
* @returns {Object} Result of the save operation
*/
static saveFlow(name, config, uuid = null) {
try {
AgentFlows.createOrCheckFlowsDir();
if (!uuid) uuid = uuidv4();
const normalizedUuid = normalizePath(`${uuid}.json`);
const filePath = path.join(AgentFlows.flowsDir, normalizedUuid);
fs.writeFileSync(filePath, JSON.stringify({ ...config, name }, null, 2));
return { success: true, uuid };
} catch (error) {
console.error("Failed to save flow:", error);
return { success: false, error: error.message };
}
}
/**
* List all available flows
* @returns {Array} Array of flow summaries
*/
static listFlows() {
try {
const flows = AgentFlows.getAllFlows();
return Object.entries(flows).map(([uuid, flow]) => ({
name: flow.name,
uuid,
description: flow.description,
active: flow.active !== false,
}));
} catch (error) {
console.error("Failed to list flows:", error);
return [];
}
}
/**
* Delete a flow by UUID
* @param {string} uuid - The UUID of the flow to delete
* @returns {Object} Result of the delete operation
*/
static deleteFlow(uuid) {
try {
const filePath = normalizePath(
path.join(AgentFlows.flowsDir, `${uuid}.json`)
);
if (!fs.existsSync(filePath)) throw new Error(`Flow ${uuid} not found`);
fs.rmSync(filePath);
return { success: true };
} catch (error) {
console.error("Failed to delete flow:", error);
return { success: false, error: error.message };
}
}
/**
* Execute a flow by UUID
* @param {string} uuid - The UUID of the flow to execute
* @param {Object} variables - Initial variables for the flow
* @param {Function} introspectFn - Function to introspect the flow
* @param {Function} loggerFn - Function to log the flow
* @returns {Promise<Object>} Result of flow execution
*/
static async executeFlow(
uuid,
variables = {},
introspectFn = null,
loggerFn = null
) {
const flow = AgentFlows.loadFlow(uuid);
if (!flow) throw new Error(`Flow ${uuid} not found`);
const flowExecutor = new FlowExecutor();
return await flowExecutor.executeFlow(
flow,
variables,
introspectFn,
loggerFn
);
}
/**
* Get all active flows as plugins that can be loaded into the agent
* @returns {string[]} Array of flow names in @@flow_{uuid} format
*/
static activeFlowPlugins() {
const flows = AgentFlows.getAllFlows();
return Object.entries(flows)
.filter(([_, flow]) => flow.active !== false)
.map(([uuid]) => `@@flow_${uuid}`);
}
/**
* Load a flow plugin by its UUID
* @param {string} uuid - The UUID of the flow to load
* @returns {Object|null} Plugin configuration or null if not found
*/
static loadFlowPlugin(uuid) {
const flow = AgentFlows.loadFlow(uuid);
if (!flow) return null;
const startBlock = flow.config.steps?.find((s) => s.type === "start");
const variables = startBlock?.config?.variables || [];
return {
name: `flow_${uuid}`,
description: `Execute agent flow: ${flow.name}`,
plugin: (_runtimeArgs = {}) => ({
name: `flow_${uuid}`,
description: flow.description || `Execute agent flow: ${flow.name}`,
setup: (aibitat) => {
aibitat.function({
name: `flow_${uuid}`,
description: flow.description || `Execute agent flow: ${flow.name}`,
parameters: {
type: "object",
properties: variables.reduce((acc, v) => {
if (v.name) {
acc[v.name] = {
type: "string",
description:
v.description || `Value for variable ${v.name}`,
};
}
return acc;
}, {}),
},
handler: async (args) => {
aibitat.introspect(`Executing flow: ${flow.name}`);
const result = await AgentFlows.executeFlow(
uuid,
args,
aibitat.introspect,
aibitat.handlerProps.log
);
if (!result.success) {
aibitat.introspect(
`Flow failed: ${result.results[0]?.error || "Unknown error"}`
);
return `Flow execution failed: ${result.results[0]?.error || "Unknown error"}`;
}
aibitat.introspect(`${flow.name} completed successfully`);
return typeof result === "object"
? JSON.stringify(result)
: String(result);
},
});
},
}),
flowName: flow.name,
};
}
}
module.exports.AgentFlows = AgentFlows;