LibreChat/api/app/clients/BaseClient.js
Danny Avila 93c4ef4ba8
🧱 refactor: typed CodeEnvRef + kind discriminator + principal-aware sandbox cache (#12960)
* 🧱 refactor: typed CodeEnvRef + kind discriminator + tenant-aware sandbox cache

Final cutover for the LibreChat ↔ codeapi sandbox file identity. Replaces
the magic string `${session_id}/${file_id}?entity_id=...` with a typed,
discriminated `CodeEnvRef`. Pre-release lockstep deploy with codeapi
#1455 and agents #148; no legacy aliases retained.

## Final shape

```ts
type CodeEnvRef =
  | { kind: 'skill'; id: string; storage_session_id: string; file_id: string; version: number }
  | { kind: 'agent'; id: string; storage_session_id: string; file_id: string }
  | { kind: 'user';  id: string; storage_session_id: string; file_id: string };
```

`kind` drives codeapi's sessionKey: `<tenant>:<kind>:<id>[✌️<version>]`
for shared kinds, `<tenant>:user:<userId>` for user-private (auth context
provides `userId`). `version` is statically required for `kind: 'skill'`
and forbidden otherwise via discriminated union — constraint holds at
compile time on every consumer, not just codeapi's runtime validator.

`id` is sessionKey-meaningful for `'skill'` / `'agent'`; informational
only for `'user'` (codeapi resolves user identity from auth context).

## What changed

- `packages/data-provider/src/codeEnvRef.ts` — discriminated union +
  `CODE_ENV_KINDS` const-tuple keeps the runtime list and TS union
  locked together.
- Schemas: `metadata.codeEnvRef` and `SkillFile.codeEnvRef` enums
  tightened to `['skill', 'agent', 'user']`.
- `primeSkillFiles` writes `kind: 'skill'`, `id: skill._id`,
  `version: skill.version`. Cache-hit path reads `codeEnvRef`
  directly. Bumping `skill.version` on edit naturally invalidates
  the prior cache entry under the new sessionKey.
- `processCodeOutput` writes `kind: 'user'`, `id: req.user.id`. Output
  bucket is always user-scoped, regardless of which skill the
  execution invoked. New regression test pins the asymmetry.
- `primeFiles` reupload preserves `kind`/`id`/`version?` from the
  existing ref so a skill-cache-miss reupload doesn't silently demote
  to user bucket.
- `crud.js` upload functions (`uploadCodeEnvFile` /
  `batchUploadCodeEnvFiles`) thread `kind`/`id`/`version?` to the
  multipart form (codeapi #1455 option α). Without these on the wire,
  codeapi falls back to user bucketing and skill-cache invalidation
  never fires. Client-side validation mirrors codeapi's validator.
- `Files/process.js` — chat attachments use `kind: 'user'`; agent
  setup files use `kind: 'agent'`.
- Drops `entity_id` everywhere (struct, schema sub-docs, write paths,
  upload form fields). Drops `'system'` from the kind enum (no emitter
  ever existed).

## Test plan

- [x] `cd packages/data-provider && npx jest src/codeEnvRef.spec` — 4 / 4
- [x] `cd packages/data-schemas && npx jest` — 1447 / 1447
- [x] `cd packages/api && npx jest src/agents` — 81 / 81 in skillFiles +
  handlers + resources
- [x] `cd api && npx jest server/services/Files server/controllers/agents` —
  436 / 436
- [x] `cd api && npx jest server/services/Files/Code` — 98 / 98 (incl.
  new "outputs are user-scoped regardless of which skill the execution
  invoked" regression and "reupload forwards kind/id/version from
  existing ref")
- [x] `npx tsc --noEmit -p packages/data-{provider,schemas}/tsconfig.json
  && npx tsc --noEmit -p packages/api/tsconfig.json` — clean (only
  pre-existing unrelated dev errors in storage/balance, untouched here)

## Deploy notes

- **24h cache-miss burst** on first deploy. Inputs (skill caches re-prime
  under new sessionKey shape) and outputs (any pre-Phase C skill-output
  cached files become unreadable). Bounded by codeapi's 24h TTL.
- **Lockstep with codeapi #1455 and agents #148.** Either repo can land
  first since no aliases to drain, but the three deploys must overlap
  within the same maintenance window.
- **`@librechat/agents` bump to `3.1.79-dev.0`** required after agents
  #148 lands and is published.

## What this enables

Auth bridge work (JWT-based tenant/user identity between LC and codeapi)
— codeapi now derives sessionKey purely from `req.codeApiAuthContext.{
tenantId, userId}`, so the next chapter is replacing the header-asserted
user identity with a verified-claim path.

* 🩹 fix: persist execute_code uploads under codeEnvRef metadata key

Codex review P1 (chatgpt-codex-connector). `Files/process.js` was
storing the upload result under `metadata.fileIdentifier` even though:
- `uploadCodeEnvFile` now returns `{ storage_session_id, file_id }`,
  not the legacy magic string.
- The post-cutover schema (`File.metadata.codeEnvRef`) only declares
  `codeEnvRef` — mongoose strict mode silently strips unknown keys.
- All readers (`primeFiles`, `getCodeFilesByIds`,
  `categorizeFileForToolResources`, controller filtering) check
  `metadata.codeEnvRef`.

Net effect of the bug: chat-attached and agent-setup execute_code files
would lose their sandbox reference on save, and primeFiles would skip
them on subsequent code-execution turns — the file blob would still be
available locally but never re-mounted in the sandbox.

Fix: construct the full `CodeEnvRef` (`{ kind, id, storage_session_id,
file_id }`) at the write site and persist under `metadata.codeEnvRef`.
`BaseClient`'s "is this a code-env file" presence check accepts the new
shape alongside the legacy `fileIdentifier` for back-compat with any
pre-cutover records still in the database. Mirrors the same change in
`processAttachments.spec.ts` (which re-implements the BaseClient logic
for testability).

New regression tests in `process.spec.js` cover three cases:
- chat attachments (`messageAttachment=true`) → `kind: 'user'`
- agent setup (`messageAttachment=false`) → `kind: 'agent'`
- legacy `fileIdentifier` key is NOT persisted (would be schema-stripped)

* 🩹 fix: read storage_session_id on primed file refs (Codex P1)

Codex review (chatgpt-codex-connector). After Phase B's per-file
`session_id` → `storage_session_id` rename, `primeFiles` emits the
new field — but `seedCodeFilesIntoSessions` was still reading
`files[0].session_id` for the representative session and `f.session_id`
for the dedupe key. In runs with only primed attachments (no skill
seed), `representativeSessionId` was `undefined`, the function
returned the unchanged map, and `seedCodeFilesIntoSessions` silently
dropped the entire batch. The first `execute_code` call then started
without `_injected_files` and the agent couldn't see prior-turn
artifacts.

Fix:
- `codeFilesSession.ts`: read `f.storage_session_id` for both the
  dedupe key and the representative session id. JSDoc updated to
  match the new field name.
- `callbacks.js`: the two output-file persistence paths read
  `file.session_id` to pass to `processCodeOutput` — switch to
  `file.storage_session_id`. The original comment explicitly says
  this should be the STORAGE session, which is exactly the field
  Phase B renamed.
- `codeFilesSession.spec.ts`: fixture builder uses `storage_session_id`
  and `kind: 'user'` to match the post-cutover `CodeEnvFile` shape.

Lockstep coordination: this matches the post-bump shape of
`@librechat/agents` 3.1.79+. CI tsc errors against the currently-pinned
3.1.78 are expected and resolve when the dep bumps in this PR before
merge.

* 📦 chore: Bump `@librechat/agents` to version 3.1.80-dev.0 in package-lock and package.json files

* 🪪 fix: thread kind/id/version through codeapi /download URLs (Phase C α)

Symmetric fix for the upload-side wire change in 537725a. Codeapi's
`sessionAuth` middleware now requires `kind`/`id`/`version?` on every
download/freshness URL — without them it 400s with "kind must be one
of: skill, agent, user" before serving the file.

Three sites construct codeapi-side URLs that go through `sessionAuth`:

- `processCodeOutput` (`Files/Code/process.js`): `/download/<sess>/<id>`
  for freshly-generated sandbox outputs. Always `kind: 'user'` +
  `id: req.user.id` — code-output files are always user-private,
  regardless of which skill the run invoked.
- `getSessionInfo` (`Files/Code/process.js`): `/sessions/<sess>/objects/<id>`
  for the 23h freshness check. Pulls kind/id/version straight off the
  `codeEnvRef` already in scope — skill files stay skill-bucketed,
  user files stay user-bucketed.
- `/code/download/:session_id/:fileId` LC route (`routes/files/files.js`):
  proxies to codeapi for manual downloads. Code-output files only on
  this route, so `kind: 'user'` + `id: req.user.id`.

The `getCodeOutputDownloadStream` helper in `crud.js` now takes an
`identity` param, validated by a `buildCodeEnvDownloadQuery` helper
that mirrors `appendCodeEnvFileIdentity`'s shape rules: kind required
from the closed `{skill, agent, user}` set, version required for
'skill' and forbidden otherwise. Bad callers fail fast on the client
instead of round-tripping a 400.

Also cleans up two log-noise sources reported alongside the 400:

- `logAxiosError` in `packages/api/src/utils/axios.ts` was dumping
  `error.response.data` raw. With `responseType: 'arraybuffer'` that's
  a `Buffer` (~4 chars per byte after JSON-serialization); with
  `responseType: 'stream'` it's a `Readable` whose internal state
  serializes the entire ring buffer + socket. New `renderResponseData`
  decodes small buffers as UTF-8 (truncated past 2KB) and stubs streams
  as `'[stream]'`. Diagnostics stay useful, log lines stop being
  megabytes.
- `/code/download` route's catch was bare `logger.error('...', error)`,
  bypassing the redactor. Switched to `logAxiosError` so it benefits
  from the same buffer/stream handling.

Tests updated to match the new contract:
- crud.spec: `getCodeOutputDownloadStream` fixtures pass `userIdentity`;
  new cases cover skill identity (with version), bad kind rejection,
  skill-without-version rejection.
- process.spec: `getSessionInfo` test passes a full `codeEnvRef` object.

* ♻️ refactor: extract codeEnv identity helpers into packages/api

Per the project convention that new backend code lives in TypeScript
under `packages/api`, moves `appendCodeEnvFileIdentity` and
`buildCodeEnvDownloadQuery` from `api/server/services/Files/Code/crud.js`
into a new `packages/api/src/files/code/identity.ts` module.

Both helpers are pure validators that mirror codeapi's
`parseUploadSessionKeyInput` server-side rules (closed kind set,
`version` required for `'skill'` and forbidden otherwise) — they
deserve TS support and a dedicated spec rather than living as
JSDoc-typed helpers in the legacy `/api` workspace. The new module:

- Exports a `CodeEnvIdentity` interface using the
  `librechat-data-provider` `CodeEnvKind` discriminated union.
- Adds 13 unit tests in `identity.spec.ts` covering the validation
  matrix (skill+version, agent, user, and every rejection path) plus
  URL encoding for the download query.
- Re-exported from `packages/api/src/files/code/index.ts` alongside
  `classify`, `extract`, and `form`.

Consumer updates:
- `api/server/services/Files/Code/crud.js`: drops the local helpers
  and imports them from `@librechat/api`. Net -64 lines.
- `api/server/services/Files/Code/process.js`: same.
- Test mocks for `@librechat/api` in three spec files now stub the
  helpers' validation behavior locally rather than pulling them
  through `requireActual` (which would drag in provider-config
  init-time side effects). The package's `exports` field only
  surfaces the root barrel, so leaf imports aren't reachable from
  legacy `/api` test setup.

No runtime behavior change. Identity validation rules and emitted
form/query shapes are byte-for-byte identical pre/post.

* 🪪 fix: emit resource_id alongside id on _injected_files (skill 403 fix)

Companion to codeapi #1455 fix and agents 3.1.80-dev.1 — the wire
shape for shared-kind files now requires `resource_id` distinct from
the storage `id`. Without this LC change, codeapi's sessionKey
re-derivation on every shared-kind /exec rejects with 403
session_key_mismatch:

    cached:  legacy:skill:69dcf561...✌️59  (signed at upload, skill _id)
    derived: legacy:skill:ysPwEURuPk-...✌️59  (storage nanoid)

Emit sites updated:

- `primeInvokedSkills` cache-hit path: `resource_id: ref.id` (the
  persisted skill `_id` from `codeEnvRef.id`); `id: ref.file_id`
  unchanged (storage uuid).
- `primeInvokedSkills` fresh-upload path: `resource_id: skill._id.toString()`
  on every primed file (the `allPrimedFiles` builder type now carries
  the field).
- `processCodeOutput`'s `pushFile` (Code/process.js): `resource_id: ref.id`
  — for `kind: 'user'` this is informational (codeapi derives
  sessionKey from auth context) but emitted for shape uniformity
  with shared kinds.

Bumps `@librechat/agents` to `^3.1.80-dev.1` (the version that
ships the matching `CodeEnvFile.resource_id` field).

## Test plan

- [x] `cd packages/api && npx jest src/agents` — 67 / 67 pass
  (skillFiles fixtures updated to assert `resource_id` on the
  emitted CodeSessionContext.files).
- [x] `cd api && npx jest server/services/Files server/controllers/agents` —
  445 / 445 pass (process.spec fixtures updated for the reupload
  + cache-hit emission).
- [x] `npx tsc --noEmit -p packages/api/tsconfig.json` — clean.

* fix(skill-tool-call): carry resource_id through primeSkillFiles → artifact

Codeapi was 400ing every /exec following a `handle_skill` tool call
with `resource_id is invalid` (`type: 'undefined'`). Both code paths
in `primeSkillFiles` (cache-hit + fresh-upload) returned files
without `resource_id`/`kind`/`version`, and the artifact in
`handlers.ts` forwarded the stripped shape into
`tc.codeSessionContext.files` → `_injected_files`.

`primeInvokedSkills` (the NL-detected loader) had already been fixed
end-to-end; this commit aligns the tool-invoked path with the same
contract: `resource_id` = `skill._id.toString()`, `kind: 'skill'`,
`version` = the skill's monotonic counter.

Tests added to `skillFiles.spec.ts` lock the contract on
`primeSkillFiles` directly so future refactors can't silently drop
the resource identity again.

* fix(handlers.spec): align session_id → storage_session_id rename + kind discriminator

Pre-existing TS errors against the post-rename `CodeEnvFile` shape:
the test file still used `session_id` on per-file objects (renamed to
`storage_session_id` in agents Phase B/C) and was missing the `kind`
discriminator the discriminated union requires. Both inputs and the
matching `expect.toEqual(...)` mirrors updated together so the
runtime equality check still holds.

Lines 723-732 stay as-is — they sit behind `as unknown as
ToolCallRequest` and TS already skipped them.

* chore: fix `@librechat/agents`, correct version to 3.1.80-dev.0 in package.json files

* chore: bump `@librechat/agents` to version 3.1.80-dev.1 in package.json and package-lock.json

* chore: bump `@librechat/agents` to version 3.1.80-dev.2

* feat(observability): trace file priming chain from primeCodeFiles to _injected_files

Diagnosing the user-upload "files=[] on first /exec" bug requires
seeing where in the LC chain a file ref disappears. Prior to this
patch the chain (primeCodeFiles → primedCodeFiles → initialSessions
→ CodeSessionContext → _injected_files) was opaque end-to-end:
  - primeCodeFiles silently dropped files without `metadata.codeEnvRef`
  - reuploadFile catches all errors and continues with no signal
  - the handlers.ts handoff to codeapi never logged what it was sending

After this patch, a single grep on `[primeCodeFiles]` plus
`[code-env:inject]` shows the full per-file path:

  [primeCodeFiles] in: file_ids=N resourceFiles=M
  [primeCodeFiles] file=<id> path=skip reason=no-codeenvref filename=...
  [primeCodeFiles] file=<id> path=cache-hit-by-session storage_session_id=...
  [primeCodeFiles] file=<id> path=reupload reason=no-uploadtime ...
  [primeCodeFiles] file=<id> path=reupload reason=stale ...
  [primeCodeFiles] file=<id> path=reupload-success oldSession=... newSession=... newFileId=...
  [primeCodeFiles] file=<id> path=reupload-failed session=...
  [primeCodeFiles] file=<id> path=fresh-active storage_session_id=...
  [primeCodeFiles] out: returned=N skippedNoRef=M reuploadFailures=K

  [code-env:inject] tool=<name> files=N missingResourceId=K     (debug)
  [code-env:inject] M/N files missing resource_id ...           (warn)
  [code-env:inject] tool=<name> _injected_files=0 ...           (warn)

The boundary log warns when LC sends zero injected files on a
code-execution tool call — that's the user's actual symptom showing
up at the LC side instead of having to correlate against codeapi's
`Request received { files: [] }`.

Tag chosen as `[code-env:inject]` rather than `[handoff:exec]` to
avoid collision with the app-level "handoff" semantic (subagent
handoff workflow).

Structural cleanup in primeFiles: replaced the `if (ref) { ... }`
nesting with an early `if (!ref) continue` so the per-path
instrumentation hooks land at top-level scope instead of indented
inside a conditional. Behavior unchanged; pushFile / reuploadFile
identical.

Spec fixtures (handlers.spec.ts, codeFilesSession.spec.ts) updated
to include `resource_id` on `CodeEnvFile` literals — required by
the post-3.1.80-dev.2 type now installed.

## Test plan

- [x] `cd packages/api && npx jest src/agents/handlers.spec.ts src/agents/codeFilesSession.spec.ts src/agents/skillFiles.spec.ts` — 69/69 pass
- [x] `cd api && npx jest server/services/Files/Code/process.spec.js` — 84/84 pass
- [x] `npx tsc --noEmit -p packages/api` — clean
- [x] `npx eslint` on all four touched files — clean

* chore: add CONSOLE_JSON_STRING_LENGTH to .env.example for JSON log string length configuration

* fix(files): align codeapi upload filename with LC's sanitized DB filename

User-attached files for code execution were uploading to codeapi
under `file.originalname` (raw upload filename, may contain spaces /
special chars) while LC's DB record stored the sanitized form
(`sanitizeFilename(file.originalname)`, underscores). Codeapi
preserves whatever filename the upload sent, so the sandbox saw
`/mnt/data/<originalname>` while LC's `primeFiles` toolContext text
+ `_injected_files.name` referenced `file.filename` (sanitized).

Visible failure: agent gets system prompt saying

    /mnt/data/librechat_code_api_-_active_customer_-_2025-11-05.xlsx

…tries that path, hits `FileNotFoundError`, then notices the
sandbox's actual `Available files` line says

    /mnt/data/librechat code api - active customer - 2025-11-05.xlsx

…retries with spaces, succeeds. Wastes a tool call per upload and
leaks raw filenames into model context.

Fix: sanitize once and use the sanitized form in both the codeapi
upload AND the LC DB record. Sandbox path = LC toolContext text =
in-memory ref name. No drift.

Reupload path (`Code/process.js` line 867 `filename: file.filename`)
already uses the sanitized DB name, so it stays consistent with the
fresh-upload path after this change.

## Test plan

- [x] `cd api && npx jest server/services/Files/process` — 32/32 pass
- [x] `npx eslint` on the touched file — clean

* chore: bump `@librechat/agents` to version 3.1.80-dev.3 in package.json and package-lock.json
2026-05-08 12:29:43 -04:00

1377 lines
45 KiB
JavaScript

const crypto = require('crypto');
const fetch = require('node-fetch');
const { logger } = require('@librechat/data-schemas');
const {
countTokens,
checkBalance,
getBalanceConfig,
buildMessageFiles,
extractFileContext,
encodeAndFormatAudios,
encodeAndFormatVideos,
encodeAndFormatDocuments,
} = require('@librechat/api');
const {
Constants,
FileSources,
ContentTypes,
excludedKeys,
EModelEndpoint,
mergeFileConfig,
isParamEndpoint,
isAgentsEndpoint,
isEphemeralAgentId,
supportsBalanceCheck,
isBedrockDocumentType,
getEndpointFileConfig,
} = require('librechat-data-provider');
const { getStrategyFunctions } = require('~/server/services/Files/strategies');
const { logViolation } = require('~/cache');
const TextStream = require('./TextStream');
const db = require('~/models');
class BaseClient {
constructor(apiKey, options = {}) {
this.apiKey = apiKey;
this.sender = options.sender ?? 'AI';
this.currentDateString = new Date().toLocaleDateString('en-us', {
year: 'numeric',
month: 'long',
day: 'numeric',
});
/** @type {boolean} */
this.skipSaveConvo = false;
/** @type {boolean} */
this.skipSaveUserMessage = false;
/** @type {string} */
this.user;
/** @type {string} */
this.conversationId;
/** @type {string} */
this.responseMessageId;
/** @type {string} */
this.parentMessageId;
/** @type {TAttachment[]} */
this.attachments;
/** The key for the usage object's input tokens
* @type {string} */
this.inputTokensKey = 'prompt_tokens';
/** The key for the usage object's output tokens
* @type {string} */
this.outputTokensKey = 'completion_tokens';
/** @type {Set<string>} */
this.savedMessageIds = new Set();
/**
* Flag to determine if the client re-submitted the latest assistant message.
* @type {boolean | undefined} */
this.continued;
/**
* Flag to determine if the client has already fetched the conversation while saving new messages.
* @type {boolean | undefined} */
this.fetchedConvo;
/** @type {TMessage[]} */
this.currentMessages = [];
/** @type {import('librechat-data-provider').VisionModes | undefined} */
this.visionMode;
/** @type {import('librechat-data-provider').FileConfig | undefined} */
this._mergedFileConfig;
/** @type {import('librechat-data-provider').EndpointFileConfig | undefined} */
this._endpointFileConfig;
}
setOptions() {
throw new Error("Method 'setOptions' must be implemented.");
}
async getCompletion() {
throw new Error("Method 'getCompletion' must be implemented.");
}
/** @type {sendCompletion} */
async sendCompletion() {
throw new Error("Method 'sendCompletion' must be implemented.");
}
getSaveOptions() {
throw new Error('Subclasses must implement getSaveOptions');
}
async buildMessages() {
throw new Error('Subclasses must implement buildMessages');
}
async summarizeMessages() {
throw new Error('Subclasses attempted to call summarizeMessages without implementing it');
}
/**
* @returns {string}
*/
getResponseModel() {
if (isAgentsEndpoint(this.options.endpoint) && this.options.agent && this.options.agent.id) {
return this.options.agent.id;
}
return this.modelOptions?.model ?? this.model;
}
/**
* Abstract method to get the token count for a message. Subclasses must implement this method.
* @param {TMessage} responseMessage
* @returns {number}
*/
getTokenCountForResponse(responseMessage) {
logger.debug('[BaseClient] `recordTokenUsage` not implemented.', {
messageId: responseMessage?.messageId,
});
}
/**
* Abstract method to record token usage. Subclasses must implement this method.
* If a correction to the token usage is needed, the method should return an object with the corrected token counts.
* Should only be used if `recordCollectedUsage` was not used instead.
* @param {string} [model]
* @param {AppConfig['balance']} [balance]
* @param {number} promptTokens
* @param {number} completionTokens
* @param {string} [messageId]
* @returns {Promise<void>}
*/
async recordTokenUsage({ model, balance, promptTokens, completionTokens, messageId }) {
logger.debug('[BaseClient] `recordTokenUsage` not implemented.', {
model,
balance,
messageId,
promptTokens,
completionTokens,
});
}
/**
* Makes an HTTP request and logs the process.
*
* @param {RequestInfo} url - The URL to make the request to. Can be a string or a Request object.
* @param {RequestInit} [init] - Optional init options for the request.
* @returns {Promise<Response>} - A promise that resolves to the response of the fetch request.
*/
async fetch(_url, init) {
let url = _url;
if (this.options.directEndpoint) {
url = this.options.reverseProxyUrl;
}
logger.debug(`Making request to ${url}`);
if (typeof Bun !== 'undefined') {
return await fetch(url, init);
}
return await fetch(url, init);
}
getBuildMessagesOptions() {
throw new Error('Subclasses must implement getBuildMessagesOptions');
}
async generateTextStream(text, onProgress, options = {}) {
const stream = new TextStream(text, options);
await stream.processTextStream(onProgress);
}
/**
* @returns {[string|undefined, string|undefined]}
*/
processOverideIds() {
/** @type {Record<string, string | undefined>} */
let { overrideConvoId, overrideUserMessageId } = this.options?.req?.body ?? {};
if (overrideConvoId) {
const [conversationId, index] = overrideConvoId.split(Constants.COMMON_DIVIDER);
overrideConvoId = conversationId;
if (index !== '0') {
this.skipSaveConvo = true;
}
}
if (overrideUserMessageId) {
const [userMessageId, index] = overrideUserMessageId.split(Constants.COMMON_DIVIDER);
overrideUserMessageId = userMessageId;
if (index !== '0') {
this.skipSaveUserMessage = true;
}
}
return [overrideConvoId, overrideUserMessageId];
}
async setMessageOptions(opts = {}) {
if (opts && opts.replaceOptions) {
this.setOptions(opts);
}
const [overrideConvoId, overrideUserMessageId] = this.processOverideIds();
const { isEdited, isContinued } = opts;
const user = opts.user ?? null;
this.user = user;
const saveOptions = this.getSaveOptions();
this.abortController = opts.abortController ?? new AbortController();
const requestConvoId = overrideConvoId ?? opts.conversationId;
const conversationId = requestConvoId ?? crypto.randomUUID();
const parentMessageId = opts.parentMessageId ?? Constants.NO_PARENT;
const userMessageId =
overrideUserMessageId ?? opts.overrideParentMessageId ?? crypto.randomUUID();
let responseMessageId = opts.responseMessageId ?? crypto.randomUUID();
let head = isEdited ? responseMessageId : parentMessageId;
this.currentMessages = (await this.loadHistory(conversationId, head)) ?? [];
this.conversationId = conversationId;
if (isEdited && !isContinued) {
responseMessageId = crypto.randomUUID();
head = responseMessageId;
this.currentMessages[this.currentMessages.length - 1].messageId = head;
}
if (opts.isRegenerate && responseMessageId.endsWith('_')) {
responseMessageId = crypto.randomUUID();
}
this.responseMessageId = responseMessageId;
return {
...opts,
user,
head,
saveOptions,
userMessageId,
requestConvoId,
conversationId,
parentMessageId,
responseMessageId,
};
}
createUserMessage({ messageId, parentMessageId, conversationId, text }) {
return {
messageId,
parentMessageId,
conversationId,
sender: 'User',
text,
isCreatedByUser: true,
};
}
async handleStartMethods(message, opts) {
const {
user,
head,
saveOptions,
userMessageId,
requestConvoId,
conversationId,
parentMessageId,
responseMessageId,
} = await this.setMessageOptions(opts);
const userMessage = opts.isEdited
? this.currentMessages[this.currentMessages.length - 2]
: this.createUserMessage({
messageId: userMessageId,
parentMessageId,
conversationId,
text: message,
});
if (typeof opts?.getReqData === 'function') {
opts.getReqData({
userMessage,
conversationId,
responseMessageId,
sender: this.sender,
});
}
if (typeof opts?.onStart === 'function') {
const isNewConvo = !requestConvoId && parentMessageId === Constants.NO_PARENT;
opts.onStart(userMessage, responseMessageId, isNewConvo);
}
return {
...opts,
user,
head,
conversationId,
responseMessageId,
saveOptions,
userMessage,
};
}
/**
* Adds instructions to the messages array. If the instructions object is empty or undefined,
* the original messages array is returned. Otherwise, the instructions are added to the messages
* array either at the beginning (default) or preserving the last message at the end.
*
* @param {Array} messages - An array of messages.
* @param {Object} instructions - An object containing instructions to be added to the messages.
* @param {boolean} [beforeLast=false] - If true, adds instructions before the last message; if false, adds at the beginning.
* @returns {Array} An array containing messages and instructions, or the original messages if instructions are empty.
*/
addInstructions(messages, instructions, beforeLast = false) {
if (!instructions || Object.keys(instructions).length === 0) {
return messages;
}
if (!beforeLast) {
return [instructions, ...messages];
}
// Legacy behavior: add instructions before the last message
const payload = [];
if (messages.length > 1) {
payload.push(...messages.slice(0, -1));
}
payload.push(instructions);
if (messages.length > 0) {
payload.push(messages[messages.length - 1]);
}
return payload;
}
concatenateMessages(messages) {
return messages.reduce((acc, message) => {
const nameOrRole = message.name ?? message.role;
return acc + `${nameOrRole}:\n${message.content}\n\n`;
}, '');
}
/**
* This method processes an array of messages and returns a context of messages that fit within a specified token limit.
* It iterates over the messages from newest to oldest, adding them to the context until the token limit is reached.
* If the token limit would be exceeded by adding a message, that message is not added to the context and remains in the original array.
* The method uses `push` and `pop` operations for efficient array manipulation, and reverses the context array at the end to maintain the original order of the messages.
*
* @param {Object} params
* @param {TMessage[]} params.messages - An array of messages, each with a `tokenCount` property. The messages should be ordered from oldest to newest.
* @param {number} [params.maxContextTokens] - The max number of tokens allowed in the context. If not provided, defaults to `this.maxContextTokens`.
* @param {{ role: 'system', content: text, tokenCount: number }} [params.instructions] - Instructions already added to the context at index 0.
* @returns {Promise<{
* context: TMessage[],
* remainingContextTokens: number,
* messagesToRefine: TMessage[],
* }>} An object with three properties: `context`, `remainingContextTokens`, and `messagesToRefine`.
* `context` is an array of messages that fit within the token limit.
* `remainingContextTokens` is the number of tokens remaining within the limit after adding the messages to the context.
* `messagesToRefine` is an array of messages that were not added to the context because they would have exceeded the token limit.
*/
async getMessagesWithinTokenLimit({ messages: _messages, maxContextTokens, instructions }) {
// Every reply is primed with <|start|>assistant<|message|>, so we
// start with 3 tokens for the label after all messages have been counted.
let currentTokenCount = 3;
const instructionsTokenCount = instructions?.tokenCount ?? 0;
let remainingContextTokens =
(maxContextTokens ?? this.maxContextTokens) - instructionsTokenCount;
const messages = [..._messages];
const context = [];
if (currentTokenCount < remainingContextTokens) {
while (messages.length > 0 && currentTokenCount < remainingContextTokens) {
if (messages.length === 1 && instructions) {
break;
}
const poppedMessage = messages.pop();
const { tokenCount } = poppedMessage;
if (poppedMessage && currentTokenCount + tokenCount <= remainingContextTokens) {
context.push(poppedMessage);
currentTokenCount += tokenCount;
} else {
messages.push(poppedMessage);
break;
}
}
}
if (instructions) {
context.push(_messages[0]);
messages.shift();
}
const prunedMemory = messages;
remainingContextTokens -= currentTokenCount;
return {
context: context.reverse(),
remainingContextTokens,
messagesToRefine: prunedMemory,
};
}
async sendMessage(message, opts = {}) {
const appConfig = this.options.req?.config;
/** @type {Promise<TMessage>} */
let userMessagePromise;
const { user, head, isEdited, conversationId, responseMessageId, saveOptions, userMessage } =
await this.handleStartMethods(message, opts);
if (opts.progressCallback) {
opts.onProgress = opts.progressCallback.call(null, {
...(opts.progressOptions ?? {}),
parentMessageId: userMessage.messageId,
messageId: responseMessageId,
});
}
const { editedContent } = opts;
// It's not necessary to push to currentMessages
// depending on subclass implementation of handling messages
// When this is an edit, all messages are already in currentMessages, both user and response
if (isEdited) {
let latestMessage = this.currentMessages[this.currentMessages.length - 1];
if (!latestMessage) {
latestMessage = {
messageId: responseMessageId,
conversationId,
parentMessageId: userMessage.messageId,
isCreatedByUser: false,
model: this.modelOptions?.model ?? this.model,
sender: this.sender,
};
this.currentMessages.push(userMessage, latestMessage);
} else if (editedContent != null) {
// Handle editedContent for content parts
if (editedContent && latestMessage.content && Array.isArray(latestMessage.content)) {
const { index, text, type } = editedContent;
if (index >= 0 && index < latestMessage.content.length) {
const contentPart = latestMessage.content[index];
if (type === ContentTypes.THINK && contentPart.type === ContentTypes.THINK) {
contentPart[ContentTypes.THINK] = text;
} else if (type === ContentTypes.TEXT && contentPart.type === ContentTypes.TEXT) {
contentPart[ContentTypes.TEXT] = text;
}
}
}
}
this.continued = true;
} else {
this.currentMessages.push(userMessage);
}
/**
* When the userMessage is pushed to currentMessages, the parentMessage is the userMessageId.
* this only matters when buildMessages is utilizing the parentMessageId, and may vary on implementation
*/
const parentMessageId = isEdited ? head : userMessage.messageId;
this.parentMessageId = parentMessageId;
let {
prompt: payload,
tokenCountMap,
promptTokens,
} = await this.buildMessages(
this.currentMessages,
parentMessageId,
this.getBuildMessagesOptions(opts),
opts,
);
if (tokenCountMap && tokenCountMap[userMessage.messageId]) {
userMessage.tokenCount = tokenCountMap[userMessage.messageId];
logger.debug('[BaseClient] userMessage', {
messageId: userMessage.messageId,
tokenCount: userMessage.tokenCount,
conversationId: userMessage.conversationId,
});
}
if (!isEdited && !this.skipSaveUserMessage) {
const reqFiles = this.options.req?.body?.files;
if (reqFiles && Array.isArray(this.options.attachments)) {
const files = buildMessageFiles(reqFiles, this.options.attachments);
if (files.length > 0) {
userMessage.files = files;
}
delete userMessage.image_urls;
}
/**
* Persist the user's manual skill picks onto the user message so the
* frontend `SkillPills` component can render them in history
* after reload. UI-only metadata — the runtime skill resolution
* pipeline reads the top-level `req.body.manualSkills` separately.
* Filter is defense-in-depth on top of Mongoose schema validation:
* keeps the DB row free of empty/non-string entries even if a
* crafted payload slips past schema checks upstream.
*/
const rawManualSkills = this.options.req?.body?.manualSkills;
if (Array.isArray(rawManualSkills) && rawManualSkills.length > 0) {
const skills = rawManualSkills.filter((s) => typeof s === 'string' && s.length > 0);
if (skills.length > 0) {
userMessage.manualSkills = skills;
}
}
/**
* Persist the names of skills auto-primed this turn via `always-apply`
* frontmatter so `SkillPills` can render pinned-variant badges
* on the user bubble that survive reload and history render. Frozen
* at turn time (not reconstructed from `Skill.alwaysApply` at render
* time) because the flag is mutable — historical turns must keep
* their audit trail even if an admin flips `alwaysApply` off later.
*/
const alwaysApplySkillPrimes = this.options.agent?.alwaysApplySkillPrimes;
if (Array.isArray(alwaysApplySkillPrimes) && alwaysApplySkillPrimes.length > 0) {
const names = alwaysApplySkillPrimes
.map((p) => p?.name)
.filter((n) => typeof n === 'string' && n.length > 0);
if (names.length > 0) {
userMessage.alwaysAppliedSkills = names;
}
}
userMessagePromise = this.saveMessageToDatabase(userMessage, saveOptions, user).catch(
(err) => {
logger.error('[BaseClient] Failed to save user message:', err);
return {};
},
);
this.savedMessageIds.add(userMessage.messageId);
if (typeof opts?.getReqData === 'function') {
opts.getReqData({
userMessagePromise,
});
}
}
const balanceConfig = getBalanceConfig(appConfig);
if (
balanceConfig?.enabled &&
supportsBalanceCheck[this.options.endpointType ?? this.options.endpoint]
) {
await checkBalance(
{
req: this.options.req,
res: this.options.res,
txData: {
user: this.user,
tokenType: 'prompt',
amount: promptTokens,
endpoint: this.options.endpoint,
model: this.modelOptions?.model ?? this.model,
endpointTokenConfig: this.options.endpointTokenConfig,
},
},
{
logViolation,
getMultiplier: db.getMultiplier,
findBalanceByUser: db.findBalanceByUser,
createAutoRefillTransaction: db.createAutoRefillTransaction,
balanceConfig,
upsertBalanceFields: db.upsertBalanceFields,
},
);
}
const { completion, metadata } = await this.sendCompletion(payload, opts);
if (this.abortController) {
this.abortController.requestCompleted = true;
}
/** @type {TMessage} */
const responseMessage = {
messageId: responseMessageId,
conversationId,
parentMessageId: userMessage.messageId,
isCreatedByUser: false,
isEdited,
model: this.getResponseModel(),
sender: this.sender,
promptTokens,
iconURL: this.options.iconURL,
endpoint: this.options.endpoint,
...(this.metadata ?? {}),
metadata: Object.keys(metadata ?? {}).length > 0 ? metadata : undefined,
};
if (typeof completion === 'string') {
responseMessage.text = completion;
} else if (
Array.isArray(completion) &&
(this.clientName === EModelEndpoint.agents ||
isParamEndpoint(this.options.endpoint, this.options.endpointType))
) {
responseMessage.text = '';
if (!opts.editedContent || this.currentMessages.length === 0) {
responseMessage.content = completion;
} else {
const latestMessage = this.currentMessages[this.currentMessages.length - 1];
if (!latestMessage?.content) {
responseMessage.content = completion;
} else {
const existingContent = [...latestMessage.content];
const { type: editedType } = opts.editedContent;
responseMessage.content = this.mergeEditedContent(
existingContent,
completion,
editedType,
);
}
}
} else if (Array.isArray(completion)) {
responseMessage.text = completion.join('');
}
if (tokenCountMap && this.recordTokenUsage && this.getTokenCountForResponse) {
let completionTokens;
/**
* Metadata about input/output costs for the current message. The client
* should provide a function to get the current stream usage metadata; if not,
* use the legacy token estimations.
* @type {StreamUsage | null} */
const usage = this.getStreamUsage != null ? this.getStreamUsage() : null;
if (usage != null && Number(usage[this.outputTokensKey]) > 0) {
responseMessage.tokenCount = usage[this.outputTokensKey];
completionTokens = responseMessage.tokenCount;
} else {
responseMessage.tokenCount = this.getTokenCountForResponse(responseMessage);
completionTokens = responseMessage.tokenCount;
await this.recordTokenUsage({
usage,
promptTokens,
completionTokens,
balance: balanceConfig,
/** Note: When using agents, responseMessage.model is the agent ID, not the model */
model: this.model,
messageId: this.responseMessageId,
});
}
logger.debug('[BaseClient] Response token usage', {
messageId: responseMessage.messageId,
model: responseMessage.model,
promptTokens,
completionTokens,
});
}
if (userMessagePromise) {
await userMessagePromise;
}
if (
this.contextMeta?.calibrationRatio > 0 &&
this.contextMeta.calibrationRatio !== 1 &&
userMessage.tokenCount > 0
) {
const calibrated = Math.round(userMessage.tokenCount * this.contextMeta.calibrationRatio);
if (calibrated !== userMessage.tokenCount) {
logger.debug('[BaseClient] Calibrated user message tokenCount', {
messageId: userMessage.messageId,
raw: userMessage.tokenCount,
calibrated,
ratio: this.contextMeta.calibrationRatio,
});
userMessage.tokenCount = calibrated;
await this.updateMessageInDatabase({
messageId: userMessage.messageId,
tokenCount: calibrated,
});
}
}
if (this.artifactPromises) {
responseMessage.attachments = (await Promise.all(this.artifactPromises)).filter((a) => a);
}
if (this.options.attachments) {
try {
saveOptions.files = this.options.attachments.map((attachments) => attachments.file_id);
} catch (error) {
logger.error('[BaseClient] Error mapping attachments for conversation', error);
}
}
if (this.contextMeta) {
responseMessage.contextMeta = this.contextMeta;
}
responseMessage.databasePromise = this.saveMessageToDatabase(
responseMessage,
saveOptions,
user,
);
this.savedMessageIds.add(responseMessage.messageId);
delete responseMessage.tokenCount;
return responseMessage;
}
async loadHistory(conversationId, parentMessageId = null) {
logger.debug('[BaseClient] Loading history:', { conversationId, parentMessageId });
const messages = (await db.getMessages({ conversationId })) ?? [];
if (messages.length === 0) {
return [];
}
let mapMethod = null;
if (this.getMessageMapMethod) {
mapMethod = this.getMessageMapMethod();
}
let _messages = this.constructor.getMessagesForConversation({
messages,
parentMessageId,
mapMethod,
});
_messages = await this.addPreviousAttachments(_messages);
if (!this.shouldSummarize) {
return _messages;
}
for (let i = _messages.length - 1; i >= 0; i--) {
const msg = _messages[i];
if (!msg) {
continue;
}
const summaryBlock = BaseClient.findSummaryContentBlock(msg);
if (summaryBlock) {
this.previous_summary = {
...msg,
summary: BaseClient.getSummaryText(summaryBlock),
summaryTokenCount: summaryBlock.tokenCount,
};
break;
}
if (msg.summary) {
this.previous_summary = msg;
break;
}
}
if (this.previous_summary) {
const { messageId, summary, tokenCount, summaryTokenCount } = this.previous_summary;
logger.debug('[BaseClient] Previous summary:', {
messageId,
summary,
tokenCount,
summaryTokenCount,
});
}
return _messages;
}
/**
* Save a message to the database.
* @param {TMessage} message
* @param {Partial<TConversation>} endpointOptions
* @param {string | null} user
*/
async saveMessageToDatabase(message, endpointOptions, user = null) {
// Snapshot options before any await; disposeClient may set client.options = null
// while this method is suspended at an I/O boundary, but the local reference
// remains valid (disposeClient nulls the property, not the object itself).
const options = this.options;
if (!options) {
logger.error('[BaseClient] saveMessageToDatabase: client disposed before save, skipping');
return {};
}
if (this.user && user !== this.user) {
throw new Error('User mismatch.');
}
const hasAddedConvo = options?.req?.body?.addedConvo != null;
const reqCtx = {
userId: options?.req?.user?.id,
isTemporary: options?.req?.body?.isTemporary,
interfaceConfig: options?.req?.config?.interfaceConfig,
};
const savedMessage = await db.saveMessage(
reqCtx,
{
...message,
endpoint: options.endpoint,
unfinished: false,
user,
...(hasAddedConvo && { addedConvo: true }),
},
{ context: 'api/app/clients/BaseClient.js - saveMessageToDatabase #saveMessage' },
);
if (this.skipSaveConvo) {
return { message: savedMessage };
}
const fieldsToKeep = {
conversationId: message.conversationId,
endpoint: options.endpoint,
endpointType: options.endpointType,
...endpointOptions,
};
const conversationCreatedAt = options?.req?.conversationCreatedAt;
const createdAtOnInsert =
conversationCreatedAt != null ? new Date(conversationCreatedAt) : undefined;
const validCreatedAtOnInsert =
createdAtOnInsert && !Number.isNaN(createdAtOnInsert.getTime())
? createdAtOnInsert
: undefined;
const req = options?.req;
const skippedExistingConvoLookup = this.fetchedConvo === true;
const hasResolvedConversation =
req != null && Object.prototype.hasOwnProperty.call(req, 'resolvedConversation');
let existingConvo = null;
if (!skippedExistingConvoLookup && hasResolvedConversation) {
existingConvo = req.resolvedConversation;
} else if (!skippedExistingConvoLookup) {
existingConvo = await db.getConvo(req?.user?.id, message.conversationId);
}
if (hasResolvedConversation) {
delete req.resolvedConversation;
}
const shouldSetCreatedAtOnInsert = !skippedExistingConvoLookup && existingConvo == null;
const unsetFields = {};
const exceptions = new Set(['spec', 'iconURL']);
const hasNonEphemeralAgent =
isAgentsEndpoint(options.endpoint) &&
endpointOptions?.agent_id &&
!isEphemeralAgentId(endpointOptions.agent_id);
if (hasNonEphemeralAgent) {
exceptions.add('model');
}
if (existingConvo != null) {
this.fetchedConvo = true;
for (const key in existingConvo) {
if (!key) {
continue;
}
if (excludedKeys.has(key) && !exceptions.has(key)) {
continue;
}
if (endpointOptions?.[key] === undefined) {
unsetFields[key] = 1;
}
}
}
const conversation = await db.saveConvo(reqCtx, fieldsToKeep, {
context: 'api/app/clients/BaseClient.js - saveMessageToDatabase #saveConvo',
unsetFields,
createdAtOnInsert: shouldSetCreatedAtOnInsert ? validCreatedAtOnInsert : undefined,
});
return { message: savedMessage, conversation };
}
/**
* Update a message in the database.
* @param {Partial<TMessage>} message
*/
async updateMessageInDatabase(message) {
await db.updateMessage(this.options?.req?.user?.id, message);
}
/** Extracts text from a summary block (handles both legacy `text` field and new `content` array format). */
static getSummaryText(summaryBlock) {
if (Array.isArray(summaryBlock.content)) {
return summaryBlock.content.map((b) => b.text ?? '').join('');
}
if (typeof summaryBlock.content === 'string') {
return summaryBlock.content;
}
return summaryBlock.text ?? '';
}
/** Finds the last summary content block in a message's content array (last-summary-wins). */
static findSummaryContentBlock(message) {
if (!Array.isArray(message?.content)) {
return null;
}
let lastSummary = null;
for (const part of message.content) {
if (
part?.type === ContentTypes.SUMMARY &&
BaseClient.getSummaryText(part).trim().length > 0
) {
lastSummary = part;
}
}
return lastSummary;
}
/**
* Iterate through messages, building an array based on the parentMessageId.
*
* This function constructs a conversation thread by traversing messages from a given parentMessageId up to the root message.
* It handles cyclic references by ensuring that a message is not processed more than once.
* If the 'summary' option is set to true and a message has a 'summary' property:
* - The message's 'role' is set to 'system'.
* - The message's 'text' is set to its 'summary'.
* - If the message has a 'summaryTokenCount', the message's 'tokenCount' is set to 'summaryTokenCount'.
* The traversal stops at the message with the 'summary' property.
*
* Each message object should have an 'id' or 'messageId' property and may have a 'parentMessageId' property.
* The 'parentMessageId' is the ID of the message that the current message is a reply to.
* If 'parentMessageId' is not present, null, or is Constants.NO_PARENT,
* the message is considered a root message.
*
* @param {Object} options - The options for the function.
* @param {TMessage[]} options.messages - An array of message objects. Each object should have either an 'id' or 'messageId' property, and may have a 'parentMessageId' property.
* @param {string} options.parentMessageId - The ID of the parent message to start the traversal from.
* @param {Function} [options.mapMethod] - An optional function to map over the ordered messages. Applied conditionally based on mapCondition.
* @param {(message: TMessage) => boolean} [options.mapCondition] - An optional function to determine whether mapMethod should be applied to a given message. If not provided and mapMethod is set, mapMethod applies to all messages.
* @param {boolean} [options.summary=false] - If set to true, the traversal modifies messages with 'summary' and 'summaryTokenCount' properties and stops at the message with a 'summary' property.
* @returns {TMessage[]} An array containing the messages in the order they should be displayed, starting with the most recent message with a 'summary' property if the 'summary' option is true, and ending with the message identified by 'parentMessageId'.
*/
static getMessagesForConversation({
messages,
parentMessageId,
mapMethod = null,
mapCondition = null,
summary = false,
}) {
if (!messages || messages.length === 0) {
return [];
}
const orderedMessages = [];
let currentMessageId = parentMessageId;
const visitedMessageIds = new Set();
while (currentMessageId) {
if (visitedMessageIds.has(currentMessageId)) {
break;
}
const message = messages.find((msg) => {
const messageId = msg.messageId ?? msg.id;
return messageId === currentMessageId;
});
visitedMessageIds.add(currentMessageId);
if (!message) {
break;
}
let resolved = message;
let hasSummary = false;
if (summary) {
const summaryBlock = BaseClient.findSummaryContentBlock(message);
if (summaryBlock) {
const summaryText = BaseClient.getSummaryText(summaryBlock);
resolved = {
...message,
role: 'system',
content: [{ type: ContentTypes.TEXT, text: summaryText }],
tokenCount: summaryBlock.tokenCount,
};
hasSummary = true;
} else if (message.summary) {
resolved = {
...message,
role: 'system',
content: [{ type: ContentTypes.TEXT, text: message.summary }],
tokenCount: message.summaryTokenCount ?? message.tokenCount,
};
hasSummary = true;
}
}
const shouldMap = mapMethod != null && (mapCondition != null ? mapCondition(resolved) : true);
const processedMessage = shouldMap ? mapMethod(resolved) : resolved;
orderedMessages.push(processedMessage);
if (hasSummary) {
break;
}
currentMessageId =
message.parentMessageId === Constants.NO_PARENT ? null : message.parentMessageId;
}
orderedMessages.reverse();
return orderedMessages;
}
/**
* Algorithm adapted from "6. Counting tokens for chat API calls" of
* https://github.com/openai/openai-cookbook/blob/main/examples/How_to_count_tokens_with_tiktoken.ipynb
*
* An additional 3 tokens need to be added for assistant label priming after all messages have been counted.
* In our implementation, this is accounted for in the getMessagesWithinTokenLimit method.
*
* The content parts example was adapted from the following example:
* https://github.com/openai/openai-cookbook/pull/881/files
*
* Note: image token calculation is to be done elsewhere where we have access to the image metadata
*
* @param {Object} message
*/
getTokenCountForMessage(message) {
// Note: gpt-3.5-turbo and gpt-4 may update over time. Use default for these as well as for unknown models
let tokensPerMessage = 3;
let tokensPerName = 1;
const model = this.modelOptions?.model ?? this.model;
if (model === 'gpt-3.5-turbo-0301') {
tokensPerMessage = 4;
tokensPerName = -1;
}
const processValue = (value) => {
if (Array.isArray(value)) {
for (let item of value) {
if (
!item ||
!item.type ||
item.type === ContentTypes.THINK ||
item.type === ContentTypes.ERROR ||
item.type === ContentTypes.IMAGE_URL
) {
continue;
}
if (item.type === ContentTypes.TOOL_CALL && item.tool_call != null) {
const toolName = item.tool_call?.name || '';
if (toolName != null && toolName && typeof toolName === 'string') {
numTokens += this.getTokenCount(toolName);
}
const args = item.tool_call?.args || '';
if (args != null && args && typeof args === 'string') {
numTokens += this.getTokenCount(args);
}
const output = item.tool_call?.output || '';
if (output != null && output && typeof output === 'string') {
numTokens += this.getTokenCount(output);
}
continue;
}
const nestedValue = item[item.type];
if (!nestedValue) {
continue;
}
processValue(nestedValue);
}
} else if (typeof value === 'string') {
numTokens += this.getTokenCount(value);
} else if (typeof value === 'number') {
numTokens += this.getTokenCount(value.toString());
} else if (typeof value === 'boolean') {
numTokens += this.getTokenCount(value.toString());
}
};
let numTokens = tokensPerMessage;
for (let [key, value] of Object.entries(message)) {
processValue(value);
if (key === 'name') {
numTokens += tokensPerName;
}
}
return numTokens;
}
/**
* Merges completion content with existing content when editing TEXT or THINK types
* @param {Array} existingContent - The existing content array
* @param {Array} newCompletion - The new completion content
* @param {string} editedType - The type of content being edited
* @returns {Array} The merged content array
*/
mergeEditedContent(existingContent, newCompletion, editedType) {
if (!newCompletion.length) {
return existingContent.concat(newCompletion);
}
if (editedType !== ContentTypes.TEXT && editedType !== ContentTypes.THINK) {
return existingContent.concat(newCompletion);
}
const lastIndex = existingContent.length - 1;
const lastExisting = existingContent[lastIndex];
const firstNew = newCompletion[0];
if (lastExisting?.type !== firstNew?.type || firstNew?.type !== editedType) {
return existingContent.concat(newCompletion);
}
const mergedContent = [...existingContent];
if (editedType === ContentTypes.TEXT) {
mergedContent[lastIndex] = {
...mergedContent[lastIndex],
[ContentTypes.TEXT]:
(mergedContent[lastIndex][ContentTypes.TEXT] || '') + (firstNew[ContentTypes.TEXT] || ''),
};
} else {
mergedContent[lastIndex] = {
...mergedContent[lastIndex],
[ContentTypes.THINK]:
(mergedContent[lastIndex][ContentTypes.THINK] || '') +
(firstNew[ContentTypes.THINK] || ''),
};
}
// Add remaining completion items
return mergedContent.concat(newCompletion.slice(1));
}
async sendPayload(payload, opts = {}) {
if (opts && typeof opts === 'object') {
this.setOptions(opts);
}
return await this.sendCompletion(payload, opts);
}
async addDocuments(message, attachments) {
const documentResult = await encodeAndFormatDocuments(
this.options.req,
attachments,
{
provider: this.options.agent?.provider ?? this.options.endpoint,
endpoint: this.options.agent?.endpoint ?? this.options.endpoint,
useResponsesApi: this.options.agent?.model_parameters?.useResponsesApi,
model: this.modelOptions?.model ?? this.model,
},
getStrategyFunctions,
);
message.documents =
documentResult.documents && documentResult.documents.length
? documentResult.documents
: undefined;
return documentResult.files;
}
async addVideos(message, attachments) {
const videoResult = await encodeAndFormatVideos(
this.options.req,
attachments,
{
provider: this.options.agent?.provider ?? this.options.endpoint,
endpoint: this.options.agent?.endpoint ?? this.options.endpoint,
},
getStrategyFunctions,
);
message.videos =
videoResult.videos && videoResult.videos.length ? videoResult.videos : undefined;
return videoResult.files;
}
async addAudios(message, attachments) {
const audioResult = await encodeAndFormatAudios(
this.options.req,
attachments,
{
provider: this.options.agent?.provider ?? this.options.endpoint,
endpoint: this.options.agent?.endpoint ?? this.options.endpoint,
},
getStrategyFunctions,
);
message.audios =
audioResult.audios && audioResult.audios.length ? audioResult.audios : undefined;
return audioResult.files;
}
/**
* Extracts text context from attachments and sets it on the message.
* This handles text that was already extracted from files (OCR, transcriptions, document text, etc.)
* @param {TMessage} message - The message to add context to
* @param {MongoFile[]} attachments - Array of file attachments
* @returns {Promise<void>}
*/
async addFileContextToMessage(message, attachments) {
const fileContext = await extractFileContext({
attachments,
req: this.options?.req,
tokenCountFn: (text) => countTokens(text),
});
if (fileContext) {
message.fileContext = fileContext;
}
}
async processAttachments(message, attachments) {
const categorizedAttachments = {
images: [],
videos: [],
audios: [],
documents: [],
};
const allFiles = [];
const provider = this.options.agent?.provider ?? this.options.endpoint;
const isBedrock = provider === EModelEndpoint.bedrock;
if (!this._mergedFileConfig && this.options.req?.config?.fileConfig) {
this._mergedFileConfig = mergeFileConfig(this.options.req.config.fileConfig);
const endpoint = this.options.agent?.endpoint ?? this.options.endpoint;
this._endpointFileConfig = getEndpointFileConfig({
fileConfig: this._mergedFileConfig,
endpoint,
endpointType: this.options.endpointType,
});
}
for (const file of attachments) {
/** @type {FileSources} */
const source = file.source ?? FileSources.local;
if (source === FileSources.text) {
allFiles.push(file);
continue;
}
if (
file.embedded === true ||
file.metadata?.codeEnvRef != null ||
file.metadata?.fileIdentifier != null
) {
allFiles.push(file);
continue;
}
if (file.type.startsWith('image/')) {
categorizedAttachments.images.push(file);
} else if (file.type === 'application/pdf') {
categorizedAttachments.documents.push(file);
allFiles.push(file);
} else if (isBedrock && isBedrockDocumentType(file.type)) {
categorizedAttachments.documents.push(file);
allFiles.push(file);
} else if (file.type.startsWith('video/')) {
categorizedAttachments.videos.push(file);
allFiles.push(file);
} else if (file.type.startsWith('audio/')) {
categorizedAttachments.audios.push(file);
allFiles.push(file);
} else if (
file.type &&
this._mergedFileConfig &&
this._endpointFileConfig?.supportedMimeTypes &&
this._mergedFileConfig.checkType(file.type, this._endpointFileConfig.supportedMimeTypes)
) {
categorizedAttachments.documents.push(file);
allFiles.push(file);
}
}
const [imageFiles] = await Promise.all([
categorizedAttachments.images.length > 0
? this.addImageURLs(message, categorizedAttachments.images)
: Promise.resolve([]),
categorizedAttachments.documents.length > 0
? this.addDocuments(message, categorizedAttachments.documents)
: Promise.resolve([]),
categorizedAttachments.videos.length > 0
? this.addVideos(message, categorizedAttachments.videos)
: Promise.resolve([]),
categorizedAttachments.audios.length > 0
? this.addAudios(message, categorizedAttachments.audios)
: Promise.resolve([]),
]);
allFiles.push(...imageFiles);
const seenFileIds = new Set();
const uniqueFiles = [];
for (const file of allFiles) {
if (file.file_id && !seenFileIds.has(file.file_id)) {
seenFileIds.add(file.file_id);
uniqueFiles.push(file);
} else if (!file.file_id) {
uniqueFiles.push(file);
}
}
return uniqueFiles;
}
/**
* @param {TMessage[]} _messages
* @returns {Promise<TMessage[]>}
*/
async addPreviousAttachments(_messages) {
if (!this.options.resendFiles) {
return _messages;
}
const seen = new Set();
const attachmentsProcessed =
this.options.attachments && !(this.options.attachments instanceof Promise);
if (attachmentsProcessed) {
for (const attachment of this.options.attachments) {
seen.add(attachment.file_id);
}
}
/**
*
* @param {TMessage} message
*/
const processMessage = async (message) => {
if (!this.message_file_map) {
/** @type {Record<string, MongoFile[]> */
this.message_file_map = {};
}
const fileIds = [];
for (const file of message.files) {
if (seen.has(file.file_id)) {
continue;
}
fileIds.push(file.file_id);
seen.add(file.file_id);
}
if (fileIds.length === 0) {
return message;
}
const files = await db.getFiles(
{
file_id: { $in: fileIds },
},
{},
{},
);
await this.addFileContextToMessage(message, files);
await this.processAttachments(message, files);
this.message_file_map[message.messageId] = files;
return message;
};
const promises = [];
for (const message of _messages) {
if (!message.files) {
promises.push(message);
continue;
}
promises.push(processMessage(message));
}
const messages = await Promise.all(promises);
this.checkVisionRequest(Object.values(this.message_file_map ?? {}).flat());
return messages;
}
}
module.exports = BaseClient;