LibreChat/api/server/controllers/agents/callbacks.js
Danny Avila db7011d567
Some checks are pending
Docker Dev Branch Images Build / build (Dockerfile, lc-dev, node) (push) Waiting to run
Docker Dev Branch Images Build / build (Dockerfile.multi, lc-dev-api, api-build) (push) Waiting to run
GitNexus Index / index (push) Waiting to run
GitNexus Index / post-index (push) Blocked by required conditions
📊 feat: Real-Time Context Window & Token Usage Tracking (#13670)
* 📊 feat: Real-Time Context Window & Token Usage Tracking

* 🧪 fix: Align Pricing Spec Dep Signatures with TxDeps

* 🩹 fix: Resolve Codex Findings for Context Usage Tracking

* 📊 feat: Granular Tool Token Breakdown with Deferred Splits

* 🧪 test: Cover Session Cost in Mock E2E and Scope Usage Selectors

* 🧪 test: Live Host-Pipeline Usage Verification (Env-Gated)

* 🧪 test: Local Real-Provider Multi-Turn E2E Harness

* 🪙 fix: Keep Tagged Usage Buckets Out of the Live Context Estimate

* 🩹 fix: Scoped Token-Config Fallback and Sequential Visibility for Usage Events

* 🩹 fix: Address Usage Review Findings — Cost Timing, Scoped Caches, Finalized Output

- carry the post-snapshot output estimate into the context snapshot at
  finalize so the gauge keeps the last response after live resets
- accumulate per-rate billable units and price the session cost at
  render, so usage events arriving before the token-config load still
  count once it resolves
- pass user-scoped token-config cache keys through loadConfigModels
  fetches and drop the controller's unscoped fallback to prevent serving
  another user's resolved config
- tag emitted usage events with a per-run seq so resume dedupe never
  drops a distinct call with an identical payload
- admit the static tokenConfig override in the custom endpoint schema so
  it survives zod parsing into req.config

* 🩹 fix: Align Client Usage Accounting with Backend Cost Semantics

- classify cache tokens by provider (shared inputTokensIncludesCache from
  data-provider, consumed by both the backend billing path and the client)
  instead of a magnitude heuristic, so Anthropic/Bedrock turns where cache
  is smaller than uncached input no longer under-bill input
- mirror resolveCompletionTokens on the client so Vertex-style hidden
  thinking tokens are reflected in the Output row and session cost
- prefer endpoint pricing over adapter-provider pricing so a custom
  endpoint can price a known model name without built-in rates shadowing it
- carry static cacheRead/cacheWrite overrides through the tokenConfig
  schema and buildTokenConfigMap

* 🩹 fix: Honor Static Token Config in Billing; Tighten Usage Freshness

- initializeCustom now uses a static endpoint tokenConfig as the agent's
  endpointTokenConfig (billing + balance checks), not just the advertised
  UI config — previously the gauge showed admin rates while the agent
  billed against built-in tables
- invalidate the token-config query alongside models on user-key add/
  revoke so context windows and pricing refresh without a reload
- include maxContextTokens in ChatForm's stabilized conversation memo so
  the gauge reflects a changed context-window setting immediately
- feed the live output estimate from the legacy content path (direct and
  assistants streams), setting from cumulative part text rather than
  accumulating deltas

* 🩹 fix: Resume Usage Dedup, Agent Pricing, and Partial Override Billing

- fold usage events idempotently by (runId, seq) so resume backfill no
  longer resets the conversation totals — a mid-stream reconnect keeps the
  usage of prompts already completed earlier in the session
- tap replayed pending message/reasoning/content events so output streamed
  past the resume snapshot reaches the live estimate, not just the message
- resolve cost against the agent's backing endpoint (Agents conversations
  report endpoint `agents` / provider `openAI`, neither of which keys a
  custom endpoint's tokenConfig)
- getMultiplier/getCacheMultiplier fall back to the standard tables for
  models absent from a partial endpointTokenConfig, so a partial static
  override no longer bills non-listed models at defaultRate while the UI
  shows the correct pattern rate

* 🩹 fix: Repaired Output in Gauge, Cache-Rate Keys, Config Gate, Usage Cleanup

- live/completed gauge counts the repaired completion (normalized output),
  so under-reporting providers don't drop the response from used context
- translate static tokenConfig cacheWrite/cacheRead onto the write/read
  keys getCacheMultiplier reads, so cache tokens bill at the configured
  rate instead of the prompt-rate fallback
- clear the token index and usage atoms when leaving a conversation, so
  visited histories don't accumulate in memory for the tab's lifetime
- wait for startupConfig before mounting the gauge, so a deployment with
  contextUsage disabled never briefly mounts it or fires the token-config
  query on first load

* 🩹 fix: Move Token-Config Resolution to TS; Key Live Usage by Created Convo

- extract the token-config resolution (override gathering + cache lookup +
  buildTokenConfigMap) into resolveTokenConfigMap in packages/api, leaving
  the /api controller a thin request-scoped wrapper (CLAUDE.md TS rule)
- getConvoKey prefers the user message's real conversationId once the
  `created` event stamps it, so a new chat's first-response live gauge and
  totals land under the id TokenUsage subscribes to instead of NEW_CONVO

* 🩹 fix: Clear Stale Redis Job Usage; Live-Tap Legacy Streams; Share Fetched Config

- DEL the Redis job hash before re-creating it so a reused streamId can't
  inherit a prior run's contextUsage/tokenUsage and backfill stale usage
- tap the legacy {message,text} stream branch (non-agent OpenAI/Anthropic
  streams) into the live estimate, not just the content path
- copy a deduped fetch's token config to every sibling endpoint sharing the
  baseURL/key/headers, so /token-config resolves each by its own name

*  revert: Don't DEL Redis job hash in createJob (breaks cross-replica resume)

createJob is an idempotent join — a second replica calls it for the same
streamId to share an in-flight stream's state. DELeting the hash wiped the
prior replica's persisted created/usage state, so a joining replica missed
the created event (GenerationJobManager cross-replica integration test).
Reverts the F1 change from 2bfce0c34b; the stale-usage concern doesn't
arise in practice (streamId is unique per generation).

* 🩹 fix: Best-Effort Usage Emit; Tag Hidden Sequential-Agent Usage

- wrap the ModelEndHandler usage emit in try/catch so a failed telemetry
  delivery (closed SSE / Redis publish error) can't abort the handler
  before thought-signature capture, which would break resumed tool calls
- tag hidden sequential-agent usage as 'sequential' (non-primary) so the
  client folds it into session cost/totals but not the live context gauge,
  instead of letting an undefined usage_type inflate the visible gauge

* 🩹 fix: Refetch Stale Token Config on Mount; Normalize Vertex for Lookup

- useTokenConfigQuery refetches on mount when stale, so a user-key change
  that invalidates tokenConfig while the gauge is unmounted takes effect on
  return instead of serving the prior key's resolved config
- normalize a Vertex-backed agent's provider (vertexai) to the google
  token-config key, so Gemini context windows and rates resolve instead of
  showing unknown context / $0 cost

*  feat: Server-Side Per-Event Cost (Authoritative Pricing for the Gauge)

Move usage-cost pricing to the single source of truth. The backend prices
each model call with the same billing functions (premium tiers via
getMultiplier(inputTokenCount), cache rates) and emits the USD cost on
on_token_usage when interface.contextCost is enabled; the client sums
emitted costs instead of re-deriving from base token-config rates.

- computeUsageCostUSD reuses prepareTokenSpend/prepareStructuredTokenSpend
  so the emitted cost matches what is billed (incl. premium thresholds)
- getDefaultHandlers gains a usageCost pricing context; initialize.js wires
  db.getMultiplier/getCacheMultiplier gated on contextCost (agents path)
- client UsageTotals carries a summed costUSD; retire the client-side rate
  lookups (costFromUnits/calcUsageCost) that drifted from backend pricing
  and produced the provider-keying / cache-key / Vertex / premium findings
- keep normalizeUsageUnits for the displayed token counts; token-config is
  still used for the context-window meter

Fixes the premium-tier session-cost under-report (gpt-5.x / gemini-3.1
above their input thresholds).

* 🩹 fix: Branch-Accurate Usage Snapshot + Clearer Gauge Track Contrast

- re-anchor the context snapshot from the user message to the response
  message at finalize. Regenerating a response branches off a shared user
  message, so anchoring on it made the snapshot read as "active" on both
  branches — switching to the sibling branch showed the wrong (other
  branch's) context. The response message is branch-unique, so sibling
  branches now correctly fall back to their own per-branch totals.
- raise the gauge ring's track/fill contrast (muted track, prominent fill)
  so the used portion reads clearly as a fill-level indicator

* 🩹 fix: Tag Sequential Usage in Billing; Emit Subagent Cost; Reset Live on Resume Errors

- tag hidden sequential-agent usage `usage_type: 'sequential'` on the
  COLLECTED usage (not just the emit), and treat it as non-primary in
  recordCollectedUsage (billed, excluded from the reported output total) so
  hidden intermediate output stops inflating the parent's tokenCount/pruning
- emit on_token_usage from the subagent usage sink (tagged `subagent`, with
  authoritative cost when contextCost is on) so the gauge's session
  cost/totals include billed subagent usage; it stays out of the live meter
- call resetLive on the resumable 404 and max-retry terminal branches so the
  gauge doesn't keep counting stale in-flight tokens after the stream ends

* 🎨 fix: Contrast the Popup Context Bar; Revert Ring Restyle

- raise the popup breakdown's context progressbar contrast (muted
  surface-tertiary track, prominent text-primary fill) — that's the bar the
  contrast feedback was about
- revert the gauge ring restyle (kept its original border-heavy track /
  text-secondary fill); the ring wasn't the element in question

* 🩹 fix: Stop Snapshot Granularity Leaking Across Branches; Revert Tree Memo

- a null-anchor context snapshot was treated as active on every branch,
  leaking one generation's granular breakdown onto sibling branches. Require
  a non-null (response-message) anchor on the viewed branch instead, so
  siblings without a matching snapshot fall back to their own totals.
- revert the buildTree WeakMap memo in messages.ts. buildTree is pure (builds
  from shallow copies) so the memo was behaviorally identical, but it was the
  feature's only change to core branch-navigation selectors — removing it
  matches upstream and rules it out of branch-navigation debugging.

* 🪙 fix: Thread Endpoint Token Config to Agent Billing, Cost, and Context Limits

Custom-endpoint agents resolve an endpointTokenConfig during agent init but
it never reached the AgentClient, so spending, emitted cost, and runtime
max-token resolution all fell back to default rates for those agents.

- Surface options.endpointTokenConfig on the returned InitializedAgent.
- Pass it to the AgentClient (this.options.endpointTokenConfig) so the
  spending path bills at configured rates.
- Thread it through usageCost to computeUsageCostUSD so emitted per-event
  cost matches billing.
- getModelMaxTokens/getModelMaxOutputTokens fall back to the built-in map
  for models absent from a partial override (matches buildTokenConfigMap);
  consolidates the duplicated fallback in pricing.ts.

* 🪙 fix: Preserve Granular Breakdown Across Branch Switches

The granular context breakdown lives only in the live on_context_usage
snapshot — a single per-conversation slot, anchored to the latest response
and overwritten by each generation. Switching to a branch generated earlier
this session lost its tool/skill/system rows and fell back to coarse totals.

Retain each generation's finalized snapshot in a per-conversation map keyed
by its branch-unique response id (snapshotsByAnchorFamily). When the live
snapshot is off the viewed branch, walk the branch tail for its deepest
stored anchor and render that breakdown. Bounded by generation count and
cleared on conversation switch; the live/just-generated path is unchanged.

* 🪙 fix: Harden Resume Seeding and Subagent Usage Emission

- useResumableSSE: skip the trailing-output live seed when the resume
  carries a context snapshot; the snapshot's messageTokens already counts
  produced output, so seeding it again inflated usage until the next reset.
- AgentClient subagent emitter: await GenerationJobManager.emitChunk like
  every other caller (it persists before publishing), so a floating promise
  can't race job cleanup and a Redis/publish failure is caught by the
  emitter's try/catch instead of surfacing as an unhandled rejection.

* 🧪 test: Playwright Coverage for Context Breakdown Granularity

Add a test-only data-testid distinguishing the granular snapshot breakdown
(context-breakdown) from the coarse message-history estimate
(context-estimate), then assert granularity in the mock e2e harness:

- renders the granular breakdown from the live on_context_usage snapshot
  (guards that the snapshot event actually reaches the popover, not just the
  usage totals).
- preserves the granular breakdown after switching branches — regenerate to
  overwrite the single live snapshot, switch back, and confirm the rows
  survive via the per-anchor snapshot history map.

Branch regenerate/sibling selectors mirror the existing chat.spec branch test.
All three usage specs pass against the mock pipeline.

* 🪙 fix: Correct Resume Live-Seed, Fallback Re-index, and Subagent Emit Flush

Codex round on the prior commit:

- countTrailingOutputChars now counts only output at the very END of the
  aggregated content (0 when the model paused at a tool call), and the resume
  path always seeds it. The earlier skip-trailing-tool-parts behavior plus the
  skip-seed-when-snapshot gate together over- or under-counted in-flight
  output on resume; one rule fixes both — pre-invoke snapshot budget is never
  double-counted, and genuine in-flight output is no longer dropped.
- useTokenUsage re-indexes from the messages cache on tail change while
  submitting. The cache subscriber is muted during streaming, so without a
  context snapshot (non-agent streams) sumBranch missed the created tail and
  dropped history + prompt until finalize. Bounded — tailId only shifts on
  created/finalize/branch-switch.
- AgentClient tracks subagent usage emit promises and flushes them in
  chatCompletion's finally. The sink fires the emitter without awaiting, and
  resume reads the usage emitChunk persists (HSET), so cleanup must not race
  it or resumed clients miss billed subagent usage.
2026-06-13 19:38:28 -04:00

1182 lines
43 KiB
JavaScript

const { nanoid } = require('nanoid');
const { logger } = require('@librechat/data-schemas');
const {
Tools,
StepTypes,
FileContext,
ErrorTypes,
UsageEvents,
} = require('librechat-data-provider');
const {
GraphEvents,
GraphNodeKeys,
ToolEndHandler,
createContentAggregator,
} = require('@librechat/agents');
const {
sendEvent,
computeUsageCostUSD,
GenerationJobManager,
writeAttachmentEvent,
createToolExecuteHandler,
HOST_FILE_AUTHORING_ARTIFACT_KEY,
isCodeSessionToolName,
} = require('@librechat/api');
const { processFileCitations } = require('~/server/services/Files/Citations');
const { processCodeOutput, runPreviewFinalize } = require('~/server/services/Files/Code/process');
const { saveBase64Image } = require('~/server/services/Files/process');
function isHostFileAuthoringArtifact(artifact) {
return artifact?.[HOST_FILE_AUTHORING_ARTIFACT_KEY] === true;
}
function isCodeArtifactToolOutput(output) {
return isCodeSessionToolName(output.name) || isHostFileAuthoringArtifact(output.artifact);
}
class ModelEndHandler {
/**
* @param {Array<UsageMetadata>} collectedUsage
* @param {Record<string, string> | null} [collectedThoughtSignatures] Map of
* `tool_call_id → thoughtSignature` accumulated across `chat_model_end`
* events. Used to persist Vertex Gemini 3 thought signatures across DB
* round-trips so resumed conversations don't 400 on the next API call.
* Each `model_end` may emit multiple tool calls (one per LLM cycle in a
* tool-using turn); per-id storage preserves the mapping so each tool
* call's signature can be restored onto the right reconstructed
* AIMessage rather than being concentrated on the last one.
* Optional; when `null`, the handler is a no-op for signatures. Non-Vertex
* providers don't emit `additional_kwargs.signatures`, so capture is also
* a no-op for them even when the map is provided.
* @param {(data: Record<string, unknown>) => Promise<void> | void} [emitUsage] Optional
* callback to stream per-call token usage to the client.
*/
constructor(collectedUsage, collectedThoughtSignatures = null, emitUsage = null) {
if (!Array.isArray(collectedUsage)) {
throw new Error('collectedUsage must be an array');
}
this.collectedUsage = collectedUsage;
this.collectedThoughtSignatures = collectedThoughtSignatures;
this.emitUsage = emitUsage;
}
finalize(errorMessage) {
if (!errorMessage) {
return;
}
throw new Error(errorMessage);
}
/**
* @param {string} event
* @param {ModelEndData | undefined} data
* @param {Record<string, unknown> | undefined} metadata
* @param {StandardGraph} graph
* @returns {Promise<void>}
*/
async handle(event, data, metadata, graph) {
if (!graph || !metadata) {
console.warn(`Graph or metadata not found in ${event} event`);
return;
}
/** @type {string | undefined} */
let errorMessage;
try {
const agentContext = graph.getAgentContext(metadata);
if (data?.output?.additional_kwargs?.stop_reason === 'refusal') {
const info = { ...data.output.additional_kwargs };
errorMessage = JSON.stringify({
type: ErrorTypes.REFUSAL,
info,
});
logger.debug(`[ModelEndHandler] Model refused to respond`, {
...info,
userId: metadata.user_id,
messageId: metadata.run_id,
conversationId: metadata.thread_id,
});
}
const usage = data?.output?.usage_metadata;
if (!usage) {
return this.finalize(errorMessage);
}
const modelName = metadata?.ls_model_name || agentContext.clientOptions?.model;
if (modelName) {
usage.model = modelName;
}
if (agentContext.provider) {
usage.provider = agentContext.provider;
}
let taggedUsage = markSummarizationUsage(usage, metadata);
/** Hidden intermediate sequential-agent calls are billed but never shown.
* Tag them non-primary on the COLLECTED usage too (not just the emit) so
* recordCollectedUsage excludes their output from the parent's tokenCount
* and the client folds them into cost/totals only — not the live gauge. */
if (
taggedUsage.usage_type == null &&
!checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node) &&
metadata?.hide_sequential_outputs === true
) {
taggedUsage = { ...taggedUsage, usage_type: 'sequential' };
}
this.collectedUsage.push(taggedUsage);
if (this.emitUsage) {
/** Normalize Anthropic/Bedrock-style top-level cache fields into details */
const cache_creation =
taggedUsage.input_token_details?.cache_creation ??
taggedUsage.cache_creation_input_tokens;
const cache_read =
taggedUsage.input_token_details?.cache_read ?? taggedUsage.cache_read_input_tokens;
try {
await this.emitUsage({
input_tokens: taggedUsage.input_tokens,
output_tokens: taggedUsage.output_tokens,
total_tokens: taggedUsage.total_tokens,
input_token_details:
cache_creation != null || cache_read != null
? { cache_creation, cache_read }
: undefined,
model: taggedUsage.model,
provider: taggedUsage.provider,
usage_type: taggedUsage.usage_type,
runId: metadata?.run_id,
/** Per-run sequence so identical payloads from distinct calls
* stay distinguishable during resume dedupe */
seq: this.collectedUsage.length,
});
} catch (err) {
/** Best-effort telemetry: a failed emit (closed SSE, Redis publish
* error) must not abort the handler before the thought-signature
* capture below, or resumed tool-call requests lose that metadata */
logger.warn('[ModelEndHandler] Failed to emit token usage', err);
}
}
/**
* `additional_kwargs.signatures` is a flat array indexed by response
* part position (text + functionCall interleaved). `tool_calls` is
* just the function calls in their original order. Non-empty
* signatures correspond 1:1 with `tool_calls` in order — see
* `partsToSignatures` in `@langchain/google-common`. Walk both in a
* single pass to map each signature onto the right `tool_call.id`.
*/
const signatures = data?.output?.additional_kwargs?.signatures;
const toolCalls = data?.output?.tool_calls;
if (
this.collectedThoughtSignatures &&
Array.isArray(signatures) &&
Array.isArray(toolCalls)
) {
let toolIdx = 0;
for (const sig of signatures) {
if (typeof sig !== 'string' || sig.length === 0) continue;
if (toolIdx >= toolCalls.length) break;
const id = toolCalls[toolIdx++]?.id;
if (id) this.collectedThoughtSignatures[id] = sig;
}
}
} catch (error) {
logger.error('Error handling model end event:', error);
return this.finalize(errorMessage);
}
}
}
/**
* @deprecated Agent Chain helper
* @param {string | undefined} [last_agent_id]
* @param {string | undefined} [langgraph_node]
* @returns {boolean}
*/
function checkIfLastAgent(last_agent_id, langgraph_node) {
if (!last_agent_id || !langgraph_node) {
return false;
}
return langgraph_node?.endsWith(last_agent_id);
}
/**
* Helper to emit events either to res (standard mode) or to job emitter (resumable mode).
* In Redis mode, awaits the emit to guarantee event ordering (critical for streaming deltas).
* @param {ServerResponse} res - The server response object
* @param {string | null} streamId - The stream ID for resumable mode, or null for standard mode
* @param {Object} eventData - The event data to send
* @returns {Promise<void>}
*/
async function emitEvent(res, streamId, eventData) {
if (streamId) {
await GenerationJobManager.emitChunk(streamId, eventData);
} else {
sendEvent(res, eventData);
}
}
/**
* Maps a {@link SubagentUpdateEvent} phase to the corresponding
* {@link GraphEvents} name that the SDK's `createContentAggregator`
* knows how to consume. Phases that don't carry content (`start`, `stop`,
* `error`) or whose payload doesn't match a handled event (`run_step`
* with an `ON_TOOL_EXECUTE`-shaped batch request rather than a RunStep)
* return `null` so the caller skips them.
* @param {SubagentUpdateEvent} event
* @returns {string | null}
*/
function subagentPhaseToGraphEvent(event) {
switch (event?.phase) {
case 'run_step':
/** `ON_RUN_STEP` and `ON_TOOL_EXECUTE` both forward with phase
* `run_step`; only the former matches the aggregator's RunStep
* schema. Detect by presence of `stepDetails`. */
return event.data?.stepDetails ? GraphEvents.ON_RUN_STEP : null;
case 'run_step_delta':
return GraphEvents.ON_RUN_STEP_DELTA;
case 'run_step_completed':
return GraphEvents.ON_RUN_STEP_COMPLETED;
case 'message_delta':
return GraphEvents.ON_MESSAGE_DELTA;
case 'reasoning_delta':
return GraphEvents.ON_REASONING_DELTA;
default:
return null;
}
}
/**
* Folds a single {@link SubagentUpdateEvent} into the given content
* aggregator. Silent no-op for phases outside the aggregator's domain.
* @param {{ aggregateContent: Function }} aggregator
* @param {SubagentUpdateEvent} event
*/
function feedSubagentAggregator(aggregator, event) {
const graphEvent = subagentPhaseToGraphEvent(event);
if (!graphEvent) return;
aggregator.aggregateContent({ event: graphEvent, data: event.data });
}
/**
* @typedef {Object} ToolExecuteOptions
* @property {(toolNames: string[]) => Promise<{loadedTools: StructuredTool[]}>} loadTools - Function to load tools by name
* @property {Object} configurable - Configurable context for tool invocation
*/
/**
* Get default handlers for stream events.
* @param {Object} options - The options object.
* @param {ServerResponse} options.res - The server response object.
* @param {ContentAggregator} options.aggregateContent - Content aggregator function.
* @param {ToolEndCallback} options.toolEndCallback - Callback to use when tool ends.
* @param {Array<UsageMetadata>} options.collectedUsage - The list of collected usage metadata.
* @param {string | null} [options.streamId] - The stream ID for resumable mode, or null for standard mode.
* @param {ToolExecuteOptions} [options.toolExecuteOptions] - Options for event-driven tool execution.
* @param {UsageCostDeps} [options.usageCost] - Pricing context for authoritative per-event cost.
* @returns {Record<string, t.EventHandler>} The default handlers.
* @throws {Error} If the request is not found.
*/
function getDefaultHandlers({
res,
aggregateContent,
toolEndCallback,
collectedUsage,
collectedThoughtSignatures = null,
streamId = null,
toolExecuteOptions = null,
summarizationOptions = null,
subagentAggregatorsByToolCallId = null,
usageCost = null,
}) {
if (!res || !aggregateContent) {
throw new Error(
`[getDefaultHandlers] Missing required options: res: ${!res}, aggregateContent: ${!aggregateContent}`,
);
}
/**
* Emit a token-usage event, attaching the authoritative per-event USD cost
* when cost display is enabled. The backend is the single source of truth
* for pricing (premium tiers, cache rates) — the client sums these instead
* of re-deriving from base rates.
* @param {Record<string, unknown>} data
*/
const emitTokenUsage = (data) => {
let payload = data;
if (usageCost?.enabled === true && usageCost.pricing) {
try {
payload = {
...data,
cost: computeUsageCostUSD(data, usageCost.pricing, usageCost.endpointTokenConfig),
};
} catch (err) {
logger.warn('[getDefaultHandlers] Failed to compute usage cost', err);
}
}
return emitEvent(res, streamId, { event: UsageEvents.ON_TOKEN_USAGE, data: payload });
};
const handlers = {
[GraphEvents.CHAT_MODEL_END]: new ModelEndHandler(
collectedUsage,
collectedThoughtSignatures,
emitTokenUsage,
),
[GraphEvents.TOOL_END]: new ToolEndHandler(toolEndCallback, logger),
[GraphEvents.ON_RUN_STEP]: {
/**
* Handle ON_RUN_STEP event.
* @param {string} event - The event name.
* @param {StreamEventData} data - The event data.
* @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata.
*/
handle: async (event, data, metadata) => {
aggregateContent({ event, data });
if (data?.stepDetails.type === StepTypes.TOOL_CALLS) {
await emitEvent(res, streamId, { event, data });
} else if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) {
await emitEvent(res, streamId, { event, data });
} else if (!metadata?.hide_sequential_outputs) {
await emitEvent(res, streamId, { event, data });
} else {
const agentName = metadata?.name ?? 'Agent';
const isToolCall = data?.stepDetails.type === StepTypes.TOOL_CALLS;
const action = isToolCall ? 'performing a task...' : 'thinking...';
await emitEvent(res, streamId, {
event: 'on_agent_update',
data: {
runId: metadata?.run_id,
message: `${agentName} is ${action}`,
},
});
}
},
},
[GraphEvents.ON_RUN_STEP_DELTA]: {
/**
* Handle ON_RUN_STEP_DELTA event.
* @param {string} event - The event name.
* @param {StreamEventData} data - The event data.
* @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata.
*/
handle: async (event, data, metadata) => {
aggregateContent({ event, data });
if (data?.delta.type === StepTypes.TOOL_CALLS) {
await emitEvent(res, streamId, { event, data });
} else if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) {
await emitEvent(res, streamId, { event, data });
} else if (!metadata?.hide_sequential_outputs) {
await emitEvent(res, streamId, { event, data });
}
},
},
[GraphEvents.ON_RUN_STEP_COMPLETED]: {
/**
* Handle ON_RUN_STEP_COMPLETED event.
* @param {string} event - The event name.
* @param {StreamEventData & { result: ToolEndData }} data - The event data.
* @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata.
*/
handle: async (event, data, metadata) => {
aggregateContent({ event, data });
if (data?.result != null) {
await emitEvent(res, streamId, { event, data });
} else if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) {
await emitEvent(res, streamId, { event, data });
} else if (!metadata?.hide_sequential_outputs) {
await emitEvent(res, streamId, { event, data });
}
},
},
[GraphEvents.ON_MESSAGE_DELTA]: {
/**
* Handle ON_MESSAGE_DELTA event.
* @param {string} event - The event name.
* @param {StreamEventData} data - The event data.
* @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata.
*/
handle: async (event, data, metadata) => {
aggregateContent({ event, data });
if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) {
await emitEvent(res, streamId, { event, data });
} else if (!metadata?.hide_sequential_outputs) {
await emitEvent(res, streamId, { event, data });
}
},
},
[GraphEvents.ON_REASONING_DELTA]: {
/**
* Handle ON_REASONING_DELTA event.
* @param {string} event - The event name.
* @param {StreamEventData} data - The event data.
* @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata.
*/
handle: async (event, data, metadata) => {
aggregateContent({ event, data });
if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) {
await emitEvent(res, streamId, { event, data });
} else if (!metadata?.hide_sequential_outputs) {
await emitEvent(res, streamId, { event, data });
}
},
},
};
if (toolExecuteOptions) {
handlers[GraphEvents.ON_TOOL_EXECUTE] = createToolExecuteHandler(toolExecuteOptions);
}
handlers[GraphEvents.ON_SUBAGENT_UPDATE] = {
/**
* Forwards subagent progress envelopes to the client stream, and
* (when a caller-owned aggregator map is provided) also folds each
* event into a per-tool-call `createContentAggregator`. The
* resulting `contentParts` are attached to the parent's `subagent`
* tool_call at message-save time so the child's reasoning / tool
* calls / final text survive a page refresh — in-memory Recoil
* atoms alone wouldn't persist that.
*
* Aggregation runs regardless of stream visibility (persistence +
* dialog depend on it), but the SSE forward respects
* `hide_sequential_outputs` the same way `ON_RUN_STEP`,
* `ON_MESSAGE_DELTA`, etc. do — so intermediate agents in a
* sequential chain don't leak their subagent activity when the
* chain is configured to suppress intermediates.
*/
handle: async (event, data, metadata) => {
const isLastAgent = checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node);
const visible = isLastAgent || !metadata?.hide_sequential_outputs;
/**
* Gate BOTH aggregation (persistence) AND streaming on the same
* visibility rule. If we aggregated for a hidden intermediate
* agent, `finalizeSubagentContent` would still attach its
* child's reasoning / tool output to the saved message — so a
* page refresh would reveal activity that was intentionally
* suppressed live. Treat hide_sequential_outputs as a
* consistent "don't record" rule for subagent traces.
*/
if (!visible) return;
if (subagentAggregatorsByToolCallId && data?.parentToolCallId) {
const key = data.parentToolCallId;
let aggregator = subagentAggregatorsByToolCallId.get(key);
if (!aggregator) {
aggregator = createContentAggregator();
subagentAggregatorsByToolCallId.set(key, aggregator);
}
try {
feedSubagentAggregator(aggregator, data);
} catch (err) {
logger.warn(
`[ON_SUBAGENT_UPDATE] Failed to aggregate phase "${data?.phase}" for tool_call ${key}: ${err?.message ?? err}`,
);
}
}
await emitEvent(res, streamId, { event, data });
},
};
if (summarizationOptions?.enabled !== false) {
handlers[GraphEvents.ON_SUMMARIZE_START] = {
handle: async (_event, data) => {
await emitEvent(res, streamId, {
event: GraphEvents.ON_SUMMARIZE_START,
data,
});
},
};
handlers[GraphEvents.ON_SUMMARIZE_DELTA] = {
handle: async (_event, data) => {
aggregateContent({ event: GraphEvents.ON_SUMMARIZE_DELTA, data });
await emitEvent(res, streamId, {
event: GraphEvents.ON_SUMMARIZE_DELTA,
data,
});
},
};
handlers[GraphEvents.ON_SUMMARIZE_COMPLETE] = {
handle: async (_event, data) => {
aggregateContent({ event: GraphEvents.ON_SUMMARIZE_COMPLETE, data });
await emitEvent(res, streamId, {
event: GraphEvents.ON_SUMMARIZE_COMPLETE,
data,
});
},
};
}
handlers[GraphEvents.ON_AGENT_LOG] = { handle: agentLogHandler };
/** Guarded: no-op when the installed @librechat/agents predates the event */
if (GraphEvents.ON_CONTEXT_USAGE) {
handlers[GraphEvents.ON_CONTEXT_USAGE] = {
/**
* Forward per-model-call context usage snapshots to the client,
* honoring the same sequential-agent visibility gate as deltas.
* @param {string} event - The event name.
* @param {StreamEventData} data - The event data.
* @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata.
*/
handle: async (event, data, metadata) => {
if (
checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node) ||
!metadata?.hide_sequential_outputs
) {
await emitEvent(res, streamId, { event, data });
}
},
};
}
return handlers;
}
/**
* Helper to write attachment events either to res or to job emitter.
* Note: Attachments are not order-sensitive like deltas, so fire-and-forget is acceptable.
* @param {ServerResponse} res - The server response object
* @param {string | null} streamId - The stream ID for resumable mode, or null for standard mode
* @param {Object} attachment - The attachment data
*/
function writeAttachment(res, streamId, attachment) {
if (streamId) {
GenerationJobManager.emitChunk(streamId, { event: 'attachment', data: attachment });
} else {
res.write(`event: attachment\ndata: ${JSON.stringify(attachment)}\n\n`);
}
}
/**
* Predicate: is it safe to push an SSE write to the caller right now?
*
* In `streamId` (resumable) mode, writes go to the job emitter and the
* `res` state is irrelevant — always writable.
*
* In standard mode, the caller's `res` must have headers sent (the
* stream has been opened) and not yet be `writableEnded` (the response
* hasn't closed). Writing to a closed stream raises
* `ERR_STREAM_WRITE_AFTER_END`.
*
* Used by deferred preview emits in both `createToolEndCallback`
* (chat-completions) and `createResponsesToolEndCallback` (Open
* Responses) so the gate logic stays in one place. (Comprehensive
* review #3 on PR #12957.)
*/
function isStreamWritable(res, streamId) {
if (streamId) {
return true;
}
return !!res && res.headersSent && !res.writableEnded;
}
/**
* Emit an update for an attachment that was previously sent with
* `status: 'pending'`. Fire-and-forget: if the response stream has
* already closed (the agent finished generating before the deferred
* preview resolved) the frontend's React Query polling on
* `/api/files/:file_id/preview` picks up the resolved record on its
* next tick. Skipping the write in that case avoids
* `ERR_STREAM_WRITE_AFTER_END`.
*
* Reuses the `attachment` SSE event name with a discriminated payload:
* the frontend's `useAttachmentHandler` upserts by `file_id`, so a
* second event with the same id and `status: 'ready' | 'failed'`
* overwrites the pending placeholder in place. No new event type, no
* new client listener.
*
* @param {ServerResponse} res
* @param {string | null} streamId
* @param {Object} attachment - Updated attachment payload (must carry `file_id`).
*/
function writeAttachmentUpdate(res, streamId, attachment) {
if (!isStreamWritable(res, streamId)) {
return;
}
writeAttachment(res, streamId, attachment);
}
/**
*
* @param {Object} params
* @param {ServerRequest} params.req
* @param {ServerResponse} params.res
* @param {Promise<MongoFile | { filename: string; filepath: string; expires: number;} | null>[]} params.artifactPromises
* @param {string | null} [params.streamId] - The stream ID for resumable mode, or null for standard mode.
* @returns {ToolEndCallback} The tool end callback.
*/
function createToolEndCallback({ req, res, artifactPromises, streamId = null }) {
/**
* @type {ToolEndCallback}
*/
return async (data, metadata) => {
const output = data?.output;
if (!output) {
return;
}
if (!output.artifact) {
return;
}
if (output.artifact[Tools.file_search]) {
artifactPromises.push(
(async () => {
const user = req.user;
const attachment = await processFileCitations({
user,
metadata,
appConfig: req.config,
toolArtifact: output.artifact,
toolCallId: output.tool_call_id,
});
if (!attachment) {
return null;
}
if (!streamId && !res.headersSent) {
return attachment;
}
writeAttachment(res, streamId, attachment);
return attachment;
})().catch((error) => {
logger.error('Error processing file citations:', error);
return null;
}),
);
}
if (output.artifact[Tools.ui_resources]) {
artifactPromises.push(
(async () => {
const attachment = {
type: Tools.ui_resources,
messageId: metadata.run_id,
toolCallId: output.tool_call_id,
conversationId: metadata.thread_id,
[Tools.ui_resources]: output.artifact[Tools.ui_resources].data,
};
if (!streamId && !res.headersSent) {
return attachment;
}
writeAttachment(res, streamId, attachment);
return attachment;
})().catch((error) => {
logger.error('Error processing artifact content:', error);
return null;
}),
);
}
if (output.artifact[Tools.web_search]) {
artifactPromises.push(
(async () => {
const attachment = {
type: Tools.web_search,
messageId: metadata.run_id,
toolCallId: output.tool_call_id,
conversationId: metadata.thread_id,
[Tools.web_search]: { ...output.artifact[Tools.web_search] },
};
if (!streamId && !res.headersSent) {
return attachment;
}
writeAttachment(res, streamId, attachment);
return attachment;
})().catch((error) => {
logger.error('Error processing artifact content:', error);
return null;
}),
);
}
if (output.artifact.content) {
/** @type {FormattedContent[]} */
const content = output.artifact.content;
for (let i = 0; i < content.length; i++) {
const part = content[i];
if (!part) {
continue;
}
if (part.type !== 'image_url') {
continue;
}
const { url } = part.image_url;
artifactPromises.push(
(async () => {
const filename = `${output.name}_img_${nanoid()}`;
const file_id = output.artifact.file_ids?.[i];
const file = await saveBase64Image(url, {
req,
file_id,
filename,
endpoint: metadata.provider,
context: FileContext.image_generation,
});
const fileMetadata = Object.assign(file, {
messageId: metadata.run_id,
toolCallId: output.tool_call_id,
conversationId: metadata.thread_id,
});
if (!streamId && !res.headersSent) {
return fileMetadata;
}
if (!fileMetadata) {
return null;
}
writeAttachment(res, streamId, fileMetadata);
return fileMetadata;
})().catch((error) => {
logger.error('Error processing artifact content:', error);
return null;
}),
);
}
return;
}
if (!isCodeArtifactToolOutput(output)) {
return;
}
if (!output.artifact.files) {
return;
}
for (const file of output.artifact.files) {
/* `inherited` files are unchanged passthroughs of inputs the caller
* already owns (skill files, prior session inputs, inherited
* .dirkeep markers). Skip post-processing: re-downloading with the
* user's session key 403s when the file is entity-scoped, and the
* input is already persisted at its origin. They remain available
* to subsequent calls via primeInvokedSkills / session inheritance. */
if (file.inherited) {
continue;
}
const { id, name } = file;
const toolCallId = output.tool_call_id;
artifactPromises.push(
(async () => {
const result = await processCodeOutput({
req,
id,
name,
messageId: metadata.run_id,
toolCallId,
conversationId: metadata.thread_id,
/**
* Use the FILE's `storage_session_id` (storage session),
* not the top-level artifact `session_id` (exec session).
* The codeapi worker reports two distinct ids on a tool
* result:
* - `artifact.session_id` is the EXEC session — the
* sandbox VM that ran the bash command. Files don't
* live there; it's torn down post-execution.
* - `file.storage_session_id` is the STORAGE session —
* the file-server bucket prefix where artifacts
* actually live and are served from.
* `processCodeOutput` builds `/download/{session_id}/{id}`,
* so passing the exec id resolves to a path the file-server
* doesn't know about and 404s. Fall back to artifact-level
* for older worker payloads that may not populate per-file
* ids.
*/
session_id: file.storage_session_id ?? output.artifact.session_id,
});
const fileMetadata = result?.file ?? null;
const finalize = result?.finalize;
if (!fileMetadata) {
return null;
}
/* Initial emit: ship the attachment to the client immediately
* (carries `status: 'pending'` for office buckets so the UI
* shows "preparing preview…"). The agent's response stops
* blocking on extraction here.
*
* Use the shared `isStreamWritable` predicate rather than the
* narrower `streamId || res.headersSent` check that lived
* here before — a client disconnect mid-stream
* (`res.writableEnded`) would otherwise hit `res.write` and
* raise `ERR_STREAM_WRITE_AFTER_END` (caught by the outer
* IIFE catch but logged as noise). Same gate the Responses
* path uses below. */
if (isStreamWritable(res, streamId)) {
writeAttachment(res, streamId, fileMetadata);
}
/* Deferred preview rendering: extraction continues running
* even after the HTTP response closes. If the stream is still
* open when the preview resolves, push an `attachment`
* update event so the UI patches in place; otherwise React
* Query polling on `/api/files/:file_id/preview` picks it up.
*
* Spread the full updated record (mirroring the initial emit
* shape) and overlay `messageId`/`toolCallId` from the
* current run. The DB record preserves the original
* `messageId` across cross-turn filename reuse so
* `getCodeGeneratedFiles` can trace the file back to its
* original assistant message; routing the update SSE by the
* persisted id would land the patch on a stale message
* slot — turn-N's pending placeholder would stay stuck while
* turn-1's already-resolved attachment got re-merged.
* (Codex P1 review on PR #12957.) */
runPreviewFinalize({
finalize,
fileId: fileMetadata.file_id,
previewRevision: result?.previewRevision,
onResolved: (updated) => {
writeAttachmentUpdate(res, streamId, {
...updated,
messageId: metadata.run_id,
toolCallId,
});
},
});
return fileMetadata;
})().catch((error) => {
logger.error('Error processing code output:', error);
return null;
}),
);
}
};
}
/**
* Helper to write attachment events in Open Responses format (librechat:attachment)
* @param {ServerResponse} res - The server response object
* @param {Object} tracker - The response tracker with sequence number
* @param {Object} attachment - The attachment data
* @param {Object} metadata - Additional metadata (messageId, conversationId)
*/
function writeResponsesAttachment(res, tracker, attachment, metadata) {
const sequenceNumber = tracker.nextSequence();
writeAttachmentEvent(res, sequenceNumber, attachment, {
messageId: metadata.run_id,
conversationId: metadata.thread_id,
});
}
/**
* Creates a tool end callback specifically for the Responses API.
* Emits attachments as `librechat:attachment` events per the Open Responses extension spec.
*
* @param {Object} params
* @param {ServerRequest} params.req
* @param {ServerResponse} params.res
* @param {Object} params.tracker - Response tracker with sequence number
* @param {Promise<MongoFile | { filename: string; filepath: string; expires: number;} | null>[]} params.artifactPromises
* @returns {ToolEndCallback} The tool end callback.
*/
function createResponsesToolEndCallback({ req, res, tracker, artifactPromises }) {
/**
* @type {ToolEndCallback}
*/
return async (data, metadata) => {
const output = data?.output;
if (!output) {
return;
}
if (!output.artifact) {
return;
}
if (output.artifact[Tools.file_search]) {
artifactPromises.push(
(async () => {
const user = req.user;
const attachment = await processFileCitations({
user,
metadata,
appConfig: req.config,
toolArtifact: output.artifact,
toolCallId: output.tool_call_id,
});
if (!attachment) {
return null;
}
// For Responses API, emit attachment during streaming
if (res.headersSent && !res.writableEnded) {
writeResponsesAttachment(res, tracker, attachment, metadata);
}
return attachment;
})().catch((error) => {
logger.error('Error processing file citations:', error);
return null;
}),
);
}
if (output.artifact[Tools.ui_resources]) {
artifactPromises.push(
(async () => {
const attachment = {
type: Tools.ui_resources,
toolCallId: output.tool_call_id,
[Tools.ui_resources]: output.artifact[Tools.ui_resources].data,
};
// For Responses API, always emit attachment during streaming
if (res.headersSent && !res.writableEnded) {
writeResponsesAttachment(res, tracker, attachment, metadata);
}
return attachment;
})().catch((error) => {
logger.error('Error processing artifact content:', error);
return null;
}),
);
}
if (output.artifact[Tools.web_search]) {
artifactPromises.push(
(async () => {
const attachment = {
type: Tools.web_search,
toolCallId: output.tool_call_id,
[Tools.web_search]: { ...output.artifact[Tools.web_search] },
};
// For Responses API, always emit attachment during streaming
if (res.headersSent && !res.writableEnded) {
writeResponsesAttachment(res, tracker, attachment, metadata);
}
return attachment;
})().catch((error) => {
logger.error('Error processing artifact content:', error);
return null;
}),
);
}
if (output.artifact.content) {
/** @type {FormattedContent[]} */
const content = output.artifact.content;
for (let i = 0; i < content.length; i++) {
const part = content[i];
if (!part) {
continue;
}
if (part.type !== 'image_url') {
continue;
}
const { url } = part.image_url;
artifactPromises.push(
(async () => {
const filename = `${output.name}_img_${nanoid()}`;
const file_id = output.artifact.file_ids?.[i];
const file = await saveBase64Image(url, {
req,
file_id,
filename,
endpoint: metadata.provider,
context: FileContext.image_generation,
});
const fileMetadata = Object.assign(file, {
toolCallId: output.tool_call_id,
});
if (!fileMetadata) {
return null;
}
// For Responses API, emit attachment during streaming
if (res.headersSent && !res.writableEnded) {
const attachment = {
file_id: fileMetadata.file_id,
filename: fileMetadata.filename,
type: fileMetadata.type,
url: fileMetadata.filepath,
width: fileMetadata.width,
height: fileMetadata.height,
tool_call_id: output.tool_call_id,
};
writeResponsesAttachment(res, tracker, attachment, metadata);
}
return fileMetadata;
})().catch((error) => {
logger.error('Error processing artifact content:', error);
return null;
}),
);
}
return;
}
if (!isCodeArtifactToolOutput(output)) {
return;
}
if (!output.artifact.files) {
return;
}
for (const file of output.artifact.files) {
/* `inherited` files are unchanged passthroughs of inputs the caller
* already owns (skill files, prior session inputs, inherited
* .dirkeep markers). Skip post-processing: re-downloading with the
* user's session key 403s when the file is entity-scoped, and the
* input is already persisted at its origin. They remain available
* to subsequent calls via primeInvokedSkills / session inheritance. */
if (file.inherited) {
continue;
}
const { id, name } = file;
const toolCallId = output.tool_call_id;
artifactPromises.push(
(async () => {
const result = await processCodeOutput({
req,
id,
name,
messageId: metadata.run_id,
toolCallId,
conversationId: metadata.thread_id,
/**
* Use the FILE's `storage_session_id` (storage session),
* not the top-level artifact `session_id` (exec session).
* The codeapi worker reports two distinct ids on a tool
* result:
* - `artifact.session_id` is the EXEC session — the
* sandbox VM that ran the bash command. Files don't
* live there; it's torn down post-execution.
* - `file.storage_session_id` is the STORAGE session —
* the file-server bucket prefix where artifacts
* actually live and are served from.
* `processCodeOutput` builds `/download/{session_id}/{id}`,
* so passing the exec id resolves to a path the file-server
* doesn't know about and 404s. Fall back to artifact-level
* for older worker payloads that may not populate per-file
* ids.
*/
session_id: file.storage_session_id ?? output.artifact.session_id,
});
const fileMetadata = result?.file ?? null;
const finalize = result?.finalize;
if (!fileMetadata) {
return null;
}
/* Initial emit (Open Responses extension format). The agent's
* response no longer blocks on extraction. */
if (isStreamWritable(res, null)) {
writeResponsesAttachment(
res,
tracker,
buildResponsesAttachment(fileMetadata, toolCallId),
metadata,
);
}
/* Deferred preview rendering: extract HTML in the background
* and emit a follow-up `librechat:attachment` with the same
* `file_id` so the client merges the resolved record over the
* pending placeholder. Fire-and-forget — survives response
* close; polling covers the post-close gap. */
runPreviewFinalize({
finalize,
fileId: fileMetadata.file_id,
previewRevision: result?.previewRevision,
onResolved: (updated) => {
if (!isStreamWritable(res, null)) {
return;
}
writeResponsesAttachment(
res,
tracker,
buildResponsesAttachment(updated, toolCallId),
metadata,
);
},
});
return fileMetadata;
})().catch((error) => {
logger.error('Error processing code output:', error);
return null;
}),
);
}
};
}
/**
* Project a file metadata record into the Open Responses attachment
* shape. Mirrors the legacy inline projection but adds `status` and
* `previewError` so deferred preview updates carry the lifecycle
* signal the client uses to upsert by `file_id`.
*/
function buildResponsesAttachment(fileMetadata, toolCallId) {
return {
file_id: fileMetadata.file_id,
filename: fileMetadata.filename,
type: fileMetadata.type,
url: fileMetadata.filepath,
width: fileMetadata.width,
height: fileMetadata.height,
tool_call_id: toolCallId,
text: fileMetadata.text ?? null,
textFormat: fileMetadata.textFormat ?? null,
status: fileMetadata.status,
previewError: fileMetadata.previewError,
};
}
const ALLOWED_LOG_LEVELS = new Set(['debug', 'info', 'warn', 'error']);
function agentLogHandler(_event, data) {
if (!data) {
return;
}
const logFn = ALLOWED_LOG_LEVELS.has(data.level) ? logger[data.level] : logger.debug;
const meta = typeof data.data === 'object' && data.data != null ? data.data : {};
logFn(`[agents:${data.scope ?? 'unknown'}] ${data.message ?? ''}`, {
...meta,
runId: data.runId,
agentId: data.agentId,
});
}
function markSummarizationUsage(usage, metadata) {
const node = metadata?.langgraph_node;
if (typeof node === 'string' && node.startsWith(GraphNodeKeys.SUMMARIZE)) {
return { ...usage, usage_type: 'summarization' };
}
return usage;
}
const agentLogHandlerObj = { handle: agentLogHandler };
/**
* Builds the three summarization SSE event handlers.
* In streaming mode, each event is forwarded to the client via `res.write`.
* In non-streaming mode, the handlers are no-ops.
* @param {{ isStreaming: boolean, res: import('express').Response }} opts
*/
function buildSummarizationHandlers({ isStreaming, res }) {
if (!isStreaming) {
const noop = { handle: () => {} };
return { on_summarize_start: noop, on_summarize_delta: noop, on_summarize_complete: noop };
}
const writeEvent = (name) => ({
handle: async (_event, data) => {
if (!res.writableEnded) {
res.write(`event: ${name}\ndata: ${JSON.stringify(data)}\n\n`);
}
},
});
return {
on_summarize_start: writeEvent('on_summarize_start'),
on_summarize_delta: writeEvent('on_summarize_delta'),
on_summarize_complete: writeEvent('on_summarize_complete'),
};
}
module.exports = {
ModelEndHandler,
agentLogHandler,
agentLogHandlerObj,
getDefaultHandlers,
createToolEndCallback,
isStreamWritable,
markSummarizationUsage,
buildSummarizationHandlers,
createResponsesToolEndCallback,
};