mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-06-27 09:51:33 +00:00
Some checks are pending
Docker Dev Branch Images Build / build (Dockerfile, lc-dev, node) (push) Waiting to run
Docker Dev Branch Images Build / build (Dockerfile.multi, lc-dev-api, api-build) (push) Waiting to run
GitNexus Index / index (push) Waiting to run
GitNexus Index / post-index (push) Blocked by required conditions
* 📊 feat: Real-Time Context Window & Token Usage Tracking
* 🧪 fix: Align Pricing Spec Dep Signatures with TxDeps
* 🩹 fix: Resolve Codex Findings for Context Usage Tracking
* 📊 feat: Granular Tool Token Breakdown with Deferred Splits
* 🧪 test: Cover Session Cost in Mock E2E and Scope Usage Selectors
* 🧪 test: Live Host-Pipeline Usage Verification (Env-Gated)
* 🧪 test: Local Real-Provider Multi-Turn E2E Harness
* 🪙 fix: Keep Tagged Usage Buckets Out of the Live Context Estimate
* 🩹 fix: Scoped Token-Config Fallback and Sequential Visibility for Usage Events
* 🩹 fix: Address Usage Review Findings — Cost Timing, Scoped Caches, Finalized Output
- carry the post-snapshot output estimate into the context snapshot at
finalize so the gauge keeps the last response after live resets
- accumulate per-rate billable units and price the session cost at
render, so usage events arriving before the token-config load still
count once it resolves
- pass user-scoped token-config cache keys through loadConfigModels
fetches and drop the controller's unscoped fallback to prevent serving
another user's resolved config
- tag emitted usage events with a per-run seq so resume dedupe never
drops a distinct call with an identical payload
- admit the static tokenConfig override in the custom endpoint schema so
it survives zod parsing into req.config
* 🩹 fix: Align Client Usage Accounting with Backend Cost Semantics
- classify cache tokens by provider (shared inputTokensIncludesCache from
data-provider, consumed by both the backend billing path and the client)
instead of a magnitude heuristic, so Anthropic/Bedrock turns where cache
is smaller than uncached input no longer under-bill input
- mirror resolveCompletionTokens on the client so Vertex-style hidden
thinking tokens are reflected in the Output row and session cost
- prefer endpoint pricing over adapter-provider pricing so a custom
endpoint can price a known model name without built-in rates shadowing it
- carry static cacheRead/cacheWrite overrides through the tokenConfig
schema and buildTokenConfigMap
* 🩹 fix: Honor Static Token Config in Billing; Tighten Usage Freshness
- initializeCustom now uses a static endpoint tokenConfig as the agent's
endpointTokenConfig (billing + balance checks), not just the advertised
UI config — previously the gauge showed admin rates while the agent
billed against built-in tables
- invalidate the token-config query alongside models on user-key add/
revoke so context windows and pricing refresh without a reload
- include maxContextTokens in ChatForm's stabilized conversation memo so
the gauge reflects a changed context-window setting immediately
- feed the live output estimate from the legacy content path (direct and
assistants streams), setting from cumulative part text rather than
accumulating deltas
* 🩹 fix: Resume Usage Dedup, Agent Pricing, and Partial Override Billing
- fold usage events idempotently by (runId, seq) so resume backfill no
longer resets the conversation totals — a mid-stream reconnect keeps the
usage of prompts already completed earlier in the session
- tap replayed pending message/reasoning/content events so output streamed
past the resume snapshot reaches the live estimate, not just the message
- resolve cost against the agent's backing endpoint (Agents conversations
report endpoint `agents` / provider `openAI`, neither of which keys a
custom endpoint's tokenConfig)
- getMultiplier/getCacheMultiplier fall back to the standard tables for
models absent from a partial endpointTokenConfig, so a partial static
override no longer bills non-listed models at defaultRate while the UI
shows the correct pattern rate
* 🩹 fix: Repaired Output in Gauge, Cache-Rate Keys, Config Gate, Usage Cleanup
- live/completed gauge counts the repaired completion (normalized output),
so under-reporting providers don't drop the response from used context
- translate static tokenConfig cacheWrite/cacheRead onto the write/read
keys getCacheMultiplier reads, so cache tokens bill at the configured
rate instead of the prompt-rate fallback
- clear the token index and usage atoms when leaving a conversation, so
visited histories don't accumulate in memory for the tab's lifetime
- wait for startupConfig before mounting the gauge, so a deployment with
contextUsage disabled never briefly mounts it or fires the token-config
query on first load
* 🩹 fix: Move Token-Config Resolution to TS; Key Live Usage by Created Convo
- extract the token-config resolution (override gathering + cache lookup +
buildTokenConfigMap) into resolveTokenConfigMap in packages/api, leaving
the /api controller a thin request-scoped wrapper (CLAUDE.md TS rule)
- getConvoKey prefers the user message's real conversationId once the
`created` event stamps it, so a new chat's first-response live gauge and
totals land under the id TokenUsage subscribes to instead of NEW_CONVO
* 🩹 fix: Clear Stale Redis Job Usage; Live-Tap Legacy Streams; Share Fetched Config
- DEL the Redis job hash before re-creating it so a reused streamId can't
inherit a prior run's contextUsage/tokenUsage and backfill stale usage
- tap the legacy {message,text} stream branch (non-agent OpenAI/Anthropic
streams) into the live estimate, not just the content path
- copy a deduped fetch's token config to every sibling endpoint sharing the
baseURL/key/headers, so /token-config resolves each by its own name
* ⏪ revert: Don't DEL Redis job hash in createJob (breaks cross-replica resume)
createJob is an idempotent join — a second replica calls it for the same
streamId to share an in-flight stream's state. DELeting the hash wiped the
prior replica's persisted created/usage state, so a joining replica missed
the created event (GenerationJobManager cross-replica integration test).
Reverts the F1 change from 2bfce0c34b; the stale-usage concern doesn't
arise in practice (streamId is unique per generation).
* 🩹 fix: Best-Effort Usage Emit; Tag Hidden Sequential-Agent Usage
- wrap the ModelEndHandler usage emit in try/catch so a failed telemetry
delivery (closed SSE / Redis publish error) can't abort the handler
before thought-signature capture, which would break resumed tool calls
- tag hidden sequential-agent usage as 'sequential' (non-primary) so the
client folds it into session cost/totals but not the live context gauge,
instead of letting an undefined usage_type inflate the visible gauge
* 🩹 fix: Refetch Stale Token Config on Mount; Normalize Vertex for Lookup
- useTokenConfigQuery refetches on mount when stale, so a user-key change
that invalidates tokenConfig while the gauge is unmounted takes effect on
return instead of serving the prior key's resolved config
- normalize a Vertex-backed agent's provider (vertexai) to the google
token-config key, so Gemini context windows and rates resolve instead of
showing unknown context / $0 cost
* ✨ feat: Server-Side Per-Event Cost (Authoritative Pricing for the Gauge)
Move usage-cost pricing to the single source of truth. The backend prices
each model call with the same billing functions (premium tiers via
getMultiplier(inputTokenCount), cache rates) and emits the USD cost on
on_token_usage when interface.contextCost is enabled; the client sums
emitted costs instead of re-deriving from base token-config rates.
- computeUsageCostUSD reuses prepareTokenSpend/prepareStructuredTokenSpend
so the emitted cost matches what is billed (incl. premium thresholds)
- getDefaultHandlers gains a usageCost pricing context; initialize.js wires
db.getMultiplier/getCacheMultiplier gated on contextCost (agents path)
- client UsageTotals carries a summed costUSD; retire the client-side rate
lookups (costFromUnits/calcUsageCost) that drifted from backend pricing
and produced the provider-keying / cache-key / Vertex / premium findings
- keep normalizeUsageUnits for the displayed token counts; token-config is
still used for the context-window meter
Fixes the premium-tier session-cost under-report (gpt-5.x / gemini-3.1
above their input thresholds).
* 🩹 fix: Branch-Accurate Usage Snapshot + Clearer Gauge Track Contrast
- re-anchor the context snapshot from the user message to the response
message at finalize. Regenerating a response branches off a shared user
message, so anchoring on it made the snapshot read as "active" on both
branches — switching to the sibling branch showed the wrong (other
branch's) context. The response message is branch-unique, so sibling
branches now correctly fall back to their own per-branch totals.
- raise the gauge ring's track/fill contrast (muted track, prominent fill)
so the used portion reads clearly as a fill-level indicator
* 🩹 fix: Tag Sequential Usage in Billing; Emit Subagent Cost; Reset Live on Resume Errors
- tag hidden sequential-agent usage `usage_type: 'sequential'` on the
COLLECTED usage (not just the emit), and treat it as non-primary in
recordCollectedUsage (billed, excluded from the reported output total) so
hidden intermediate output stops inflating the parent's tokenCount/pruning
- emit on_token_usage from the subagent usage sink (tagged `subagent`, with
authoritative cost when contextCost is on) so the gauge's session
cost/totals include billed subagent usage; it stays out of the live meter
- call resetLive on the resumable 404 and max-retry terminal branches so the
gauge doesn't keep counting stale in-flight tokens after the stream ends
* 🎨 fix: Contrast the Popup Context Bar; Revert Ring Restyle
- raise the popup breakdown's context progressbar contrast (muted
surface-tertiary track, prominent text-primary fill) — that's the bar the
contrast feedback was about
- revert the gauge ring restyle (kept its original border-heavy track /
text-secondary fill); the ring wasn't the element in question
* 🩹 fix: Stop Snapshot Granularity Leaking Across Branches; Revert Tree Memo
- a null-anchor context snapshot was treated as active on every branch,
leaking one generation's granular breakdown onto sibling branches. Require
a non-null (response-message) anchor on the viewed branch instead, so
siblings without a matching snapshot fall back to their own totals.
- revert the buildTree WeakMap memo in messages.ts. buildTree is pure (builds
from shallow copies) so the memo was behaviorally identical, but it was the
feature's only change to core branch-navigation selectors — removing it
matches upstream and rules it out of branch-navigation debugging.
* 🪙 fix: Thread Endpoint Token Config to Agent Billing, Cost, and Context Limits
Custom-endpoint agents resolve an endpointTokenConfig during agent init but
it never reached the AgentClient, so spending, emitted cost, and runtime
max-token resolution all fell back to default rates for those agents.
- Surface options.endpointTokenConfig on the returned InitializedAgent.
- Pass it to the AgentClient (this.options.endpointTokenConfig) so the
spending path bills at configured rates.
- Thread it through usageCost to computeUsageCostUSD so emitted per-event
cost matches billing.
- getModelMaxTokens/getModelMaxOutputTokens fall back to the built-in map
for models absent from a partial override (matches buildTokenConfigMap);
consolidates the duplicated fallback in pricing.ts.
* 🪙 fix: Preserve Granular Breakdown Across Branch Switches
The granular context breakdown lives only in the live on_context_usage
snapshot — a single per-conversation slot, anchored to the latest response
and overwritten by each generation. Switching to a branch generated earlier
this session lost its tool/skill/system rows and fell back to coarse totals.
Retain each generation's finalized snapshot in a per-conversation map keyed
by its branch-unique response id (snapshotsByAnchorFamily). When the live
snapshot is off the viewed branch, walk the branch tail for its deepest
stored anchor and render that breakdown. Bounded by generation count and
cleared on conversation switch; the live/just-generated path is unchanged.
* 🪙 fix: Harden Resume Seeding and Subagent Usage Emission
- useResumableSSE: skip the trailing-output live seed when the resume
carries a context snapshot; the snapshot's messageTokens already counts
produced output, so seeding it again inflated usage until the next reset.
- AgentClient subagent emitter: await GenerationJobManager.emitChunk like
every other caller (it persists before publishing), so a floating promise
can't race job cleanup and a Redis/publish failure is caught by the
emitter's try/catch instead of surfacing as an unhandled rejection.
* 🧪 test: Playwright Coverage for Context Breakdown Granularity
Add a test-only data-testid distinguishing the granular snapshot breakdown
(context-breakdown) from the coarse message-history estimate
(context-estimate), then assert granularity in the mock e2e harness:
- renders the granular breakdown from the live on_context_usage snapshot
(guards that the snapshot event actually reaches the popover, not just the
usage totals).
- preserves the granular breakdown after switching branches — regenerate to
overwrite the single live snapshot, switch back, and confirm the rows
survive via the per-anchor snapshot history map.
Branch regenerate/sibling selectors mirror the existing chat.spec branch test.
All three usage specs pass against the mock pipeline.
* 🪙 fix: Correct Resume Live-Seed, Fallback Re-index, and Subagent Emit Flush
Codex round on the prior commit:
- countTrailingOutputChars now counts only output at the very END of the
aggregated content (0 when the model paused at a tool call), and the resume
path always seeds it. The earlier skip-trailing-tool-parts behavior plus the
skip-seed-when-snapshot gate together over- or under-counted in-flight
output on resume; one rule fixes both — pre-invoke snapshot budget is never
double-counted, and genuine in-flight output is no longer dropped.
- useTokenUsage re-indexes from the messages cache on tail change while
submitting. The cache subscriber is muted during streaming, so without a
context snapshot (non-agent streams) sumBranch missed the created tail and
dropped history + prompt until finalize. Bounded — tailId only shifts on
created/finalize/branch-switch.
- AgentClient tracks subagent usage emit promises and flushes them in
chatCompletion's finally. The sink fires the emitter without awaiting, and
resume reads the usage emitChunk persists (HSET), so cleanup must not race
it or resumed clients miss billed subagent usage.
1376 lines
45 KiB
JavaScript
1376 lines
45 KiB
JavaScript
const crypto = require('crypto');
|
|
const fetch = require('node-fetch');
|
|
const { logger } = require('@librechat/data-schemas');
|
|
const {
|
|
countTokens,
|
|
checkBalance,
|
|
getBalanceConfig,
|
|
buildMessageFiles,
|
|
extractFileContext,
|
|
encodeAndFormatAudios,
|
|
encodeAndFormatVideos,
|
|
encodeAndFormatDocuments,
|
|
} = require('@librechat/api');
|
|
const {
|
|
Constants,
|
|
FileSources,
|
|
ContentTypes,
|
|
excludedKeys,
|
|
EModelEndpoint,
|
|
mergeFileConfig,
|
|
isParamEndpoint,
|
|
isAgentsEndpoint,
|
|
isEphemeralAgentId,
|
|
supportsBalanceCheck,
|
|
isBedrockDocumentType,
|
|
getEndpointFileConfig,
|
|
} = require('librechat-data-provider');
|
|
const { getStrategyFunctions } = require('~/server/services/Files/strategies');
|
|
const { logViolation } = require('~/cache');
|
|
const TextStream = require('./TextStream');
|
|
const db = require('~/models');
|
|
|
|
class BaseClient {
|
|
constructor(apiKey, options = {}) {
|
|
this.apiKey = apiKey;
|
|
this.sender = options.sender ?? 'AI';
|
|
this.currentDateString = new Date().toLocaleDateString('en-us', {
|
|
year: 'numeric',
|
|
month: 'long',
|
|
day: 'numeric',
|
|
});
|
|
/** @type {boolean} */
|
|
this.skipSaveConvo = false;
|
|
/** @type {boolean} */
|
|
this.skipSaveUserMessage = false;
|
|
/** @type {string} */
|
|
this.user;
|
|
/** @type {string} */
|
|
this.conversationId;
|
|
/** @type {string} */
|
|
this.responseMessageId;
|
|
/** @type {string} */
|
|
this.parentMessageId;
|
|
/** @type {TAttachment[]} */
|
|
this.attachments;
|
|
/** The key for the usage object's input tokens
|
|
* @type {string} */
|
|
this.inputTokensKey = 'prompt_tokens';
|
|
/** The key for the usage object's output tokens
|
|
* @type {string} */
|
|
this.outputTokensKey = 'completion_tokens';
|
|
/** @type {Set<string>} */
|
|
this.savedMessageIds = new Set();
|
|
/**
|
|
* Flag to determine if the client re-submitted the latest assistant message.
|
|
* @type {boolean | undefined} */
|
|
this.continued;
|
|
/**
|
|
* Flag to determine if the client has already fetched the conversation while saving new messages.
|
|
* @type {boolean | undefined} */
|
|
this.fetchedConvo;
|
|
/** @type {TMessage[]} */
|
|
this.currentMessages = [];
|
|
/** @type {import('librechat-data-provider').VisionModes | undefined} */
|
|
this.visionMode;
|
|
/** @type {import('librechat-data-provider').FileConfig | undefined} */
|
|
this._mergedFileConfig;
|
|
/** @type {import('librechat-data-provider').EndpointFileConfig | undefined} */
|
|
this._endpointFileConfig;
|
|
}
|
|
|
|
setOptions() {
|
|
throw new Error("Method 'setOptions' must be implemented.");
|
|
}
|
|
|
|
async getCompletion() {
|
|
throw new Error("Method 'getCompletion' must be implemented.");
|
|
}
|
|
|
|
/** @type {sendCompletion} */
|
|
async sendCompletion() {
|
|
throw new Error("Method 'sendCompletion' must be implemented.");
|
|
}
|
|
|
|
getSaveOptions() {
|
|
throw new Error('Subclasses must implement getSaveOptions');
|
|
}
|
|
|
|
async buildMessages() {
|
|
throw new Error('Subclasses must implement buildMessages');
|
|
}
|
|
|
|
async summarizeMessages() {
|
|
throw new Error('Subclasses attempted to call summarizeMessages without implementing it');
|
|
}
|
|
|
|
/**
|
|
* @returns {string}
|
|
*/
|
|
getResponseModel() {
|
|
if (isAgentsEndpoint(this.options.endpoint) && this.options.agent && this.options.agent.id) {
|
|
return this.options.agent.id;
|
|
}
|
|
|
|
return this.modelOptions?.model ?? this.model;
|
|
}
|
|
|
|
/**
|
|
* Abstract method to get the token count for a message. Subclasses must implement this method.
|
|
* @param {TMessage} responseMessage
|
|
* @returns {number}
|
|
*/
|
|
getTokenCountForResponse(responseMessage) {
|
|
logger.debug('[BaseClient] `recordTokenUsage` not implemented.', {
|
|
messageId: responseMessage?.messageId,
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Abstract method to record token usage. Subclasses must implement this method.
|
|
* If a correction to the token usage is needed, the method should return an object with the corrected token counts.
|
|
* Should only be used if `recordCollectedUsage` was not used instead.
|
|
* @param {string} [model]
|
|
* @param {AppConfig['balance']} [balance]
|
|
* @param {number} promptTokens
|
|
* @param {number} completionTokens
|
|
* @param {string} [messageId]
|
|
* @returns {Promise<void>}
|
|
*/
|
|
async recordTokenUsage({ model, balance, promptTokens, completionTokens, messageId }) {
|
|
logger.debug('[BaseClient] `recordTokenUsage` not implemented.', {
|
|
model,
|
|
balance,
|
|
messageId,
|
|
promptTokens,
|
|
completionTokens,
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Makes an HTTP request and logs the process.
|
|
*
|
|
* @param {RequestInfo} url - The URL to make the request to. Can be a string or a Request object.
|
|
* @param {RequestInit} [init] - Optional init options for the request.
|
|
* @returns {Promise<Response>} - A promise that resolves to the response of the fetch request.
|
|
*/
|
|
async fetch(_url, init) {
|
|
let url = _url;
|
|
if (this.options.directEndpoint) {
|
|
url = this.options.reverseProxyUrl;
|
|
}
|
|
logger.debug(`Making request to ${url}`);
|
|
if (typeof Bun !== 'undefined') {
|
|
return await fetch(url, init);
|
|
}
|
|
return await fetch(url, init);
|
|
}
|
|
|
|
getBuildMessagesOptions() {
|
|
throw new Error('Subclasses must implement getBuildMessagesOptions');
|
|
}
|
|
|
|
async generateTextStream(text, onProgress, options = {}) {
|
|
const stream = new TextStream(text, options);
|
|
await stream.processTextStream(onProgress);
|
|
}
|
|
|
|
/**
|
|
* @returns {[string|undefined, string|undefined]}
|
|
*/
|
|
processOverideIds() {
|
|
/** @type {Record<string, string | undefined>} */
|
|
let { overrideConvoId, overrideUserMessageId } = this.options?.req?.body ?? {};
|
|
if (overrideConvoId) {
|
|
const [conversationId, index] = overrideConvoId.split(Constants.COMMON_DIVIDER);
|
|
overrideConvoId = conversationId;
|
|
if (index !== '0') {
|
|
this.skipSaveConvo = true;
|
|
}
|
|
}
|
|
if (overrideUserMessageId) {
|
|
const [userMessageId, index] = overrideUserMessageId.split(Constants.COMMON_DIVIDER);
|
|
overrideUserMessageId = userMessageId;
|
|
if (index !== '0') {
|
|
this.skipSaveUserMessage = true;
|
|
}
|
|
}
|
|
|
|
return [overrideConvoId, overrideUserMessageId];
|
|
}
|
|
|
|
async setMessageOptions(opts = {}) {
|
|
if (opts && opts.replaceOptions) {
|
|
this.setOptions(opts);
|
|
}
|
|
|
|
const [overrideConvoId, overrideUserMessageId] = this.processOverideIds();
|
|
const { isEdited, isContinued } = opts;
|
|
const user = opts.user ?? null;
|
|
this.user = user;
|
|
const saveOptions = this.getSaveOptions();
|
|
this.abortController = opts.abortController ?? new AbortController();
|
|
const requestConvoId = overrideConvoId ?? opts.conversationId;
|
|
const conversationId = requestConvoId ?? crypto.randomUUID();
|
|
const parentMessageId = opts.parentMessageId ?? Constants.NO_PARENT;
|
|
const userMessageId =
|
|
overrideUserMessageId ?? opts.overrideParentMessageId ?? crypto.randomUUID();
|
|
let responseMessageId = opts.responseMessageId ?? crypto.randomUUID();
|
|
let head = isEdited ? responseMessageId : parentMessageId;
|
|
this.currentMessages = (await this.loadHistory(conversationId, head)) ?? [];
|
|
this.conversationId = conversationId;
|
|
|
|
if (isEdited && !isContinued) {
|
|
responseMessageId = crypto.randomUUID();
|
|
head = responseMessageId;
|
|
this.currentMessages[this.currentMessages.length - 1].messageId = head;
|
|
}
|
|
|
|
if (opts.isRegenerate && responseMessageId.endsWith('_')) {
|
|
responseMessageId = crypto.randomUUID();
|
|
}
|
|
|
|
this.responseMessageId = responseMessageId;
|
|
|
|
return {
|
|
...opts,
|
|
user,
|
|
head,
|
|
saveOptions,
|
|
userMessageId,
|
|
requestConvoId,
|
|
conversationId,
|
|
parentMessageId,
|
|
responseMessageId,
|
|
};
|
|
}
|
|
|
|
createUserMessage({ messageId, parentMessageId, conversationId, text }) {
|
|
return {
|
|
messageId,
|
|
parentMessageId,
|
|
conversationId,
|
|
sender: 'User',
|
|
text,
|
|
isCreatedByUser: true,
|
|
};
|
|
}
|
|
|
|
async handleStartMethods(message, opts) {
|
|
const {
|
|
user,
|
|
head,
|
|
saveOptions,
|
|
userMessageId,
|
|
requestConvoId,
|
|
conversationId,
|
|
parentMessageId,
|
|
responseMessageId,
|
|
} = await this.setMessageOptions(opts);
|
|
|
|
const userMessage = opts.isEdited
|
|
? this.currentMessages[this.currentMessages.length - 2]
|
|
: this.createUserMessage({
|
|
messageId: userMessageId,
|
|
parentMessageId,
|
|
conversationId,
|
|
text: message,
|
|
});
|
|
|
|
if (typeof opts?.getReqData === 'function') {
|
|
opts.getReqData({
|
|
userMessage,
|
|
conversationId,
|
|
responseMessageId,
|
|
sender: this.sender,
|
|
});
|
|
}
|
|
|
|
if (typeof opts?.onStart === 'function') {
|
|
const isNewConvo = !requestConvoId && parentMessageId === Constants.NO_PARENT;
|
|
opts.onStart(userMessage, responseMessageId, isNewConvo);
|
|
}
|
|
|
|
return {
|
|
...opts,
|
|
user,
|
|
head,
|
|
conversationId,
|
|
responseMessageId,
|
|
saveOptions,
|
|
userMessage,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Adds instructions to the messages array. If the instructions object is empty or undefined,
|
|
* the original messages array is returned. Otherwise, the instructions are added to the messages
|
|
* array either at the beginning (default) or preserving the last message at the end.
|
|
*
|
|
* @param {Array} messages - An array of messages.
|
|
* @param {Object} instructions - An object containing instructions to be added to the messages.
|
|
* @param {boolean} [beforeLast=false] - If true, adds instructions before the last message; if false, adds at the beginning.
|
|
* @returns {Array} An array containing messages and instructions, or the original messages if instructions are empty.
|
|
*/
|
|
addInstructions(messages, instructions, beforeLast = false) {
|
|
if (!instructions || Object.keys(instructions).length === 0) {
|
|
return messages;
|
|
}
|
|
|
|
if (!beforeLast) {
|
|
return [instructions, ...messages];
|
|
}
|
|
|
|
// Legacy behavior: add instructions before the last message
|
|
const payload = [];
|
|
if (messages.length > 1) {
|
|
payload.push(...messages.slice(0, -1));
|
|
}
|
|
|
|
payload.push(instructions);
|
|
|
|
if (messages.length > 0) {
|
|
payload.push(messages[messages.length - 1]);
|
|
}
|
|
|
|
return payload;
|
|
}
|
|
|
|
concatenateMessages(messages) {
|
|
return messages.reduce((acc, message) => {
|
|
const nameOrRole = message.name ?? message.role;
|
|
return acc + `${nameOrRole}:\n${message.content}\n\n`;
|
|
}, '');
|
|
}
|
|
|
|
/**
|
|
* This method processes an array of messages and returns a context of messages that fit within a specified token limit.
|
|
* It iterates over the messages from newest to oldest, adding them to the context until the token limit is reached.
|
|
* If the token limit would be exceeded by adding a message, that message is not added to the context and remains in the original array.
|
|
* The method uses `push` and `pop` operations for efficient array manipulation, and reverses the context array at the end to maintain the original order of the messages.
|
|
*
|
|
* @param {Object} params
|
|
* @param {TMessage[]} params.messages - An array of messages, each with a `tokenCount` property. The messages should be ordered from oldest to newest.
|
|
* @param {number} [params.maxContextTokens] - The max number of tokens allowed in the context. If not provided, defaults to `this.maxContextTokens`.
|
|
* @param {{ role: 'system', content: text, tokenCount: number }} [params.instructions] - Instructions already added to the context at index 0.
|
|
* @returns {Promise<{
|
|
* context: TMessage[],
|
|
* remainingContextTokens: number,
|
|
* messagesToRefine: TMessage[],
|
|
* }>} An object with three properties: `context`, `remainingContextTokens`, and `messagesToRefine`.
|
|
* `context` is an array of messages that fit within the token limit.
|
|
* `remainingContextTokens` is the number of tokens remaining within the limit after adding the messages to the context.
|
|
* `messagesToRefine` is an array of messages that were not added to the context because they would have exceeded the token limit.
|
|
*/
|
|
async getMessagesWithinTokenLimit({ messages: _messages, maxContextTokens, instructions }) {
|
|
// Every reply is primed with <|start|>assistant<|message|>, so we
|
|
// start with 3 tokens for the label after all messages have been counted.
|
|
let currentTokenCount = 3;
|
|
const instructionsTokenCount = instructions?.tokenCount ?? 0;
|
|
let remainingContextTokens =
|
|
(maxContextTokens ?? this.maxContextTokens) - instructionsTokenCount;
|
|
const messages = [..._messages];
|
|
|
|
const context = [];
|
|
|
|
if (currentTokenCount < remainingContextTokens) {
|
|
while (messages.length > 0 && currentTokenCount < remainingContextTokens) {
|
|
if (messages.length === 1 && instructions) {
|
|
break;
|
|
}
|
|
const poppedMessage = messages.pop();
|
|
const { tokenCount } = poppedMessage;
|
|
|
|
if (poppedMessage && currentTokenCount + tokenCount <= remainingContextTokens) {
|
|
context.push(poppedMessage);
|
|
currentTokenCount += tokenCount;
|
|
} else {
|
|
messages.push(poppedMessage);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (instructions) {
|
|
context.push(_messages[0]);
|
|
messages.shift();
|
|
}
|
|
|
|
const prunedMemory = messages;
|
|
remainingContextTokens -= currentTokenCount;
|
|
|
|
return {
|
|
context: context.reverse(),
|
|
remainingContextTokens,
|
|
messagesToRefine: prunedMemory,
|
|
};
|
|
}
|
|
|
|
async sendMessage(message, opts = {}) {
|
|
const appConfig = this.options.req?.config;
|
|
/** @type {Promise<TMessage>} */
|
|
let userMessagePromise;
|
|
const { user, head, isEdited, conversationId, responseMessageId, saveOptions, userMessage } =
|
|
await this.handleStartMethods(message, opts);
|
|
|
|
if (opts.progressCallback) {
|
|
opts.onProgress = opts.progressCallback.call(null, {
|
|
...(opts.progressOptions ?? {}),
|
|
parentMessageId: userMessage.messageId,
|
|
messageId: responseMessageId,
|
|
});
|
|
}
|
|
|
|
const { editedContent } = opts;
|
|
|
|
// It's not necessary to push to currentMessages
|
|
// depending on subclass implementation of handling messages
|
|
// When this is an edit, all messages are already in currentMessages, both user and response
|
|
if (isEdited) {
|
|
let latestMessage = this.currentMessages[this.currentMessages.length - 1];
|
|
if (!latestMessage) {
|
|
latestMessage = {
|
|
messageId: responseMessageId,
|
|
conversationId,
|
|
parentMessageId: userMessage.messageId,
|
|
isCreatedByUser: false,
|
|
model: this.modelOptions?.model ?? this.model,
|
|
sender: this.sender,
|
|
};
|
|
this.currentMessages.push(userMessage, latestMessage);
|
|
} else if (editedContent != null) {
|
|
// Handle editedContent for content parts
|
|
if (editedContent && latestMessage.content && Array.isArray(latestMessage.content)) {
|
|
const { index, text, type } = editedContent;
|
|
if (index >= 0 && index < latestMessage.content.length) {
|
|
const contentPart = latestMessage.content[index];
|
|
if (type === ContentTypes.THINK && contentPart.type === ContentTypes.THINK) {
|
|
contentPart[ContentTypes.THINK] = text;
|
|
} else if (type === ContentTypes.TEXT && contentPart.type === ContentTypes.TEXT) {
|
|
contentPart[ContentTypes.TEXT] = text;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
this.continued = true;
|
|
} else {
|
|
this.currentMessages.push(userMessage);
|
|
}
|
|
|
|
/**
|
|
* When the userMessage is pushed to currentMessages, the parentMessage is the userMessageId.
|
|
* this only matters when buildMessages is utilizing the parentMessageId, and may vary on implementation
|
|
*/
|
|
const parentMessageId = isEdited ? head : userMessage.messageId;
|
|
this.parentMessageId = parentMessageId;
|
|
let {
|
|
prompt: payload,
|
|
tokenCountMap,
|
|
promptTokens,
|
|
} = await this.buildMessages(
|
|
this.currentMessages,
|
|
parentMessageId,
|
|
this.getBuildMessagesOptions(opts),
|
|
opts,
|
|
);
|
|
|
|
if (tokenCountMap && tokenCountMap[userMessage.messageId]) {
|
|
userMessage.tokenCount = tokenCountMap[userMessage.messageId];
|
|
logger.debug('[BaseClient] userMessage', {
|
|
messageId: userMessage.messageId,
|
|
tokenCount: userMessage.tokenCount,
|
|
conversationId: userMessage.conversationId,
|
|
});
|
|
}
|
|
|
|
if (!isEdited && !this.skipSaveUserMessage) {
|
|
const reqFiles = this.options.req?.body?.files;
|
|
if (reqFiles && Array.isArray(this.options.attachments)) {
|
|
const files = buildMessageFiles(reqFiles, this.options.attachments);
|
|
if (files.length > 0) {
|
|
userMessage.files = files;
|
|
}
|
|
delete userMessage.image_urls;
|
|
}
|
|
/**
|
|
* Persist the user's manual skill picks onto the user message so the
|
|
* frontend `SkillPills` component can render them in history
|
|
* after reload. UI-only metadata — the runtime skill resolution
|
|
* pipeline reads the top-level `req.body.manualSkills` separately.
|
|
* Filter is defense-in-depth on top of Mongoose schema validation:
|
|
* keeps the DB row free of empty/non-string entries even if a
|
|
* crafted payload slips past schema checks upstream.
|
|
*/
|
|
const rawManualSkills = this.options.req?.body?.manualSkills;
|
|
if (Array.isArray(rawManualSkills) && rawManualSkills.length > 0) {
|
|
const skills = rawManualSkills.filter((s) => typeof s === 'string' && s.length > 0);
|
|
if (skills.length > 0) {
|
|
userMessage.manualSkills = skills;
|
|
}
|
|
}
|
|
/**
|
|
* Persist the names of skills auto-primed this turn via `always-apply`
|
|
* frontmatter so `SkillPills` can render pinned-variant badges
|
|
* on the user bubble that survive reload and history render. Frozen
|
|
* at turn time (not reconstructed from `Skill.alwaysApply` at render
|
|
* time) because the flag is mutable — historical turns must keep
|
|
* their audit trail even if an admin flips `alwaysApply` off later.
|
|
*/
|
|
const alwaysApplySkillPrimes = this.options.agent?.alwaysApplySkillPrimes;
|
|
if (Array.isArray(alwaysApplySkillPrimes) && alwaysApplySkillPrimes.length > 0) {
|
|
const names = alwaysApplySkillPrimes
|
|
.map((p) => p?.name)
|
|
.filter((n) => typeof n === 'string' && n.length > 0);
|
|
if (names.length > 0) {
|
|
userMessage.alwaysAppliedSkills = names;
|
|
}
|
|
}
|
|
userMessagePromise = this.saveMessageToDatabase(userMessage, saveOptions, user).catch(
|
|
(err) => {
|
|
logger.error('[BaseClient] Failed to save user message:', err);
|
|
return {};
|
|
},
|
|
);
|
|
this.savedMessageIds.add(userMessage.messageId);
|
|
if (typeof opts?.getReqData === 'function') {
|
|
opts.getReqData({
|
|
userMessagePromise,
|
|
});
|
|
}
|
|
}
|
|
|
|
const balanceConfig = getBalanceConfig(appConfig);
|
|
if (
|
|
balanceConfig?.enabled &&
|
|
supportsBalanceCheck[this.options.endpointType ?? this.options.endpoint]
|
|
) {
|
|
await checkBalance(
|
|
{
|
|
req: this.options.req,
|
|
res: this.options.res,
|
|
txData: {
|
|
user: this.user,
|
|
tokenType: 'prompt',
|
|
amount: promptTokens,
|
|
endpoint: this.options.endpoint,
|
|
model: this.modelOptions?.model ?? this.model,
|
|
endpointTokenConfig: this.options.endpointTokenConfig,
|
|
},
|
|
},
|
|
{
|
|
logViolation,
|
|
getMultiplier: db.getMultiplier,
|
|
findBalanceByUser: db.findBalanceByUser,
|
|
createAutoRefillTransaction: db.createAutoRefillTransaction,
|
|
balanceConfig,
|
|
upsertBalanceFields: db.upsertBalanceFields,
|
|
},
|
|
);
|
|
}
|
|
|
|
const { completion, metadata } = await this.sendCompletion(payload, opts);
|
|
if (this.abortController) {
|
|
this.abortController.requestCompleted = true;
|
|
}
|
|
|
|
/** @type {TMessage} */
|
|
const responseMessage = {
|
|
messageId: responseMessageId,
|
|
conversationId,
|
|
parentMessageId: userMessage.messageId,
|
|
isCreatedByUser: false,
|
|
isEdited,
|
|
model: this.getResponseModel(),
|
|
sender: this.sender,
|
|
promptTokens,
|
|
iconURL: this.options.iconURL,
|
|
endpoint: this.options.endpoint,
|
|
...(this.metadata ?? {}),
|
|
metadata: Object.keys(metadata ?? {}).length > 0 ? metadata : undefined,
|
|
};
|
|
|
|
if (typeof completion === 'string') {
|
|
responseMessage.text = completion;
|
|
} else if (
|
|
Array.isArray(completion) &&
|
|
(this.clientName === EModelEndpoint.agents ||
|
|
isParamEndpoint(this.options.endpoint, this.options.endpointType))
|
|
) {
|
|
responseMessage.text = '';
|
|
|
|
if (!opts.editedContent || this.currentMessages.length === 0) {
|
|
responseMessage.content = completion;
|
|
} else {
|
|
const latestMessage = this.currentMessages[this.currentMessages.length - 1];
|
|
if (!latestMessage?.content) {
|
|
responseMessage.content = completion;
|
|
} else {
|
|
const existingContent = [...latestMessage.content];
|
|
const { type: editedType } = opts.editedContent;
|
|
responseMessage.content = this.mergeEditedContent(
|
|
existingContent,
|
|
completion,
|
|
editedType,
|
|
);
|
|
}
|
|
}
|
|
} else if (Array.isArray(completion)) {
|
|
responseMessage.text = completion.join('');
|
|
}
|
|
|
|
if (tokenCountMap && this.recordTokenUsage && this.getTokenCountForResponse) {
|
|
let completionTokens;
|
|
|
|
/**
|
|
* Metadata about input/output costs for the current message. The client
|
|
* should provide a function to get the current stream usage metadata; if not,
|
|
* use the legacy token estimations.
|
|
* @type {StreamUsage | null} */
|
|
const usage = this.getStreamUsage != null ? this.getStreamUsage() : null;
|
|
|
|
if (usage != null && Number(usage[this.outputTokensKey]) > 0) {
|
|
responseMessage.tokenCount = usage[this.outputTokensKey];
|
|
completionTokens = responseMessage.tokenCount;
|
|
} else {
|
|
responseMessage.tokenCount = this.getTokenCountForResponse(responseMessage);
|
|
completionTokens = responseMessage.tokenCount;
|
|
await this.recordTokenUsage({
|
|
usage,
|
|
promptTokens,
|
|
completionTokens,
|
|
balance: balanceConfig,
|
|
/** Note: When using agents, responseMessage.model is the agent ID, not the model */
|
|
model: this.model,
|
|
messageId: this.responseMessageId,
|
|
});
|
|
}
|
|
|
|
logger.debug('[BaseClient] Response token usage', {
|
|
messageId: responseMessage.messageId,
|
|
model: responseMessage.model,
|
|
promptTokens,
|
|
completionTokens,
|
|
});
|
|
}
|
|
|
|
if (userMessagePromise) {
|
|
await userMessagePromise;
|
|
}
|
|
|
|
if (
|
|
this.contextMeta?.calibrationRatio > 0 &&
|
|
this.contextMeta.calibrationRatio !== 1 &&
|
|
userMessage.tokenCount > 0
|
|
) {
|
|
const calibrated = Math.round(userMessage.tokenCount * this.contextMeta.calibrationRatio);
|
|
if (calibrated !== userMessage.tokenCount) {
|
|
logger.debug('[BaseClient] Calibrated user message tokenCount', {
|
|
messageId: userMessage.messageId,
|
|
raw: userMessage.tokenCount,
|
|
calibrated,
|
|
ratio: this.contextMeta.calibrationRatio,
|
|
});
|
|
userMessage.tokenCount = calibrated;
|
|
await this.updateMessageInDatabase({
|
|
messageId: userMessage.messageId,
|
|
tokenCount: calibrated,
|
|
});
|
|
}
|
|
}
|
|
|
|
if (this.artifactPromises) {
|
|
responseMessage.attachments = (await Promise.all(this.artifactPromises)).filter((a) => a);
|
|
}
|
|
|
|
if (this.options.attachments) {
|
|
try {
|
|
saveOptions.files = this.options.attachments.map((attachments) => attachments.file_id);
|
|
} catch (error) {
|
|
logger.error('[BaseClient] Error mapping attachments for conversation', error);
|
|
}
|
|
}
|
|
|
|
if (this.contextMeta) {
|
|
responseMessage.contextMeta = this.contextMeta;
|
|
}
|
|
|
|
responseMessage.databasePromise = this.saveMessageToDatabase(
|
|
responseMessage,
|
|
saveOptions,
|
|
user,
|
|
);
|
|
this.savedMessageIds.add(responseMessage.messageId);
|
|
return responseMessage;
|
|
}
|
|
|
|
async loadHistory(conversationId, parentMessageId = null) {
|
|
logger.debug('[BaseClient] Loading history:', { conversationId, parentMessageId });
|
|
|
|
const messages = (await db.getMessages({ conversationId, user: this.user })) ?? [];
|
|
|
|
if (messages.length === 0) {
|
|
return [];
|
|
}
|
|
|
|
let mapMethod = null;
|
|
if (this.getMessageMapMethod) {
|
|
mapMethod = this.getMessageMapMethod();
|
|
}
|
|
|
|
let _messages = this.constructor.getMessagesForConversation({
|
|
messages,
|
|
parentMessageId,
|
|
mapMethod,
|
|
});
|
|
|
|
_messages = await this.addPreviousAttachments(_messages);
|
|
|
|
if (!this.shouldSummarize) {
|
|
return _messages;
|
|
}
|
|
|
|
for (let i = _messages.length - 1; i >= 0; i--) {
|
|
const msg = _messages[i];
|
|
if (!msg) {
|
|
continue;
|
|
}
|
|
|
|
const summaryBlock = BaseClient.findSummaryContentBlock(msg);
|
|
if (summaryBlock) {
|
|
this.previous_summary = {
|
|
...msg,
|
|
summary: BaseClient.getSummaryText(summaryBlock),
|
|
summaryTokenCount: summaryBlock.tokenCount,
|
|
};
|
|
break;
|
|
}
|
|
|
|
if (msg.summary) {
|
|
this.previous_summary = msg;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (this.previous_summary) {
|
|
const { messageId, summary, tokenCount, summaryTokenCount } = this.previous_summary;
|
|
logger.debug('[BaseClient] Previous summary:', {
|
|
messageId,
|
|
summary,
|
|
tokenCount,
|
|
summaryTokenCount,
|
|
});
|
|
}
|
|
|
|
return _messages;
|
|
}
|
|
|
|
/**
|
|
* Save a message to the database.
|
|
* @param {TMessage} message
|
|
* @param {Partial<TConversation>} endpointOptions
|
|
* @param {string | null} user
|
|
*/
|
|
async saveMessageToDatabase(message, endpointOptions, user = null) {
|
|
// Snapshot options before any await; disposeClient may set client.options = null
|
|
// while this method is suspended at an I/O boundary, but the local reference
|
|
// remains valid (disposeClient nulls the property, not the object itself).
|
|
const options = this.options;
|
|
if (!options) {
|
|
logger.error('[BaseClient] saveMessageToDatabase: client disposed before save, skipping');
|
|
return {};
|
|
}
|
|
|
|
if (this.user && user !== this.user) {
|
|
throw new Error('User mismatch.');
|
|
}
|
|
|
|
const hasAddedConvo = options?.req?.body?.addedConvo != null;
|
|
const reqCtx = {
|
|
userId: options?.req?.user?.id,
|
|
isTemporary: options?.req?.body?.isTemporary,
|
|
interfaceConfig: options?.req?.config?.interfaceConfig,
|
|
};
|
|
const savedMessage = await db.saveMessage(
|
|
reqCtx,
|
|
{
|
|
...message,
|
|
endpoint: options.endpoint,
|
|
unfinished: false,
|
|
user,
|
|
...(hasAddedConvo && { addedConvo: true }),
|
|
},
|
|
{ context: 'api/app/clients/BaseClient.js - saveMessageToDatabase #saveMessage' },
|
|
);
|
|
|
|
if (this.skipSaveConvo) {
|
|
return { message: savedMessage };
|
|
}
|
|
|
|
const fieldsToKeep = {
|
|
conversationId: message.conversationId,
|
|
endpoint: options.endpoint,
|
|
endpointType: options.endpointType,
|
|
...endpointOptions,
|
|
};
|
|
const conversationCreatedAt = options?.req?.conversationCreatedAt;
|
|
const createdAtOnInsert =
|
|
conversationCreatedAt != null ? new Date(conversationCreatedAt) : undefined;
|
|
const validCreatedAtOnInsert =
|
|
createdAtOnInsert && !Number.isNaN(createdAtOnInsert.getTime())
|
|
? createdAtOnInsert
|
|
: undefined;
|
|
|
|
const req = options?.req;
|
|
const skippedExistingConvoLookup = this.fetchedConvo === true;
|
|
const hasResolvedConversation =
|
|
req != null && Object.prototype.hasOwnProperty.call(req, 'resolvedConversation');
|
|
let existingConvo = null;
|
|
if (!skippedExistingConvoLookup && hasResolvedConversation) {
|
|
existingConvo = req.resolvedConversation;
|
|
} else if (!skippedExistingConvoLookup) {
|
|
existingConvo = await db.getConvo(req?.user?.id, message.conversationId);
|
|
}
|
|
if (hasResolvedConversation) {
|
|
delete req.resolvedConversation;
|
|
}
|
|
const shouldSetCreatedAtOnInsert = !skippedExistingConvoLookup && existingConvo == null;
|
|
|
|
const unsetFields = {};
|
|
const exceptions = new Set(['spec', 'iconURL']);
|
|
const hasNonEphemeralAgent =
|
|
isAgentsEndpoint(options.endpoint) &&
|
|
endpointOptions?.agent_id &&
|
|
!isEphemeralAgentId(endpointOptions.agent_id);
|
|
if (hasNonEphemeralAgent) {
|
|
exceptions.add('model');
|
|
}
|
|
if (existingConvo != null) {
|
|
this.fetchedConvo = true;
|
|
for (const key in existingConvo) {
|
|
if (!key) {
|
|
continue;
|
|
}
|
|
if (excludedKeys.has(key) && !exceptions.has(key)) {
|
|
continue;
|
|
}
|
|
|
|
if (endpointOptions?.[key] === undefined) {
|
|
unsetFields[key] = 1;
|
|
}
|
|
}
|
|
}
|
|
|
|
const conversation = await db.saveConvo(reqCtx, fieldsToKeep, {
|
|
context: 'api/app/clients/BaseClient.js - saveMessageToDatabase #saveConvo',
|
|
unsetFields,
|
|
createdAtOnInsert: shouldSetCreatedAtOnInsert ? validCreatedAtOnInsert : undefined,
|
|
});
|
|
|
|
return { message: savedMessage, conversation };
|
|
}
|
|
|
|
/**
|
|
* Update a message in the database.
|
|
* @param {Partial<TMessage>} message
|
|
*/
|
|
async updateMessageInDatabase(message) {
|
|
await db.updateMessage(this.options?.req?.user?.id, message);
|
|
}
|
|
|
|
/** Extracts text from a summary block (handles both legacy `text` field and new `content` array format). */
|
|
static getSummaryText(summaryBlock) {
|
|
if (Array.isArray(summaryBlock.content)) {
|
|
return summaryBlock.content.map((b) => b.text ?? '').join('');
|
|
}
|
|
if (typeof summaryBlock.content === 'string') {
|
|
return summaryBlock.content;
|
|
}
|
|
return summaryBlock.text ?? '';
|
|
}
|
|
|
|
/** Finds the last summary content block in a message's content array (last-summary-wins). */
|
|
static findSummaryContentBlock(message) {
|
|
if (!Array.isArray(message?.content)) {
|
|
return null;
|
|
}
|
|
let lastSummary = null;
|
|
for (const part of message.content) {
|
|
if (
|
|
part?.type === ContentTypes.SUMMARY &&
|
|
BaseClient.getSummaryText(part).trim().length > 0
|
|
) {
|
|
lastSummary = part;
|
|
}
|
|
}
|
|
return lastSummary;
|
|
}
|
|
|
|
/**
|
|
* Iterate through messages, building an array based on the parentMessageId.
|
|
*
|
|
* This function constructs a conversation thread by traversing messages from a given parentMessageId up to the root message.
|
|
* It handles cyclic references by ensuring that a message is not processed more than once.
|
|
* If the 'summary' option is set to true and a message has a 'summary' property:
|
|
* - The message's 'role' is set to 'system'.
|
|
* - The message's 'text' is set to its 'summary'.
|
|
* - If the message has a 'summaryTokenCount', the message's 'tokenCount' is set to 'summaryTokenCount'.
|
|
* The traversal stops at the message with the 'summary' property.
|
|
*
|
|
* Each message object should have an 'id' or 'messageId' property and may have a 'parentMessageId' property.
|
|
* The 'parentMessageId' is the ID of the message that the current message is a reply to.
|
|
* If 'parentMessageId' is not present, null, or is Constants.NO_PARENT,
|
|
* the message is considered a root message.
|
|
*
|
|
* @param {Object} options - The options for the function.
|
|
* @param {TMessage[]} options.messages - An array of message objects. Each object should have either an 'id' or 'messageId' property, and may have a 'parentMessageId' property.
|
|
* @param {string} options.parentMessageId - The ID of the parent message to start the traversal from.
|
|
* @param {Function} [options.mapMethod] - An optional function to map over the ordered messages. Applied conditionally based on mapCondition.
|
|
* @param {(message: TMessage) => boolean} [options.mapCondition] - An optional function to determine whether mapMethod should be applied to a given message. If not provided and mapMethod is set, mapMethod applies to all messages.
|
|
* @param {boolean} [options.summary=false] - If set to true, the traversal modifies messages with 'summary' and 'summaryTokenCount' properties and stops at the message with a 'summary' property.
|
|
* @returns {TMessage[]} An array containing the messages in the order they should be displayed, starting with the most recent message with a 'summary' property if the 'summary' option is true, and ending with the message identified by 'parentMessageId'.
|
|
*/
|
|
static getMessagesForConversation({
|
|
messages,
|
|
parentMessageId,
|
|
mapMethod = null,
|
|
mapCondition = null,
|
|
summary = false,
|
|
}) {
|
|
if (!messages || messages.length === 0) {
|
|
return [];
|
|
}
|
|
|
|
const orderedMessages = [];
|
|
let currentMessageId = parentMessageId;
|
|
const visitedMessageIds = new Set();
|
|
|
|
while (currentMessageId) {
|
|
if (visitedMessageIds.has(currentMessageId)) {
|
|
break;
|
|
}
|
|
const message = messages.find((msg) => {
|
|
const messageId = msg.messageId ?? msg.id;
|
|
return messageId === currentMessageId;
|
|
});
|
|
|
|
visitedMessageIds.add(currentMessageId);
|
|
|
|
if (!message) {
|
|
break;
|
|
}
|
|
|
|
let resolved = message;
|
|
let hasSummary = false;
|
|
if (summary) {
|
|
const summaryBlock = BaseClient.findSummaryContentBlock(message);
|
|
if (summaryBlock) {
|
|
const summaryText = BaseClient.getSummaryText(summaryBlock);
|
|
resolved = {
|
|
...message,
|
|
role: 'system',
|
|
content: [{ type: ContentTypes.TEXT, text: summaryText }],
|
|
tokenCount: summaryBlock.tokenCount,
|
|
};
|
|
hasSummary = true;
|
|
} else if (message.summary) {
|
|
resolved = {
|
|
...message,
|
|
role: 'system',
|
|
content: [{ type: ContentTypes.TEXT, text: message.summary }],
|
|
tokenCount: message.summaryTokenCount ?? message.tokenCount,
|
|
};
|
|
hasSummary = true;
|
|
}
|
|
}
|
|
|
|
const shouldMap = mapMethod != null && (mapCondition != null ? mapCondition(resolved) : true);
|
|
const processedMessage = shouldMap ? mapMethod(resolved) : resolved;
|
|
orderedMessages.push(processedMessage);
|
|
|
|
if (hasSummary) {
|
|
break;
|
|
}
|
|
|
|
currentMessageId =
|
|
message.parentMessageId === Constants.NO_PARENT ? null : message.parentMessageId;
|
|
}
|
|
|
|
orderedMessages.reverse();
|
|
return orderedMessages;
|
|
}
|
|
|
|
/**
|
|
* Algorithm adapted from "6. Counting tokens for chat API calls" of
|
|
* https://github.com/openai/openai-cookbook/blob/main/examples/How_to_count_tokens_with_tiktoken.ipynb
|
|
*
|
|
* An additional 3 tokens need to be added for assistant label priming after all messages have been counted.
|
|
* In our implementation, this is accounted for in the getMessagesWithinTokenLimit method.
|
|
*
|
|
* The content parts example was adapted from the following example:
|
|
* https://github.com/openai/openai-cookbook/pull/881/files
|
|
*
|
|
* Note: image token calculation is to be done elsewhere where we have access to the image metadata
|
|
*
|
|
* @param {Object} message
|
|
*/
|
|
getTokenCountForMessage(message) {
|
|
// Note: gpt-3.5-turbo and gpt-4 may update over time. Use default for these as well as for unknown models
|
|
let tokensPerMessage = 3;
|
|
let tokensPerName = 1;
|
|
const model = this.modelOptions?.model ?? this.model;
|
|
|
|
if (model === 'gpt-3.5-turbo-0301') {
|
|
tokensPerMessage = 4;
|
|
tokensPerName = -1;
|
|
}
|
|
|
|
const processValue = (value) => {
|
|
if (Array.isArray(value)) {
|
|
for (let item of value) {
|
|
if (
|
|
!item ||
|
|
!item.type ||
|
|
item.type === ContentTypes.THINK ||
|
|
item.type === ContentTypes.ERROR ||
|
|
item.type === ContentTypes.IMAGE_URL
|
|
) {
|
|
continue;
|
|
}
|
|
|
|
if (item.type === ContentTypes.TOOL_CALL && item.tool_call != null) {
|
|
const toolName = item.tool_call?.name || '';
|
|
if (toolName != null && toolName && typeof toolName === 'string') {
|
|
numTokens += this.getTokenCount(toolName);
|
|
}
|
|
|
|
const args = item.tool_call?.args || '';
|
|
if (args != null && args && typeof args === 'string') {
|
|
numTokens += this.getTokenCount(args);
|
|
}
|
|
|
|
const output = item.tool_call?.output || '';
|
|
if (output != null && output && typeof output === 'string') {
|
|
numTokens += this.getTokenCount(output);
|
|
}
|
|
continue;
|
|
}
|
|
|
|
const nestedValue = item[item.type];
|
|
|
|
if (!nestedValue) {
|
|
continue;
|
|
}
|
|
|
|
processValue(nestedValue);
|
|
}
|
|
} else if (typeof value === 'string') {
|
|
numTokens += this.getTokenCount(value);
|
|
} else if (typeof value === 'number') {
|
|
numTokens += this.getTokenCount(value.toString());
|
|
} else if (typeof value === 'boolean') {
|
|
numTokens += this.getTokenCount(value.toString());
|
|
}
|
|
};
|
|
|
|
let numTokens = tokensPerMessage;
|
|
for (let [key, value] of Object.entries(message)) {
|
|
processValue(value);
|
|
|
|
if (key === 'name') {
|
|
numTokens += tokensPerName;
|
|
}
|
|
}
|
|
return numTokens;
|
|
}
|
|
|
|
/**
|
|
* Merges completion content with existing content when editing TEXT or THINK types
|
|
* @param {Array} existingContent - The existing content array
|
|
* @param {Array} newCompletion - The new completion content
|
|
* @param {string} editedType - The type of content being edited
|
|
* @returns {Array} The merged content array
|
|
*/
|
|
mergeEditedContent(existingContent, newCompletion, editedType) {
|
|
if (!newCompletion.length) {
|
|
return existingContent.concat(newCompletion);
|
|
}
|
|
|
|
if (editedType !== ContentTypes.TEXT && editedType !== ContentTypes.THINK) {
|
|
return existingContent.concat(newCompletion);
|
|
}
|
|
|
|
const lastIndex = existingContent.length - 1;
|
|
const lastExisting = existingContent[lastIndex];
|
|
const firstNew = newCompletion[0];
|
|
|
|
if (lastExisting?.type !== firstNew?.type || firstNew?.type !== editedType) {
|
|
return existingContent.concat(newCompletion);
|
|
}
|
|
|
|
const mergedContent = [...existingContent];
|
|
if (editedType === ContentTypes.TEXT) {
|
|
mergedContent[lastIndex] = {
|
|
...mergedContent[lastIndex],
|
|
[ContentTypes.TEXT]:
|
|
(mergedContent[lastIndex][ContentTypes.TEXT] || '') + (firstNew[ContentTypes.TEXT] || ''),
|
|
};
|
|
} else {
|
|
mergedContent[lastIndex] = {
|
|
...mergedContent[lastIndex],
|
|
[ContentTypes.THINK]:
|
|
(mergedContent[lastIndex][ContentTypes.THINK] || '') +
|
|
(firstNew[ContentTypes.THINK] || ''),
|
|
};
|
|
}
|
|
|
|
// Add remaining completion items
|
|
return mergedContent.concat(newCompletion.slice(1));
|
|
}
|
|
|
|
async sendPayload(payload, opts = {}) {
|
|
if (opts && typeof opts === 'object') {
|
|
this.setOptions(opts);
|
|
}
|
|
|
|
return await this.sendCompletion(payload, opts);
|
|
}
|
|
|
|
async addDocuments(message, attachments) {
|
|
const documentResult = await encodeAndFormatDocuments(
|
|
this.options.req,
|
|
attachments,
|
|
{
|
|
provider: this.options.agent?.provider ?? this.options.endpoint,
|
|
endpoint: this.options.agent?.endpoint ?? this.options.endpoint,
|
|
useResponsesApi: this.options.agent?.model_parameters?.useResponsesApi,
|
|
model: this.modelOptions?.model ?? this.model,
|
|
},
|
|
getStrategyFunctions,
|
|
);
|
|
message.documents =
|
|
documentResult.documents && documentResult.documents.length
|
|
? documentResult.documents
|
|
: undefined;
|
|
return documentResult.files;
|
|
}
|
|
|
|
async addVideos(message, attachments) {
|
|
const videoResult = await encodeAndFormatVideos(
|
|
this.options.req,
|
|
attachments,
|
|
{
|
|
provider: this.options.agent?.provider ?? this.options.endpoint,
|
|
endpoint: this.options.agent?.endpoint ?? this.options.endpoint,
|
|
},
|
|
getStrategyFunctions,
|
|
);
|
|
message.videos =
|
|
videoResult.videos && videoResult.videos.length ? videoResult.videos : undefined;
|
|
return videoResult.files;
|
|
}
|
|
|
|
async addAudios(message, attachments) {
|
|
const audioResult = await encodeAndFormatAudios(
|
|
this.options.req,
|
|
attachments,
|
|
{
|
|
provider: this.options.agent?.provider ?? this.options.endpoint,
|
|
endpoint: this.options.agent?.endpoint ?? this.options.endpoint,
|
|
},
|
|
getStrategyFunctions,
|
|
);
|
|
message.audios =
|
|
audioResult.audios && audioResult.audios.length ? audioResult.audios : undefined;
|
|
return audioResult.files;
|
|
}
|
|
|
|
/**
|
|
* Extracts text context from attachments and sets it on the message.
|
|
* This handles text that was already extracted from files (OCR, transcriptions, document text, etc.)
|
|
* @param {TMessage} message - The message to add context to
|
|
* @param {MongoFile[]} attachments - Array of file attachments
|
|
* @returns {Promise<void>}
|
|
*/
|
|
async addFileContextToMessage(message, attachments) {
|
|
const fileContext = await extractFileContext({
|
|
attachments,
|
|
req: this.options?.req,
|
|
tokenCountFn: (text) => countTokens(text),
|
|
});
|
|
|
|
if (fileContext) {
|
|
message.fileContext = fileContext;
|
|
}
|
|
}
|
|
|
|
async processAttachments(message, attachments) {
|
|
const categorizedAttachments = {
|
|
images: [],
|
|
videos: [],
|
|
audios: [],
|
|
documents: [],
|
|
};
|
|
|
|
const allFiles = [];
|
|
|
|
const provider = this.options.agent?.provider ?? this.options.endpoint;
|
|
const isBedrock = provider === EModelEndpoint.bedrock;
|
|
|
|
if (!this._mergedFileConfig) {
|
|
this._mergedFileConfig = mergeFileConfig(this.options.req?.config?.fileConfig);
|
|
const endpoint = this.options.agent?.endpoint ?? this.options.endpoint;
|
|
this._endpointFileConfig = getEndpointFileConfig({
|
|
fileConfig: this._mergedFileConfig,
|
|
endpoint,
|
|
endpointType: this.options.endpointType,
|
|
});
|
|
}
|
|
|
|
for (const file of attachments) {
|
|
/** @type {FileSources} */
|
|
const source = file.source ?? FileSources.local;
|
|
if (source === FileSources.text) {
|
|
allFiles.push(file);
|
|
continue;
|
|
}
|
|
if (
|
|
file.embedded === true ||
|
|
file.metadata?.codeEnvRef != null ||
|
|
file.metadata?.fileIdentifier != null
|
|
) {
|
|
allFiles.push(file);
|
|
continue;
|
|
}
|
|
|
|
if (file.type.startsWith('image/')) {
|
|
categorizedAttachments.images.push(file);
|
|
} else if (file.type === 'application/pdf') {
|
|
categorizedAttachments.documents.push(file);
|
|
allFiles.push(file);
|
|
} else if (isBedrock && isBedrockDocumentType(file.type)) {
|
|
categorizedAttachments.documents.push(file);
|
|
allFiles.push(file);
|
|
} else if (file.type.startsWith('video/')) {
|
|
categorizedAttachments.videos.push(file);
|
|
allFiles.push(file);
|
|
} else if (file.type.startsWith('audio/')) {
|
|
categorizedAttachments.audios.push(file);
|
|
allFiles.push(file);
|
|
} else if (
|
|
file.type &&
|
|
this._mergedFileConfig &&
|
|
this._endpointFileConfig?.supportedMimeTypes &&
|
|
this._mergedFileConfig.checkType(file.type, this._endpointFileConfig.supportedMimeTypes)
|
|
) {
|
|
categorizedAttachments.documents.push(file);
|
|
allFiles.push(file);
|
|
}
|
|
}
|
|
|
|
const [imageFiles] = await Promise.all([
|
|
categorizedAttachments.images.length > 0
|
|
? this.addImageURLs(message, categorizedAttachments.images)
|
|
: Promise.resolve([]),
|
|
categorizedAttachments.documents.length > 0
|
|
? this.addDocuments(message, categorizedAttachments.documents)
|
|
: Promise.resolve([]),
|
|
categorizedAttachments.videos.length > 0
|
|
? this.addVideos(message, categorizedAttachments.videos)
|
|
: Promise.resolve([]),
|
|
categorizedAttachments.audios.length > 0
|
|
? this.addAudios(message, categorizedAttachments.audios)
|
|
: Promise.resolve([]),
|
|
]);
|
|
|
|
allFiles.push(...imageFiles);
|
|
|
|
const seenFileIds = new Set();
|
|
const uniqueFiles = [];
|
|
|
|
for (const file of allFiles) {
|
|
if (file.file_id && !seenFileIds.has(file.file_id)) {
|
|
seenFileIds.add(file.file_id);
|
|
uniqueFiles.push(file);
|
|
} else if (!file.file_id) {
|
|
uniqueFiles.push(file);
|
|
}
|
|
}
|
|
|
|
return uniqueFiles;
|
|
}
|
|
|
|
/**
|
|
* @param {TMessage[]} _messages
|
|
* @returns {Promise<TMessage[]>}
|
|
*/
|
|
async addPreviousAttachments(_messages) {
|
|
if (!this.options.resendFiles) {
|
|
return _messages;
|
|
}
|
|
|
|
const seen = new Set();
|
|
const attachmentsProcessed =
|
|
this.options.attachments && !(this.options.attachments instanceof Promise);
|
|
if (attachmentsProcessed) {
|
|
for (const attachment of this.options.attachments) {
|
|
seen.add(attachment.file_id);
|
|
}
|
|
}
|
|
|
|
/**
|
|
*
|
|
* @param {TMessage} message
|
|
*/
|
|
const processMessage = async (message) => {
|
|
if (!this.message_file_map) {
|
|
/** @type {Record<string, MongoFile[]> */
|
|
this.message_file_map = {};
|
|
}
|
|
|
|
const fileIds = [];
|
|
for (const file of message.files) {
|
|
if (seen.has(file.file_id)) {
|
|
continue;
|
|
}
|
|
fileIds.push(file.file_id);
|
|
seen.add(file.file_id);
|
|
}
|
|
|
|
if (fileIds.length === 0) {
|
|
return message;
|
|
}
|
|
|
|
const files = await db.getFiles(
|
|
{
|
|
file_id: { $in: fileIds },
|
|
},
|
|
{},
|
|
{},
|
|
);
|
|
|
|
await this.addFileContextToMessage(message, files);
|
|
await this.processAttachments(message, files);
|
|
|
|
this.message_file_map[message.messageId] = files;
|
|
return message;
|
|
};
|
|
|
|
const promises = [];
|
|
|
|
for (const message of _messages) {
|
|
if (!message.files) {
|
|
promises.push(message);
|
|
continue;
|
|
}
|
|
|
|
promises.push(processMessage(message));
|
|
}
|
|
|
|
const messages = await Promise.all(promises);
|
|
|
|
this.checkVisionRequest(Object.values(this.message_file_map ?? {}).flat());
|
|
return messages;
|
|
}
|
|
}
|
|
|
|
module.exports = BaseClient;
|