diff --git a/.env.example b/.env.example index 024278c..e8eb20f 100644 --- a/.env.example +++ b/.env.example @@ -1,6 +1,11 @@ # OpenAI Codex SDK CODEX_API_KEY= OPENAI_API_KEY= +# OPENAI_AUTH_MODE: auto | chatgpt | api_key +# - auto: prefer CODEX_API_KEY/OPENAI_API_KEY when set; otherwise use Codex CLI login +# - chatgpt: always use Codex CLI login (ChatGPT subscription auth), ignore API keys +# - api_key: use CODEX_API_KEY/OPENAI_API_KEY when set +OPENAI_AUTH_MODE=auto OPENAI_BASE_URL= CODEX_SKIP_GIT_CHECK=true MCP_CONFIG_PATH=./mcp.config.json @@ -45,6 +50,13 @@ AGENT_SECURITY_ENV_SCRUB= AGENT_SECURITY_DROP_UID= AGENT_SECURITY_DROP_GID= +# Runtime events / telemetry +AGENT_RUNTIME_EVENT_LOG_PATH=.ai_ops/events/runtime-events.ndjson +AGENT_RUNTIME_DISCORD_WEBHOOK_URL= +# AGENT_RUNTIME_DISCORD_MIN_SEVERITY: info | warning | critical +AGENT_RUNTIME_DISCORD_MIN_SEVERITY=critical +AGENT_RUNTIME_DISCORD_ALWAYS_NOTIFY_TYPES=session.started,session.completed,session.failed + # Runtime-injected (do not set manually): # AGENT_REPO_ROOT, AGENT_WORKTREE_PATH, AGENT_WORKTREE_BASE_REF, # AGENT_PORT_RANGE_START, AGENT_PORT_RANGE_END, AGENT_PORT_PRIMARY, AGENT_DISCOVERY_FILE diff --git a/docs/runtime-events.md b/docs/runtime-events.md new file mode 100644 index 0000000..a7f5443 --- /dev/null +++ b/docs/runtime-events.md @@ -0,0 +1,45 @@ +# Runtime Events + +## Purpose + +Runtime events provide a best-effort telemetry side-channel for: + +- long-term analytics (tool usage, token usage, retries, failure rates) +- high-visibility operational notifications (session starts/stops, critical failures) + +This channel is intentionally non-blocking and does not participate in orchestration routing logic. + +## Event model + +Events include: + +- identity: `id`, `timestamp`, `type`, `severity` +- routing context: `sessionId`, `nodeId`, `attempt` +- narrative context: `message` +- analytics context: optional `usage` (`tokenInput`, `tokenOutput`, `tokenTotal`, `toolCalls`, `durationMs`, `costUsd`) +- structured `metadata` + +Core emitted event types: + +- `session.started` +- `node.attempt.completed` +- `domain.` +- `session.completed` +- `session.failed` +- `security.` (mirrored from security audit engine) + +## Sinks + +- File sink (`AGENT_RUNTIME_EVENT_LOG_PATH`) + - NDJSON append-only log suitable for offline analytics ingestion. +- Discord webhook sink (`AGENT_RUNTIME_DISCORD_WEBHOOK_URL`) + - Sends events at or above `AGENT_RUNTIME_DISCORD_MIN_SEVERITY`. + - Always-notify event types configurable via `AGENT_RUNTIME_DISCORD_ALWAYS_NOTIFY_TYPES`. + +All sinks are best-effort. Sink failures are swallowed to avoid impacting agent execution. + +## Non-goals + +- Runtime events are not used to drive DAG edge conditions. +- Runtime events are not required for pipeline correctness. +- Runtime events do not replace session state persistence (`AGENT_STATE_ROOT`) or project context state (`AGENT_PROJECT_CONTEXT_PATH`). diff --git a/src/agents/lifecycle-observer.ts b/src/agents/lifecycle-observer.ts index 654462f..697abc0 100644 --- a/src/agents/lifecycle-observer.ts +++ b/src/agents/lifecycle-observer.ts @@ -10,6 +10,12 @@ import { FileSystemStateContextManager, } from "./state-context.js"; import type { ActorExecutionResult, ActorResultStatus } from "./pipeline.js"; +import { isRecord, type JsonObject } from "./types.js"; +import { + type RuntimeEventPublisher, + type RuntimeEventSeverity, + type RuntimeEventUsage, +} from "../telemetry/index.js"; export type PipelineNodeAttemptObservedEvent = { sessionId: string; @@ -29,6 +35,120 @@ function toBehaviorEvent(status: ActorResultStatus): "onTaskComplete" | "onValid return undefined; } +function toNodeAttemptSeverity(status: ActorResultStatus): RuntimeEventSeverity { + if (status === "failure") { + return "critical"; + } + if (status === "validation_fail") { + return "warning"; + } + return "info"; +} + +function toDomainEventSeverity(type: DomainEventType): RuntimeEventSeverity { + if (type === "task_blocked") { + return "critical"; + } + if (type === "validation_failed") { + return "warning"; + } + return "info"; +} + +function toNumber(value: unknown): number | undefined { + if (typeof value === "number" && Number.isFinite(value)) { + return value; + } + if (typeof value === "string") { + const parsed = Number(value); + if (Number.isFinite(parsed)) { + return parsed; + } + } + return undefined; +} + +function readFirstNumber(record: JsonObject, keys: string[]): number | undefined { + for (const key of keys) { + const parsed = toNumber(record[key]); + if (typeof parsed === "number") { + return parsed; + } + } + return undefined; +} + +function extractUsageMetrics(result: ActorExecutionResult): RuntimeEventUsage | undefined { + const candidates = [ + result.stateMetadata?.usage, + result.stateMetadata?.tokenUsage, + result.payload?.usage, + result.payload?.tokenUsage, + ]; + + for (const candidate of candidates) { + if (!isRecord(candidate)) { + continue; + } + + const usageRecord = candidate as JsonObject; + const tokenInput = readFirstNumber(usageRecord, [ + "tokenInput", + "token_input", + "inputTokens", + "input_tokens", + "promptTokens", + "prompt_tokens", + ]); + const tokenOutput = readFirstNumber(usageRecord, [ + "tokenOutput", + "token_output", + "outputTokens", + "output_tokens", + "completionTokens", + "completion_tokens", + ]); + const tokenTotal = readFirstNumber(usageRecord, [ + "tokenTotal", + "token_total", + "totalTokens", + "total_tokens", + ]); + const toolCalls = readFirstNumber(usageRecord, [ + "toolCalls", + "tool_calls", + "toolCallCount", + "tool_call_count", + ]); + const durationMs = readFirstNumber(usageRecord, [ + "durationMs", + "duration_ms", + "latencyMs", + "latency_ms", + ]); + const costUsd = readFirstNumber(usageRecord, [ + "costUsd", + "cost_usd", + "usd", + ]); + + const usage: RuntimeEventUsage = { + ...(typeof tokenInput === "number" ? { tokenInput } : {}), + ...(typeof tokenOutput === "number" ? { tokenOutput } : {}), + ...(typeof tokenTotal === "number" ? { tokenTotal } : {}), + ...(typeof toolCalls === "number" ? { toolCalls } : {}), + ...(typeof durationMs === "number" ? { durationMs } : {}), + ...(typeof costUsd === "number" ? { costUsd } : {}), + }; + + if (Object.keys(usage).length > 0) { + return usage; + } + } + + return undefined; +} + export interface PipelineLifecycleObserver { onNodeAttempt(event: PipelineNodeAttemptObservedEvent): Promise; } @@ -40,6 +160,7 @@ export class PersistenceLifecycleObserver implements PipelineLifecycleObserver { stateManager: FileSystemStateContextManager; projectContextStore: FileSystemProjectContextStore; domainEventBus?: DomainEventBus; + runtimeEventPublisher?: RuntimeEventPublisher; }, ) {} @@ -80,6 +201,43 @@ export class PersistenceLifecycleObserver implements PipelineLifecycleObserver { historyEvents: domainHistoryEvents, }); + await this.input.runtimeEventPublisher?.publish({ + type: "node.attempt.completed", + severity: toNodeAttemptSeverity(event.result.status), + sessionId: event.sessionId, + nodeId: event.node.id, + attempt: event.attempt, + message: `Node "${event.node.id}" attempt ${String(event.attempt)} completed with status "${event.result.status}".`, + usage: extractUsageMetrics(event.result), + metadata: { + status: event.result.status, + ...(event.result.failureKind ? { failureKind: event.result.failureKind } : {}), + ...(event.result.failureCode ? { failureCode: event.result.failureCode } : {}), + }, + }); + + for (const domainEvent of event.domainEvents) { + await this.input.runtimeEventPublisher?.publish({ + type: `domain.${domainEvent.type}`, + severity: toDomainEventSeverity(domainEvent.type), + sessionId: event.sessionId, + nodeId: event.node.id, + attempt: event.attempt, + message: + domainEvent.payload.summary ?? + `Domain event "${domainEvent.type}" emitted for node "${event.node.id}".`, + metadata: { + source: domainEvent.source, + ...(domainEvent.payload.errorCode + ? { errorCode: domainEvent.payload.errorCode } + : {}), + ...(domainEvent.payload.artifactPointer + ? { artifactPointer: domainEvent.payload.artifactPointer } + : {}), + }, + }); + } + const domainEventBus = this.input.domainEventBus; if (domainEventBus) { for (const domainEvent of event.domainEvents) { diff --git a/src/config.ts b/src/config.ts index 1a316b3..53c110a 100644 --- a/src/config.ts +++ b/src/config.ts @@ -1,10 +1,15 @@ import type { AgentManagerLimits } from "./agents/manager.js"; import type { BuiltInProvisioningConfig } from "./agents/provisioning.js"; import { parseSecurityViolationHandling, type SecurityViolationHandling } from "./security/index.js"; +import { + parseRuntimeEventSeverity, + type RuntimeEventSeverity, +} from "./telemetry/runtime-events.js"; export type ProviderRuntimeConfig = { codexApiKey?: string; openAiApiKey?: string; + openAiAuthMode: OpenAiAuthMode; openAiBaseUrl?: string; codexSkipGitCheck: boolean; anthropicOauthToken?: string; @@ -13,6 +18,8 @@ export type ProviderRuntimeConfig = { claudeCodePath?: string; }; +export type OpenAiAuthMode = "auto" | "chatgpt" | "api_key"; + export type McpRuntimeConfig = { configPath: string; }; @@ -40,6 +47,13 @@ export type SecurityRuntimeConfig = { dropGid?: number; }; +export type RuntimeEventRuntimeConfig = { + logPath: string; + discordWebhookUrl?: string; + discordMinSeverity: RuntimeEventSeverity; + discordAlwaysNotifyTypes: string[]; +}; + export type AppConfig = { provider: ProviderRuntimeConfig; mcp: McpRuntimeConfig; @@ -48,6 +62,7 @@ export type AppConfig = { provisioning: BuiltInProvisioningConfig; discovery: DiscoveryRuntimeConfig; security: SecurityRuntimeConfig; + runtimeEvents: RuntimeEventRuntimeConfig; }; const DEFAULT_AGENT_MANAGER: AgentManagerLimits = { @@ -91,6 +106,13 @@ const DEFAULT_SECURITY: SecurityRuntimeConfig = { scrubbedEnvVars: [], }; +const DEFAULT_RUNTIME_EVENTS: RuntimeEventRuntimeConfig = { + logPath: ".ai_ops/events/runtime-events.ndjson", + discordWebhookUrl: undefined, + discordMinSeverity: "critical", + discordAlwaysNotifyTypes: ["session.started", "session.completed", "session.failed"], +}; + function readOptionalString( env: NodeJS.ProcessEnv, key: string, @@ -196,6 +218,16 @@ function readBooleanWithFallback( throw new Error(`Environment variable ${key} must be "true" or "false".`); } +function parseOpenAiAuthMode(raw: string): OpenAiAuthMode { + if (raw === "auto" || raw === "chatgpt" || raw === "api_key") { + return raw; + } + + throw new Error( + 'Environment variable OPENAI_AUTH_MODE must be one of: "auto", "chatgpt", "api_key".', + ); +} + function deepFreeze(value: T): Readonly { if (value === null || typeof value !== "object") { return value; @@ -221,6 +253,22 @@ export function resolveAnthropicToken( return apiKey || undefined; } +export function resolveOpenAiApiKey( + provider: Pick, +): string | undefined { + if (provider.openAiAuthMode === "chatgpt") { + return undefined; + } + + const codexApiKey = provider.codexApiKey?.trim(); + if (codexApiKey) { + return codexApiKey; + } + + const openAiApiKey = provider.openAiApiKey?.trim(); + return openAiApiKey || undefined; +} + export function buildClaudeAuthEnv( provider: Pick, ): Record { @@ -245,6 +293,11 @@ export function loadConfig(env: NodeJS.ProcessEnv = process.env): Readonly; + +export type RuntimeEventSink = { + name: string; + publish: (event: RuntimeEvent) => Promise; +}; + +function isSeverity(value: string): value is RuntimeEventSeverity { + return value in RUNTIME_EVENT_SEVERITY_ORDER; +} + +export function parseRuntimeEventSeverity(value: string): RuntimeEventSeverity { + const normalized = value.trim().toLowerCase(); + if (!isSeverity(normalized)) { + throw new Error( + `Runtime event severity "${value}" is invalid. Expected one of: info, warning, critical.`, + ); + } + return normalized; +} + +function isSeverityAtLeast( + severity: RuntimeEventSeverity, + threshold: RuntimeEventSeverity, +): boolean { + return ( + RUNTIME_EVENT_SEVERITY_ORDER[severity] >= + RUNTIME_EVENT_SEVERITY_ORDER[threshold] + ); +} + +function toRuntimeEvent(input: RuntimeEventInput): RuntimeEvent { + return { + id: randomUUID(), + timestamp: new Date().toISOString(), + ...input, + }; +} + +function toSummaryMetadata(event: RuntimeEvent): string | undefined { + if (!event.metadata) { + return undefined; + } + + const compact = JSON.stringify(event.metadata); + if (compact.length <= 700) { + return compact; + } + return `${compact.slice(0, 700)}...`; +} + +function toUsageSummary(event: RuntimeEvent): string | undefined { + if (!event.usage) { + return undefined; + } + + const entries: string[] = []; + + const tokenInput = event.usage.tokenInput; + if (typeof tokenInput === "number") { + entries.push(`tokenInput=${String(tokenInput)}`); + } + const tokenOutput = event.usage.tokenOutput; + if (typeof tokenOutput === "number") { + entries.push(`tokenOutput=${String(tokenOutput)}`); + } + const tokenTotal = event.usage.tokenTotal; + if (typeof tokenTotal === "number") { + entries.push(`tokenTotal=${String(tokenTotal)}`); + } + const toolCalls = event.usage.toolCalls; + if (typeof toolCalls === "number") { + entries.push(`toolCalls=${String(toolCalls)}`); + } + const durationMs = event.usage.durationMs; + if (typeof durationMs === "number") { + entries.push(`durationMs=${String(durationMs)}`); + } + const costUsd = event.usage.costUsd; + if (typeof costUsd === "number") { + entries.push(`costUsd=${String(costUsd)}`); + } + + if (entries.length === 0) { + return undefined; + } + + return entries.join(", "); +} + +export class RuntimeEventPublisher { + private readonly sinks: RuntimeEventSink[]; + private readonly onSinkError?: (input: { + sinkName: string; + error: unknown; + event: RuntimeEvent; + }) => void; + + constructor(input: { + sinks?: RuntimeEventSink[]; + onSinkError?: (input: { + sinkName: string; + error: unknown; + event: RuntimeEvent; + }) => void; + } = {}) { + this.sinks = [...(input.sinks ?? [])]; + this.onSinkError = input.onSinkError; + } + + async publish(input: RuntimeEventInput): Promise { + const event = toRuntimeEvent(input); + await this.publishEvent(event); + return event; + } + + async publishEvent(event: RuntimeEvent): Promise { + if (this.sinks.length === 0) { + return; + } + + await Promise.all( + this.sinks.map(async (sink) => { + try { + await sink.publish(event); + } catch (error) { + this.onSinkError?.({ + sinkName: sink.name, + error, + event, + }); + } + }), + ); + } +} + +export function createFileRuntimeEventSink(filePath: string): RuntimeEventSink { + const resolvedPath = resolve(filePath); + + return { + name: "file_runtime_event_sink", + publish: async (event): Promise => { + await mkdir(dirname(resolvedPath), { recursive: true }); + await appendFile(resolvedPath, `${JSON.stringify(event)}\n`, "utf8"); + }, + }; +} + +export function createDiscordWebhookRuntimeEventSink(input: { + webhookUrl: string; + minSeverity?: RuntimeEventSeverity; + alwaysNotifyTypes?: string[]; + username?: string; + fetchFn?: typeof fetch; +}): RuntimeEventSink { + const fetchFn = input.fetchFn ?? globalThis.fetch; + if (!fetchFn) { + throw new Error("Global fetch API is not available for Discord webhook sink."); + } + const minSeverity = input.minSeverity ?? "critical"; + const alwaysNotifyTypes = new Set(input.alwaysNotifyTypes ?? []); + const webhookUrl = input.webhookUrl; + + return { + name: "discord_webhook_runtime_event_sink", + publish: async (event): Promise => { + const shouldNotify = + alwaysNotifyTypes.has(event.type) || + isSeverityAtLeast(event.severity, minSeverity); + if (!shouldNotify) { + return; + } + + const summaryMetadata = toSummaryMetadata(event); + const usageSummary = toUsageSummary(event); + const fields: Array<{ + name: string; + value: string; + inline?: boolean; + }> = [ + { + name: "Severity", + value: event.severity, + inline: true, + }, + { + name: "Type", + value: event.type, + inline: true, + }, + ]; + + if (event.sessionId) { + fields.push({ + name: "Session", + value: event.sessionId, + inline: true, + }); + } + + if (event.nodeId) { + fields.push({ + name: "Node", + value: event.nodeId, + inline: true, + }); + } + + if (typeof event.attempt === "number") { + fields.push({ + name: "Attempt", + value: String(event.attempt), + inline: true, + }); + } + + if (usageSummary) { + fields.push({ + name: "Usage", + value: usageSummary, + }); + } + + if (summaryMetadata) { + fields.push({ + name: "Metadata", + value: summaryMetadata, + }); + } + + const response = await fetchFn(webhookUrl, { + method: "POST", + headers: { + "content-type": "application/json", + }, + body: JSON.stringify({ + ...(input.username ? { username: input.username } : {}), + embeds: [ + { + title: event.type, + description: event.message, + timestamp: event.timestamp, + color: + event.severity === "critical" + ? 15158332 + : event.severity === "warning" + ? 16763904 + : 3447003, + fields, + }, + ], + }), + }); + + if (!response.ok) { + throw new Error( + `Discord webhook rejected runtime event (${String( + response.status, + )} ${response.statusText}).`, + ); + } + }, + }; +} diff --git a/tests/config.test.ts b/tests/config.test.ts index 2f061b4..9ee3c10 100644 --- a/tests/config.test.ts +++ b/tests/config.test.ts @@ -1,6 +1,11 @@ import test from "node:test"; import assert from "node:assert/strict"; -import { buildClaudeAuthEnv, loadConfig, resolveAnthropicToken } from "../src/config.js"; +import { + buildClaudeAuthEnv, + loadConfig, + resolveAnthropicToken, + resolveOpenAiApiKey, +} from "../src/config.js"; test("loads defaults and freezes config", () => { const config = loadConfig({}); @@ -11,10 +16,25 @@ test("loads defaults and freezes config", () => { assert.equal(config.discovery.fileRelativePath, ".agent-context/resources.json"); assert.equal(config.security.violationHandling, "hard_abort"); assert.equal(config.security.commandTimeoutMs, 120000); + assert.equal(config.runtimeEvents.logPath, ".ai_ops/events/runtime-events.ndjson"); + assert.equal(config.runtimeEvents.discordMinSeverity, "critical"); + assert.deepEqual(config.runtimeEvents.discordAlwaysNotifyTypes, [ + "session.started", + "session.completed", + "session.failed", + ]); + assert.equal(config.provider.openAiAuthMode, "auto"); assert.equal(Object.isFrozen(config), true); assert.equal(Object.isFrozen(config.orchestration), true); }); +test("validates OPENAI_AUTH_MODE values", () => { + assert.throws( + () => loadConfig({ OPENAI_AUTH_MODE: "oauth" }), + /OPENAI_AUTH_MODE must be one of/, + ); +}); + test("validates boolean env values", () => { assert.throws( () => loadConfig({ CODEX_SKIP_GIT_CHECK: "maybe" }), @@ -29,6 +49,13 @@ test("validates security violation mode", () => { ); }); +test("validates runtime discord severity mode", () => { + assert.throws( + () => loadConfig({ AGENT_RUNTIME_DISCORD_MIN_SEVERITY: "verbose" }), + /Runtime event severity/, + ); +}); + test("prefers CLAUDE_CODE_OAUTH_TOKEN over ANTHROPIC_API_KEY", () => { const config = loadConfig({ CLAUDE_CODE_OAUTH_TOKEN: "oauth-token", @@ -57,3 +84,23 @@ test("falls back to ANTHROPIC_API_KEY when oauth token is absent", () => { assert.equal(authEnv.CLAUDE_CODE_OAUTH_TOKEN, undefined); assert.equal(authEnv.ANTHROPIC_API_KEY, "api-key"); }); + +test("resolveOpenAiApiKey respects chatgpt auth mode", () => { + const config = loadConfig({ + OPENAI_AUTH_MODE: "chatgpt", + CODEX_API_KEY: "codex-key", + OPENAI_API_KEY: "openai-key", + }); + + assert.equal(resolveOpenAiApiKey(config.provider), undefined); +}); + +test("resolveOpenAiApiKey prefers CODEX_API_KEY in auto mode", () => { + const config = loadConfig({ + OPENAI_AUTH_MODE: "auto", + CODEX_API_KEY: "codex-key", + OPENAI_API_KEY: "openai-key", + }); + + assert.equal(resolveOpenAiApiKey(config.provider), "codex-key"); +}); diff --git a/tests/provider-adapters.test.ts b/tests/provider-adapters.test.ts index d8ca4d4..60227af 100644 --- a/tests/provider-adapters.test.ts +++ b/tests/provider-adapters.test.ts @@ -107,6 +107,53 @@ test("runCodexPrompt wires client options and parses final output", async () => assert.equal(closed, true); }); +test("runCodexPrompt omits apiKey when OPENAI_AUTH_MODE=chatgpt", async () => { + const config = loadConfig({ + OPENAI_AUTH_MODE: "chatgpt", + CODEX_API_KEY: "codex-token", + OPENAI_API_KEY: "openai-token", + OPENAI_BASE_URL: "https://api.example.com/v1", + }); + + let capturedClientInput: Record | undefined; + + const sessionContext: SessionContext = { + provider: "codex", + sessionId: "session-codex-chatgpt", + mcp: {}, + promptWithContext: "prompt with context", + runtimeInjection: { + workingDirectory: "/tmp/worktree", + env: { + HOME: "/home/tester", + }, + discoveryFilePath: "/tmp/worktree/.agent-context/resources.json", + }, + runInSession: async (run: () => Promise) => run(), + close: async () => {}, + }; + + await runCodexPrompt("ignored", { + config, + createSessionContextFn: async () => sessionContext, + createCodexClient: (input) => { + capturedClientInput = input as Record; + return { + startThread: () => ({ + run: async () => ({ + finalResponse: "ok", + }), + }), + }; + }, + writeOutput: () => {}, + }); + + assert.equal(capturedClientInput?.["apiKey"], undefined); + assert.equal(capturedClientInput?.["baseUrl"], "https://api.example.com/v1"); + assert.deepEqual(capturedClientInput?.["env"], sessionContext.runtimeInjection.env); +}); + test("runClaudePrompt wires auth env, stream parsing, and output", async () => { const config = loadConfig({ CLAUDE_CODE_OAUTH_TOKEN: "oauth-token", @@ -193,6 +240,68 @@ test("runClaudePrompt wires auth env, stream parsing, and output", async () => { assert.equal(closed, true); }); +test("runClaudePrompt uses ambient Claude login when no token env is configured", async () => { + const config = loadConfig({}); + + let queryInput: + | { + prompt: string; + options?: Record; + } + | undefined; + + const sessionContext: SessionContext = { + provider: "claude", + sessionId: "session-claude-no-key", + mcp: {}, + promptWithContext: "augmented prompt", + runtimeInjection: { + workingDirectory: "/tmp/claude-worktree", + env: { + HOME: "/home/tester", + PATH: "/usr/bin", + }, + discoveryFilePath: "/tmp/claude-worktree/.agent-context/resources.json", + }, + runInSession: async (run: () => Promise) => run(), + close: async () => {}, + }; + + const queryFn: ClaudeQueryFunction = ((input: { + prompt: string; + options?: Record; + }) => { + queryInput = input; + const stream = createMessageStream([ + { + type: "result", + subtype: "success", + result: "ok", + } as SDKMessage, + ]); + + return { + ...stream, + close: () => {}, + } as ReturnType; + }) as ClaudeQueryFunction; + + await runClaudePrompt("ignored", { + config, + createSessionContextFn: async () => sessionContext, + queryFn, + writeOutput: () => {}, + }); + + assert.equal(queryInput?.options?.apiKey, undefined); + assert.equal(queryInput?.options?.authToken, undefined); + + const env = queryInput?.options?.env as Record | undefined; + assert.equal(env?.HOME, "/home/tester"); + assert.equal(env?.CLAUDE_CODE_OAUTH_TOKEN, undefined); + assert.equal(env?.ANTHROPIC_API_KEY, undefined); +}); + test("readClaudeResult throws on non-success result events", async () => { const stream = createMessageStream([ { diff --git a/tests/runtime-events.test.ts b/tests/runtime-events.test.ts new file mode 100644 index 0000000..0b60a0c --- /dev/null +++ b/tests/runtime-events.test.ts @@ -0,0 +1,94 @@ +import test from "node:test"; +import assert from "node:assert/strict"; +import { mkdtemp, readFile } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { resolve } from "node:path"; +import { + RuntimeEventPublisher, + createDiscordWebhookRuntimeEventSink, + createFileRuntimeEventSink, +} from "../src/telemetry/index.js"; + +test("runtime event file sink writes ndjson events", async () => { + const root = await mkdtemp(resolve(tmpdir(), "ai-ops-runtime-events-")); + const logPath = resolve(root, "runtime-events.ndjson"); + const publisher = new RuntimeEventPublisher({ + sinks: [createFileRuntimeEventSink(logPath)], + }); + + await publisher.publish({ + type: "session.started", + severity: "info", + sessionId: "session-1", + message: "Session started.", + metadata: { + entryNodeId: "entry", + }, + }); + + const lines = (await readFile(logPath, "utf8")) + .trim() + .split("\n") + .filter((line) => line.length > 0); + assert.equal(lines.length, 1); + const parsed = JSON.parse(lines[0] ?? "{}") as Record; + assert.equal(parsed.type, "session.started"); + assert.equal(parsed.severity, "info"); + assert.equal(parsed.sessionId, "session-1"); +}); + +test("discord runtime sink supports severity threshold and always-notify types", async () => { + const requests: Array<{ + url: string; + body: Record; + }> = []; + + const discordSink = createDiscordWebhookRuntimeEventSink({ + webhookUrl: "https://discord.example/webhook", + minSeverity: "critical", + alwaysNotifyTypes: ["session.started", "session.completed"], + fetchFn: async (url, init) => { + requests.push({ + url: String(url), + body: JSON.parse(String(init?.body ?? "{}")) as Record, + }); + return new Response(null, { status: 204 }); + }, + }); + + const publisher = new RuntimeEventPublisher({ + sinks: [discordSink], + }); + + await publisher.publish({ + type: "session.started", + severity: "info", + sessionId: "session-1", + message: "Session started.", + }); + await publisher.publish({ + type: "node.attempt.completed", + severity: "warning", + sessionId: "session-1", + nodeId: "node-1", + attempt: 1, + message: "Validation failed.", + }); + await publisher.publish({ + type: "session.failed", + severity: "critical", + sessionId: "session-1", + message: "Session failed.", + }); + + assert.equal(requests.length, 2); + assert.equal(requests[0]?.url, "https://discord.example/webhook"); + const firstPayload = requests[0]?.body; + assert.ok(firstPayload); + const firstEmbeds = firstPayload.embeds as Array>; + assert.equal(firstEmbeds[0]?.title, "session.started"); + const secondPayload = requests[1]?.body; + assert.ok(secondPayload); + const secondEmbeds = secondPayload.embeds as Array>; + assert.equal(secondEmbeds[0]?.title, "session.failed"); +});