LibreChat/api/server/controllers/agents/__tests__/usageEvents.integration.spec.js
Danny Avila d18d62e7c1
🪙 refactor: Reconcile Context Gauge to Actual Provider Tokens (#13780)
* 🪙 fix: Reconcile Context Gauge to Actual Provider Tokens

The context gauge could read several× too high (e.g. 213K when the real prompt
was 56K) and stay there across reloads. Root cause: the SDK's calibrationRatio is
`cumulativeProviderReported / cumulativeRawSent`, but a provider's server-side
web search injects large fetched content into the prompt that the SDK never sent
or counted — pinning the ratio at its cap (5) and multiplying every later message
estimate, including post-summary ones. The gauge rendered (and persisted) that
inflated estimate, never the provider's actual token count.

Fix: reconcile the snapshot to the call's ACTUAL prompt tokens (input + cache),
which already arrive in on_token_usage. Only messageTokens is calibration-scaled
(instructions/summary are raw tiktoken), so keep those and set messageTokens to
the remainder, recomputing free space. Shared `promptTokensFromUsage` +
`reconcileContextUsage` in data-provider; applied server-side in
buildPersistedContextUsage (reload-stable) and client-side in useUsageHandler on
each primary usage (corrects at turn-end, no follow-up needed). Also drop the
summary double-count from the Breakdown Messages row.

Deferred (separate agents PR): the SDK over-calibration also fires summarization
prematurely; fixing it needs decoupling real-content estimation from server-side
injection headroom without weakening pruning-overflow safety.

* 🪙 fix: Harden Token Reconciliation for Provider-less + Resume Paths

Codex review on the reconciliation:
- promptTokensFromUsage: when the provider is absent (custom/OpenAI-compatible
  payloads), fall back to the same magnitude heuristic normalizeUsageUnits uses
  (cache ≤ input ⇒ already included) so cached events aren't re-inflated.
- Resume: backfillUsage restores a primary call's usage without replaying a live
  on_token_usage (Redis mode), so the live reconcile never ran and a reconnected
  session stayed on the inflated estimate. New reconcileBackfill reconciles the
  restored snapshot from the final primary call after contextHandler installs it.

* 🪙 fix: Reconcile Resume Snapshot Server-Side, Not via Backfill

Codex: the client reconcileBackfill scanned the resumed run's collectedUsage and
applied the final primary to the latest snapshot — but on a mid-call resume that
usage belongs to an EARLIER call, corrupting the restored gauge.

Move the resume reconciliation server-side: GenerationJobManager.persistTokenUsage
reconciles the stored contextUsage to a primary usage's actual prompt tokens as it
arrives. That usage is the post-invoke truth for the call the latest stored
snapshot precedes (no snapshot is captured between a call's pre-invoke dispatch
and its usage), so it's correct by construction and run-matched. A mid-call resume
(no usage yet) keeps the raw snapshot instead of mis-applying an earlier call's
tokens; it reconciles once the call completes. Removed client reconcileBackfill;
the live-path reconcile (non-resume) stays.

* 🪙 fix: Guard Reconciliation Against Replays and Snapshot Races

Two Codex concurrency findings on the reconciliation:
- Client: reconcile only on a NEWLY folded primary usage. A replayed duplicate
  (folded=false on resume) can be an earlier tool-loop call sharing the run id,
  which would overwrite the latest snapshot with an earlier, smaller prompt. Moved
  the reconcile after the folded guard.
- Server: serialize the context-usage write through the same per-stream queue as
  the token-usage write. persistTokenUsage reconciles the stored snapshot
  (read-modify-write); an unserialized trackContextUsage could store a newer
  snapshot between the read and write — or a stale reconciled write could land
  after a newer snapshot — clobbering the newer run's gauge when calls interleave.
  FIFO keeps each call's snapshot ahead of its own usage and behind the next.

* chore: import order in GenerationJobManager.ts
2026-06-16 11:05:44 -04:00

477 lines
17 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

const { z } = require('zod');
const { tool } = require('@langchain/core/tools');
const { ChatGenerationChunk } = require('@langchain/core/outputs');
const { HumanMessage, AIMessage, AIMessageChunk } = require('@langchain/core/messages');
const {
Run,
Providers,
GraphEvents,
FakeChatModel,
createContentAggregator,
} = require('@librechat/agents');
const {
GenerationJobManager,
aggregateEmittedUsage,
resolveAgentTokenConfig,
buildPersistedContextUsage,
} = require('@librechat/api');
const { getDefaultHandlers } = require('~/server/controllers/agents/callbacks');
jest.mock('nanoid', () => ({
nanoid: jest.fn(() => 'mock-nanoid'),
}));
jest.mock('~/server/services/Files/Citations', () => ({
processFileCitations: jest.fn(),
}));
jest.mock('~/server/services/Files/Code/process', () => ({
processCodeOutput: jest.fn(),
runPreviewFinalize: jest.fn(),
}));
jest.mock('~/server/services/Files/process', () => ({
saveBase64Image: jest.fn(),
}));
/** Real pipeline guard: published lib versions without the event skip its assertions */
const hasContextUsageEvent = GraphEvents.ON_CONTEXT_USAGE != null;
/**
* FakeChatModel that attaches provider-style usage_metadata on a final
* empty chunk (the OpenAI streaming pattern), so CHAT_MODEL_END carries
* aggregated usage through the real @librechat/agents pipeline.
*/
class UsageFakeModel extends FakeChatModel {
constructor(options, usagePerCall) {
super(options);
this.usagePerCall = usagePerCall;
this.usageCallIndex = 0;
}
async *_streamResponseChunks(messages, options, runManager) {
yield* super._streamResponseChunks(messages, options, runManager);
const index = Math.min(this.usageCallIndex, this.usagePerCall.length - 1);
this.usageCallIndex += 1;
yield new ChatGenerationChunk({
text: '',
message: new AIMessageChunk({ content: '', usage_metadata: this.usagePerCall[index] }),
});
}
}
const addTool = tool(async ({ a, b }) => String(a + b), {
name: 'add',
description: 'Add two numbers',
schema: z.object({ a: z.number(), b: z.number() }),
});
const charCounter = (msg) => {
const content = msg.content;
if (typeof content === 'string') {
return content.length + 3;
}
if (Array.isArray(content)) {
let length = 3;
for (const part of content) {
if (typeof part === 'string') {
length += part.length;
} else if (typeof part?.text === 'string') {
length += part.text.length;
}
}
return length;
}
return 3;
};
function createMockRes() {
const events = [];
return {
events,
headersSent: true,
writableEnded: false,
write(payload) {
for (const line of String(payload).split('\n')) {
if (line.startsWith('data: ')) {
events.push(JSON.parse(line.slice(6)));
}
}
return true;
},
};
}
const FIRST_CALL_USAGE = {
input_tokens: 100,
output_tokens: 20,
total_tokens: 120,
};
const SECOND_CALL_USAGE = {
input_tokens: 150,
output_tokens: 10,
total_tokens: 160,
input_token_details: { cache_creation: 30, cache_read: 50 },
};
const MAX_CONTEXT_TOKENS = 8000;
async function runToolLoop({
res,
streamId = null,
collectedUsage,
contextUsageSink = null,
usageEmitSink = null,
usageCost = null,
}) {
const { contentParts, aggregateContent } = createContentAggregator();
const handlers = getDefaultHandlers({
res,
aggregateContent,
toolEndCallback: () => {},
collectedUsage,
streamId,
contextUsageSink,
usageEmitSink,
usageCost,
});
const run = await Run.create({
runId: 'usage-e2e-response',
graphConfig: {
type: 'standard',
llmConfig: {
provider: Providers.OPENAI,
model: 'gpt-4o-mini',
streaming: true,
streamUsage: false,
},
instructions: 'You are a helpful assistant.',
maxContextTokens: MAX_CONTEXT_TOKENS,
tools: [addTool],
},
returnContent: true,
customHandlers: handlers,
tokenCounter: charCounter,
indexTokenCountMap: {},
});
run.Graph.overrideModel = new UsageFakeModel(
{
responses: ['Let me calculate that.', 'The answer is 4.'],
toolCalls: [{ name: 'add', args: { a: 2, b: 2 }, id: 'tc_1', type: 'tool_call' }],
},
[FIRST_CALL_USAGE, SECOND_CALL_USAGE],
);
await run.processStream(
{ messages: [new HumanMessage('What is 2+2?')] },
{
configurable: { thread_id: 'usage-e2e-thread', user_id: 'user-1' },
streamMode: 'values',
version: 'v2',
},
);
return { run, contentParts };
}
describe('usage events through the real agents pipeline', () => {
jest.setTimeout(30000);
afterAll(async () => {
await GenerationJobManager.destroy();
});
test('emits on_token_usage per model call with collectedUsage parity', async () => {
const res = createMockRes();
const collectedUsage = [];
const { contentParts } = await runToolLoop({ res, collectedUsage });
const usageEvents = res.events.filter((e) => e.event === 'on_token_usage');
expect(usageEvents).toHaveLength(2);
expect(usageEvents[0].data).toMatchObject(FIRST_CALL_USAGE);
expect(usageEvents[1].data).toMatchObject(SECOND_CALL_USAGE);
expect(usageEvents[0].data.provider).toBe(Providers.OPENAI);
expect(usageEvents[0].data.model).toBeTruthy();
expect(usageEvents[0].data.usage_type).toBeUndefined();
expect(collectedUsage).toHaveLength(2);
expect(collectedUsage[0]).toMatchObject(FIRST_CALL_USAGE);
expect(collectedUsage[1]).toMatchObject(SECOND_CALL_USAGE);
const text = contentParts
.filter((part) => part?.type === 'text')
.map((part) => part.text)
.join('');
expect(text).toContain('The answer is 4.');
});
test('emits a context snapshot before each model call', async () => {
if (!hasContextUsageEvent) {
console.warn('Skipping: installed @librechat/agents predates ON_CONTEXT_USAGE');
return;
}
const res = createMockRes();
const { run } = await runToolLoop({ res, collectedUsage: [] });
expect(run).toBeDefined();
const contextEvents = res.events.filter((e) => e.event === 'on_context_usage');
expect(contextEvents).toHaveLength(2);
for (const event of contextEvents) {
const { breakdown, contextBudget, remainingContextTokens, effectiveInstructionTokens } =
event.data;
expect(breakdown.maxContextTokens).toBe(MAX_CONTEXT_TOKENS);
expect(contextBudget).toBeGreaterThan(0);
expect(contextBudget).toBeLessThanOrEqual(MAX_CONTEXT_TOKENS);
expect(effectiveInstructionTokens).toBeGreaterThan(0);
expect(remainingContextTokens).toBeGreaterThan(0);
expect(remainingContextTokens).toBeLessThan(contextBudget);
expect(breakdown.toolTokenCounts.add).toBeGreaterThan(0);
}
/** Tool loop grows the context between calls */
expect(contextEvents[1].data.prePruneContextTokens).toBeGreaterThan(
contextEvents[0].data.prePruneContextTokens,
);
/** Snapshot precedes the call's usage event */
const firstContextIndex = res.events.findIndex((e) => e.event === 'on_context_usage');
const firstUsageIndex = res.events.findIndex((e) => e.event === 'on_token_usage');
expect(firstContextIndex).toBeGreaterThanOrEqual(0);
expect(firstContextIndex).toBeLessThan(firstUsageIndex);
});
test('captures the usage rollup + latest context snapshot for message persistence', () => {
const res = createMockRes();
const contextUsageSink = { latest: null };
const usageEmitSink = [];
return runToolLoop({ res, collectedUsage: [], contextUsageSink, usageEmitSink }).then(() => {
/** Both model calls' emitted payloads are captured for the rollup */
expect(usageEmitSink).toHaveLength(2);
const usage = aggregateEmittedUsage(usageEmitSink);
/** Display units: openAI is cache-subset, so input excludes cache
* (1503050=70); output is repaired completion */
expect(usage).toEqual({
input:
FIRST_CALL_USAGE.input_tokens +
(SECOND_CALL_USAGE.input_tokens -
SECOND_CALL_USAGE.input_token_details.cache_creation -
SECOND_CALL_USAGE.input_token_details.cache_read),
output: FIRST_CALL_USAGE.output_tokens + SECOND_CALL_USAGE.output_tokens,
cacheWrite: SECOND_CALL_USAGE.input_token_details.cache_creation,
cacheRead: SECOND_CALL_USAGE.input_token_details.cache_read,
});
/** contextCost off → no cost folded into the rollup */
expect(usage.cost).toBeUndefined();
if (hasContextUsageEvent) {
expect(contextUsageSink.latest).not.toBeNull();
const persisted = buildPersistedContextUsage(contextUsageSink.latest);
expect(persisted.breakdown.maxContextTokens).toBe(MAX_CONTEXT_TOKENS);
/** Zero-valued tool counts are trimmed from the persisted blob */
for (const count of Object.values(persisted.breakdown.toolTokenCounts ?? {})) {
expect(count).toBeGreaterThan(0);
}
}
});
});
test('folds authoritative per-event cost into the rollup when contextCost is on', async () => {
const res = createMockRes();
const usageEmitSink = [];
/** Stub pricing mirroring getMultiplier/getCacheMultiplier shape */
const usageCost = {
enabled: true,
pricing: {
getMultiplier: ({ tokenType }) => (tokenType === 'completion' ? 15 : 3),
getCacheMultiplier: ({ cacheType }) => (cacheType === 'write' ? 3.75 : 0.3),
},
};
await runToolLoop({ res, collectedUsage: [], usageEmitSink, usageCost });
for (const event of usageEmitSink) {
expect(typeof event.cost).toBe('number');
}
const usage = aggregateEmittedUsage(usageEmitSink);
expect(usage.cost).toBeGreaterThan(0);
expect(usage.cost).toBeCloseTo(usageEmitSink.reduce((sum, e) => sum + e.cost, 0));
});
test('emit path prices each call by its producing agent and strips the agentId tag', () => {
const res = createMockRes();
const usageEmitSink = [];
/** Two endpoints share a model id but bill at different rates. */
const primaryConfig = { 'gpt-4': { prompt: 0.01, completion: 0.03, context: 8192 } };
const subagentConfig = { 'gpt-4': { prompt: 0.05, completion: 0.15, context: 8192 } };
const byAgentId = new Map([
['primary', primaryConfig],
['sub', subagentConfig],
]);
const usageCost = {
enabled: true,
endpointTokenConfig: primaryConfig,
pricing: {
getMultiplier: ({ tokenType, model, endpointTokenConfig }) =>
endpointTokenConfig?.[model]?.[tokenType] ?? 0,
getCacheMultiplier: () => 0,
},
resolveEndpointTokenConfig: (usage) =>
resolveAgentTokenConfig({ agentId: usage?.agentId, byAgentId, fallback: primaryConfig }),
};
const { aggregateContent } = createContentAggregator();
const handlers = getDefaultHandlers({
res,
aggregateContent,
toolEndCallback: () => {},
collectedUsage: [],
usageEmitSink,
usageCost,
});
/** The CHAT_MODEL_END handler's emitUsage IS the real emitTokenUsage closure. */
const emitUsage = handlers[GraphEvents.CHAT_MODEL_END].emitUsage;
const call = { model: 'gpt-4', input_tokens: 100, output_tokens: 50, total_tokens: 150 };
emitUsage({ ...call, agentId: 'sub' });
emitUsage({ ...call, agentId: 'primary' });
const events = res.events.filter((e) => e.event === 'on_token_usage');
expect(events).toHaveLength(2);
/** agentId is an internal pricing tag — never streamed to the client nor
* folded into the persisted rollup. */
for (const e of events) {
expect(e.data.agentId).toBeUndefined();
}
for (const entry of usageEmitSink) {
expect(entry.agentId).toBeUndefined();
}
/** Same tokens + model id, but the subagent endpoint's higher rates price
* its call above the primary — proving per-agent emit pricing. The 5× ratio
* ((100·0.05+50·0.15)/(100·0.01+50·0.03)) is scale-independent of credit units. */
expect(events[1].data.cost).toBeGreaterThan(0);
expect(events[0].data.cost).toBeGreaterThan(events[1].data.cost);
expect(events[0].data.cost / events[1].data.cost).toBeCloseTo(5);
});
test('persists usage and context snapshot for resume via GenerationJobManager', async () => {
const streamId = `usage-e2e-stream-${Date.now()}`;
await GenerationJobManager.createJob(streamId, 'user-1', 'convo-1');
const res = createMockRes();
await runToolLoop({ res, streamId, collectedUsage: [] });
const resumeState = await GenerationJobManager.getResumeState(streamId);
expect(resumeState).not.toBeNull();
expect(resumeState.collectedUsage).toHaveLength(2);
expect(resumeState.collectedUsage[0]).toMatchObject(FIRST_CALL_USAGE);
expect(resumeState.collectedUsage[1]).toMatchObject(SECOND_CALL_USAGE);
if (hasContextUsageEvent) {
expect(resumeState.contextUsage.breakdown.maxContextTokens).toBe(MAX_CONTEXT_TOKENS);
/** Latest-wins: the persisted snapshot is the second call's */
expect(resumeState.contextUsage.prePruneContextTokens).toBeGreaterThan(0);
/** Reconciled to the final primary call's actual prompt: openAI folds cache
* into input_tokens (150), so the resume snapshot's used = 150 — the real
* context, not the calibrated estimate. */
const used =
resumeState.contextUsage.contextBudget - resumeState.contextUsage.remainingContextTokens;
expect(used).toBe(SECOND_CALL_USAGE.input_tokens);
}
});
/** Drives a real summarization (tight context + padded history); self-summarize
* reuses the overridden fake model so no API key is needed. */
async function runSummarizationLoop({ res, collectedUsage, contextUsageSink, usageEmitSink }) {
const { aggregateContent } = createContentAggregator();
const handlers = getDefaultHandlers({
res,
aggregateContent,
toolEndCallback: () => {},
collectedUsage,
contextUsageSink,
usageEmitSink,
summarizationOptions: { enabled: true },
});
const pad = 'context detail to overflow the tiny budget. '.repeat(40);
const history = [
new HumanMessage(`Turn 1 question. ${pad}`),
new AIMessage(`Turn 1 answer. ${pad}`),
new HumanMessage(`Turn 2 question. ${pad}`),
new AIMessage(`Turn 2 answer. ${pad}`),
new HumanMessage(`Final question after a lot of prior history. ${pad}`),
];
const indexTokenCountMap = {};
history.forEach((message, i) => {
indexTokenCountMap[i] = charCounter(message);
});
const run = await Run.create({
runId: `summ-e2e-${Date.now()}`,
graphConfig: {
type: 'standard',
llmConfig: {
provider: Providers.OPENAI,
model: 'gpt-4o-mini',
streaming: true,
streamUsage: false,
},
instructions: 'You are a helpful assistant.',
maxContextTokens: 700,
summarizationEnabled: true,
summarizationConfig: { provider: Providers.OPENAI, model: 'gpt-4o-mini' },
},
returnContent: true,
customHandlers: handlers,
tokenCounter: charCounter,
indexTokenCountMap,
});
run.Graph.overrideModel = new UsageFakeModel(
{ responses: ['## Summary\nPrior turns compacted.', 'Here is the final answer.'] },
[{ input_tokens: 40, output_tokens: 8, total_tokens: 48 }],
);
await run.processStream(
{ messages: history },
{
configurable: { thread_id: 'summ-e2e-thread', user_id: 'user-1' },
streamMode: 'values',
version: 'v2',
},
);
return run;
}
/** A summarized turn compacts the context (summary tokens replace the older
* turns) and the reduced snapshot is persisted — the latest snapshot is
* followed by a primary usage, so the save guard keeps it and the client
* uses the snapshot (not the inflated whole-history estimate). */
test('persists the reduced (compacted) snapshot after summarization', async () => {
if (!hasContextUsageEvent) {
return;
}
const res = createMockRes();
const contextUsageSink = { latest: null, count: 0 };
const usageEmitSink = [];
await runSummarizationLoop({ res, collectedUsage: [], contextUsageSink, usageEmitSink });
const snapshot = contextUsageSink.latest;
/** Summarization fired: a summary exists and the kept message tokens are
* small (the compacted context, not the full history). */
expect(snapshot?.breakdown?.summaryTokens).toBeGreaterThan(0);
expect(snapshot?.breakdown?.messageTokens).toBeLessThan(snapshot?.breakdown?.summaryTokens);
/** The save guard keeps it: a primary usage follows the latest snapshot. */
const afterLatest = usageEmitSink.slice(contextUsageSink.latestUsageIndex ?? 0);
expect(afterLatest.some((e) => e.usage_type == null)).toBe(true);
expect(
buildPersistedContextUsage(snapshot, usageEmitSink).breakdown.summaryTokens,
).toBeGreaterThan(0);
});
});