import { DomainEventBus, type DomainEvent, type DomainEventType, } from "./domain-events.js"; import type { NodeTopologyKind, PipelineNode } from "./manifest.js"; import { type ProjectContextPatch, type FileSystemProjectContextStore } from "./project-context.js"; import { PersonaRegistry } from "./persona-registry.js"; import { FileSystemStateContextManager, } from "./state-context.js"; import type { ActorExecutionResult, ActorResultStatus } from "./pipeline.js"; import { isRecord, type JsonObject } from "./types.js"; import { type RuntimeEventPublisher, type RuntimeEventSeverity, type RuntimeEventUsage, } from "../telemetry/index.js"; export type PipelineNodeAttemptObservedEvent = { sessionId: string; node: PipelineNode; attempt: number; result: ActorExecutionResult; domainEvents: DomainEvent[]; executionContext: JsonObject; fromNodeId?: string; retrySpawned: boolean; topologyKind: NodeTopologyKind; }; function toBehaviorEvent(status: ActorResultStatus): "onTaskComplete" | "onValidationFail" | undefined { if (status === "success") { return "onTaskComplete"; } if (status === "validation_fail") { return "onValidationFail"; } return undefined; } function toNodeAttemptSeverity(status: ActorResultStatus): RuntimeEventSeverity { if (status === "failure") { return "critical"; } if (status === "validation_fail") { return "warning"; } return "info"; } function toDomainEventSeverity(type: DomainEventType): RuntimeEventSeverity { if (type === "task_blocked") { return "critical"; } if (type === "validation_failed") { return "warning"; } return "info"; } function toNumber(value: unknown): number | undefined { if (typeof value === "number" && Number.isFinite(value)) { return value; } if (typeof value === "string") { const parsed = Number(value); if (Number.isFinite(parsed)) { return parsed; } } return undefined; } function readFirstNumber(record: JsonObject, keys: string[]): number | undefined { for (const key of keys) { const parsed = toNumber(record[key]); if (typeof parsed === "number") { return parsed; } } return undefined; } function extractUsageMetrics(result: ActorExecutionResult): RuntimeEventUsage | undefined { const candidates = [ result.stateMetadata?.usage, result.stateMetadata?.tokenUsage, result.payload?.usage, result.payload?.tokenUsage, ]; for (const candidate of candidates) { if (!isRecord(candidate)) { continue; } const usageRecord = candidate as JsonObject; const tokenInput = readFirstNumber(usageRecord, [ "tokenInput", "token_input", "inputTokens", "input_tokens", "promptTokens", "prompt_tokens", ]); const tokenOutput = readFirstNumber(usageRecord, [ "tokenOutput", "token_output", "outputTokens", "output_tokens", "completionTokens", "completion_tokens", ]); const tokenTotal = readFirstNumber(usageRecord, [ "tokenTotal", "token_total", "totalTokens", "total_tokens", ]); const toolCalls = readFirstNumber(usageRecord, [ "toolCalls", "tool_calls", "toolCallCount", "tool_call_count", ]); const durationMs = readFirstNumber(usageRecord, [ "durationMs", "duration_ms", "latencyMs", "latency_ms", ]); const costUsd = readFirstNumber(usageRecord, [ "costUsd", "cost_usd", "usd", ]); const usage: RuntimeEventUsage = { ...(typeof tokenInput === "number" ? { tokenInput } : {}), ...(typeof tokenOutput === "number" ? { tokenOutput } : {}), ...(typeof tokenTotal === "number" ? { tokenTotal } : {}), ...(typeof toolCalls === "number" ? { toolCalls } : {}), ...(typeof durationMs === "number" ? { durationMs } : {}), ...(typeof costUsd === "number" ? { costUsd } : {}), }; if (Object.keys(usage).length > 0) { return usage; } } return undefined; } function extractSubtasks(result: ActorExecutionResult): string[] { const candidates = [ result.payload?.subtasks, result.stateMetadata?.subtasks, ]; for (const candidate of candidates) { if (!Array.isArray(candidate)) { continue; } const subtasks: string[] = []; for (const item of candidate) { if (typeof item !== "string") { continue; } const normalized = item.trim(); if (!normalized) { continue; } subtasks.push(normalized); } if (subtasks.length > 0) { return subtasks; } } return []; } function hasSecurityViolation(result: ActorExecutionResult): boolean { return isRecord(result.payload) && result.payload.security_violation === true; } export interface PipelineLifecycleObserver { onNodeAttempt(event: PipelineNodeAttemptObservedEvent): Promise; } export class PersistenceLifecycleObserver implements PipelineLifecycleObserver { constructor( private readonly input: { personaRegistry: PersonaRegistry; stateManager: FileSystemStateContextManager; projectContextStore: FileSystemProjectContextStore; domainEventBus?: DomainEventBus; runtimeEventPublisher?: RuntimeEventPublisher; }, ) {} async onNodeAttempt(event: PipelineNodeAttemptObservedEvent): Promise { const behaviorEvent = toBehaviorEvent(event.result.status); const behaviorPatch = behaviorEvent ? await this.input.personaRegistry.emitBehaviorEvent({ personaId: event.node.personaId, event: behaviorEvent, sessionId: event.sessionId, nodeId: event.node.id, payload: event.result.payload ?? {}, }) : {}; const domainHistoryEvents = event.domainEvents.map((domainEvent) => ({ nodeId: event.node.id, event: domainEvent.type, timestamp: domainEvent.timestamp, data: { source: domainEvent.source, attempt: domainEvent.attempt, ...(domainEvent.payload.summary ? { summary: domainEvent.payload.summary } : {}), ...(domainEvent.payload.errorCode ? { errorCode: domainEvent.payload.errorCode } : {}), ...(domainEvent.payload.artifactPointer ? { artifactPointer: domainEvent.payload.artifactPointer } : {}), ...(domainEvent.payload.details ? { details: domainEvent.payload.details } : {}), }, })); await this.input.stateManager.patchState(event.sessionId, { ...(event.result.stateFlags ? { flags: event.result.stateFlags } : {}), metadata: { ...(event.result.stateMetadata ?? {}), ...behaviorPatch, }, historyEvents: domainHistoryEvents, }); await this.input.runtimeEventPublisher?.publish({ type: "node.attempt.completed", severity: toNodeAttemptSeverity(event.result.status), sessionId: event.sessionId, nodeId: event.node.id, attempt: event.attempt, message: `Node "${event.node.id}" attempt ${String(event.attempt)} completed with status "${event.result.status}".`, usage: extractUsageMetrics(event.result), metadata: { status: event.result.status, ...(event.result.failureKind ? { failureKind: event.result.failureKind } : {}), ...(event.result.failureCode ? { failureCode: event.result.failureCode } : {}), executionContext: event.executionContext, topologyKind: event.topologyKind, retrySpawned: event.retrySpawned, ...(event.fromNodeId ? { fromNodeId: event.fromNodeId } : {}), subtasks: extractSubtasks(event.result), securityViolation: hasSecurityViolation(event.result), }, }); for (const domainEvent of event.domainEvents) { await this.input.runtimeEventPublisher?.publish({ type: `domain.${domainEvent.type}`, severity: toDomainEventSeverity(domainEvent.type), sessionId: event.sessionId, nodeId: event.node.id, attempt: event.attempt, message: domainEvent.payload.summary ?? `Domain event "${domainEvent.type}" emitted for node "${event.node.id}".`, metadata: { source: domainEvent.source, ...(domainEvent.payload.errorCode ? { errorCode: domainEvent.payload.errorCode } : {}), ...(domainEvent.payload.artifactPointer ? { artifactPointer: domainEvent.payload.artifactPointer } : {}), }, }); } const domainEventBus = this.input.domainEventBus; if (domainEventBus) { for (const domainEvent of event.domainEvents) { await domainEventBus.publish(domainEvent); } } const patch: ProjectContextPatch = { ...(event.result.projectContextPatch ?? {}), artifactPointers: { [`sessions/${event.sessionId}/last_completed_node`]: event.node.id, [`sessions/${event.sessionId}/last_attempt`]: String(event.attempt), ...(event.result.projectContextPatch?.artifactPointers ?? {}), }, }; await this.input.projectContextStore.patchState(patch); } } export class DomainEventCollector { private readonly events: DomainEvent[] = []; record(events: DomainEvent[]): void { this.events.push(...events); } toEventTypes(events: DomainEvent[]): DomainEventType[] { return events.map((event) => event.type); } getAll(): DomainEvent[] { return [...this.events]; } }