Add runtime event telemetry and auth-mode config hardening

This commit is contained in:
2026-02-23 17:30:53 -05:00
parent 3ca9bd3db8
commit 94c79d9dd7
10 changed files with 853 additions and 3 deletions

View File

@@ -10,6 +10,12 @@ import {
FileSystemStateContextManager,
} from "./state-context.js";
import type { ActorExecutionResult, ActorResultStatus } from "./pipeline.js";
import { isRecord, type JsonObject } from "./types.js";
import {
type RuntimeEventPublisher,
type RuntimeEventSeverity,
type RuntimeEventUsage,
} from "../telemetry/index.js";
export type PipelineNodeAttemptObservedEvent = {
sessionId: string;
@@ -29,6 +35,120 @@ function toBehaviorEvent(status: ActorResultStatus): "onTaskComplete" | "onValid
return undefined;
}
function toNodeAttemptSeverity(status: ActorResultStatus): RuntimeEventSeverity {
if (status === "failure") {
return "critical";
}
if (status === "validation_fail") {
return "warning";
}
return "info";
}
function toDomainEventSeverity(type: DomainEventType): RuntimeEventSeverity {
if (type === "task_blocked") {
return "critical";
}
if (type === "validation_failed") {
return "warning";
}
return "info";
}
function toNumber(value: unknown): number | undefined {
if (typeof value === "number" && Number.isFinite(value)) {
return value;
}
if (typeof value === "string") {
const parsed = Number(value);
if (Number.isFinite(parsed)) {
return parsed;
}
}
return undefined;
}
function readFirstNumber(record: JsonObject, keys: string[]): number | undefined {
for (const key of keys) {
const parsed = toNumber(record[key]);
if (typeof parsed === "number") {
return parsed;
}
}
return undefined;
}
function extractUsageMetrics(result: ActorExecutionResult): RuntimeEventUsage | undefined {
const candidates = [
result.stateMetadata?.usage,
result.stateMetadata?.tokenUsage,
result.payload?.usage,
result.payload?.tokenUsage,
];
for (const candidate of candidates) {
if (!isRecord(candidate)) {
continue;
}
const usageRecord = candidate as JsonObject;
const tokenInput = readFirstNumber(usageRecord, [
"tokenInput",
"token_input",
"inputTokens",
"input_tokens",
"promptTokens",
"prompt_tokens",
]);
const tokenOutput = readFirstNumber(usageRecord, [
"tokenOutput",
"token_output",
"outputTokens",
"output_tokens",
"completionTokens",
"completion_tokens",
]);
const tokenTotal = readFirstNumber(usageRecord, [
"tokenTotal",
"token_total",
"totalTokens",
"total_tokens",
]);
const toolCalls = readFirstNumber(usageRecord, [
"toolCalls",
"tool_calls",
"toolCallCount",
"tool_call_count",
]);
const durationMs = readFirstNumber(usageRecord, [
"durationMs",
"duration_ms",
"latencyMs",
"latency_ms",
]);
const costUsd = readFirstNumber(usageRecord, [
"costUsd",
"cost_usd",
"usd",
]);
const usage: RuntimeEventUsage = {
...(typeof tokenInput === "number" ? { tokenInput } : {}),
...(typeof tokenOutput === "number" ? { tokenOutput } : {}),
...(typeof tokenTotal === "number" ? { tokenTotal } : {}),
...(typeof toolCalls === "number" ? { toolCalls } : {}),
...(typeof durationMs === "number" ? { durationMs } : {}),
...(typeof costUsd === "number" ? { costUsd } : {}),
};
if (Object.keys(usage).length > 0) {
return usage;
}
}
return undefined;
}
export interface PipelineLifecycleObserver {
onNodeAttempt(event: PipelineNodeAttemptObservedEvent): Promise<void>;
}
@@ -40,6 +160,7 @@ export class PersistenceLifecycleObserver implements PipelineLifecycleObserver {
stateManager: FileSystemStateContextManager;
projectContextStore: FileSystemProjectContextStore;
domainEventBus?: DomainEventBus;
runtimeEventPublisher?: RuntimeEventPublisher;
},
) {}
@@ -80,6 +201,43 @@ export class PersistenceLifecycleObserver implements PipelineLifecycleObserver {
historyEvents: domainHistoryEvents,
});
await this.input.runtimeEventPublisher?.publish({
type: "node.attempt.completed",
severity: toNodeAttemptSeverity(event.result.status),
sessionId: event.sessionId,
nodeId: event.node.id,
attempt: event.attempt,
message: `Node "${event.node.id}" attempt ${String(event.attempt)} completed with status "${event.result.status}".`,
usage: extractUsageMetrics(event.result),
metadata: {
status: event.result.status,
...(event.result.failureKind ? { failureKind: event.result.failureKind } : {}),
...(event.result.failureCode ? { failureCode: event.result.failureCode } : {}),
},
});
for (const domainEvent of event.domainEvents) {
await this.input.runtimeEventPublisher?.publish({
type: `domain.${domainEvent.type}`,
severity: toDomainEventSeverity(domainEvent.type),
sessionId: event.sessionId,
nodeId: event.node.id,
attempt: event.attempt,
message:
domainEvent.payload.summary ??
`Domain event "${domainEvent.type}" emitted for node "${event.node.id}".`,
metadata: {
source: domainEvent.source,
...(domainEvent.payload.errorCode
? { errorCode: domainEvent.payload.errorCode }
: {}),
...(domainEvent.payload.artifactPointer
? { artifactPointer: domainEvent.payload.artifactPointer }
: {}),
},
});
}
const domainEventBus = this.input.domainEventBus;
if (domainEventBus) {
for (const domainEvent of event.domainEvents) {