LibreChat/api/server/controllers/agents/request.js
Danny Avila 2ef7bdfbc2
Some checks are pending
Docker Dev Branch Images Build / build (Dockerfile, lc-dev, node) (push) Waiting to run
Docker Dev Branch Images Build / build (Dockerfile.multi, lc-dev-api, api-build) (push) Waiting to run
GitNexus Index / index (push) Waiting to run
GitNexus Index / post-index (push) Blocked by required conditions
Docker Dev Images Build / build (Dockerfile, librechat-dev, node) (push) Waiting to run
Docker Dev Images Build / build (Dockerfile.multi, librechat-dev-api, api-build) (push) Waiting to run
Sync Locize Translations & Create Translation PR / Sync Translation Keys with Locize (push) Waiting to run
Sync Locize Translations & Create Translation PR / Create Translation PR on Version Published (push) Blocked by required conditions
Sync Helm Chart Tags / Ignore non-main push (push) Waiting to run
Sync Helm Chart Tags / Sync chart tags (push) Waiting to run
feat: Immediate Conversation Title Generation (#13395)
*  feat: Immediate Conversation Title Generation

Generate conversation titles as soon as the request is made (in parallel
with the response, from the user's first message) as the new default,
fixing the #13318 race where a transient /gen_title 404 left new chats
stuck on "New Chat".

- Add per-endpoint `titleTiming` ('immediate' | 'final') to baseEndpointSchema;
  `endpoints.all` acts as the global default, unset = immediate. Resolve via
  a new `resolveTitleTiming` helper (`all` takes precedence).
- Fire title generation in parallel with `sendMessage`; `titleConvo` waits
  (bounded, abortable) for the agent run and titles from the user input only.
  Persist after the conversation row exists; defer `disposeClient` until the
  title settles.
- Expose `titleGenerationTiming` via startup config; `useTitleGeneration`
  fetches eagerly in immediate mode with a bounded 404 retry and never treats
  a transient 404 as final. Skip title queueing for temporary conversations.
- Supersedes #13329 while incorporating its bounded 404-retry.

* 🩹 fix: Address Copilot review findings on title timing

- Guard against an undefined conversationId in addTitle (skip + warn) so the
  gen_title cache key can't collide as `userId-undefined` and saveConvo is
  never called without a conversationId.
- Gate the title `useQueries` on `enabled` so no /gen_title request fires while
  unauthenticated (e.g. after logout) even if the module queue holds IDs.
- Drop the stale `conversationId` param from the titleConvo JSDoc.
- Add a regression test for the undefined-conversationId guard.

* 🧵 fix: Harden immediate-title edge cases from codex review

- Cancel in-flight immediate title generation when the request aborts: thread
  job.abortController.signal through addTitle so pressing Stop on a new chat
  neither consumes the title model nor surfaces a title for a cancelled turn.
- Preserve a locally-applied title when the final SSE event's conversation
  carries no title yet (built before the title was saved), so long immediate-mode
  responses no longer revert the chat to "New Chat" until reload.
- Guarantee one full post-completion gen_title fetch cycle before giving up, so a
  `final`-mode title (generated only after the stream ends) is still fetched under
  a global `immediate` default instead of being stranded.
- Add regression tests for the abort propagation and the undefined-conversationId guard.

* 🔁 fix: Correct title abort, post-completion refetch, and replacement ordering

Follow-up to codex review of the immediate-title fixes:

- Use a dedicated title AbortController instead of `job.abortController`. The
  latter is also aborted by `completeJob` on *successful* completion, which
  cancelled any title slower than a short response. The title is now cancelled
  only on a real user Stop or when the stream is replaced; a completed-then-
  aborted title is discarded (no save, cache cleared) rather than persisted.
- Reset (not remove) the post-completion title query: `resetQueries` refetches
  the mounted observer with a fresh retry budget, whereas `removeQueries` left it
  stuck in its error state, so the promised post-completion cycle never ran.
- Run the job-replacement check before resolving `convoReady`, and on a replaced
  stream cancel/discard the stale title so a discarded prompt can't persist a title.

* 🧷 fix: Tighten title abort ordering and endpoint-level timing resolution

Follow-up to codex review:

- Abort the title controller before resolving `convoReady` on a stopped turn, so
  the title task can't resume and persist before the later abort.
- Cancel the title and unblock its waits on ANY send failure (not just user
  aborts): a preflight/quota failure before the run exists otherwise hangs
  `_waitForRun`, deferring client disposal until the 45s title timeout.
- Resolve `titleTiming` for custom endpoints via `getCustomEndpointConfig`
  (their config lives under `endpoints.custom[]`, not `endpoints[endpoint]`).
- Derive the startup `titleGenerationTiming` via `resolveTitleTiming` for the
  agents endpoint so an endpoint-level `final` (without `endpoints.all`) is honored
  client-side instead of defaulting to immediate and burning eager gen_title polls.

* 🪢 fix: Per-agent title timing and safer abort/replacement handling

Follow-up to codex review:

- Resolve `titleTiming` from the agent's actual endpoint after initialization, so a
  per-endpoint `final` override on a custom/provider endpoint backing an (ephemeral)
  agent is honored instead of always using the `agents` endpoint's value.
- Don't preserve a locally-fetched title on a stopped (unfinished) turn: the server
  cancels and discards that title, so keeping it client-side would diverge from
  server state and leave the stopped chat titled until reload.
- On abort/replacement, only delete the cached title if it still holds THIS task's
  value — a replacement stream shares the `userId-conversationId` key and may have
  already cached its own valid title that must not be removed.

* 🪞 fix: Mirror AgentClient title-config resolution for titleTiming

Per maintainer guidance, keep titleTiming resolution identical to how
`AgentClient#titleConvo` already resolves the endpoint config — `endpoints.all`
is the intended global override and the agent's actual provider endpoint is used:

- Resolve via `endpoints.all ?? endpoints[endpoint] ?? getProviderConfig(endpoint)
  .customEndpointConfig` (was using `getCustomEndpointConfig` directly). Going
  through `getProviderConfig` picks up its case-insensitive fallback for normalized
  provider names (e.g. `openrouter` → `OpenRouter`), so a custom endpoint's
  `titleTiming` is honored like its other title settings.
- Add `titleTiming` to the Azure endpoint schema `.pick()` so
  `endpoints.azureOpenAI.titleTiming` is no longer silently stripped by Zod.

Note: per-endpoint title settings being skipped when `endpoints.all` is present is
the existing, intended global-override behavior — not changed here.

* 🧪 test: Cover useTitleGeneration effect logic (integration)

Adds a deterministic white-box integration test that drives the real hook's
React effects with a controllable react-query surface, locking down the
stateful decisions that previously had no coverage:

- immediate mode fetches a queued conversation while its stream is still active
- final mode gates until the stream completes, then becomes eligible
- success applies the fetched title to the conversation caches
- a 404 while active defers (removeQueries) instead of giving up
- a 404 after completion forces a fresh fetch via resetQueries (post-completion remount)

* feat: Stream immediate title events

* style: Format title SSE handler

* test: Preserve data-provider exports in OAuth mock

* test: Isolate OAuth route API mock

* test: Keep OAuth callback factory capture

* fix: Replay streamed title events on resume

* fix: Honor agents title timing precedence

* style: Format title timing fixes
2026-06-02 16:40:57 -04:00

946 lines
33 KiB
JavaScript

const { logger } = require('@librechat/data-schemas');
const { Constants, ViolationTypes } = require('librechat-data-provider');
const {
sendEvent,
getViolationInfo,
buildMessageFiles,
resolveTitleTiming,
GenerationJobManager,
decrementPendingRequest,
sanitizeMessageForTransmit,
checkAndIncrementPendingRequest,
} = require('@librechat/api');
const { disposeClient, clientRegistry, requestDataMap } = require('~/server/cleanup');
const { handleAbortError } = require('~/server/middleware');
const { logViolation } = require('~/cache');
const { saveMessage, getConvo } = require('~/models');
function createCloseHandler(abortController) {
return function (manual) {
if (!manual) {
logger.debug('[AgentController] Request closed');
}
if (!abortController) {
return;
} else if (abortController.signal.aborted) {
return;
} else if (abortController.requestCompleted) {
return;
}
abortController.abort();
logger.debug('[AgentController] Request aborted on close');
};
}
function toValidISOString(value) {
if (value == null) {
return null;
}
const date = value instanceof Date ? value : new Date(value);
return Number.isNaN(date.getTime()) ? null : date.toISOString();
}
async function resolveConversationCreatedAt({ userId, conversationId, isNewConvo }) {
if (isNewConvo) {
return { createdAt: new Date().toISOString(), conversation: undefined };
}
try {
const conversation = await getConvo(userId, conversationId);
return {
conversation,
createdAt: toValidISOString(conversation?.createdAt) ?? new Date().toISOString(),
};
} catch (error) {
logger.warn('[AgentController] Failed to resolve conversation timestamp anchor', {
conversationId,
error: error?.message ?? error,
});
return { createdAt: new Date().toISOString(), conversation: undefined };
}
}
async function attachConversationCreatedAt(req, { userId, conversationId, isNewConvo }) {
req.body.conversationId = conversationId;
const resolved = await resolveConversationCreatedAt({
userId,
conversationId,
isNewConvo,
});
req.conversationCreatedAt = resolved.createdAt;
if (!isNewConvo && resolved.conversation !== undefined) {
req.resolvedConversation = resolved.conversation ?? null;
}
}
/**
* Resumable Agent Controller - Generation runs independently of HTTP connection.
* Returns streamId immediately, client subscribes separately via SSE.
*/
const ResumableAgentController = async (req, res, next, initializeClient, addTitle) => {
const {
text,
isRegenerate,
endpointOption,
conversationId: reqConversationId,
isContinued = false,
editedContent = null,
parentMessageId = null,
overrideParentMessageId = null,
responseMessageId: editedResponseMessageId = null,
} = req.body;
const userId = req.user.id;
/** When to generate the conversation title. `immediate` (default) fires title
* generation in parallel with the response, from the user's first message;
* `final` defers it until the full response completes (legacy behavior).
* Resolved from the agent's actual endpoint once the client is initialized. */
let titleTiming = 'immediate';
const { allowed, pendingRequests, limit } = await checkAndIncrementPendingRequest(userId);
if (!allowed) {
const violationInfo = getViolationInfo(pendingRequests, limit);
await logViolation(req, res, ViolationTypes.CONCURRENT, violationInfo, violationInfo.score);
return res.status(429).json(violationInfo);
}
// Generate conversationId upfront if not provided - streamId === conversationId always
// Treat "new" as a placeholder that needs a real UUID (frontend may send "new" for new convos)
const isNewConvo = !reqConversationId || reqConversationId === 'new';
const conversationId = isNewConvo ? crypto.randomUUID() : reqConversationId;
const streamId = conversationId;
req.body.conversationId = conversationId;
let client = null;
try {
logger.debug(`[ResumableAgentController] Creating job`, {
streamId,
conversationId,
reqConversationId,
userId,
});
const job = await GenerationJobManager.createJob(streamId, userId, conversationId);
const jobCreatedAt = job.createdAt; // Capture creation time to detect job replacement
req._resumableStreamId = streamId;
// Send JSON response IMMEDIATELY so client can connect to SSE stream
// This is critical: tool loading (MCP OAuth) may emit events that the client needs to receive
res.json({ streamId, conversationId, status: 'started' });
await attachConversationCreatedAt(req, { userId, conversationId, isNewConvo });
// Note: We no longer use res.on('close') to abort since we send JSON immediately.
// The response closes normally after res.json(), which is not an abort condition.
// Abort handling is done through GenerationJobManager via the SSE stream connection.
// Track if partial response was already saved to avoid duplicates
let partialResponseSaved = false;
/**
* Listen for all subscribers leaving to save partial response.
* This ensures the response is saved to DB even if all clients disconnect
* while generation continues.
*
* Note: The messageId used here falls back to `${userMessage.messageId}_` if the
* actual response messageId isn't available yet. The final response save will
* overwrite this with the complete response using the same messageId pattern.
*/
job.emitter.on('allSubscribersLeft', async (aggregatedContent) => {
if (partialResponseSaved || !aggregatedContent || aggregatedContent.length === 0) {
return;
}
const resumeState = await GenerationJobManager.getResumeState(streamId);
if (!resumeState?.userMessage) {
logger.debug('[ResumableAgentController] No user message to save partial response for');
return;
}
partialResponseSaved = true;
const responseConversationId = resumeState.conversationId || conversationId;
try {
const partialMessage = {
messageId: resumeState.responseMessageId || `${resumeState.userMessage.messageId}_`,
conversationId: responseConversationId,
parentMessageId: resumeState.userMessage.messageId,
sender: client?.sender ?? 'AI',
content: aggregatedContent,
unfinished: true,
error: false,
isCreatedByUser: false,
user: userId,
endpoint: endpointOption.endpoint,
model: endpointOption.modelOptions?.model || endpointOption.model_parameters?.model,
};
if (req.body?.agent_id) {
partialMessage.agent_id = req.body.agent_id;
}
await saveMessage(
{
userId: req?.user?.id,
isTemporary: req?.body?.isTemporary,
interfaceConfig: req?.config?.interfaceConfig,
},
partialMessage,
{ context: 'api/server/controllers/agents/request.js - partial response on disconnect' },
);
logger.debug(
`[ResumableAgentController] Saved partial response for ${streamId}, content parts: ${aggregatedContent.length}`,
);
} catch (error) {
logger.error('[ResumableAgentController] Error saving partial response:', error);
// Reset flag so we can try again if subscribers reconnect and leave again
partialResponseSaved = false;
}
});
/** @type {{ client: TAgentClient; userMCPAuthMap?: Record<string, Record<string, string>> }} */
const result = await initializeClient({
req,
res,
endpointOption,
// Use the job's abort controller signal - allows abort via GenerationJobManager.abortJob()
signal: job.abortController.signal,
});
if (job.abortController.signal.aborted) {
GenerationJobManager.completeJob(streamId, 'Request aborted during initialization');
await decrementPendingRequest(userId);
return;
}
client = result.client;
// Resolve title timing from the public agents endpoint first, then fall
// back to the agent's actual backing provider/custom endpoint.
titleTiming = resolveTitleTiming({
appConfig: req.config,
endpoint: [endpointOption?.endpoint, client?.options?.agent?.endpoint],
});
if (client?.sender) {
GenerationJobManager.updateMetadata(streamId, { sender: client.sender });
}
// Store reference to client's contentParts - graph will be set when run is created
if (client?.contentParts) {
GenerationJobManager.setContentParts(streamId, client.contentParts);
}
let userMessage;
const getReqData = (data = {}) => {
if (data.userMessage) {
userMessage = data.userMessage;
}
// conversationId is pre-generated, no need to update from callback
};
// Start background generation - readyPromise resolves immediately now
// (sync mechanism handles late subscribers)
const startGeneration = async () => {
try {
// Short timeout as safety net - promise should already be resolved
await Promise.race([job.readyPromise, new Promise((resolve) => setTimeout(resolve, 100))]);
} catch (waitError) {
logger.warn(
`[ResumableAgentController] Error waiting for subscriber: ${waitError.message}`,
);
}
/** Immediate-mode title generation runs in parallel with the response, so
* the conversation row may not exist when the title resolves. `convoReady`
* resolves once the response (and thus the conversation) has been saved,
* gating the title's `saveConvo`. Declared here so both the success tail
* and the catch block can settle it and gate `disposeClient` on the title. */
let immediateTitlePromise = null;
let titleEventPromise = null;
let acceptsTitleEvents = true;
let resolveConvoReady;
const convoReady = new Promise((resolve) => {
resolveConvoReady = resolve;
});
/** Dedicated controller so a user Stop (or a replaced stream) cancels the
* in-flight title — kept separate from `job.abortController`, which
* `completeJob` also aborts on *successful* completion and would otherwise
* cancel a title that is merely slower than a short response. */
const titleAbortController = new AbortController();
const abortTitleOnJobAbort = () => titleAbortController.abort();
if (job.abortController.signal.aborted) {
titleAbortController.abort();
} else {
job.abortController.signal.addEventListener('abort', abortTitleOnJobAbort, { once: true });
}
const titleEligible =
addTitle && parentMessageId === Constants.NO_PARENT && isNewConvo && !req.body?.isTemporary;
const emitTitleEvent = ({ conversationId: titleConversationId, title }) => {
titleEventPromise = (async () => {
if (!acceptsTitleEvents || titleAbortController.signal.aborted) {
return;
}
const currentJob = await GenerationJobManager.getJob(streamId);
if (!currentJob || currentJob.createdAt !== jobCreatedAt) {
return;
}
if (titleAbortController.signal.aborted) {
return;
}
await GenerationJobManager.emitChunk(streamId, {
event: 'title',
data: {
conversationId: titleConversationId,
title,
},
});
})().catch((err) => {
logger.error('[ResumableAgentController] Error emitting title event', err);
});
return titleEventPromise;
};
try {
const onStart = (userMsg, respMsgId, _isNewConvo) => {
userMessage = userMsg;
// Store userMessage and responseMessageId upfront for resume capability
GenerationJobManager.updateMetadata(streamId, {
responseMessageId: respMsgId,
userMessage: {
messageId: userMsg.messageId,
parentMessageId: userMsg.parentMessageId,
conversationId: userMsg.conversationId,
text: userMsg.text,
},
});
GenerationJobManager.emitChunk(streamId, {
created: true,
message: userMessage,
streamId,
});
};
const messageOptions = {
user: userId,
onStart,
getReqData,
isContinued,
isRegenerate,
editedContent,
conversationId,
parentMessageId,
abortController: job.abortController,
overrideParentMessageId,
isEdited: !!editedContent,
userMCPAuthMap: result.userMCPAuthMap,
responseMessageId: editedResponseMessageId,
progressOptions: {
res: {
write: () => true,
end: () => {},
headersSent: false,
writableEnded: false,
},
},
};
const sendPromise = client.sendMessage(text, messageOptions);
if (titleEligible && titleTiming === 'immediate') {
immediateTitlePromise = addTitle(req, {
text,
conversationId,
client,
immediate: true,
convoReady,
signal: titleAbortController.signal,
onTitleGenerated: emitTitleEvent,
}).catch((err) => {
logger.error('[ResumableAgentController] Error in immediate title generation', err);
});
}
const response = await sendPromise;
const messageId = response.messageId;
const endpoint = endpointOption.endpoint;
response.endpoint = endpoint;
const databasePromise = response.databasePromise;
delete response.databasePromise;
const { conversation: convoData = {} } = await databasePromise;
const conversation = { ...convoData };
conversation.title =
conversation && !conversation.title ? null : conversation?.title || 'New Chat';
if (req.body.files && Array.isArray(client.options.attachments)) {
const files = buildMessageFiles(req.body.files, client.options.attachments);
if (files.length > 0) {
userMessage.files = files;
}
delete userMessage.image_urls;
}
// Check abort state BEFORE calling completeJob (which triggers abort signal for cleanup)
const wasAbortedBeforeComplete = job.abortController.signal.aborted;
const shouldGenerateTitle =
addTitle &&
parentMessageId === Constants.NO_PARENT &&
isNewConvo &&
!wasAbortedBeforeComplete;
// Save user message BEFORE sending final event to avoid race condition
// where client refetch happens before database is updated
const reqCtx = {
userId: req?.user?.id,
isTemporary: req?.body?.isTemporary,
interfaceConfig: req?.config?.interfaceConfig,
};
if (!client.skipSaveUserMessage && userMessage) {
await saveMessage(reqCtx, userMessage, {
context: 'api/server/controllers/agents/request.js - resumable user message',
});
}
// CRITICAL: Save response message BEFORE emitting final event.
// This prevents race conditions where the client sends a follow-up message
// before the response is saved to the database, causing orphaned parentMessageIds.
if (client.savedMessageIds && !client.savedMessageIds.has(messageId)) {
await saveMessage(
reqCtx,
{ ...response, user: userId, unfinished: wasAbortedBeforeComplete },
{ context: 'api/server/controllers/agents/request.js - resumable response end' },
);
}
// Check if our job was replaced by a new request before emitting
// This prevents stale requests from emitting events to newer jobs
const currentJob = await GenerationJobManager.getJob(streamId);
const jobWasReplaced = !currentJob || currentJob.createdAt !== jobCreatedAt;
if (jobWasReplaced) {
logger.debug(`[ResumableAgentController] Skipping FINAL emit - job was replaced`, {
streamId,
originalCreatedAt: jobCreatedAt,
currentCreatedAt: currentJob?.createdAt,
});
// Discard the stale title from this replaced stream: cancel it and
// unblock its persistence wait without letting it save (the newer job
// owns the conversation now).
titleAbortController.abort();
job.abortController.signal.removeEventListener('abort', abortTitleOnJobAbort);
acceptsTitleEvents = false;
resolveConvoReady();
// Still decrement pending request since we incremented at start
await decrementPendingRequest(userId);
if (immediateTitlePromise) {
immediateTitlePromise.finally(() => {
if (client) {
disposeClient(client);
}
});
} else if (client) {
disposeClient(client);
}
return;
}
// If the user stopped this turn, cancel the title BEFORE unblocking its
// persistence wait — otherwise resolving `convoReady` lets the title task
// resume and save before the later abort runs.
if (wasAbortedBeforeComplete) {
titleAbortController.abort();
} else {
job.abortController.signal.removeEventListener('abort', abortTitleOnJobAbort);
}
// The conversation row now exists and this stream is authoritative; allow
// any in-flight immediate title generation to persist (saveConvo uses noUpsert).
resolveConvoReady();
acceptsTitleEvents = false;
if (titleEventPromise) {
await titleEventPromise;
}
if (!wasAbortedBeforeComplete) {
const finalEvent = {
final: true,
conversation,
title: conversation.title,
requestMessage: sanitizeMessageForTransmit(userMessage),
responseMessage: { ...response },
};
logger.debug(`[ResumableAgentController] Emitting FINAL event`, {
streamId,
wasAbortedBeforeComplete,
userMessageId: userMessage?.messageId,
responseMessageId: response?.messageId,
conversationId: conversation?.conversationId,
});
await GenerationJobManager.emitDone(streamId, finalEvent);
GenerationJobManager.completeJob(streamId);
await decrementPendingRequest(userId);
} else {
const finalEvent = {
final: true,
conversation,
title: conversation.title,
requestMessage: sanitizeMessageForTransmit(userMessage),
responseMessage: { ...response, unfinished: true },
};
logger.debug(`[ResumableAgentController] Emitting ABORTED FINAL event`, {
streamId,
wasAbortedBeforeComplete,
userMessageId: userMessage?.messageId,
responseMessageId: response?.messageId,
conversationId: conversation?.conversationId,
});
await GenerationJobManager.emitDone(streamId, finalEvent);
GenerationJobManager.completeJob(streamId, 'Request aborted');
await decrementPendingRequest(userId);
}
if (titleTiming === 'immediate') {
// Title was fired in parallel above (if eligible); a stopped turn already
// aborted it before `resolveConvoReady`. Defer disposal until it settles
// so the run/req aren't torn down mid-generation.
if (immediateTitlePromise) {
immediateTitlePromise.finally(() => {
if (client) {
disposeClient(client);
}
});
} else if (client) {
disposeClient(client);
}
} else if (shouldGenerateTitle) {
addTitle(req, {
text,
response: { ...response },
client,
})
.catch((err) => {
logger.error('[ResumableAgentController] Error in title generation', err);
})
.finally(() => {
if (client) {
disposeClient(client);
}
});
} else {
if (client) {
disposeClient(client);
}
}
} catch (error) {
// Any failure (user Stop, or a preflight/quota failure before the run is
// even created) must cancel the title and unblock its waits: the title's
// `_waitForRun` would otherwise never resolve, deferring client disposal
// until the 45s title timeout, and no title should persist for a failed turn.
titleAbortController.abort();
job.abortController.signal.removeEventListener('abort', abortTitleOnJobAbort);
acceptsTitleEvents = false;
resolveConvoReady();
// Check if this was an abort (not a real error)
const wasAborted = job.abortController.signal.aborted || error.message?.includes('abort');
if (wasAborted) {
logger.debug(`[ResumableAgentController] Generation aborted for ${streamId}`);
// abortJob already handled emitDone and completeJob
} else {
logger.error(`[ResumableAgentController] Generation error for ${streamId}:`, error);
await GenerationJobManager.emitError(streamId, error.message || 'Generation failed');
GenerationJobManager.completeJob(streamId, error.message);
}
await decrementPendingRequest(userId);
// Defer disposal until any immediate title settles (it holds the run/req).
if (immediateTitlePromise) {
immediateTitlePromise.finally(() => {
if (client) {
disposeClient(client);
}
});
} else if (client) {
disposeClient(client);
}
// Don't continue to title generation after error/abort
return;
}
};
// Start generation and handle any unhandled errors
startGeneration().catch(async (err) => {
logger.error(
`[ResumableAgentController] Unhandled error in background generation: ${err.message}`,
);
GenerationJobManager.completeJob(streamId, err.message);
await decrementPendingRequest(userId);
});
} catch (error) {
logger.error('[ResumableAgentController] Initialization error:', error);
if (!res.headersSent) {
res.status(500).json({ error: error.message || 'Failed to start generation' });
} else {
// JSON already sent, emit error to stream so client can receive it
await GenerationJobManager.emitError(streamId, error.message || 'Failed to start generation');
}
GenerationJobManager.completeJob(streamId, error.message);
await decrementPendingRequest(userId);
if (client) {
disposeClient(client);
}
}
};
/**
* Agent Controller - Routes to ResumableAgentController for all requests.
* The legacy non-resumable path is kept below but no longer used by default.
*/
const AgentController = async (req, res, next, initializeClient, addTitle) => {
return ResumableAgentController(req, res, next, initializeClient, addTitle);
};
/**
* Legacy Non-resumable Agent Controller - Uses GenerationJobManager for abort handling.
* Response is streamed directly to client via res, but abort state is managed centrally.
* @deprecated Use ResumableAgentController instead
*/
const _LegacyAgentController = async (req, res, next, initializeClient, addTitle) => {
const {
text,
isRegenerate,
endpointOption,
conversationId: reqConversationId,
isContinued = false,
editedContent = null,
parentMessageId = null,
overrideParentMessageId = null,
responseMessageId: editedResponseMessageId = null,
} = req.body;
// Generate conversationId upfront if not provided - streamId === conversationId always
// Treat "new" as a placeholder that needs a real UUID (frontend may send "new" for new convos)
const isNewConvo = !reqConversationId || reqConversationId === 'new';
const conversationId = isNewConvo ? crypto.randomUUID() : reqConversationId;
const streamId = conversationId;
let userMessage;
let userMessageId;
let responseMessageId;
let client = null;
let cleanupHandlers = [];
// Match the same logic used for conversationId generation above
const userId = req.user.id;
await attachConversationCreatedAt(req, { userId, conversationId, isNewConvo });
// Create handler to avoid capturing the entire parent scope
let getReqData = (data = {}) => {
for (let key in data) {
if (key === 'userMessage') {
userMessage = data[key];
userMessageId = data[key].messageId;
} else if (key === 'responseMessageId') {
responseMessageId = data[key];
} else if (key === 'promptTokens') {
// Update job metadata with prompt tokens for abort handling
GenerationJobManager.updateMetadata(streamId, { promptTokens: data[key] });
} else if (key === 'sender') {
GenerationJobManager.updateMetadata(streamId, { sender: data[key] });
}
// conversationId is pre-generated, no need to update from callback
}
};
// Create a function to handle final cleanup
const performCleanup = async () => {
logger.debug('[AgentController] Performing cleanup');
if (Array.isArray(cleanupHandlers)) {
for (const handler of cleanupHandlers) {
try {
if (typeof handler === 'function') {
handler();
}
} catch (e) {
logger.error('[AgentController] Error in cleanup handler', e);
}
}
}
// Complete the job in GenerationJobManager
if (streamId) {
logger.debug('[AgentController] Completing job in GenerationJobManager');
await GenerationJobManager.completeJob(streamId);
}
// Dispose client properly
if (client) {
disposeClient(client);
}
// Clear all references
client = null;
getReqData = null;
userMessage = null;
cleanupHandlers = null;
// Clear request data map
if (requestDataMap.has(req)) {
requestDataMap.delete(req);
}
logger.debug('[AgentController] Cleanup completed');
};
try {
let prelimAbortController = new AbortController();
const prelimCloseHandler = createCloseHandler(prelimAbortController);
res.on('close', prelimCloseHandler);
const removePrelimHandler = (manual) => {
try {
prelimCloseHandler(manual);
res.removeListener('close', prelimCloseHandler);
} catch (e) {
logger.error('[AgentController] Error removing close listener', e);
}
};
cleanupHandlers.push(removePrelimHandler);
/** @type {{ client: TAgentClient; userMCPAuthMap?: Record<string, Record<string, string>> }} */
const result = await initializeClient({
req,
res,
endpointOption,
signal: prelimAbortController.signal,
});
if (prelimAbortController.signal?.aborted) {
prelimAbortController = null;
throw new Error('Request was aborted before initialization could complete');
} else {
prelimAbortController = null;
removePrelimHandler(true);
cleanupHandlers.pop();
}
client = result.client;
// Register client with finalization registry if available
if (clientRegistry) {
clientRegistry.register(client, { userId }, client);
}
// Store request data in WeakMap keyed by req object
requestDataMap.set(req, { client });
// Create job in GenerationJobManager for abort handling
// streamId === conversationId (pre-generated above)
const job = await GenerationJobManager.createJob(streamId, userId, conversationId);
// Store endpoint metadata for abort handling
GenerationJobManager.updateMetadata(streamId, {
endpoint: endpointOption.endpoint,
iconURL: endpointOption.iconURL,
model: endpointOption.modelOptions?.model || endpointOption.model_parameters?.model,
sender: client?.sender,
});
// Store content parts reference for abort
if (client?.contentParts) {
GenerationJobManager.setContentParts(streamId, client.contentParts);
}
const closeHandler = createCloseHandler(job.abortController);
res.on('close', closeHandler);
cleanupHandlers.push(() => {
try {
res.removeListener('close', closeHandler);
} catch (e) {
logger.error('[AgentController] Error removing close listener', e);
}
});
/**
* onStart callback - stores user message and response ID for abort handling
*/
const onStart = (userMsg, respMsgId, _isNewConvo) => {
sendEvent(res, { message: userMsg, created: true });
userMessage = userMsg;
userMessageId = userMsg.messageId;
responseMessageId = respMsgId;
// Store metadata for abort handling (conversationId is pre-generated)
GenerationJobManager.updateMetadata(streamId, {
responseMessageId: respMsgId,
userMessage: {
messageId: userMsg.messageId,
parentMessageId: userMsg.parentMessageId,
conversationId,
text: userMsg.text,
},
});
};
const messageOptions = {
user: userId,
onStart,
getReqData,
isContinued,
isRegenerate,
editedContent,
conversationId,
parentMessageId,
abortController: job.abortController,
overrideParentMessageId,
isEdited: !!editedContent,
userMCPAuthMap: result.userMCPAuthMap,
responseMessageId: editedResponseMessageId,
progressOptions: {
res,
},
};
let response = await client.sendMessage(text, messageOptions);
// Extract what we need and immediately break reference
const messageId = response.messageId;
const endpoint = endpointOption.endpoint;
response.endpoint = endpoint;
// Store database promise locally
const databasePromise = response.databasePromise;
delete response.databasePromise;
// Resolve database-related data
const { conversation: convoData = {} } = await databasePromise;
const conversation = { ...convoData };
conversation.title =
conversation && !conversation.title ? null : conversation?.title || 'New Chat';
if (req.body.files && Array.isArray(client.options.attachments)) {
const files = buildMessageFiles(req.body.files, client.options.attachments);
if (files.length > 0) {
userMessage.files = files;
}
delete userMessage.image_urls;
}
// Only send if not aborted
if (!job.abortController.signal.aborted) {
// Create a new response object with minimal copies
const finalResponse = { ...response };
sendEvent(res, {
final: true,
conversation,
title: conversation.title,
requestMessage: sanitizeMessageForTransmit(userMessage),
responseMessage: finalResponse,
});
res.end();
// Save the message if needed
if (client.savedMessageIds && !client.savedMessageIds.has(messageId)) {
await saveMessage(
{
userId: req?.user?.id,
isTemporary: req?.body?.isTemporary,
interfaceConfig: req?.config?.interfaceConfig,
},
{ ...finalResponse, user: userId },
{ context: 'api/server/controllers/agents/request.js - response end' },
);
}
}
// Edge case: sendMessage completed but abort happened during sendCompletion
// We need to ensure a final event is sent
else if (!res.headersSent && !res.finished) {
logger.debug(
'[AgentController] Handling edge case: `sendMessage` completed but aborted during `sendCompletion`',
);
const finalResponse = { ...response };
finalResponse.error = true;
sendEvent(res, {
final: true,
conversation,
title: conversation.title,
requestMessage: sanitizeMessageForTransmit(userMessage),
responseMessage: finalResponse,
error: { message: 'Request was aborted during completion' },
});
res.end();
}
// Save user message if needed
if (!client.skipSaveUserMessage) {
await saveMessage(
{
userId: req?.user?.id,
isTemporary: req?.body?.isTemporary,
interfaceConfig: req?.config?.interfaceConfig,
},
userMessage,
{ context: "api/server/controllers/agents/request.js - don't skip saving user message" },
);
}
// Add title if needed - extract minimal data
if (addTitle && parentMessageId === Constants.NO_PARENT && isNewConvo) {
addTitle(req, {
text,
response: { ...response },
client,
})
.then(() => {
logger.debug('[AgentController] Title generation started');
})
.catch((err) => {
logger.error('[AgentController] Error in title generation', err);
})
.finally(() => {
logger.debug('[AgentController] Title generation completed');
performCleanup();
});
} else {
performCleanup();
}
} catch (error) {
// Handle error without capturing much scope
handleAbortError(res, req, error, {
conversationId,
sender: client?.sender,
messageId: responseMessageId,
parentMessageId: overrideParentMessageId ?? userMessageId ?? parentMessageId,
userMessageId,
})
.catch((err) => {
logger.error('[api/server/controllers/agents/request] Error in `handleAbortError`', err);
})
.finally(() => {
performCleanup();
});
}
};
module.exports = AgentController;