You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
146 lines
4.3 KiB
146 lines
4.3 KiB
const { FLOW_TYPES } = require("./flowTypes");
|
|
const executeApiCall = require("./executors/api-call");
|
|
const executeWebsite = require("./executors/website");
|
|
const executeFile = require("./executors/file");
|
|
const executeCode = require("./executors/code");
|
|
const executeLLMInstruction = require("./executors/llm-instruction");
|
|
const executeWebScraping = require("./executors/web-scraping");
|
|
const { Telemetry } = require("../../models/telemetry");
|
|
|
|
class FlowExecutor {
|
|
constructor() {
|
|
this.variables = {};
|
|
this.introspect = () => {}; // Default no-op introspect
|
|
this.logger = console.info; // Default console.info
|
|
}
|
|
|
|
attachLogging(introspectFn, loggerFn) {
|
|
this.introspect = introspectFn || (() => {});
|
|
this.logger = loggerFn || console.info;
|
|
}
|
|
|
|
// Utility to replace variables in config
|
|
replaceVariables(config) {
|
|
const deepReplace = (obj) => {
|
|
if (typeof obj === "string") {
|
|
return obj.replace(/\${([^}]+)}/g, (match, varName) => {
|
|
return this.variables[varName] !== undefined
|
|
? this.variables[varName]
|
|
: match;
|
|
});
|
|
}
|
|
if (Array.isArray(obj)) {
|
|
return obj.map((item) => deepReplace(item));
|
|
}
|
|
if (obj && typeof obj === "object") {
|
|
const result = {};
|
|
for (const [key, value] of Object.entries(obj)) {
|
|
result[key] = deepReplace(value);
|
|
}
|
|
return result;
|
|
}
|
|
return obj;
|
|
};
|
|
|
|
return deepReplace(config);
|
|
}
|
|
|
|
// Main execution method
|
|
async executeStep(step) {
|
|
const config = this.replaceVariables(step.config);
|
|
let result;
|
|
// Create execution context with introspect
|
|
const context = {
|
|
introspect: this.introspect,
|
|
variables: this.variables,
|
|
logger: this.logger,
|
|
model: process.env.LLM_PROVIDER_MODEL || "gpt-4",
|
|
provider: process.env.LLM_PROVIDER || "openai",
|
|
};
|
|
|
|
switch (step.type) {
|
|
case FLOW_TYPES.START.type:
|
|
// For start blocks, we just initialize variables if they're not already set
|
|
if (config.variables) {
|
|
config.variables.forEach((v) => {
|
|
if (v.name && !this.variables[v.name]) {
|
|
this.variables[v.name] = v.value || "";
|
|
}
|
|
});
|
|
}
|
|
result = this.variables;
|
|
break;
|
|
case FLOW_TYPES.API_CALL.type:
|
|
result = await executeApiCall(config, context);
|
|
break;
|
|
case FLOW_TYPES.WEBSITE.type:
|
|
result = await executeWebsite(config, context);
|
|
break;
|
|
case FLOW_TYPES.FILE.type:
|
|
result = await executeFile(config, context);
|
|
break;
|
|
case FLOW_TYPES.CODE.type:
|
|
result = await executeCode(config, context);
|
|
break;
|
|
case FLOW_TYPES.LLM_INSTRUCTION.type:
|
|
result = await executeLLMInstruction(config, context);
|
|
break;
|
|
case FLOW_TYPES.WEB_SCRAPING.type:
|
|
result = await executeWebScraping(config, context);
|
|
break;
|
|
default:
|
|
throw new Error(`Unknown flow type: ${step.type}`);
|
|
}
|
|
|
|
// Store result in variable if specified
|
|
if (config.resultVariable || config.responseVariable) {
|
|
const varName = config.resultVariable || config.responseVariable;
|
|
this.variables[varName] = result;
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
// Execute entire flow
|
|
async executeFlow(
|
|
flow,
|
|
initialVariables = {},
|
|
introspectFn = null,
|
|
loggerFn = null
|
|
) {
|
|
await Telemetry.sendTelemetry("agent_flow_execution_started");
|
|
|
|
// Initialize variables with both initial values and any passed-in values
|
|
this.variables = {
|
|
...(
|
|
flow.config.steps.find((s) => s.type === "start")?.config?.variables ||
|
|
[]
|
|
).reduce((acc, v) => ({ ...acc, [v.name]: v.value }), {}),
|
|
...initialVariables, // This will override any default values with passed-in values
|
|
};
|
|
|
|
this.attachLogging(introspectFn, loggerFn);
|
|
const results = [];
|
|
|
|
for (const step of flow.config.steps) {
|
|
try {
|
|
const result = await this.executeStep(step);
|
|
results.push({ success: true, result });
|
|
} catch (error) {
|
|
results.push({ success: false, error: error.message });
|
|
break;
|
|
}
|
|
}
|
|
|
|
return {
|
|
success: results.every((r) => r.success),
|
|
results,
|
|
variables: this.variables,
|
|
};
|
|
}
|
|
}
|
|
|
|
module.exports = {
|
|
FlowExecutor,
|
|
FLOW_TYPES,
|
|
};
|