461 lines
13 KiB
TypeScript
461 lines
13 KiB
TypeScript
import test from "node:test";
|
|
import assert from "node:assert/strict";
|
|
import { AgentManager } from "../src/agents/manager.js";
|
|
import {
|
|
ResourceProvisioningOrchestrator,
|
|
type DiscoverySnapshot,
|
|
type ResourceProvider,
|
|
} from "../src/agents/provisioning.js";
|
|
|
|
test("queues work when session concurrency is saturated", async () => {
|
|
const manager = new AgentManager({
|
|
maxConcurrentAgents: 1,
|
|
maxSessionAgents: 1,
|
|
maxRecursiveDepth: 2,
|
|
});
|
|
const session = manager.createSession("session-a");
|
|
|
|
let releaseFirst: (() => void) | undefined;
|
|
const firstStarted = new Promise<void>((resolve) => {
|
|
releaseFirst = resolve;
|
|
});
|
|
let secondStarted = false;
|
|
|
|
const firstRun = session.runAgent({
|
|
depth: 0,
|
|
run: async () => {
|
|
await firstStarted;
|
|
return "first";
|
|
},
|
|
});
|
|
|
|
const secondRun = session.runAgent({
|
|
depth: 0,
|
|
run: async () => {
|
|
secondStarted = true;
|
|
return "second";
|
|
},
|
|
});
|
|
|
|
await new Promise((resolve) => setTimeout(resolve, 20));
|
|
assert.equal(secondStarted, false);
|
|
|
|
assert.ok(releaseFirst);
|
|
releaseFirst();
|
|
|
|
const [firstResult, secondResult] = await Promise.all([firstRun, secondRun]);
|
|
assert.equal(firstResult, "first");
|
|
assert.equal(secondResult, "second");
|
|
assert.equal(manager.getActiveAgentCount(), 0);
|
|
});
|
|
|
|
test("rejects agent runs above recursive depth limit", async () => {
|
|
const manager = new AgentManager({
|
|
maxConcurrentAgents: 2,
|
|
maxSessionAgents: 2,
|
|
maxRecursiveDepth: 1,
|
|
});
|
|
const session = manager.createSession("session-b");
|
|
|
|
await assert.rejects(
|
|
() =>
|
|
session.runAgent({
|
|
depth: 2,
|
|
run: async () => "unreachable",
|
|
}),
|
|
/exceeds maxRecursiveDepth/,
|
|
);
|
|
});
|
|
|
|
test("closing a session rejects queued runs", async () => {
|
|
const manager = new AgentManager({
|
|
maxConcurrentAgents: 1,
|
|
maxSessionAgents: 1,
|
|
maxRecursiveDepth: 1,
|
|
});
|
|
const session = manager.createSession("session-c");
|
|
|
|
let releaseFirst: (() => void) | undefined;
|
|
const firstRun = session.runAgent({
|
|
depth: 0,
|
|
run: async () =>
|
|
new Promise<string>((resolve) => {
|
|
releaseFirst = () => resolve("first");
|
|
}),
|
|
});
|
|
|
|
const queuedRun = session.runAgent({
|
|
depth: 0,
|
|
run: async () => "second",
|
|
});
|
|
|
|
await new Promise((resolve) => setTimeout(resolve, 20));
|
|
session.close();
|
|
|
|
await assert.rejects(() => queuedRun, /was closed/);
|
|
assert.ok(releaseFirst);
|
|
releaseFirst();
|
|
await firstRun;
|
|
});
|
|
|
|
test("recursive fanout/fan-in avoids deadlock at maxConcurrentAgents=1", async () => {
|
|
const manager = new AgentManager({
|
|
maxConcurrentAgents: 1,
|
|
maxSessionAgents: 1,
|
|
maxRecursiveDepth: 3,
|
|
});
|
|
const session = manager.createSession("recursive-deadlock");
|
|
|
|
const executionOrder: string[] = [];
|
|
const result = await manager.runRecursiveAgent({
|
|
sessionId: session.id,
|
|
depth: 0,
|
|
run: async ({ sessionId, intent }) => {
|
|
executionOrder.push(`${sessionId}:${intent?.task ?? "root"}`);
|
|
if (!intent) {
|
|
return {
|
|
type: "fanout" as const,
|
|
intents: [
|
|
{
|
|
persona: "coder",
|
|
task: "build-child",
|
|
},
|
|
],
|
|
aggregate: ({ childResults }) => childResults[0]?.output ?? "missing",
|
|
};
|
|
}
|
|
|
|
return {
|
|
type: "complete" as const,
|
|
output: `done:${sessionId}`,
|
|
};
|
|
},
|
|
});
|
|
|
|
assert.equal(result, "done:recursive-deadlock_child_1");
|
|
assert.deepEqual(executionOrder, [
|
|
"recursive-deadlock:root",
|
|
"recursive-deadlock_child_1:build-child",
|
|
]);
|
|
assert.equal(manager.getActiveAgentCount(), 0);
|
|
session.close();
|
|
});
|
|
|
|
test("rejects recursive child spawn above depth limit", async () => {
|
|
const manager = new AgentManager({
|
|
maxConcurrentAgents: 2,
|
|
maxSessionAgents: 2,
|
|
maxRecursiveDepth: 2,
|
|
});
|
|
const session = manager.createSession("recursive-depth");
|
|
|
|
await assert.rejects(
|
|
() =>
|
|
manager.runRecursiveAgent({
|
|
sessionId: session.id,
|
|
depth: 0,
|
|
run: async ({ depth }) => {
|
|
if (depth < 3) {
|
|
return {
|
|
type: "fanout" as const,
|
|
intents: [
|
|
{
|
|
persona: "coder",
|
|
task: `spawn-${String(depth + 1)}`,
|
|
},
|
|
],
|
|
aggregate: ({ childResults }) => childResults[0]?.output ?? "missing",
|
|
};
|
|
}
|
|
|
|
return {
|
|
type: "complete" as const,
|
|
output: `leaf-${String(depth)}`,
|
|
};
|
|
},
|
|
}),
|
|
/Cannot spawn child at depth 3/,
|
|
);
|
|
session.close();
|
|
});
|
|
|
|
test("closing parent session aborts active recursive work and releases child resources", async () => {
|
|
const manager = new AgentManager({
|
|
maxConcurrentAgents: 2,
|
|
maxSessionAgents: 2,
|
|
maxRecursiveDepth: 3,
|
|
});
|
|
const session = manager.createSession("recursive-abort");
|
|
|
|
let notifyChildStarted: (() => void) | undefined;
|
|
const childStarted = new Promise<void>((resolve) => {
|
|
notifyChildStarted = resolve;
|
|
});
|
|
|
|
let abortCount = 0;
|
|
let releaseCount = 0;
|
|
|
|
const runPromise = manager.runRecursiveAgent({
|
|
sessionId: session.id,
|
|
depth: 0,
|
|
run: async ({ intent, signal }) => {
|
|
if (!intent) {
|
|
return {
|
|
type: "fanout" as const,
|
|
intents: [
|
|
{
|
|
persona: "coder",
|
|
task: "long-running",
|
|
},
|
|
],
|
|
aggregate: () => "unreachable",
|
|
};
|
|
}
|
|
|
|
notifyChildStarted?.();
|
|
|
|
await new Promise<void>((resolve, reject) => {
|
|
const timer = setTimeout(resolve, 5000);
|
|
const onAbort = () => {
|
|
clearTimeout(timer);
|
|
abortCount += 1;
|
|
reject(signal.reason ?? new Error("Aborted"));
|
|
};
|
|
signal.addEventListener("abort", onAbort, { once: true });
|
|
});
|
|
|
|
return {
|
|
type: "complete" as const,
|
|
output: "child-done",
|
|
};
|
|
},
|
|
childMiddleware: {
|
|
releaseForChild: async () => {
|
|
releaseCount += 1;
|
|
},
|
|
},
|
|
});
|
|
|
|
await childStarted;
|
|
session.close();
|
|
|
|
await assert.rejects(() => runPromise, /(AbortError|aborted|closed)/i);
|
|
assert.equal(abortCount, 1);
|
|
assert.equal(releaseCount, 1);
|
|
assert.equal(manager.getActiveAgentCount(), 0);
|
|
});
|
|
|
|
test("recursive children can be isolated via middleware-backed suballocation", async () => {
|
|
const manager = new AgentManager({
|
|
maxConcurrentAgents: 2,
|
|
maxSessionAgents: 2,
|
|
maxRecursiveDepth: 2,
|
|
});
|
|
const session = manager.createSession("recursive-isolation");
|
|
const provisioner = new ResourceProvisioningOrchestrator([
|
|
createTestGitWorktreeProvider(),
|
|
createTestPortRangeProvider(),
|
|
]);
|
|
|
|
const parentResources = await provisioner.provisionSession({
|
|
sessionId: session.id,
|
|
workspaceRoot: "/repo",
|
|
resources: [
|
|
{
|
|
kind: "git-worktree",
|
|
options: {
|
|
rootDirectory: "/repo/.ai_ops/worktrees",
|
|
baseRef: "HEAD",
|
|
},
|
|
},
|
|
{
|
|
kind: "port-range",
|
|
options: {
|
|
basePort: 41000,
|
|
blockSize: 20,
|
|
blockCount: 1,
|
|
primaryPortOffset: 0,
|
|
lockDirectory: "/repo/.ai_ops/locks/ports",
|
|
},
|
|
},
|
|
],
|
|
});
|
|
|
|
const parentSnapshot = parentResources.toDiscoverySnapshot();
|
|
const childSnapshots = new Map<string, DiscoverySnapshot>();
|
|
const childLeases = new Map<string, Awaited<ReturnType<typeof provisioner.provisionChildSession>>>();
|
|
|
|
try {
|
|
await manager.runRecursiveAgent({
|
|
sessionId: session.id,
|
|
depth: 0,
|
|
run: async ({ intent, sessionId }) => {
|
|
if (!intent) {
|
|
return {
|
|
type: "fanout" as const,
|
|
intents: [
|
|
{ persona: "coder", task: "child-a" },
|
|
{ persona: "coder", task: "child-b" },
|
|
],
|
|
aggregate: ({ childResults }) => childResults.map((entry) => entry.output).join(","),
|
|
};
|
|
}
|
|
|
|
return {
|
|
type: "complete" as const,
|
|
output: sessionId,
|
|
};
|
|
},
|
|
childMiddleware: {
|
|
allocateForChild: async ({ childSessionId, childIndex, childCount }) => {
|
|
const lease = await provisioner.provisionChildSession({
|
|
parentSnapshot,
|
|
childSessionId,
|
|
childIndex,
|
|
childCount,
|
|
});
|
|
childLeases.set(childSessionId, lease);
|
|
childSnapshots.set(childSessionId, lease.toDiscoverySnapshot());
|
|
},
|
|
releaseForChild: async ({ childSessionId }) => {
|
|
const lease = childLeases.get(childSessionId);
|
|
if (!lease) {
|
|
return;
|
|
}
|
|
|
|
await lease.release();
|
|
childLeases.delete(childSessionId);
|
|
},
|
|
},
|
|
});
|
|
|
|
const childOneSnapshot = childSnapshots.get("recursive-isolation_child_1");
|
|
const childTwoSnapshot = childSnapshots.get("recursive-isolation_child_2");
|
|
assert.ok(childOneSnapshot);
|
|
assert.ok(childTwoSnapshot);
|
|
|
|
const childOnePort = readPortRangeConstraint(childOneSnapshot);
|
|
const childTwoPort = readPortRangeConstraint(childTwoSnapshot);
|
|
assert.ok(childOnePort.endPort < childTwoPort.startPort);
|
|
|
|
const childOneWorktree = readGitConstraint(childOneSnapshot).worktreePath;
|
|
const childTwoWorktree = readGitConstraint(childTwoSnapshot).worktreePath;
|
|
assert.notEqual(childOneWorktree, childTwoWorktree);
|
|
} finally {
|
|
for (const lease of childLeases.values()) {
|
|
await lease.release();
|
|
}
|
|
await parentResources.release();
|
|
session.close();
|
|
}
|
|
});
|
|
|
|
function createTestGitWorktreeProvider(): ResourceProvider {
|
|
return {
|
|
kind: "git-worktree",
|
|
provision: async ({ sessionId, workspaceRoot, options }) => {
|
|
const rootDirectory =
|
|
typeof options.rootDirectory === "string" ? options.rootDirectory : `${workspaceRoot}/.ai_ops/worktrees`;
|
|
const baseRef = typeof options.baseRef === "string" ? options.baseRef : "HEAD";
|
|
const worktreePath = `${rootDirectory}/${sessionId}`;
|
|
|
|
return {
|
|
kind: "git-worktree",
|
|
hard: {
|
|
repoRoot: workspaceRoot,
|
|
worktreeRoot: rootDirectory,
|
|
worktreePath,
|
|
baseRef,
|
|
},
|
|
soft: {
|
|
preferredWorkingDirectory: worktreePath,
|
|
},
|
|
release: async () => {},
|
|
};
|
|
},
|
|
};
|
|
}
|
|
|
|
function createTestPortRangeProvider(): ResourceProvider {
|
|
return {
|
|
kind: "port-range",
|
|
provision: async ({ sessionId, options }) => {
|
|
const basePort = readNumberOption(options, "basePort", 36000);
|
|
const blockSize = readNumberOption(options, "blockSize", 32);
|
|
const blockCount = readNumberOption(options, "blockCount", 1);
|
|
const primaryOffset = readNumberOption(options, "primaryPortOffset", 0);
|
|
const blockIndex = 0;
|
|
const startPort = basePort + blockIndex * blockSize;
|
|
const endPort = startPort + blockSize - 1;
|
|
const primaryPort = startPort + primaryOffset;
|
|
const lockDirectory = readStringOption(options, "lockDirectory", "/tmp");
|
|
|
|
return {
|
|
kind: "port-range",
|
|
hard: {
|
|
basePort,
|
|
blockSize,
|
|
blockCount,
|
|
blockIndex,
|
|
startPort,
|
|
endPort,
|
|
primaryPort,
|
|
lockPath: `${lockDirectory}/${startPort}-${endPort}-${sessionId}.lock`,
|
|
},
|
|
release: async () => {},
|
|
};
|
|
},
|
|
};
|
|
}
|
|
|
|
function readNumberOption(options: Record<string, unknown>, key: string, fallback: number): number {
|
|
const value = options[key];
|
|
if (typeof value === "number" && Number.isInteger(value)) {
|
|
return value;
|
|
}
|
|
return fallback;
|
|
}
|
|
|
|
function readStringOption(options: Record<string, unknown>, key: string, fallback: string): string {
|
|
const value = options[key];
|
|
return typeof value === "string" && value.trim().length > 0 ? value : fallback;
|
|
}
|
|
|
|
function readHardConstraint(snapshot: DiscoverySnapshot, kind: string): Record<string, unknown> {
|
|
const constraint = snapshot.hardConstraints.find((entry) => entry.kind === kind);
|
|
assert.ok(constraint);
|
|
return constraint.allocation as Record<string, unknown>;
|
|
}
|
|
|
|
function readGitConstraint(snapshot: DiscoverySnapshot): {
|
|
worktreePath: string;
|
|
} {
|
|
const allocation = readHardConstraint(snapshot, "git-worktree");
|
|
const worktreePath = allocation.worktreePath;
|
|
assert.equal(typeof worktreePath, "string");
|
|
if (typeof worktreePath !== "string") {
|
|
throw new Error("Expected git-worktree allocation to include worktreePath.");
|
|
}
|
|
return {
|
|
worktreePath,
|
|
};
|
|
}
|
|
|
|
function readPortRangeConstraint(snapshot: DiscoverySnapshot): {
|
|
startPort: number;
|
|
endPort: number;
|
|
} {
|
|
const allocation = readHardConstraint(snapshot, "port-range");
|
|
const startPort = allocation.startPort;
|
|
const endPort = allocation.endPort;
|
|
assert.equal(typeof startPort, "number");
|
|
assert.equal(typeof endPort, "number");
|
|
if (typeof startPort !== "number" || typeof endPort !== "number") {
|
|
throw new Error("Expected port-range allocation to include numeric startPort/endPort.");
|
|
}
|
|
return {
|
|
startPort,
|
|
endPort,
|
|
};
|
|
}
|