LibreChat/api/server/middleware/abortMiddleware.js
Danny Avila bc6b032421
Some checks are pending
Docker Dev Branch Images Build / build (Dockerfile, lc-dev, node) (push) Waiting to run
Docker Dev Branch Images Build / build (Dockerfile.multi, lc-dev-api, api-build) (push) Waiting to run
GitNexus Index / index (push) Waiting to run
GitNexus Index / post-index (push) Blocked by required conditions
Docker Dev Images Build / build (Dockerfile, librechat-dev, node) (push) Waiting to run
Docker Dev Images Build / build (Dockerfile.multi, librechat-dev-api, api-build) (push) Waiting to run
Sync Locize Translations & Create Translation PR / Sync Translation Keys with Locize (push) Waiting to run
Sync Locize Translations & Create Translation PR / Create Translation PR on Version Published (push) Blocked by required conditions
Sync Helm Chart Tags / Ignore non-main push (push) Waiting to run
Sync Helm Chart Tags / Sync chart tags (push) Waiting to run
🛑 refactor: Demote User Abort Logs (#13904)
* fix: Demote user abort logging

* fix: Handle abort causes

* fix: Demote user-aborted agent completion to debug log

The error users still saw originated in AgentClient's completion catch,
which logged every caught error (including user aborts) at error level
before checking the abort signal. Branch on abortController.signal.aborted
so user-initiated aborts log at debug while real failures stay error-classified.

Also give the handleAbortError it.each cases distinct titles.
2026-06-23 09:55:21 -04:00

322 lines
9.5 KiB
JavaScript

const { logger } = require('@librechat/data-schemas');
const { isAssistantsEndpoint, ErrorTypes } = require('librechat-data-provider');
const {
isEnabled,
sendEvent,
countTokens,
GenerationJobManager,
recordCollectedUsage,
sanitizeMessageForTransmit,
buildAbortedResponseMetadata,
} = require('@librechat/api');
const { truncateText, smartTruncateText } = require('~/app/clients/prompts');
const clearPendingReq = require('~/cache/clearPendingReq');
const { sendError } = require('~/server/middleware/error');
const { abortRun } = require('./abortRun');
const db = require('~/models');
/**
* @param {Error | unknown} error
* @returns {boolean}
*/
const isAbortError = (error) => {
const visited = new Set();
let current = error;
while (current && typeof current === 'object' && !visited.has(current)) {
visited.add(current);
const errorName = current.name;
const errorCode = current.code;
const errorMessage = typeof current.message === 'string' ? current.message : '';
if (
errorName === 'AbortError' ||
errorCode === 'ABORT_ERR' ||
errorMessage.includes('AbortError') ||
/(?:operation|request|stream) was aborted/i.test(errorMessage)
) {
return true;
}
current = current.cause;
}
return false;
};
/**
* Spend tokens for all models from collected usage.
* This handles both sequential and parallel agent execution.
*
* IMPORTANT: After spending, this function clears the collectedUsage array
* to prevent double-spending. The array is shared with AgentClient.collectedUsage,
* so clearing it here prevents the finally block from also spending tokens.
*
* @param {Object} params
* @param {string} params.userId - User ID
* @param {string} params.conversationId - Conversation ID
* @param {Array<Object>} params.collectedUsage - Usage metadata from all models
* @param {string} [params.fallbackModel] - Fallback model name if not in usage
* @param {string} [params.messageId] - The response message ID for transaction correlation
*/
async function spendCollectedUsage({
userId,
conversationId,
collectedUsage,
fallbackModel,
messageId,
}) {
if (!collectedUsage || collectedUsage.length === 0) {
return;
}
await recordCollectedUsage(
{
spendTokens: db.spendTokens,
spendStructuredTokens: db.spendStructuredTokens,
pricing: { getMultiplier: db.getMultiplier, getCacheMultiplier: db.getCacheMultiplier },
bulkWriteOps: { insertMany: db.bulkInsertTransactions, updateBalance: db.updateBalance },
},
{
user: userId,
conversationId,
collectedUsage,
context: 'abort',
messageId,
model: fallbackModel,
},
);
// Clear the array to prevent double-spending from the AgentClient finally block.
// The collectedUsage array is shared by reference with AgentClient.collectedUsage,
// so clearing it here ensures recordCollectedUsage() sees an empty array and returns early.
collectedUsage.length = 0;
}
/**
* Abort an active message generation.
* Uses GenerationJobManager for all agent requests.
* Since streamId === conversationId, we can directly abort by conversationId.
*/
async function abortMessage(req, res) {
const { abortKey, endpoint } = req.body;
if (isAssistantsEndpoint(endpoint)) {
return await abortRun(req, res);
}
const conversationId = abortKey?.split(':')?.[0] ?? req.user.id;
const userId = req.user.id;
// Use GenerationJobManager to abort the job (streamId === conversationId)
const abortResult = await GenerationJobManager.abortJob(conversationId);
if (!abortResult.success) {
if (!res.headersSent) {
return res.status(204).send({ message: 'Request not found' });
}
return;
}
const { jobData, content, text, collectedUsage } = abortResult;
const completionTokens = await countTokens(text);
const promptTokens = jobData?.promptTokens ?? 0;
const responseMessage = {
messageId: jobData?.responseMessageId,
parentMessageId: jobData?.userMessage?.messageId,
conversationId: jobData?.conversationId,
content,
text,
sender: jobData?.sender ?? 'AI',
finish_reason: 'incomplete',
endpoint: jobData?.endpoint,
iconURL: jobData?.iconURL,
model: jobData?.model,
unfinished: false,
error: false,
isCreatedByUser: false,
tokenCount: completionTokens,
};
/** Persist the usage/cost rollup + context breakdown for the stopped response
* so its branch/total cost and granular rows survive a reload, matching the
* normal completion path. */
const abortMetadata = buildAbortedResponseMetadata(jobData);
if (abortMetadata) {
responseMessage.metadata = abortMetadata;
}
// Spend tokens for ALL models from collectedUsage (handles parallel agents/addedConvo)
if (collectedUsage && collectedUsage.length > 0) {
await spendCollectedUsage({
userId,
conversationId: jobData?.conversationId,
collectedUsage,
fallbackModel: jobData?.model,
messageId: jobData?.responseMessageId,
});
} else {
// Fallback: no collected usage, use text-based token counting for primary model only
await db.spendTokens(
{ ...responseMessage, context: 'incomplete', user: userId },
{ promptTokens, completionTokens },
);
}
await db.saveMessage(
{
userId: req?.user?.id,
isTemporary: req?.body?.isTemporary,
interfaceConfig: req?.config?.interfaceConfig,
},
{ ...responseMessage, user: userId },
{ context: 'api/server/middleware/abortMiddleware.js' },
);
// Get conversation for title
const conversation = await db.getConvo(userId, conversationId);
const finalEvent = {
title: conversation && !conversation.title ? null : conversation?.title || 'New Chat',
final: true,
conversation,
requestMessage: jobData?.userMessage
? sanitizeMessageForTransmit({
messageId: jobData.userMessage.messageId,
parentMessageId: jobData.userMessage.parentMessageId,
conversationId: jobData.userMessage.conversationId,
text: jobData.userMessage.text,
quotes: jobData.userMessage.quotes,
isCreatedByUser: true,
})
: null,
responseMessage,
};
logger.debug(
`[abortMessage] ID: ${userId} | ${req.user.email} | Aborted request: ${conversationId}`,
);
if (res.headersSent) {
return sendEvent(res, finalEvent);
}
res.setHeader('Content-Type', 'application/json');
res.send(JSON.stringify(finalEvent));
}
const handleAbort = function () {
return async function (req, res) {
try {
if (isEnabled(process.env.LIMIT_CONCURRENT_MESSAGES)) {
await clearPendingReq({ userId: req.user.id });
}
return await abortMessage(req, res);
} catch (err) {
logger.error('[abortMessage] handleAbort error', err);
}
};
};
/**
* Handle abort errors during generation.
* @param {ServerResponse} res
* @param {ServerRequest} req
* @param {Error | unknown} error
* @param {Partial<TMessage> & { partialText?: string }} data
* @returns {Promise<void>}
*/
const handleAbortError = async (res, req, error, data) => {
const { sender, conversationId, messageId, parentMessageId, userMessageId, partialText } = data;
if (error?.message?.includes('base64')) {
logger.error('[handleAbortError] Error in base64 encoding', {
...error,
stack: smartTruncateText(error?.stack, 1000),
message: truncateText(error.message, 350),
});
} else if (isAbortError(error)) {
logger.debug('[handleAbortError] AI response aborted by user', {
conversationId,
code: error?.code,
name: error?.name,
message: truncateText(error?.message ?? 'AbortError', 350),
});
} else {
logger.error('[handleAbortError] AI response error; aborting request:', error);
}
if (error?.stack && error.stack.includes('google')) {
logger.warn(
`AI Response error for conversation ${conversationId} likely caused by Google censor/filter`,
);
}
let errorText = error?.message?.includes('"type"')
? error.message
: 'An error occurred while processing your request. Please contact the Admin.';
if (error?.type === ErrorTypes.INVALID_REQUEST) {
errorText = `{"type":"${ErrorTypes.INVALID_REQUEST}"}`;
}
if (error?.message?.includes("does not support 'system'")) {
errorText = `{"type":"${ErrorTypes.NO_SYSTEM_MESSAGES}"}`;
}
/**
* @param {string} partialText
* @returns {Promise<void>}
*/
const respondWithError = async (partialText) => {
const endpointOption = req.body?.endpointOption;
let options = {
sender,
messageId,
conversationId,
parentMessageId,
text: errorText,
user: req.user.id,
spec: endpointOption?.spec,
iconURL: endpointOption?.iconURL,
modelLabel: endpointOption?.modelLabel,
shouldSaveMessage: userMessageId != null,
model: endpointOption?.modelOptions?.model || req.body?.model,
};
if (req.body?.agent_id) {
options.agent_id = req.body.agent_id;
}
if (partialText) {
options = {
...options,
error: false,
unfinished: true,
text: partialText,
};
}
await sendError(req, res, options);
};
if (partialText && partialText.length > 5) {
try {
return await abortMessage(req, res);
} catch (err) {
logger.error('[handleAbortError] error while trying to abort message', err);
return respondWithError(partialText);
}
} else {
return respondWithError();
}
};
module.exports = {
handleAbort,
handleAbortError,
spendCollectedUsage,
};