325 lines
9.4 KiB
TypeScript
325 lines
9.4 KiB
TypeScript
import {
|
|
DomainEventBus,
|
|
type DomainEvent,
|
|
type DomainEventType,
|
|
} from "./domain-events.js";
|
|
import type { NodeTopologyKind, PipelineNode } from "./manifest.js";
|
|
import { type ProjectContextPatch, type FileSystemProjectContextStore } from "./project-context.js";
|
|
import { PersonaRegistry } from "./persona-registry.js";
|
|
import {
|
|
FileSystemStateContextManager,
|
|
} from "./state-context.js";
|
|
import type { ActorExecutionResult, ActorResultStatus } from "./pipeline.js";
|
|
import { isRecord, type JsonObject } from "./types.js";
|
|
import {
|
|
type RuntimeEventPublisher,
|
|
type RuntimeEventSeverity,
|
|
type RuntimeEventUsage,
|
|
} from "../telemetry/index.js";
|
|
|
|
export type PipelineNodeAttemptObservedEvent = {
|
|
sessionId: string;
|
|
node: PipelineNode;
|
|
attempt: number;
|
|
result: ActorExecutionResult;
|
|
domainEvents: DomainEvent[];
|
|
executionContext: JsonObject;
|
|
fromNodeId?: string;
|
|
retrySpawned: boolean;
|
|
topologyKind: NodeTopologyKind;
|
|
};
|
|
|
|
function toBehaviorEvent(status: ActorResultStatus): "onTaskComplete" | "onValidationFail" | undefined {
|
|
if (status === "success") {
|
|
return "onTaskComplete";
|
|
}
|
|
if (status === "validation_fail") {
|
|
return "onValidationFail";
|
|
}
|
|
return undefined;
|
|
}
|
|
|
|
function toNodeAttemptSeverity(status: ActorResultStatus): RuntimeEventSeverity {
|
|
if (status === "failure") {
|
|
return "critical";
|
|
}
|
|
if (status === "validation_fail") {
|
|
return "warning";
|
|
}
|
|
return "info";
|
|
}
|
|
|
|
function toDomainEventSeverity(type: DomainEventType): RuntimeEventSeverity {
|
|
if (type === "task_blocked" || type === "merge_conflict_unresolved") {
|
|
return "critical";
|
|
}
|
|
if (
|
|
type === "validation_failed" ||
|
|
type === "merge_conflict_detected" ||
|
|
type === "merge_retry_started"
|
|
) {
|
|
return "warning";
|
|
}
|
|
return "info";
|
|
}
|
|
|
|
function toNumber(value: unknown): number | undefined {
|
|
if (typeof value === "number" && Number.isFinite(value)) {
|
|
return value;
|
|
}
|
|
if (typeof value === "string") {
|
|
const parsed = Number(value);
|
|
if (Number.isFinite(parsed)) {
|
|
return parsed;
|
|
}
|
|
}
|
|
return undefined;
|
|
}
|
|
|
|
function readFirstNumber(record: JsonObject, keys: string[]): number | undefined {
|
|
for (const key of keys) {
|
|
const parsed = toNumber(record[key]);
|
|
if (typeof parsed === "number") {
|
|
return parsed;
|
|
}
|
|
}
|
|
return undefined;
|
|
}
|
|
|
|
function extractUsageMetrics(result: ActorExecutionResult): RuntimeEventUsage | undefined {
|
|
const candidates = [
|
|
result.stateMetadata?.usage,
|
|
result.stateMetadata?.tokenUsage,
|
|
result.payload?.usage,
|
|
result.payload?.tokenUsage,
|
|
];
|
|
|
|
for (const candidate of candidates) {
|
|
if (!isRecord(candidate)) {
|
|
continue;
|
|
}
|
|
|
|
const usageRecord = candidate as JsonObject;
|
|
const tokenInput = readFirstNumber(usageRecord, [
|
|
"tokenInput",
|
|
"token_input",
|
|
"inputTokens",
|
|
"input_tokens",
|
|
"promptTokens",
|
|
"prompt_tokens",
|
|
]);
|
|
const tokenOutput = readFirstNumber(usageRecord, [
|
|
"tokenOutput",
|
|
"token_output",
|
|
"outputTokens",
|
|
"output_tokens",
|
|
"completionTokens",
|
|
"completion_tokens",
|
|
]);
|
|
const tokenTotal = readFirstNumber(usageRecord, [
|
|
"tokenTotal",
|
|
"token_total",
|
|
"totalTokens",
|
|
"total_tokens",
|
|
]);
|
|
const toolCalls = readFirstNumber(usageRecord, [
|
|
"toolCalls",
|
|
"tool_calls",
|
|
"toolCallCount",
|
|
"tool_call_count",
|
|
]);
|
|
const durationMs = readFirstNumber(usageRecord, [
|
|
"durationMs",
|
|
"duration_ms",
|
|
"latencyMs",
|
|
"latency_ms",
|
|
]);
|
|
const costUsd = readFirstNumber(usageRecord, [
|
|
"costUsd",
|
|
"cost_usd",
|
|
"usd",
|
|
]);
|
|
|
|
const usage: RuntimeEventUsage = {
|
|
...(typeof tokenInput === "number" ? { tokenInput } : {}),
|
|
...(typeof tokenOutput === "number" ? { tokenOutput } : {}),
|
|
...(typeof tokenTotal === "number" ? { tokenTotal } : {}),
|
|
...(typeof toolCalls === "number" ? { toolCalls } : {}),
|
|
...(typeof durationMs === "number" ? { durationMs } : {}),
|
|
...(typeof costUsd === "number" ? { costUsd } : {}),
|
|
};
|
|
|
|
if (Object.keys(usage).length > 0) {
|
|
return usage;
|
|
}
|
|
}
|
|
|
|
return undefined;
|
|
}
|
|
|
|
function extractSubtasks(result: ActorExecutionResult): string[] {
|
|
const candidates = [
|
|
result.payload?.subtasks,
|
|
result.stateMetadata?.subtasks,
|
|
];
|
|
|
|
for (const candidate of candidates) {
|
|
if (!Array.isArray(candidate)) {
|
|
continue;
|
|
}
|
|
|
|
const subtasks: string[] = [];
|
|
for (const item of candidate) {
|
|
if (typeof item !== "string") {
|
|
continue;
|
|
}
|
|
const normalized = item.trim();
|
|
if (!normalized) {
|
|
continue;
|
|
}
|
|
subtasks.push(normalized);
|
|
}
|
|
|
|
if (subtasks.length > 0) {
|
|
return subtasks;
|
|
}
|
|
}
|
|
|
|
return [];
|
|
}
|
|
|
|
function hasSecurityViolation(result: ActorExecutionResult): boolean {
|
|
return isRecord(result.payload) && result.payload.security_violation === true;
|
|
}
|
|
|
|
export interface PipelineLifecycleObserver {
|
|
onNodeAttempt(event: PipelineNodeAttemptObservedEvent): Promise<void>;
|
|
}
|
|
|
|
export class PersistenceLifecycleObserver implements PipelineLifecycleObserver {
|
|
constructor(
|
|
private readonly input: {
|
|
personaRegistry: PersonaRegistry;
|
|
stateManager: FileSystemStateContextManager;
|
|
projectContextStore: FileSystemProjectContextStore;
|
|
domainEventBus?: DomainEventBus;
|
|
runtimeEventPublisher?: RuntimeEventPublisher;
|
|
},
|
|
) {}
|
|
|
|
async onNodeAttempt(event: PipelineNodeAttemptObservedEvent): Promise<void> {
|
|
const behaviorEvent = toBehaviorEvent(event.result.status);
|
|
const behaviorPatch = behaviorEvent
|
|
? await this.input.personaRegistry.emitBehaviorEvent({
|
|
personaId: event.node.personaId,
|
|
event: behaviorEvent,
|
|
sessionId: event.sessionId,
|
|
nodeId: event.node.id,
|
|
payload: event.result.payload ?? {},
|
|
})
|
|
: {};
|
|
|
|
const domainHistoryEvents = event.domainEvents.map((domainEvent) => ({
|
|
nodeId: event.node.id,
|
|
event: domainEvent.type,
|
|
timestamp: domainEvent.timestamp,
|
|
data: {
|
|
source: domainEvent.source,
|
|
attempt: domainEvent.attempt,
|
|
...(domainEvent.payload.summary ? { summary: domainEvent.payload.summary } : {}),
|
|
...(domainEvent.payload.errorCode ? { errorCode: domainEvent.payload.errorCode } : {}),
|
|
...(domainEvent.payload.artifactPointer
|
|
? { artifactPointer: domainEvent.payload.artifactPointer }
|
|
: {}),
|
|
...(domainEvent.payload.details ? { details: domainEvent.payload.details } : {}),
|
|
},
|
|
}));
|
|
|
|
await this.input.stateManager.patchState(event.sessionId, {
|
|
...(event.result.stateFlags ? { flags: event.result.stateFlags } : {}),
|
|
metadata: {
|
|
...(event.result.stateMetadata ?? {}),
|
|
...behaviorPatch,
|
|
},
|
|
historyEvents: domainHistoryEvents,
|
|
});
|
|
|
|
await this.input.runtimeEventPublisher?.publish({
|
|
type: "node.attempt.completed",
|
|
severity: toNodeAttemptSeverity(event.result.status),
|
|
sessionId: event.sessionId,
|
|
nodeId: event.node.id,
|
|
attempt: event.attempt,
|
|
message: `Node "${event.node.id}" attempt ${String(event.attempt)} completed with status "${event.result.status}".`,
|
|
usage: extractUsageMetrics(event.result),
|
|
metadata: {
|
|
status: event.result.status,
|
|
...(event.result.failureKind ? { failureKind: event.result.failureKind } : {}),
|
|
...(event.result.failureCode ? { failureCode: event.result.failureCode } : {}),
|
|
executionContext: event.executionContext,
|
|
topologyKind: event.topologyKind,
|
|
retrySpawned: event.retrySpawned,
|
|
...(event.fromNodeId ? { fromNodeId: event.fromNodeId } : {}),
|
|
subtasks: extractSubtasks(event.result),
|
|
securityViolation: hasSecurityViolation(event.result),
|
|
},
|
|
});
|
|
|
|
for (const domainEvent of event.domainEvents) {
|
|
await this.input.runtimeEventPublisher?.publish({
|
|
type: `domain.${domainEvent.type}`,
|
|
severity: toDomainEventSeverity(domainEvent.type),
|
|
sessionId: event.sessionId,
|
|
nodeId: event.node.id,
|
|
attempt: event.attempt,
|
|
message:
|
|
domainEvent.payload.summary ??
|
|
`Domain event "${domainEvent.type}" emitted for node "${event.node.id}".`,
|
|
metadata: {
|
|
source: domainEvent.source,
|
|
...(domainEvent.payload.errorCode
|
|
? { errorCode: domainEvent.payload.errorCode }
|
|
: {}),
|
|
...(domainEvent.payload.artifactPointer
|
|
? { artifactPointer: domainEvent.payload.artifactPointer }
|
|
: {}),
|
|
},
|
|
});
|
|
}
|
|
|
|
const domainEventBus = this.input.domainEventBus;
|
|
if (domainEventBus) {
|
|
for (const domainEvent of event.domainEvents) {
|
|
await domainEventBus.publish(domainEvent);
|
|
}
|
|
}
|
|
|
|
const patch: ProjectContextPatch = {
|
|
...(event.result.projectContextPatch ?? {}),
|
|
artifactPointers: {
|
|
[`sessions/${event.sessionId}/last_completed_node`]: event.node.id,
|
|
[`sessions/${event.sessionId}/last_attempt`]: String(event.attempt),
|
|
...(event.result.projectContextPatch?.artifactPointers ?? {}),
|
|
},
|
|
};
|
|
|
|
await this.input.projectContextStore.patchState(patch);
|
|
}
|
|
}
|
|
|
|
export class DomainEventCollector {
|
|
private readonly events: DomainEvent[] = [];
|
|
|
|
record(events: DomainEvent[]): void {
|
|
this.events.push(...events);
|
|
}
|
|
|
|
toEventTypes(events: DomainEvent[]): DomainEventType[] {
|
|
return events.map((event) => event.type);
|
|
}
|
|
|
|
getAll(): DomainEvent[] {
|
|
return [...this.events];
|
|
}
|
|
}
|