mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-05-13 16:07:30 +00:00
When a tool round-trip is interrupted between the tool result and the
model's text reply (user aborted, network drop, pod restart, ...) and
LibreChat persists the partial assistant message, the next conversation
turn reconstructs an `AIMessage` from `formatAgentMessages` that has
`tool_calls` populated but no `additional_kwargs.signatures`. Vertex
Gemini 3 rejects the resumed request with 400 because the most recent
historical functionCall has no `thought_signature`.
## Storage shape
Capture as `Record<tool_call_id, signature>` rather than a flat array.
This addresses the codex P1 review:
> When an assistant turn contains multiple sequential tool-call batches,
> this restoration path writes all persisted thoughtSignatures onto only
> the last tool-bearing AIMessage. Vertex/Gemini validates signatures
> for each step in the current tool-calling turn, so earlier
> functionCall steps reconstructed without their signature can still
> fail with 400.
A single agent run can fire multiple `chat_model_end` events when the
loop cycles the LLM with intervening tool results — each cycle owns a
distinct `tool_call_id`. Per-id storage maps each signature back onto
the right reconstructed `AIMessage`, not just the last one.
## Mapping
`additional_kwargs.signatures` is a flat array indexed by *response part*
(text + functionCall interleaved). `tool_calls` is just the function
calls in their original order. Non-empty signatures correspond 1:1 with
tool_calls in order — see `partsToSignatures` in
`@langchain/google-common`. Single-pass walk maps `signatures[i]` (when
non-empty) onto the i-th `tool_call.id`.
## Pipeline
| Stage | File | Change |
|---|---|---|
| Capture | callbacks.js | `ModelEndHandler` accepts `Record<string,string>` map; walks signatures + tool_calls in tandem to record per-id. Gated on the map being provided — non-Vertex flows are no-op (and also no-op even when provided, since they don't emit signatures). |
| Plumbing | initialize.js | Allocate `collectedThoughtSignatures = {}`, share with handler + client. Always allocated; the JSDoc explicitly documents that it stays empty for non-Vertex providers. |
| Surface | client.js | `sendCompletion` returns `metadata.thoughtSignatures` when the map has entries; falls through unchanged when empty. |
| Persist | (existing BaseClient.handleRespCompletion) | Writes `metadata` from `sendCompletion` onto `responseMessage.metadata`. Mongoose `Mixed` — no migration. |
| Restore | formatMessages.js | Track every tool-bearing AIMessage produced from a TMessage. For each, build a position-aligned `additional_kwargs.signatures` array (empty placeholders for tool_calls without a stored sig). Agents' `fixThoughtSignatures` dispatches non-empty entries to functionCall parts in order. |
## Live verification
- **Single-step:** real Vertex `gemini-3.1-flash-lite-preview` resume-after-tool case. With fix ✅ / without ❌ 400.
- **Multi-step (codex case):** real two-step agent loop (list /tmp → echo done). Each step's signature attaches to its own reconstructed AIMessage. With fix ✅ / without ❌ 400.
- **Cross-provider:** Anthropic Claude haiku-4.5 + OpenAI gpt-5-mini accept the persisted/restored shape unchanged.
## Tests
`modelEndHandler.spec.js` (new) — 6 tests:
- maps non-empty signatures onto tool_call_ids in order
- accumulates per-id across multiple `model_end` events (multi-step)
- no-op when `collectedThoughtSignatures` is null
- no-op when `signatures` field missing (non-Vertex)
- no-op when `tool_calls` missing
- preserves existing `collectedUsage` array contract
`formatAgentMessages.spec.js` — 6 new tests:
- restores onto the AIMessage that owns the tool_call
- per-step attachment for multi-step turns (codex review case)
- preserves tool_call ordering when signatures are partial
- no-op when metadata.thoughtSignatures absent
- no-op when assistant has no tool_calls
- no-op when stored ids don't match any current tool_call
37 passing across 3 suites; 15 existing formatAgentMessages tests unchanged.
## Compatibility
- Backward-compatible — restore gated on `metadata.thoughtSignatures` being a populated object; capture gated on the map being provided.
- No schema migration — uses `Message.metadata: Mixed` already in place.
- Cross-provider safe — non-Vertex providers tolerate the field (verified live against Anthropic + OpenAI converters).
- Pairs with [agents#159](https://github.com/danny-avila/agents/pull/159) for full coverage on histories that mix plain-text and toolcall AIMessages.
1072 lines
39 KiB
JavaScript
1072 lines
39 KiB
JavaScript
const { nanoid } = require('nanoid');
|
|
const { logger } = require('@librechat/data-schemas');
|
|
const { Tools, StepTypes, FileContext, ErrorTypes } = require('librechat-data-provider');
|
|
const {
|
|
GraphEvents,
|
|
GraphNodeKeys,
|
|
ToolEndHandler,
|
|
CODE_EXECUTION_TOOLS,
|
|
createContentAggregator,
|
|
} = require('@librechat/agents');
|
|
const {
|
|
sendEvent,
|
|
GenerationJobManager,
|
|
writeAttachmentEvent,
|
|
createToolExecuteHandler,
|
|
} = require('@librechat/api');
|
|
const { processFileCitations } = require('~/server/services/Files/Citations');
|
|
const { processCodeOutput, runPreviewFinalize } = require('~/server/services/Files/Code/process');
|
|
const { saveBase64Image } = require('~/server/services/Files/process');
|
|
|
|
class ModelEndHandler {
|
|
/**
|
|
* @param {Array<UsageMetadata>} collectedUsage
|
|
* @param {Record<string, string> | null} [collectedThoughtSignatures] Map of
|
|
* `tool_call_id → thoughtSignature` accumulated across `chat_model_end`
|
|
* events. Used to persist Vertex Gemini 3 thought signatures across DB
|
|
* round-trips so resumed conversations don't 400 on the next API call.
|
|
* Each `model_end` may emit multiple tool calls (one per LLM cycle in a
|
|
* tool-using turn); per-id storage preserves the mapping so each tool
|
|
* call's signature can be restored onto the right reconstructed
|
|
* AIMessage rather than being concentrated on the last one.
|
|
* Optional; when `null`, the handler is a no-op for signatures. Non-Vertex
|
|
* providers don't emit `additional_kwargs.signatures`, so capture is also
|
|
* a no-op for them even when the map is provided.
|
|
*/
|
|
constructor(collectedUsage, collectedThoughtSignatures = null) {
|
|
if (!Array.isArray(collectedUsage)) {
|
|
throw new Error('collectedUsage must be an array');
|
|
}
|
|
this.collectedUsage = collectedUsage;
|
|
this.collectedThoughtSignatures = collectedThoughtSignatures;
|
|
}
|
|
|
|
finalize(errorMessage) {
|
|
if (!errorMessage) {
|
|
return;
|
|
}
|
|
throw new Error(errorMessage);
|
|
}
|
|
|
|
/**
|
|
* @param {string} event
|
|
* @param {ModelEndData | undefined} data
|
|
* @param {Record<string, unknown> | undefined} metadata
|
|
* @param {StandardGraph} graph
|
|
* @returns {Promise<void>}
|
|
*/
|
|
async handle(event, data, metadata, graph) {
|
|
if (!graph || !metadata) {
|
|
console.warn(`Graph or metadata not found in ${event} event`);
|
|
return;
|
|
}
|
|
|
|
/** @type {string | undefined} */
|
|
let errorMessage;
|
|
try {
|
|
const agentContext = graph.getAgentContext(metadata);
|
|
if (data?.output?.additional_kwargs?.stop_reason === 'refusal') {
|
|
const info = { ...data.output.additional_kwargs };
|
|
errorMessage = JSON.stringify({
|
|
type: ErrorTypes.REFUSAL,
|
|
info,
|
|
});
|
|
logger.debug(`[ModelEndHandler] Model refused to respond`, {
|
|
...info,
|
|
userId: metadata.user_id,
|
|
messageId: metadata.run_id,
|
|
conversationId: metadata.thread_id,
|
|
});
|
|
}
|
|
|
|
const usage = data?.output?.usage_metadata;
|
|
if (!usage) {
|
|
return this.finalize(errorMessage);
|
|
}
|
|
const modelName = metadata?.ls_model_name || agentContext.clientOptions?.model;
|
|
if (modelName) {
|
|
usage.model = modelName;
|
|
}
|
|
if (agentContext.provider) {
|
|
usage.provider = agentContext.provider;
|
|
}
|
|
|
|
const taggedUsage = markSummarizationUsage(usage, metadata);
|
|
|
|
this.collectedUsage.push(taggedUsage);
|
|
|
|
/**
|
|
* `additional_kwargs.signatures` is a flat array indexed by response
|
|
* part position (text + functionCall interleaved). `tool_calls` is
|
|
* just the function calls in their original order. Non-empty
|
|
* signatures correspond 1:1 with `tool_calls` in order — see
|
|
* `partsToSignatures` in `@langchain/google-common`. Walk both in a
|
|
* single pass to map each signature onto the right `tool_call.id`.
|
|
*/
|
|
const signatures = data?.output?.additional_kwargs?.signatures;
|
|
const toolCalls = data?.output?.tool_calls;
|
|
if (
|
|
this.collectedThoughtSignatures &&
|
|
Array.isArray(signatures) &&
|
|
Array.isArray(toolCalls)
|
|
) {
|
|
let toolIdx = 0;
|
|
for (const sig of signatures) {
|
|
if (typeof sig !== 'string' || sig.length === 0) continue;
|
|
if (toolIdx >= toolCalls.length) break;
|
|
const id = toolCalls[toolIdx++]?.id;
|
|
if (id) this.collectedThoughtSignatures[id] = sig;
|
|
}
|
|
}
|
|
} catch (error) {
|
|
logger.error('Error handling model end event:', error);
|
|
return this.finalize(errorMessage);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @deprecated Agent Chain helper
|
|
* @param {string | undefined} [last_agent_id]
|
|
* @param {string | undefined} [langgraph_node]
|
|
* @returns {boolean}
|
|
*/
|
|
function checkIfLastAgent(last_agent_id, langgraph_node) {
|
|
if (!last_agent_id || !langgraph_node) {
|
|
return false;
|
|
}
|
|
return langgraph_node?.endsWith(last_agent_id);
|
|
}
|
|
|
|
/**
|
|
* Helper to emit events either to res (standard mode) or to job emitter (resumable mode).
|
|
* In Redis mode, awaits the emit to guarantee event ordering (critical for streaming deltas).
|
|
* @param {ServerResponse} res - The server response object
|
|
* @param {string | null} streamId - The stream ID for resumable mode, or null for standard mode
|
|
* @param {Object} eventData - The event data to send
|
|
* @returns {Promise<void>}
|
|
*/
|
|
async function emitEvent(res, streamId, eventData) {
|
|
if (streamId) {
|
|
await GenerationJobManager.emitChunk(streamId, eventData);
|
|
} else {
|
|
sendEvent(res, eventData);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Maps a {@link SubagentUpdateEvent} phase to the corresponding
|
|
* {@link GraphEvents} name that the SDK's `createContentAggregator`
|
|
* knows how to consume. Phases that don't carry content (`start`, `stop`,
|
|
* `error`) or whose payload doesn't match a handled event (`run_step`
|
|
* with an `ON_TOOL_EXECUTE`-shaped batch request rather than a RunStep)
|
|
* return `null` so the caller skips them.
|
|
* @param {SubagentUpdateEvent} event
|
|
* @returns {string | null}
|
|
*/
|
|
function subagentPhaseToGraphEvent(event) {
|
|
switch (event?.phase) {
|
|
case 'run_step':
|
|
/** `ON_RUN_STEP` and `ON_TOOL_EXECUTE` both forward with phase
|
|
* `run_step`; only the former matches the aggregator's RunStep
|
|
* schema. Detect by presence of `stepDetails`. */
|
|
return event.data?.stepDetails ? GraphEvents.ON_RUN_STEP : null;
|
|
case 'run_step_delta':
|
|
return GraphEvents.ON_RUN_STEP_DELTA;
|
|
case 'run_step_completed':
|
|
return GraphEvents.ON_RUN_STEP_COMPLETED;
|
|
case 'message_delta':
|
|
return GraphEvents.ON_MESSAGE_DELTA;
|
|
case 'reasoning_delta':
|
|
return GraphEvents.ON_REASONING_DELTA;
|
|
default:
|
|
return null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Folds a single {@link SubagentUpdateEvent} into the given content
|
|
* aggregator. Silent no-op for phases outside the aggregator's domain.
|
|
* @param {{ aggregateContent: Function }} aggregator
|
|
* @param {SubagentUpdateEvent} event
|
|
*/
|
|
function feedSubagentAggregator(aggregator, event) {
|
|
const graphEvent = subagentPhaseToGraphEvent(event);
|
|
if (!graphEvent) return;
|
|
aggregator.aggregateContent({ event: graphEvent, data: event.data });
|
|
}
|
|
|
|
/**
|
|
* @typedef {Object} ToolExecuteOptions
|
|
* @property {(toolNames: string[]) => Promise<{loadedTools: StructuredTool[]}>} loadTools - Function to load tools by name
|
|
* @property {Object} configurable - Configurable context for tool invocation
|
|
*/
|
|
|
|
/**
|
|
* Get default handlers for stream events.
|
|
* @param {Object} options - The options object.
|
|
* @param {ServerResponse} options.res - The server response object.
|
|
* @param {ContentAggregator} options.aggregateContent - Content aggregator function.
|
|
* @param {ToolEndCallback} options.toolEndCallback - Callback to use when tool ends.
|
|
* @param {Array<UsageMetadata>} options.collectedUsage - The list of collected usage metadata.
|
|
* @param {string | null} [options.streamId] - The stream ID for resumable mode, or null for standard mode.
|
|
* @param {ToolExecuteOptions} [options.toolExecuteOptions] - Options for event-driven tool execution.
|
|
* @returns {Record<string, t.EventHandler>} The default handlers.
|
|
* @throws {Error} If the request is not found.
|
|
*/
|
|
function getDefaultHandlers({
|
|
res,
|
|
aggregateContent,
|
|
toolEndCallback,
|
|
collectedUsage,
|
|
collectedThoughtSignatures = null,
|
|
streamId = null,
|
|
toolExecuteOptions = null,
|
|
summarizationOptions = null,
|
|
subagentAggregatorsByToolCallId = null,
|
|
}) {
|
|
if (!res || !aggregateContent) {
|
|
throw new Error(
|
|
`[getDefaultHandlers] Missing required options: res: ${!res}, aggregateContent: ${!aggregateContent}`,
|
|
);
|
|
}
|
|
const handlers = {
|
|
[GraphEvents.CHAT_MODEL_END]: new ModelEndHandler(collectedUsage, collectedThoughtSignatures),
|
|
[GraphEvents.TOOL_END]: new ToolEndHandler(toolEndCallback, logger),
|
|
[GraphEvents.ON_RUN_STEP]: {
|
|
/**
|
|
* Handle ON_RUN_STEP event.
|
|
* @param {string} event - The event name.
|
|
* @param {StreamEventData} data - The event data.
|
|
* @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata.
|
|
*/
|
|
handle: async (event, data, metadata) => {
|
|
aggregateContent({ event, data });
|
|
if (data?.stepDetails.type === StepTypes.TOOL_CALLS) {
|
|
await emitEvent(res, streamId, { event, data });
|
|
} else if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) {
|
|
await emitEvent(res, streamId, { event, data });
|
|
} else if (!metadata?.hide_sequential_outputs) {
|
|
await emitEvent(res, streamId, { event, data });
|
|
} else {
|
|
const agentName = metadata?.name ?? 'Agent';
|
|
const isToolCall = data?.stepDetails.type === StepTypes.TOOL_CALLS;
|
|
const action = isToolCall ? 'performing a task...' : 'thinking...';
|
|
await emitEvent(res, streamId, {
|
|
event: 'on_agent_update',
|
|
data: {
|
|
runId: metadata?.run_id,
|
|
message: `${agentName} is ${action}`,
|
|
},
|
|
});
|
|
}
|
|
},
|
|
},
|
|
[GraphEvents.ON_RUN_STEP_DELTA]: {
|
|
/**
|
|
* Handle ON_RUN_STEP_DELTA event.
|
|
* @param {string} event - The event name.
|
|
* @param {StreamEventData} data - The event data.
|
|
* @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata.
|
|
*/
|
|
handle: async (event, data, metadata) => {
|
|
aggregateContent({ event, data });
|
|
if (data?.delta.type === StepTypes.TOOL_CALLS) {
|
|
await emitEvent(res, streamId, { event, data });
|
|
} else if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) {
|
|
await emitEvent(res, streamId, { event, data });
|
|
} else if (!metadata?.hide_sequential_outputs) {
|
|
await emitEvent(res, streamId, { event, data });
|
|
}
|
|
},
|
|
},
|
|
[GraphEvents.ON_RUN_STEP_COMPLETED]: {
|
|
/**
|
|
* Handle ON_RUN_STEP_COMPLETED event.
|
|
* @param {string} event - The event name.
|
|
* @param {StreamEventData & { result: ToolEndData }} data - The event data.
|
|
* @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata.
|
|
*/
|
|
handle: async (event, data, metadata) => {
|
|
aggregateContent({ event, data });
|
|
if (data?.result != null) {
|
|
await emitEvent(res, streamId, { event, data });
|
|
} else if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) {
|
|
await emitEvent(res, streamId, { event, data });
|
|
} else if (!metadata?.hide_sequential_outputs) {
|
|
await emitEvent(res, streamId, { event, data });
|
|
}
|
|
},
|
|
},
|
|
[GraphEvents.ON_MESSAGE_DELTA]: {
|
|
/**
|
|
* Handle ON_MESSAGE_DELTA event.
|
|
* @param {string} event - The event name.
|
|
* @param {StreamEventData} data - The event data.
|
|
* @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata.
|
|
*/
|
|
handle: async (event, data, metadata) => {
|
|
aggregateContent({ event, data });
|
|
if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) {
|
|
await emitEvent(res, streamId, { event, data });
|
|
} else if (!metadata?.hide_sequential_outputs) {
|
|
await emitEvent(res, streamId, { event, data });
|
|
}
|
|
},
|
|
},
|
|
[GraphEvents.ON_REASONING_DELTA]: {
|
|
/**
|
|
* Handle ON_REASONING_DELTA event.
|
|
* @param {string} event - The event name.
|
|
* @param {StreamEventData} data - The event data.
|
|
* @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata.
|
|
*/
|
|
handle: async (event, data, metadata) => {
|
|
aggregateContent({ event, data });
|
|
if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) {
|
|
await emitEvent(res, streamId, { event, data });
|
|
} else if (!metadata?.hide_sequential_outputs) {
|
|
await emitEvent(res, streamId, { event, data });
|
|
}
|
|
},
|
|
},
|
|
};
|
|
|
|
if (toolExecuteOptions) {
|
|
handlers[GraphEvents.ON_TOOL_EXECUTE] = createToolExecuteHandler(toolExecuteOptions);
|
|
}
|
|
|
|
handlers[GraphEvents.ON_SUBAGENT_UPDATE] = {
|
|
/**
|
|
* Forwards subagent progress envelopes to the client stream, and
|
|
* (when a caller-owned aggregator map is provided) also folds each
|
|
* event into a per-tool-call `createContentAggregator`. The
|
|
* resulting `contentParts` are attached to the parent's `subagent`
|
|
* tool_call at message-save time so the child's reasoning / tool
|
|
* calls / final text survive a page refresh — in-memory Recoil
|
|
* atoms alone wouldn't persist that.
|
|
*
|
|
* Aggregation runs regardless of stream visibility (persistence +
|
|
* dialog depend on it), but the SSE forward respects
|
|
* `hide_sequential_outputs` the same way `ON_RUN_STEP`,
|
|
* `ON_MESSAGE_DELTA`, etc. do — so intermediate agents in a
|
|
* sequential chain don't leak their subagent activity when the
|
|
* chain is configured to suppress intermediates.
|
|
*/
|
|
handle: async (event, data, metadata) => {
|
|
const isLastAgent = checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node);
|
|
const visible = isLastAgent || !metadata?.hide_sequential_outputs;
|
|
/**
|
|
* Gate BOTH aggregation (persistence) AND streaming on the same
|
|
* visibility rule. If we aggregated for a hidden intermediate
|
|
* agent, `finalizeSubagentContent` would still attach its
|
|
* child's reasoning / tool output to the saved message — so a
|
|
* page refresh would reveal activity that was intentionally
|
|
* suppressed live. Treat hide_sequential_outputs as a
|
|
* consistent "don't record" rule for subagent traces.
|
|
*/
|
|
if (!visible) return;
|
|
if (subagentAggregatorsByToolCallId && data?.parentToolCallId) {
|
|
const key = data.parentToolCallId;
|
|
let aggregator = subagentAggregatorsByToolCallId.get(key);
|
|
if (!aggregator) {
|
|
aggregator = createContentAggregator();
|
|
subagentAggregatorsByToolCallId.set(key, aggregator);
|
|
}
|
|
try {
|
|
feedSubagentAggregator(aggregator, data);
|
|
} catch (err) {
|
|
logger.warn(
|
|
`[ON_SUBAGENT_UPDATE] Failed to aggregate phase "${data?.phase}" for tool_call ${key}: ${err?.message ?? err}`,
|
|
);
|
|
}
|
|
}
|
|
await emitEvent(res, streamId, { event, data });
|
|
},
|
|
};
|
|
|
|
if (summarizationOptions?.enabled !== false) {
|
|
handlers[GraphEvents.ON_SUMMARIZE_START] = {
|
|
handle: async (_event, data) => {
|
|
await emitEvent(res, streamId, {
|
|
event: GraphEvents.ON_SUMMARIZE_START,
|
|
data,
|
|
});
|
|
},
|
|
};
|
|
handlers[GraphEvents.ON_SUMMARIZE_DELTA] = {
|
|
handle: async (_event, data) => {
|
|
aggregateContent({ event: GraphEvents.ON_SUMMARIZE_DELTA, data });
|
|
await emitEvent(res, streamId, {
|
|
event: GraphEvents.ON_SUMMARIZE_DELTA,
|
|
data,
|
|
});
|
|
},
|
|
};
|
|
handlers[GraphEvents.ON_SUMMARIZE_COMPLETE] = {
|
|
handle: async (_event, data) => {
|
|
aggregateContent({ event: GraphEvents.ON_SUMMARIZE_COMPLETE, data });
|
|
await emitEvent(res, streamId, {
|
|
event: GraphEvents.ON_SUMMARIZE_COMPLETE,
|
|
data,
|
|
});
|
|
},
|
|
};
|
|
}
|
|
|
|
handlers[GraphEvents.ON_AGENT_LOG] = { handle: agentLogHandler };
|
|
|
|
return handlers;
|
|
}
|
|
|
|
/**
|
|
* Helper to write attachment events either to res or to job emitter.
|
|
* Note: Attachments are not order-sensitive like deltas, so fire-and-forget is acceptable.
|
|
* @param {ServerResponse} res - The server response object
|
|
* @param {string | null} streamId - The stream ID for resumable mode, or null for standard mode
|
|
* @param {Object} attachment - The attachment data
|
|
*/
|
|
function writeAttachment(res, streamId, attachment) {
|
|
if (streamId) {
|
|
GenerationJobManager.emitChunk(streamId, { event: 'attachment', data: attachment });
|
|
} else {
|
|
res.write(`event: attachment\ndata: ${JSON.stringify(attachment)}\n\n`);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Predicate: is it safe to push an SSE write to the caller right now?
|
|
*
|
|
* In `streamId` (resumable) mode, writes go to the job emitter and the
|
|
* `res` state is irrelevant — always writable.
|
|
*
|
|
* In standard mode, the caller's `res` must have headers sent (the
|
|
* stream has been opened) and not yet be `writableEnded` (the response
|
|
* hasn't closed). Writing to a closed stream raises
|
|
* `ERR_STREAM_WRITE_AFTER_END`.
|
|
*
|
|
* Used by deferred preview emits in both `createToolEndCallback`
|
|
* (chat-completions) and `createResponsesToolEndCallback` (Open
|
|
* Responses) so the gate logic stays in one place. (Comprehensive
|
|
* review #3 on PR #12957.)
|
|
*/
|
|
function isStreamWritable(res, streamId) {
|
|
if (streamId) {
|
|
return true;
|
|
}
|
|
return !!res && res.headersSent && !res.writableEnded;
|
|
}
|
|
|
|
/**
|
|
* Emit an update for an attachment that was previously sent with
|
|
* `status: 'pending'`. Fire-and-forget: if the response stream has
|
|
* already closed (the agent finished generating before the deferred
|
|
* preview resolved) the frontend's React Query polling on
|
|
* `/api/files/:file_id/preview` picks up the resolved record on its
|
|
* next tick. Skipping the write in that case avoids
|
|
* `ERR_STREAM_WRITE_AFTER_END`.
|
|
*
|
|
* Reuses the `attachment` SSE event name with a discriminated payload:
|
|
* the frontend's `useAttachmentHandler` upserts by `file_id`, so a
|
|
* second event with the same id and `status: 'ready' | 'failed'`
|
|
* overwrites the pending placeholder in place. No new event type, no
|
|
* new client listener.
|
|
*
|
|
* @param {ServerResponse} res
|
|
* @param {string | null} streamId
|
|
* @param {Object} attachment - Updated attachment payload (must carry `file_id`).
|
|
*/
|
|
function writeAttachmentUpdate(res, streamId, attachment) {
|
|
if (!isStreamWritable(res, streamId)) {
|
|
return;
|
|
}
|
|
writeAttachment(res, streamId, attachment);
|
|
}
|
|
|
|
/**
|
|
*
|
|
* @param {Object} params
|
|
* @param {ServerRequest} params.req
|
|
* @param {ServerResponse} params.res
|
|
* @param {Promise<MongoFile | { filename: string; filepath: string; expires: number;} | null>[]} params.artifactPromises
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode, or null for standard mode.
|
|
* @returns {ToolEndCallback} The tool end callback.
|
|
*/
|
|
function createToolEndCallback({ req, res, artifactPromises, streamId = null }) {
|
|
/**
|
|
* @type {ToolEndCallback}
|
|
*/
|
|
return async (data, metadata) => {
|
|
const output = data?.output;
|
|
if (!output) {
|
|
return;
|
|
}
|
|
|
|
if (!output.artifact) {
|
|
return;
|
|
}
|
|
|
|
if (output.artifact[Tools.file_search]) {
|
|
artifactPromises.push(
|
|
(async () => {
|
|
const user = req.user;
|
|
const attachment = await processFileCitations({
|
|
user,
|
|
metadata,
|
|
appConfig: req.config,
|
|
toolArtifact: output.artifact,
|
|
toolCallId: output.tool_call_id,
|
|
});
|
|
if (!attachment) {
|
|
return null;
|
|
}
|
|
if (!streamId && !res.headersSent) {
|
|
return attachment;
|
|
}
|
|
writeAttachment(res, streamId, attachment);
|
|
return attachment;
|
|
})().catch((error) => {
|
|
logger.error('Error processing file citations:', error);
|
|
return null;
|
|
}),
|
|
);
|
|
}
|
|
|
|
if (output.artifact[Tools.ui_resources]) {
|
|
artifactPromises.push(
|
|
(async () => {
|
|
const attachment = {
|
|
type: Tools.ui_resources,
|
|
messageId: metadata.run_id,
|
|
toolCallId: output.tool_call_id,
|
|
conversationId: metadata.thread_id,
|
|
[Tools.ui_resources]: output.artifact[Tools.ui_resources].data,
|
|
};
|
|
if (!streamId && !res.headersSent) {
|
|
return attachment;
|
|
}
|
|
writeAttachment(res, streamId, attachment);
|
|
return attachment;
|
|
})().catch((error) => {
|
|
logger.error('Error processing artifact content:', error);
|
|
return null;
|
|
}),
|
|
);
|
|
}
|
|
|
|
if (output.artifact[Tools.web_search]) {
|
|
artifactPromises.push(
|
|
(async () => {
|
|
const attachment = {
|
|
type: Tools.web_search,
|
|
messageId: metadata.run_id,
|
|
toolCallId: output.tool_call_id,
|
|
conversationId: metadata.thread_id,
|
|
[Tools.web_search]: { ...output.artifact[Tools.web_search] },
|
|
};
|
|
if (!streamId && !res.headersSent) {
|
|
return attachment;
|
|
}
|
|
writeAttachment(res, streamId, attachment);
|
|
return attachment;
|
|
})().catch((error) => {
|
|
logger.error('Error processing artifact content:', error);
|
|
return null;
|
|
}),
|
|
);
|
|
}
|
|
|
|
if (output.artifact.content) {
|
|
/** @type {FormattedContent[]} */
|
|
const content = output.artifact.content;
|
|
for (let i = 0; i < content.length; i++) {
|
|
const part = content[i];
|
|
if (!part) {
|
|
continue;
|
|
}
|
|
if (part.type !== 'image_url') {
|
|
continue;
|
|
}
|
|
const { url } = part.image_url;
|
|
artifactPromises.push(
|
|
(async () => {
|
|
const filename = `${output.name}_img_${nanoid()}`;
|
|
const file_id = output.artifact.file_ids?.[i];
|
|
const file = await saveBase64Image(url, {
|
|
req,
|
|
file_id,
|
|
filename,
|
|
endpoint: metadata.provider,
|
|
context: FileContext.image_generation,
|
|
});
|
|
const fileMetadata = Object.assign(file, {
|
|
messageId: metadata.run_id,
|
|
toolCallId: output.tool_call_id,
|
|
conversationId: metadata.thread_id,
|
|
});
|
|
if (!streamId && !res.headersSent) {
|
|
return fileMetadata;
|
|
}
|
|
|
|
if (!fileMetadata) {
|
|
return null;
|
|
}
|
|
|
|
writeAttachment(res, streamId, fileMetadata);
|
|
return fileMetadata;
|
|
})().catch((error) => {
|
|
logger.error('Error processing artifact content:', error);
|
|
return null;
|
|
}),
|
|
);
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (!CODE_EXECUTION_TOOLS.has(output.name)) {
|
|
return;
|
|
}
|
|
|
|
if (!output.artifact.files) {
|
|
return;
|
|
}
|
|
|
|
for (const file of output.artifact.files) {
|
|
/* `inherited` files are unchanged passthroughs of inputs the caller
|
|
* already owns (skill files, prior session inputs, inherited
|
|
* .dirkeep markers). Skip post-processing: re-downloading with the
|
|
* user's session key 403s when the file is entity-scoped, and the
|
|
* input is already persisted at its origin. They remain available
|
|
* to subsequent calls via primeInvokedSkills / session inheritance. */
|
|
if (file.inherited) {
|
|
continue;
|
|
}
|
|
const { id, name } = file;
|
|
const toolCallId = output.tool_call_id;
|
|
artifactPromises.push(
|
|
(async () => {
|
|
const result = await processCodeOutput({
|
|
req,
|
|
id,
|
|
name,
|
|
messageId: metadata.run_id,
|
|
toolCallId,
|
|
conversationId: metadata.thread_id,
|
|
/**
|
|
* Use the FILE's `storage_session_id` (storage session),
|
|
* not the top-level artifact `session_id` (exec session).
|
|
* The codeapi worker reports two distinct ids on a tool
|
|
* result:
|
|
* - `artifact.session_id` is the EXEC session — the
|
|
* sandbox VM that ran the bash command. Files don't
|
|
* live there; it's torn down post-execution.
|
|
* - `file.storage_session_id` is the STORAGE session —
|
|
* the file-server bucket prefix where artifacts
|
|
* actually live and are served from.
|
|
* `processCodeOutput` builds `/download/{session_id}/{id}`,
|
|
* so passing the exec id resolves to a path the file-server
|
|
* doesn't know about and 404s. Fall back to artifact-level
|
|
* for older worker payloads that may not populate per-file
|
|
* ids.
|
|
*/
|
|
session_id: file.storage_session_id ?? output.artifact.session_id,
|
|
});
|
|
const fileMetadata = result?.file ?? null;
|
|
const finalize = result?.finalize;
|
|
if (!fileMetadata) {
|
|
return null;
|
|
}
|
|
/* Initial emit: ship the attachment to the client immediately
|
|
* (carries `status: 'pending'` for office buckets so the UI
|
|
* shows "preparing preview…"). The agent's response stops
|
|
* blocking on extraction here.
|
|
*
|
|
* Use the shared `isStreamWritable` predicate rather than the
|
|
* narrower `streamId || res.headersSent` check that lived
|
|
* here before — a client disconnect mid-stream
|
|
* (`res.writableEnded`) would otherwise hit `res.write` and
|
|
* raise `ERR_STREAM_WRITE_AFTER_END` (caught by the outer
|
|
* IIFE catch but logged as noise). Same gate the Responses
|
|
* path uses below. */
|
|
if (isStreamWritable(res, streamId)) {
|
|
writeAttachment(res, streamId, fileMetadata);
|
|
}
|
|
/* Deferred preview rendering: extraction continues running
|
|
* even after the HTTP response closes. If the stream is still
|
|
* open when the preview resolves, push an `attachment`
|
|
* update event so the UI patches in place; otherwise React
|
|
* Query polling on `/api/files/:file_id/preview` picks it up.
|
|
*
|
|
* Spread the full updated record (mirroring the initial emit
|
|
* shape) and overlay `messageId`/`toolCallId` from the
|
|
* current run. The DB record preserves the original
|
|
* `messageId` across cross-turn filename reuse so
|
|
* `getCodeGeneratedFiles` can trace the file back to its
|
|
* original assistant message; routing the update SSE by the
|
|
* persisted id would land the patch on a stale message
|
|
* slot — turn-N's pending placeholder would stay stuck while
|
|
* turn-1's already-resolved attachment got re-merged.
|
|
* (Codex P1 review on PR #12957.) */
|
|
runPreviewFinalize({
|
|
finalize,
|
|
fileId: fileMetadata.file_id,
|
|
previewRevision: result?.previewRevision,
|
|
onResolved: (updated) => {
|
|
writeAttachmentUpdate(res, streamId, {
|
|
...updated,
|
|
messageId: metadata.run_id,
|
|
toolCallId,
|
|
});
|
|
},
|
|
});
|
|
return fileMetadata;
|
|
})().catch((error) => {
|
|
logger.error('Error processing code output:', error);
|
|
return null;
|
|
}),
|
|
);
|
|
}
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Helper to write attachment events in Open Responses format (librechat:attachment)
|
|
* @param {ServerResponse} res - The server response object
|
|
* @param {Object} tracker - The response tracker with sequence number
|
|
* @param {Object} attachment - The attachment data
|
|
* @param {Object} metadata - Additional metadata (messageId, conversationId)
|
|
*/
|
|
function writeResponsesAttachment(res, tracker, attachment, metadata) {
|
|
const sequenceNumber = tracker.nextSequence();
|
|
writeAttachmentEvent(res, sequenceNumber, attachment, {
|
|
messageId: metadata.run_id,
|
|
conversationId: metadata.thread_id,
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Creates a tool end callback specifically for the Responses API.
|
|
* Emits attachments as `librechat:attachment` events per the Open Responses extension spec.
|
|
*
|
|
* @param {Object} params
|
|
* @param {ServerRequest} params.req
|
|
* @param {ServerResponse} params.res
|
|
* @param {Object} params.tracker - Response tracker with sequence number
|
|
* @param {Promise<MongoFile | { filename: string; filepath: string; expires: number;} | null>[]} params.artifactPromises
|
|
* @returns {ToolEndCallback} The tool end callback.
|
|
*/
|
|
function createResponsesToolEndCallback({ req, res, tracker, artifactPromises }) {
|
|
/**
|
|
* @type {ToolEndCallback}
|
|
*/
|
|
return async (data, metadata) => {
|
|
const output = data?.output;
|
|
if (!output) {
|
|
return;
|
|
}
|
|
|
|
if (!output.artifact) {
|
|
return;
|
|
}
|
|
|
|
if (output.artifact[Tools.file_search]) {
|
|
artifactPromises.push(
|
|
(async () => {
|
|
const user = req.user;
|
|
const attachment = await processFileCitations({
|
|
user,
|
|
metadata,
|
|
appConfig: req.config,
|
|
toolArtifact: output.artifact,
|
|
toolCallId: output.tool_call_id,
|
|
});
|
|
if (!attachment) {
|
|
return null;
|
|
}
|
|
// For Responses API, emit attachment during streaming
|
|
if (res.headersSent && !res.writableEnded) {
|
|
writeResponsesAttachment(res, tracker, attachment, metadata);
|
|
}
|
|
return attachment;
|
|
})().catch((error) => {
|
|
logger.error('Error processing file citations:', error);
|
|
return null;
|
|
}),
|
|
);
|
|
}
|
|
|
|
if (output.artifact[Tools.ui_resources]) {
|
|
artifactPromises.push(
|
|
(async () => {
|
|
const attachment = {
|
|
type: Tools.ui_resources,
|
|
toolCallId: output.tool_call_id,
|
|
[Tools.ui_resources]: output.artifact[Tools.ui_resources].data,
|
|
};
|
|
// For Responses API, always emit attachment during streaming
|
|
if (res.headersSent && !res.writableEnded) {
|
|
writeResponsesAttachment(res, tracker, attachment, metadata);
|
|
}
|
|
return attachment;
|
|
})().catch((error) => {
|
|
logger.error('Error processing artifact content:', error);
|
|
return null;
|
|
}),
|
|
);
|
|
}
|
|
|
|
if (output.artifact[Tools.web_search]) {
|
|
artifactPromises.push(
|
|
(async () => {
|
|
const attachment = {
|
|
type: Tools.web_search,
|
|
toolCallId: output.tool_call_id,
|
|
[Tools.web_search]: { ...output.artifact[Tools.web_search] },
|
|
};
|
|
// For Responses API, always emit attachment during streaming
|
|
if (res.headersSent && !res.writableEnded) {
|
|
writeResponsesAttachment(res, tracker, attachment, metadata);
|
|
}
|
|
return attachment;
|
|
})().catch((error) => {
|
|
logger.error('Error processing artifact content:', error);
|
|
return null;
|
|
}),
|
|
);
|
|
}
|
|
|
|
if (output.artifact.content) {
|
|
/** @type {FormattedContent[]} */
|
|
const content = output.artifact.content;
|
|
for (let i = 0; i < content.length; i++) {
|
|
const part = content[i];
|
|
if (!part) {
|
|
continue;
|
|
}
|
|
if (part.type !== 'image_url') {
|
|
continue;
|
|
}
|
|
const { url } = part.image_url;
|
|
artifactPromises.push(
|
|
(async () => {
|
|
const filename = `${output.name}_img_${nanoid()}`;
|
|
const file_id = output.artifact.file_ids?.[i];
|
|
const file = await saveBase64Image(url, {
|
|
req,
|
|
file_id,
|
|
filename,
|
|
endpoint: metadata.provider,
|
|
context: FileContext.image_generation,
|
|
});
|
|
const fileMetadata = Object.assign(file, {
|
|
toolCallId: output.tool_call_id,
|
|
});
|
|
|
|
if (!fileMetadata) {
|
|
return null;
|
|
}
|
|
|
|
// For Responses API, emit attachment during streaming
|
|
if (res.headersSent && !res.writableEnded) {
|
|
const attachment = {
|
|
file_id: fileMetadata.file_id,
|
|
filename: fileMetadata.filename,
|
|
type: fileMetadata.type,
|
|
url: fileMetadata.filepath,
|
|
width: fileMetadata.width,
|
|
height: fileMetadata.height,
|
|
tool_call_id: output.tool_call_id,
|
|
};
|
|
writeResponsesAttachment(res, tracker, attachment, metadata);
|
|
}
|
|
|
|
return fileMetadata;
|
|
})().catch((error) => {
|
|
logger.error('Error processing artifact content:', error);
|
|
return null;
|
|
}),
|
|
);
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (!CODE_EXECUTION_TOOLS.has(output.name)) {
|
|
return;
|
|
}
|
|
|
|
if (!output.artifact.files) {
|
|
return;
|
|
}
|
|
|
|
for (const file of output.artifact.files) {
|
|
/* `inherited` files are unchanged passthroughs of inputs the caller
|
|
* already owns (skill files, prior session inputs, inherited
|
|
* .dirkeep markers). Skip post-processing: re-downloading with the
|
|
* user's session key 403s when the file is entity-scoped, and the
|
|
* input is already persisted at its origin. They remain available
|
|
* to subsequent calls via primeInvokedSkills / session inheritance. */
|
|
if (file.inherited) {
|
|
continue;
|
|
}
|
|
const { id, name } = file;
|
|
const toolCallId = output.tool_call_id;
|
|
artifactPromises.push(
|
|
(async () => {
|
|
const result = await processCodeOutput({
|
|
req,
|
|
id,
|
|
name,
|
|
messageId: metadata.run_id,
|
|
toolCallId,
|
|
conversationId: metadata.thread_id,
|
|
/**
|
|
* Use the FILE's `storage_session_id` (storage session),
|
|
* not the top-level artifact `session_id` (exec session).
|
|
* The codeapi worker reports two distinct ids on a tool
|
|
* result:
|
|
* - `artifact.session_id` is the EXEC session — the
|
|
* sandbox VM that ran the bash command. Files don't
|
|
* live there; it's torn down post-execution.
|
|
* - `file.storage_session_id` is the STORAGE session —
|
|
* the file-server bucket prefix where artifacts
|
|
* actually live and are served from.
|
|
* `processCodeOutput` builds `/download/{session_id}/{id}`,
|
|
* so passing the exec id resolves to a path the file-server
|
|
* doesn't know about and 404s. Fall back to artifact-level
|
|
* for older worker payloads that may not populate per-file
|
|
* ids.
|
|
*/
|
|
session_id: file.storage_session_id ?? output.artifact.session_id,
|
|
});
|
|
const fileMetadata = result?.file ?? null;
|
|
const finalize = result?.finalize;
|
|
if (!fileMetadata) {
|
|
return null;
|
|
}
|
|
|
|
/* Initial emit (Open Responses extension format). The agent's
|
|
* response no longer blocks on extraction. */
|
|
if (isStreamWritable(res, null)) {
|
|
writeResponsesAttachment(
|
|
res,
|
|
tracker,
|
|
buildResponsesAttachment(fileMetadata, toolCallId),
|
|
metadata,
|
|
);
|
|
}
|
|
|
|
/* Deferred preview rendering: extract HTML in the background
|
|
* and emit a follow-up `librechat:attachment` with the same
|
|
* `file_id` so the client merges the resolved record over the
|
|
* pending placeholder. Fire-and-forget — survives response
|
|
* close; polling covers the post-close gap. */
|
|
runPreviewFinalize({
|
|
finalize,
|
|
fileId: fileMetadata.file_id,
|
|
previewRevision: result?.previewRevision,
|
|
onResolved: (updated) => {
|
|
if (!isStreamWritable(res, null)) {
|
|
return;
|
|
}
|
|
writeResponsesAttachment(
|
|
res,
|
|
tracker,
|
|
buildResponsesAttachment(updated, toolCallId),
|
|
metadata,
|
|
);
|
|
},
|
|
});
|
|
|
|
return fileMetadata;
|
|
})().catch((error) => {
|
|
logger.error('Error processing code output:', error);
|
|
return null;
|
|
}),
|
|
);
|
|
}
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Project a file metadata record into the Open Responses attachment
|
|
* shape. Mirrors the legacy inline projection but adds `status` and
|
|
* `previewError` so deferred preview updates carry the lifecycle
|
|
* signal the client uses to upsert by `file_id`.
|
|
*/
|
|
function buildResponsesAttachment(fileMetadata, toolCallId) {
|
|
return {
|
|
file_id: fileMetadata.file_id,
|
|
filename: fileMetadata.filename,
|
|
type: fileMetadata.type,
|
|
url: fileMetadata.filepath,
|
|
width: fileMetadata.width,
|
|
height: fileMetadata.height,
|
|
tool_call_id: toolCallId,
|
|
text: fileMetadata.text ?? null,
|
|
textFormat: fileMetadata.textFormat ?? null,
|
|
status: fileMetadata.status,
|
|
previewError: fileMetadata.previewError,
|
|
};
|
|
}
|
|
|
|
const ALLOWED_LOG_LEVELS = new Set(['debug', 'info', 'warn', 'error']);
|
|
|
|
function agentLogHandler(_event, data) {
|
|
if (!data) {
|
|
return;
|
|
}
|
|
const logFn = ALLOWED_LOG_LEVELS.has(data.level) ? logger[data.level] : logger.debug;
|
|
const meta = typeof data.data === 'object' && data.data != null ? data.data : {};
|
|
logFn(`[agents:${data.scope ?? 'unknown'}] ${data.message ?? ''}`, {
|
|
...meta,
|
|
runId: data.runId,
|
|
agentId: data.agentId,
|
|
});
|
|
}
|
|
|
|
function markSummarizationUsage(usage, metadata) {
|
|
const node = metadata?.langgraph_node;
|
|
if (typeof node === 'string' && node.startsWith(GraphNodeKeys.SUMMARIZE)) {
|
|
return { ...usage, usage_type: 'summarization' };
|
|
}
|
|
return usage;
|
|
}
|
|
|
|
const agentLogHandlerObj = { handle: agentLogHandler };
|
|
|
|
/**
|
|
* Builds the three summarization SSE event handlers.
|
|
* In streaming mode, each event is forwarded to the client via `res.write`.
|
|
* In non-streaming mode, the handlers are no-ops.
|
|
* @param {{ isStreaming: boolean, res: import('express').Response }} opts
|
|
*/
|
|
function buildSummarizationHandlers({ isStreaming, res }) {
|
|
if (!isStreaming) {
|
|
const noop = { handle: () => {} };
|
|
return { on_summarize_start: noop, on_summarize_delta: noop, on_summarize_complete: noop };
|
|
}
|
|
const writeEvent = (name) => ({
|
|
handle: async (_event, data) => {
|
|
if (!res.writableEnded) {
|
|
res.write(`event: ${name}\ndata: ${JSON.stringify(data)}\n\n`);
|
|
}
|
|
},
|
|
});
|
|
return {
|
|
on_summarize_start: writeEvent('on_summarize_start'),
|
|
on_summarize_delta: writeEvent('on_summarize_delta'),
|
|
on_summarize_complete: writeEvent('on_summarize_complete'),
|
|
};
|
|
}
|
|
|
|
module.exports = {
|
|
ModelEndHandler,
|
|
agentLogHandler,
|
|
agentLogHandlerObj,
|
|
getDefaultHandlers,
|
|
createToolEndCallback,
|
|
isStreamWritable,
|
|
markSummarizationUsage,
|
|
buildSummarizationHandlers,
|
|
createResponsesToolEndCallback,
|
|
};
|