Files
ai_ops/src/agents/lifecycle-observer.ts

325 lines
9.4 KiB
TypeScript

import {
DomainEventBus,
type DomainEvent,
type DomainEventType,
} from "./domain-events.js";
import type { NodeTopologyKind, PipelineNode } from "./manifest.js";
import { type ProjectContextPatch, type FileSystemProjectContextStore } from "./project-context.js";
import { PersonaRegistry } from "./persona-registry.js";
import {
FileSystemStateContextManager,
} from "./state-context.js";
import type { ActorExecutionResult, ActorResultStatus } from "./pipeline.js";
import { isRecord, type JsonObject } from "./types.js";
import {
type RuntimeEventPublisher,
type RuntimeEventSeverity,
type RuntimeEventUsage,
} from "../telemetry/index.js";
export type PipelineNodeAttemptObservedEvent = {
sessionId: string;
node: PipelineNode;
attempt: number;
result: ActorExecutionResult;
domainEvents: DomainEvent[];
executionContext: JsonObject;
fromNodeId?: string;
retrySpawned: boolean;
topologyKind: NodeTopologyKind;
};
function toBehaviorEvent(status: ActorResultStatus): "onTaskComplete" | "onValidationFail" | undefined {
if (status === "success") {
return "onTaskComplete";
}
if (status === "validation_fail") {
return "onValidationFail";
}
return undefined;
}
function toNodeAttemptSeverity(status: ActorResultStatus): RuntimeEventSeverity {
if (status === "failure") {
return "critical";
}
if (status === "validation_fail") {
return "warning";
}
return "info";
}
function toDomainEventSeverity(type: DomainEventType): RuntimeEventSeverity {
if (type === "task_blocked" || type === "merge_conflict_unresolved") {
return "critical";
}
if (
type === "validation_failed" ||
type === "merge_conflict_detected" ||
type === "merge_retry_started"
) {
return "warning";
}
return "info";
}
function toNumber(value: unknown): number | undefined {
if (typeof value === "number" && Number.isFinite(value)) {
return value;
}
if (typeof value === "string") {
const parsed = Number(value);
if (Number.isFinite(parsed)) {
return parsed;
}
}
return undefined;
}
function readFirstNumber(record: JsonObject, keys: string[]): number | undefined {
for (const key of keys) {
const parsed = toNumber(record[key]);
if (typeof parsed === "number") {
return parsed;
}
}
return undefined;
}
function extractUsageMetrics(result: ActorExecutionResult): RuntimeEventUsage | undefined {
const candidates = [
result.stateMetadata?.usage,
result.stateMetadata?.tokenUsage,
result.payload?.usage,
result.payload?.tokenUsage,
];
for (const candidate of candidates) {
if (!isRecord(candidate)) {
continue;
}
const usageRecord = candidate as JsonObject;
const tokenInput = readFirstNumber(usageRecord, [
"tokenInput",
"token_input",
"inputTokens",
"input_tokens",
"promptTokens",
"prompt_tokens",
]);
const tokenOutput = readFirstNumber(usageRecord, [
"tokenOutput",
"token_output",
"outputTokens",
"output_tokens",
"completionTokens",
"completion_tokens",
]);
const tokenTotal = readFirstNumber(usageRecord, [
"tokenTotal",
"token_total",
"totalTokens",
"total_tokens",
]);
const toolCalls = readFirstNumber(usageRecord, [
"toolCalls",
"tool_calls",
"toolCallCount",
"tool_call_count",
]);
const durationMs = readFirstNumber(usageRecord, [
"durationMs",
"duration_ms",
"latencyMs",
"latency_ms",
]);
const costUsd = readFirstNumber(usageRecord, [
"costUsd",
"cost_usd",
"usd",
]);
const usage: RuntimeEventUsage = {
...(typeof tokenInput === "number" ? { tokenInput } : {}),
...(typeof tokenOutput === "number" ? { tokenOutput } : {}),
...(typeof tokenTotal === "number" ? { tokenTotal } : {}),
...(typeof toolCalls === "number" ? { toolCalls } : {}),
...(typeof durationMs === "number" ? { durationMs } : {}),
...(typeof costUsd === "number" ? { costUsd } : {}),
};
if (Object.keys(usage).length > 0) {
return usage;
}
}
return undefined;
}
function extractSubtasks(result: ActorExecutionResult): string[] {
const candidates = [
result.payload?.subtasks,
result.stateMetadata?.subtasks,
];
for (const candidate of candidates) {
if (!Array.isArray(candidate)) {
continue;
}
const subtasks: string[] = [];
for (const item of candidate) {
if (typeof item !== "string") {
continue;
}
const normalized = item.trim();
if (!normalized) {
continue;
}
subtasks.push(normalized);
}
if (subtasks.length > 0) {
return subtasks;
}
}
return [];
}
function hasSecurityViolation(result: ActorExecutionResult): boolean {
return isRecord(result.payload) && result.payload.security_violation === true;
}
export interface PipelineLifecycleObserver {
onNodeAttempt(event: PipelineNodeAttemptObservedEvent): Promise<void>;
}
export class PersistenceLifecycleObserver implements PipelineLifecycleObserver {
constructor(
private readonly input: {
personaRegistry: PersonaRegistry;
stateManager: FileSystemStateContextManager;
projectContextStore: FileSystemProjectContextStore;
domainEventBus?: DomainEventBus;
runtimeEventPublisher?: RuntimeEventPublisher;
},
) {}
async onNodeAttempt(event: PipelineNodeAttemptObservedEvent): Promise<void> {
const behaviorEvent = toBehaviorEvent(event.result.status);
const behaviorPatch = behaviorEvent
? await this.input.personaRegistry.emitBehaviorEvent({
personaId: event.node.personaId,
event: behaviorEvent,
sessionId: event.sessionId,
nodeId: event.node.id,
payload: event.result.payload ?? {},
})
: {};
const domainHistoryEvents = event.domainEvents.map((domainEvent) => ({
nodeId: event.node.id,
event: domainEvent.type,
timestamp: domainEvent.timestamp,
data: {
source: domainEvent.source,
attempt: domainEvent.attempt,
...(domainEvent.payload.summary ? { summary: domainEvent.payload.summary } : {}),
...(domainEvent.payload.errorCode ? { errorCode: domainEvent.payload.errorCode } : {}),
...(domainEvent.payload.artifactPointer
? { artifactPointer: domainEvent.payload.artifactPointer }
: {}),
...(domainEvent.payload.details ? { details: domainEvent.payload.details } : {}),
},
}));
await this.input.stateManager.patchState(event.sessionId, {
...(event.result.stateFlags ? { flags: event.result.stateFlags } : {}),
metadata: {
...(event.result.stateMetadata ?? {}),
...behaviorPatch,
},
historyEvents: domainHistoryEvents,
});
await this.input.runtimeEventPublisher?.publish({
type: "node.attempt.completed",
severity: toNodeAttemptSeverity(event.result.status),
sessionId: event.sessionId,
nodeId: event.node.id,
attempt: event.attempt,
message: `Node "${event.node.id}" attempt ${String(event.attempt)} completed with status "${event.result.status}".`,
usage: extractUsageMetrics(event.result),
metadata: {
status: event.result.status,
...(event.result.failureKind ? { failureKind: event.result.failureKind } : {}),
...(event.result.failureCode ? { failureCode: event.result.failureCode } : {}),
executionContext: event.executionContext,
topologyKind: event.topologyKind,
retrySpawned: event.retrySpawned,
...(event.fromNodeId ? { fromNodeId: event.fromNodeId } : {}),
subtasks: extractSubtasks(event.result),
securityViolation: hasSecurityViolation(event.result),
},
});
for (const domainEvent of event.domainEvents) {
await this.input.runtimeEventPublisher?.publish({
type: `domain.${domainEvent.type}`,
severity: toDomainEventSeverity(domainEvent.type),
sessionId: event.sessionId,
nodeId: event.node.id,
attempt: event.attempt,
message:
domainEvent.payload.summary ??
`Domain event "${domainEvent.type}" emitted for node "${event.node.id}".`,
metadata: {
source: domainEvent.source,
...(domainEvent.payload.errorCode
? { errorCode: domainEvent.payload.errorCode }
: {}),
...(domainEvent.payload.artifactPointer
? { artifactPointer: domainEvent.payload.artifactPointer }
: {}),
},
});
}
const domainEventBus = this.input.domainEventBus;
if (domainEventBus) {
for (const domainEvent of event.domainEvents) {
await domainEventBus.publish(domainEvent);
}
}
const patch: ProjectContextPatch = {
...(event.result.projectContextPatch ?? {}),
artifactPointers: {
[`sessions/${event.sessionId}/last_completed_node`]: event.node.id,
[`sessions/${event.sessionId}/last_attempt`]: String(event.attempt),
...(event.result.projectContextPatch?.artifactPointers ?? {}),
},
};
await this.input.projectContextStore.patchState(patch);
}
}
export class DomainEventCollector {
private readonly events: DomainEvent[] = [];
record(events: DomainEvent[]): void {
this.events.push(...events);
}
toEventTypes(events: DomainEvent[]): DomainEventType[] {
return events.map((event) => event.type);
}
getAll(): DomainEvent[] {
return [...this.events];
}
}