mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-07-02 20:32:58 +00:00
* 💾 feat: Persist Context Breakdown & Branch/Total Usage Cost Persist the granular context breakdown and per-response usage/cost on the response message metadata, and re-derive branch + total usage/cost from a per-message index so the popover survives reloads and is branch-aware live. - Add aggregateEmittedUsage + buildPersistedContextUsage helpers in packages/api; capture the latest visible snapshot and every emitted on_token_usage payload via contextUsageSink/usageEmitSink. - Attach metadata.contextUsage (Part A) and metadata.usage (Part B) on the agents response message in sendCompletion. - Carry per-message usage on the token index; add sumTotalUsage/setEntryUsage and branch-scoped usage on sumBranch. - Repurpose the session accumulator into a single in-flight pending holder; flush it into the index at finalize; hydrate breakdowns on load. - Render branch cost with a conditional all-branches total in the breakdown. * 🧹 chore: Remove orphaned com_ui_session_cost i18n key * 🩹 fix: Address Codex review — normalize usage server-side, fix reload deltas - Persist per-event-normalized display units in metadata.usage (TResponseUsage) so reloaded mixed-provider turns match the live session; client reads them directly instead of re-normalizing with a single stamped provider (P2). - Persist completedOutputTokens (final call output) on metadata.contextUsage so a reloaded multi-call turn adds the post-snapshot delta, not the full tokenCount the snapshot already counts (P2). - buildIndex preserves a prior entry's immutable usage when a rebuilt cache message lacks metadata.usage, so a mid-session rebuild (regenerate) keeps a sibling branch's flushed cost (fixes the e2e regenerate failure). - Track costKnown so turns saved with contextCost off don't render $0.00 when cost display is later enabled (P3). - Use an epsilon for the all-branches cost comparison to avoid a spurious total row from float summation order (P3). - Update unit/integration/e2e tests for the new shapes; regenerate e2e asserts the all-branches total after reload (deterministic via persisted metadata). * 🩹 fix: Address Codex round 2 — pending leak, cost coverage, reload delta - Clear the in-flight pending usage on terminal abort/error (resetLive), so a stopped generation's tokens no longer merge into the next response (P2). - costKnown now means COMPLETE coverage (ANDed): a branch mixing cost-bearing and cost-less turns is flagged incomplete and the cost row is hidden rather than rendering an under-reported total (P2). - Drop the tokenCount fallback for completedOutputTokens on reload: only the persisted post-snapshot delta is used, so a multi-call turn whose provider emitted no usage_metadata no longer double-counts earlier output (P2). - Update tokens.spec for AND coverage semantics + incomplete-cost case. * 🩹 fix: Address Codex round 3 — no-usage snapshots, total coverage, provider-less cache - Skip persisting metadata.contextUsage when the response emitted no primary usage event: without a known post-snapshot output the granular gauge would undercount the reply on reload, so fall back to the coarse per-message estimate instead (P2). - Gate the all-branches cost row on totalUsage.costKnown so an incomplete total (a sibling saved without cost) never renders an under-reported figure (P2). - aggregateEmittedUsage/finalCallOutputTokens now normalize per-event with the client's magnitude fallback (normalizeEventUnits) instead of billing splitUsage, so provider-less cached events match live on reload (P2). - Add backend test for the provider-less cached case. * 🩹 fix: Address Codex round 4 — abort attribution, complete cost coverage - aggregateEmittedUsage persists cost only when EVERY call was priced; a partial pricing failure now omits cost so the client treats coverage as unknown rather than reading an under-reported sum as authoritative (P2). - finalizeUsage flushes pending into the response entry only when events were folded this session (eventCount > 0), so a late/second resumable subscriber carrying persisted metadata.usage keeps it instead of being overwritten with an empty pending record (P2). - On user stop, attribute the in-flight pending usage to the partial response (new attributePending handler) instead of discarding it in resetLive — the stopped reply's billed tokens are kept and still can't leak into the next response; resetLive's discard remains for the error path (P2). * 🐛 fix: Persist branch cost across branch switches via sticky usage history Branch cost vanished on switching to a sibling branch (until a new turn) — the cost analog of the granularity bug. buildIndex rebuilds the token index from the messages cache; a sibling generated this session whose cache message lacks metadata.usage (and is transiently dropped from the cache during regenerate) lost its live-flushed usage, so sumBranch found none and the cost row hid. Fix: a sticky per-response usage map (conversationId → messageId → usage), written by setEntryUsage and never rebuilt from the cache — the usage counterpart of snapshotsByAnchorFamily for the breakdown. buildIndex/upsertEntries restore an entry's usage from it when the message carries none; cleared on convo switch and migrated with the index. Add unit coverage for the drop-then-readd regression and an e2e assertion that branch cost survives a branch switch. * 🐛 fix: Re-index on branch switch so branch cost survives the switch The sticky usage history alone didn't fix the reported branch-switch cost drop: on a branch switch no cache `updated` event fires, so the index subscriber never re-ran, and the post-regenerate rebuild was skipped while `isSubmitting` was still true — leaving the index stale and missing the now-viewed branch's response entirely (sticky can only restore entries present in a rebuild). Re-index from the messages cache on every tail change (created/finalize AND branch switch), not just while submitting. The cache holds the full message set at switch time, so the viewed branch's response is re-added and its usage restored from metadata.usage or the sticky history → sumBranch finds it and the branch cost renders. Verified locally: the branch-switch e2e now passes (the cost section shows both the branch row and the all-branches total). Also fixed that e2e assertion to target a single cost value (strict-mode safe). * 🩹 fix: Handle stopped-stream usage — reset pending + persist abort metadata Codex round (stop/abort edges): - Resumable explicit-stop (intentional SSE close) reset UI state but never cleared pendingUsageFamily, so usage folded before the stop leaked into the next response in the conversation. Discard pending on intentional close (resetLive); a resume re-folds via backfillUsage, so nothing is lost. - The abort save path (abortMiddleware) persisted the stopped response without metadata.usage/contextUsage, so its cost + breakdown vanished on reload. Rebuild both from the job's persisted tokenUsage (emitted payloads incl. cost) and contextUsage snapshot — parity with the normal sendCompletion path; breakdown gated on a primary usage event like buildResponseMetadata. Deferred (per scope decision): mid-stream branch-switch transiently shows the streaming branch's pending on the viewed sibling (cosmetic, until finalize). * 🩹 fix: Persist abort metadata on the real agents route + tighten snapshot gate Codex round (corrects last round's wrong-path fixes): - Stopped AGENTS responses are saved by routes/agents/index.js (/chat/abort), not abortMiddleware — so last round's metadata fix never ran for them. Moved the rollup/snapshot builder into packages/api as buildAbortedResponseMetadata (shared, unit-tested) and applied it in BOTH abort save paths, so a stopped agent reply keeps its cost + breakdown on reload. - Persist the breakdown only when the FINAL visible call emitted usage: track a per-response snapshot count and require primaryUsageCount >= snapshotCount. Previously any earlier primary usage event passed the gate, so a multi-call turn whose final call emitted no usage_metadata used an earlier call's output as completedOutputTokens (already counted by the latest snapshot) → reload over-reported. Now it falls back to the coarse estimate. Resumable stop pending-reset (prior round, 3cde6fe035) already flows through clearAllSubmissions → SSE close → the intentional-close handler's resetLive. Deferred per scope: mid-stream branch-switch pending attribution (tracked). * 🩹 fix: Abort breakdown over-count + resume re-fold after pending discard Codex round (on the re-applied abort/snapshot work): - buildAbortedResponseMetadata now persists ONLY the usage/cost rollup, not the context breakdown. The abort path can't tell whether the final call emitted usage (the job stores only the latest snapshot, not a count), so persisting the breakdown risked reusing an earlier call's output as completedOutputTokens (already in the snapshot) → reload over-count. Stopped/incomplete responses now fall back to the coarse gauge estimate, which is safe and apt. - resetLive now also forgets the conversation's folded usage-event identities (clearUsageFolded). Discarding pending on a terminal/intentional close left the folded keys set, so a later resume's backfillUsage saw the persisted events as duplicates and never rebuilt pending — leaving the response's usage missing until a full reload. Clearing them lets the resume re-fold.
347 lines
11 KiB
JavaScript
347 lines
11 KiB
JavaScript
const express = require('express');
|
|
const {
|
|
isEnabled,
|
|
GenerationJobManager,
|
|
hasPersistableAbortContent,
|
|
buildAbortedResponseMetadata,
|
|
} = require('@librechat/api');
|
|
const { createSseStreamTelemetry } = require('@librechat/api/telemetry');
|
|
const { logger } = require('@librechat/data-schemas');
|
|
const {
|
|
uaParser,
|
|
checkBan,
|
|
requireJwtAuth,
|
|
messageIpLimiter,
|
|
configMiddleware,
|
|
messageUserLimiter,
|
|
} = require('~/server/middleware');
|
|
const { saveMessage } = require('~/models');
|
|
const responses = require('./responses');
|
|
const openai = require('./openai');
|
|
const { v1 } = require('./v1');
|
|
const chat = require('./chat');
|
|
|
|
const { LIMIT_MESSAGE_IP, LIMIT_MESSAGE_USER } = process.env ?? {};
|
|
|
|
/** Untenanted jobs (pre-multi-tenancy) remain accessible if the userId check passes. */
|
|
function hasTenantMismatch(job, user) {
|
|
return job.metadata?.tenantId != null && job.metadata.tenantId !== user.tenantId;
|
|
}
|
|
|
|
const router = express.Router();
|
|
|
|
/**
|
|
* Open Responses API routes (API key authentication handled in route file)
|
|
* Mounted at /agents/v1/responses (full path: /api/agents/v1/responses)
|
|
* NOTE: Must be mounted BEFORE /v1 to avoid being caught by the less specific route
|
|
* @see https://openresponses.org/specification
|
|
*/
|
|
router.use('/v1/responses', responses);
|
|
|
|
/**
|
|
* OpenAI-compatible API routes (API key authentication handled in route file)
|
|
* Mounted at /agents/v1 (full path: /api/agents/v1/chat/completions)
|
|
*/
|
|
router.use('/v1', openai);
|
|
|
|
router.use(requireJwtAuth);
|
|
router.use(checkBan);
|
|
router.use(uaParser);
|
|
|
|
/**
|
|
* Stream endpoints - mounted before chatRouter to bypass rate limiters
|
|
* These are GET requests and don't need message body validation or rate limiting
|
|
*/
|
|
|
|
/**
|
|
* @route GET /chat/stream/:streamId
|
|
* @desc Subscribe to an ongoing generation job's SSE stream with replay support
|
|
* @access Private
|
|
* @description Sends sync event with resume state, replays missed chunks, then streams live
|
|
* @query resume=true - Indicates this is a reconnection (sends sync event)
|
|
*/
|
|
router.get('/chat/stream/:streamId', async (req, res) => {
|
|
const { streamId } = req.params;
|
|
const isResume = req.query.resume === 'true';
|
|
|
|
const job = await GenerationJobManager.getJob(streamId);
|
|
if (!job) {
|
|
return res.status(404).json({
|
|
error: 'Stream not found',
|
|
message: 'The generation job does not exist or has expired.',
|
|
});
|
|
}
|
|
|
|
if (job.metadata?.userId && job.metadata.userId !== req.user.id) {
|
|
return res.status(403).json({ error: 'Unauthorized' });
|
|
}
|
|
|
|
if (hasTenantMismatch(job, req.user)) {
|
|
return res.status(403).json({ error: 'Unauthorized' });
|
|
}
|
|
|
|
const streamTelemetry = createSseStreamTelemetry({ req, res, streamId, isResume });
|
|
|
|
res.setHeader('Content-Encoding', 'identity');
|
|
res.setHeader('Content-Type', 'text/event-stream');
|
|
res.setHeader('Cache-Control', 'no-cache, no-transform');
|
|
res.setHeader('Connection', 'keep-alive');
|
|
res.setHeader('X-Accel-Buffering', 'no');
|
|
res.flushHeaders();
|
|
streamTelemetry.recordHeadersFlushed();
|
|
|
|
logger.debug(`[AgentStream] Client subscribed to ${streamId}, resume: ${isResume}`);
|
|
|
|
const writeEvent = (event, options = {}) => {
|
|
if (!res.writableEnded) {
|
|
const eventName = options.eventName ?? 'message';
|
|
const payload = `event: ${eventName}\ndata: ${JSON.stringify(event)}\n\n`;
|
|
res.write(payload);
|
|
streamTelemetry.recordWrite(payload, { final: options.final });
|
|
if (typeof res.flush === 'function') {
|
|
res.flush();
|
|
}
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
};
|
|
|
|
const onDone = (event) => {
|
|
streamTelemetry.recordFinalEventEmitted();
|
|
writeEvent(event, { final: true });
|
|
res.end();
|
|
};
|
|
|
|
const onError = (error) => {
|
|
if (!res.writableEnded) {
|
|
streamTelemetry.recordErrorEventEmitted();
|
|
writeEvent({ error }, { eventName: 'error' });
|
|
res.end();
|
|
}
|
|
};
|
|
|
|
let result;
|
|
|
|
if (isResume) {
|
|
const { subscription, resumeState, pendingEvents } =
|
|
await GenerationJobManager.subscribeWithResume(streamId, writeEvent, onDone, onError);
|
|
|
|
if (!res.writableEnded) {
|
|
if (resumeState) {
|
|
writeEvent({ sync: true, resumeState, pendingEvents });
|
|
GenerationJobManager.markSyncSent(streamId);
|
|
logger.debug(
|
|
`[AgentStream] Sent sync event for ${streamId} with ${resumeState.runSteps.length} run steps, ${pendingEvents.length} pending events`,
|
|
);
|
|
} else if (pendingEvents.length > 0) {
|
|
for (const event of pendingEvents) {
|
|
writeEvent(event);
|
|
}
|
|
logger.warn(
|
|
`[AgentStream] Resume state null for ${streamId}, replayed ${pendingEvents.length} gap events directly`,
|
|
);
|
|
}
|
|
}
|
|
|
|
result = subscription;
|
|
} else {
|
|
result = await GenerationJobManager.subscribe(streamId, writeEvent, onDone, onError);
|
|
}
|
|
|
|
if (!result) {
|
|
streamTelemetry.recordSubscribeFailed();
|
|
onError('Failed to subscribe to stream');
|
|
return;
|
|
}
|
|
|
|
req.on('close', () => {
|
|
logger.debug(`[AgentStream] Client disconnected from ${streamId}`);
|
|
result.unsubscribe();
|
|
});
|
|
});
|
|
|
|
/**
|
|
* @route GET /chat/active
|
|
* @desc Get all active generation job IDs for the current user
|
|
* @access Private
|
|
* @returns { activeJobIds: string[] }
|
|
*/
|
|
router.get('/chat/active', async (req, res) => {
|
|
const activeJobIds = await GenerationJobManager.getActiveJobIdsForUser(
|
|
req.user.id,
|
|
req.user.tenantId,
|
|
);
|
|
res.json({ activeJobIds });
|
|
});
|
|
|
|
/**
|
|
* @route GET /chat/status/:conversationId
|
|
* @desc Check if there's an active generation job for a conversation
|
|
* @access Private
|
|
* @returns { active, streamId, status, aggregatedContent, createdAt, resumeState }
|
|
*/
|
|
router.get('/chat/status/:conversationId', async (req, res) => {
|
|
const { conversationId } = req.params;
|
|
|
|
// streamId === conversationId, so we can use getJob directly
|
|
const job = await GenerationJobManager.getJob(conversationId);
|
|
|
|
if (!job) {
|
|
return res.json({ active: false });
|
|
}
|
|
|
|
if (job.metadata.userId !== req.user.id) {
|
|
return res.status(403).json({ error: 'Unauthorized' });
|
|
}
|
|
|
|
if (hasTenantMismatch(job, req.user)) {
|
|
return res.status(403).json({ error: 'Unauthorized' });
|
|
}
|
|
|
|
// Get resume state which contains aggregatedContent
|
|
// Avoid calling both getStreamInfo and getResumeState (both fetch content)
|
|
const resumeState = await GenerationJobManager.getResumeState(conversationId);
|
|
const isActive = job.status === 'running';
|
|
|
|
res.json({
|
|
active: isActive,
|
|
streamId: conversationId,
|
|
status: job.status,
|
|
aggregatedContent: resumeState?.aggregatedContent ?? [],
|
|
createdAt: job.createdAt,
|
|
resumeState,
|
|
});
|
|
});
|
|
|
|
/**
|
|
* @route POST /chat/abort
|
|
* @desc Abort an ongoing generation job
|
|
* @access Private
|
|
* @description Mounted before chatRouter to bypass buildEndpointOption middleware
|
|
*/
|
|
router.post('/chat/abort', async (req, res) => {
|
|
logger.debug(`[AgentStream] ========== ABORT ENDPOINT HIT ==========`);
|
|
logger.debug(`[AgentStream] Method: ${req.method}, Path: ${req.path}`);
|
|
logger.debug(`[AgentStream] Body:`, req.body);
|
|
|
|
const { streamId, conversationId, abortKey } = req.body;
|
|
const userId = req.user?.id;
|
|
|
|
// streamId === conversationId, so try any of the provided IDs
|
|
// Skip "new" as it's a placeholder for new conversations, not an actual ID
|
|
let jobStreamId =
|
|
streamId || (conversationId !== 'new' ? conversationId : null) || abortKey?.split(':')[0];
|
|
let job = jobStreamId ? await GenerationJobManager.getJob(jobStreamId) : null;
|
|
|
|
// Fallback: if job not found and we have a userId, look up active jobs for user
|
|
// This handles the case where frontend sends "new" but job was created with a UUID
|
|
if (!job && userId) {
|
|
logger.debug(`[AgentStream] Job not found by ID, checking active jobs for user: ${userId}`);
|
|
const activeJobIds = await GenerationJobManager.getActiveJobIdsForUser(
|
|
userId,
|
|
req.user.tenantId,
|
|
);
|
|
if (activeJobIds.length > 0) {
|
|
// Abort the most recent active job for this user
|
|
jobStreamId = activeJobIds[0];
|
|
job = await GenerationJobManager.getJob(jobStreamId);
|
|
logger.debug(`[AgentStream] Found active job for user: ${jobStreamId}`);
|
|
}
|
|
}
|
|
|
|
logger.debug(`[AgentStream] Computed jobStreamId: ${jobStreamId}`);
|
|
|
|
if (job && jobStreamId) {
|
|
if (job.metadata?.userId && job.metadata.userId !== userId) {
|
|
logger.warn(`[AgentStream] Unauthorized abort attempt for ${jobStreamId} by user ${userId}`);
|
|
return res.status(403).json({ error: 'Unauthorized' });
|
|
}
|
|
|
|
if (hasTenantMismatch(job, req.user)) {
|
|
return res.status(403).json({ error: 'Unauthorized' });
|
|
}
|
|
|
|
logger.debug(`[AgentStream] Job found, aborting: ${jobStreamId}`);
|
|
const abortResult = await GenerationJobManager.abortJob(jobStreamId);
|
|
logger.debug(`[AgentStream] Job aborted successfully: ${jobStreamId}`, {
|
|
abortResultSuccess: abortResult.success,
|
|
abortResultUserMessageId: abortResult.jobData?.userMessage?.messageId,
|
|
abortResultResponseMessageId: abortResult.jobData?.responseMessageId,
|
|
});
|
|
|
|
// CRITICAL: Save partial response BEFORE returning to prevent race condition.
|
|
// If user sends a follow-up immediately after abort, the parentMessageId must exist in DB.
|
|
// Only save if we have a valid responseMessageId (skip early aborts before generation started)
|
|
if (
|
|
abortResult.success &&
|
|
abortResult.jobData?.userMessage?.messageId &&
|
|
abortResult.jobData?.responseMessageId &&
|
|
hasPersistableAbortContent(abortResult.content)
|
|
) {
|
|
const { jobData, content, text } = abortResult;
|
|
const responseMessage = {
|
|
messageId: jobData.responseMessageId,
|
|
parentMessageId: jobData.userMessage.messageId,
|
|
conversationId: jobData.conversationId,
|
|
content: content || [],
|
|
text: text || '',
|
|
sender: jobData.sender || 'AI',
|
|
endpoint: jobData.endpoint,
|
|
iconURL: jobData.iconURL,
|
|
model: jobData.model,
|
|
unfinished: true,
|
|
error: false,
|
|
isCreatedByUser: false,
|
|
user: userId,
|
|
};
|
|
|
|
/** Persist the usage/cost rollup + context breakdown for the stopped
|
|
* response (from the job's tracked tokenUsage/contextUsage) so its
|
|
* branch/total cost and granular rows survive a reload — parity with the
|
|
* normal completion path. */
|
|
const abortMetadata = buildAbortedResponseMetadata(jobData);
|
|
if (abortMetadata) {
|
|
responseMessage.metadata = abortMetadata;
|
|
}
|
|
|
|
try {
|
|
await saveMessage(
|
|
{
|
|
userId: req?.user?.id,
|
|
isTemporary: req?.body?.isTemporary,
|
|
interfaceConfig: req?.config?.interfaceConfig,
|
|
},
|
|
responseMessage,
|
|
{ context: 'api/server/routes/agents/index.js - abort endpoint' },
|
|
);
|
|
logger.debug(`[AgentStream] Saved partial response for: ${jobStreamId}`);
|
|
} catch (saveError) {
|
|
logger.error(`[AgentStream] Failed to save partial response: ${saveError.message}`);
|
|
}
|
|
}
|
|
|
|
return res.json({ success: true, aborted: jobStreamId });
|
|
}
|
|
|
|
logger.warn(`[AgentStream] Job not found for streamId: ${jobStreamId}`);
|
|
return res.status(404).json({ error: 'Job not found', streamId: jobStreamId });
|
|
});
|
|
|
|
router.use('/', v1);
|
|
|
|
const chatRouter = express.Router();
|
|
chatRouter.use(configMiddleware);
|
|
|
|
if (isEnabled(LIMIT_MESSAGE_IP)) {
|
|
chatRouter.use(messageIpLimiter);
|
|
}
|
|
|
|
if (isEnabled(LIMIT_MESSAGE_USER)) {
|
|
chatRouter.use(messageUserLimiter);
|
|
}
|
|
|
|
chatRouter.use('/', chat);
|
|
router.use('/chat', chatRouter);
|
|
|
|
module.exports = router;
|