mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-07-02 04:12:36 +00:00
Some checks are pending
Docker Dev Branch Images Build / build (Dockerfile, lc-dev, node) (push) Waiting to run
Docker Dev Branch Images Build / build (Dockerfile.multi, lc-dev-api, api-build) (push) Waiting to run
GitNexus Index / index (push) Waiting to run
GitNexus Index / post-index (push) Blocked by required conditions
* chore: add @langchain/langgraph-checkpoint-mongodb for HITL durable resume
* feat: HITL tool approval runtime — backend (Slice B)
- endpoints.agents.checkpointer config + durable Mongo checkpointer (seam over the app
connection; SDK MemorySaver fallback) with a TTL index + deleteThread pruning
- HITL run wiring (PreToolUse policy hook + humanInTheLoop) attached in createRun, fully
inert when toolApproval.enabled is off
- interrupt gate (pause job -> requires_action + emit on_pending_action) and a resume
route that rebuilds the run from the durable checkpoint and run.resume()s it
- atomic single-winner resolve; agent-consistency guard; expireStaleApprovals terminal
event; checkpoint pruned on every non-paused completion (thread_id == conversationId)
* feat: HITL tool approval UI — frontend (Slice B)
approve/reject/edit/respond + ask-user controls in the tool card (OAuth-button precedent),
batch-aware single submit, live + reconnect (resumeState.pendingAction) wiring, and resume
mutations posting to /agents/chat/resume.
* fix(hitl): decouple ApprovalProvider from chat context
ApprovalProvider is now pure state (safe to mount in provider-less / shared / test
renders); the context-dependent submit moved to a useResumeSubmit hook the cards call.
Part imports getAskUserQuestionPart from ~/utils/approval directly so suites that
partial-mock ~/utils render Part without throwing.
* fix(hitl): address Codex review — backend
- P1: enforce per-tool allowed_decisions on resume (reject a crafted decision the
policy disallows) via findDisallowedDecisions
- prune the durable checkpoint on user-abort of a paused run, and before a fresh
HITL turn, so a new turn cannot rehydrate an expired/aborted interrupt (thread_id
is the stable conversationId)
- persist + use isTemporary and the original parentMessageId on resume (temporary
chats stay temporary; initializeAgent scopes thread files off the right parent)
- generate a deferred first-turn title BEFORE completeJob so its event reaches the
client and the final event carries the real title
- moderateText: skip when there is no text (tool-approval resume) and moderate the
ask-user answer, instead of denying on an empty input
* fix(hitl): address Codex review — frontend
- render ToolApproval for ANY paused agent tool card (bash/code/file/etc.), not just
the generic ToolCall, by wrapping the tool-card branch in Part (moved the rendering
out of ToolCall)
- findPendingActionMessageIndex only matches an assistant message, never the user
message (the underscore-strip could target the user bubble before the assistant
placeholder exists)
* fix(hitl): address Codex re-review
- title eligibility checks the user message’s parent (first turn), not the response’s
parent — the previous check could never be true and skipped title generation
- use client.buildResponseMetadata() for the resumed message so contextUsage /
thoughtSignatures survive (the abort-only helper dropped them)
- moderate decisions[].responseText (the respond action’s user text)
- give /chat/abort req.config (configMiddleware) so the HITL checkpoint prune on abort
actually runs
- read resume state BEFORE setContentParts so the in-memory store does not lose the
pre-pause seed content
- count resumes against LIMIT_CONCURRENT_MESSAGES (increment/decrement) so paused-then-
resumed turns cannot bypass the limit
- require actionId on resume so a body without it cannot resolve the current action
* fix(hitl): address Codex re-review (round 3) — resume fidelity
Bring the lean resume path to parity with sendMessage for things it bypassed:
- carry userMCPAuthMap into the rebuilt run so approved MCP tools keep the user's creds
- seed initialSessions (buildInitialToolSessions) so approved code/file/skill tools have
the pre-pause uploaded-file context (esp. cross-replica / after restart)
- await client.artifactPromises and persist them as response attachments (else tool
artifacts created after the pause vanish on reload / for late subscribers)
- merge metadata: cumulative usage (+ summary marker) from the job, contextUsage /
thoughtSignatures from the client — fixes the round-2 regression that underreported
post-resume cost
* fix(hitl): address Codex re-review (round 4) — resume hardening
- resume: require an EXACT paused agent_id match (reject omitted/ephemeral
agent_id, not just a different one) and reject an endpoint mismatch, so a
request can't rebuild the claimed checkpoint on a different graph
- moderateText: also moderate a tool-approval decision's reject `reason` and
stringified `editedArguments`, not just `responseText`
- request: re-mark the paused response `unfinished:true` after BaseClient saves
it as completed, so an expired / never-resumed approval doesn't leave a
"finished" response in history; the resume path overwrites it on success
* test(hitl): route-level integration test for the resume controller
Adds api/server/controllers/agents/__tests__/resume.spec.js, a supertest
integration test that drives the real ResumeAgentController over the full
pause -> approve -> resume -> finalize lifecycle with the SDK run, durable
checkpointer, Mongo, and concurrency cache mocked. The pure decision/liveness
helpers run for real via requireActual, so the guard ladder is exercised end to
end rather than stubbed.
25 cases covering:
- the authorization / staleness / agent-and-endpoint / actionId guard ladder
- tool_approval validation (undecided tool call, policy-disallowed decision)
- ask_user_question answer requirement
- the concurrency gate (429) and the atomic single-winner claim (409)
- the happy path: ACK, run reconstruction, decision->SDK mapping, finalize
(save the now-finished response, emit done, complete job, prune checkpoint)
- first-turn title generation before stream completion
- re-pause (no double finalize), abort-during-resume (no double finalize),
and the resume-failure terminal path (emitError + completeJob + prune)
* test(hitl): strengthen resume coverage + add approval util tests
Acts on a self-audit of the new resume integration test.
resume.spec.js (25 -> 32 cases):
- replace the tautological emitDone assertion (it only checked the hardcoded
`final: true`) with a structural check of the finalEvent payload —
responseMessage content/id/unfinished, requestMessage identity, title
- cover the previously-unwalked finalize branches: tool-artifact attachments
(null-filtered), the aggregatedContent fallback when live content is empty,
and client response-metadata attachment
- add guard cases: unsupported pending-action type (400) and the
pre-multi-tenancy null-tenantId pass-through (must not 403)
- add error-path cases: first-turn title generation throwing must still
finalize, and a completeJob failure during a resume error must force a
terminal job state via the last-resort updateJob
client/src/utils/approval.spec.ts (new, 15 cases):
- applyPendingAction tool_approval: join by tool_call_id not position,
skip completed calls, default allowed_decisions to [], referential
stability when nothing changes
- applyPendingAction ask_user_question: append, idempotent replace on replay,
non-array content coercion
- getAskUserQuestionPart type guard; findPendingActionMessageIndex
assistant-only resolution (never resolves to the user bubble)
* fix(hitl): address Codex re-review (round 5)
Five findings verified against the code before fixing:
- resume: require an EXACT endpoint match (like agent_id) — a resume that OMITS
endpoint must not fall through, since the shared chat middleware treats a
missing/non-agents endpoint as the ephemeral agent and could rebuild the
claimed checkpoint on a different graph
- resume: filter malformed content parts before saving the finished response,
matching the normal AgentClient path (a resumed turn could otherwise persist
an empty/invalid tool_call part that breaks reload/rendering)
- resume: accumulate tool artifacts across pause segments — persist them on
re-pause and MERGE (not overwrite) at finalize, so artifacts produced before
a second approval pause aren't dropped by the next rebuilt client
- approval (client): findPendingActionMessageIndex returns -1 when a provided
responseMessageId isn't found, so the caller retries instead of attaching the
prompt/approval to a prior assistant reply; fall back to the last assistant
only when no responseMessageId is given
- RedisJobStore: make appendChunk extend-only (XADD + EXPIRE-if-shorter via a
single eval) so the on_pending_action chunk emitted after a pause can't reset
the chunk-stream TTL back to the running window and evict pre-pause content
before the approval is resolved
Tests: +endpoint-omitted/unsupported-type/malformed-filter/attachment-merge/
re-pause-persist cases in resume.spec.js (36); ask-retry -1 semantics in
approval.spec.ts (16); extend-only TTL assertion in the RedisJobStore Redis
integration spec.
* test(hitl): mongodb-memory-server integration test for the checkpointer seam
The checkpointer unit spec covers config/selection with no DB connection; this
exercises the durable Mongo seam against a real (in-memory) MongoDB — the part
correctness actually depends on:
- getAgentCheckpointer builds a real MongoDBSaver when Mongo is connected and
setup() creates the TTL index (expireAfterSeconds) on the checkpoint collection
- memory type returns undefined (SDK MemorySaver fallback) even when connected
- saver is memoized per resolved config
- deleteAgentCheckpoint prunes a thread's persisted checkpoint (the cross-turn
isolation guarantee: turn N+1 on the same conversationId can't rehydrate it)
- pruning is thread-scoped — deleting one conversation leaves others intact
- undefined threadId is a no-op
* fix(hitl): address Codex re-review (round 6)
Four findings verified against the code before fixing:
- messageFilterPii: scan the resume payload's user-authored text (ask-user
`answer`, and a tool-approval decision's `respond` text, `reject` reason, and
edited tool arguments) — the shared /resume route ran through the PII filter
but it only inspected req.body.text, so a blocked token rode the resume
payload back into the model/tool (mirrors the earlier moderateText fix)
- resume: re-prime skill files invoked in the pre-pause segment before rebuilding
the run, so an approved code/file-backed tool keeps the injected skill-file
session refs instead of running without them (mirrors the normal path's
primeInvokedSkills; the pre-pause content stands in for the message payload)
- hitl: pin the graph identity. Persist a fingerprint of the graph-determining
request fields (endpoint, agent_id, model, spec, ephemeralAgent — normalized)
on the pending action at pause, and reject a resume whose recomputed
fingerprint differs. This closes the ephemeral-agent gap, where agent_id is
undefined so the id guard can't tell two ephemeral configs apart
- resume: reject incomplete edit/respond decisions (findIncompleteDecisions) —
an `edit` without an object editedArguments or a `respond` without non-empty
responseText is 400'd before mapping, rather than defaulting to {} / '' and
resuming with behavior the user never approved
Tests: incomplete-decision + fingerprint match/mismatch cases in resume.spec.js
(41); findIncompleteDecisions + computeAgentRequestFingerprint unit tests; and
resume-field PII cases in messageFilterPii.spec.ts.
* fix(hitl): address Codex re-review (round 7)
Four findings verified against the code before fixing:
- RedisJobStore: clear `agent_id` on createJob (add it to staleHitlFields). The
job hash is keyed by conversationId and reused across turns; updateMetadata
only writes agent_id when truthy, so a conversation that switched from a saved
agent to an ephemeral/no-agent turn kept the old id and the resume guard
rejected the valid pause as a different agent. (real correctness bug)
- fingerprint: include `promptPrefix` in computeAgentRequestFingerprint, and
re-send it on resume (ResumeAgentFields + buildResumeFields). Ephemeral agents
derive their system instructions from promptPrefix, so a resume changing it
previously passed the pin and rebuilt different instructions. (completes the
round-6 fingerprint)
- resume: the re-pause branch now persists the segment's accumulated CONTENT
(filtered), not just artifacts, so an approval that expires/reaps without a
final resume no longer loses everything streamed during the resumed segment.
- request: carry `manualSkills`/`alwaysAppliedSkills` on the persisted user
message so a resumed turn's reconstructed requestMessage keeps its skill pills
instead of dropping them until a full reload.
Deferred (narrow, no safe contained fix yet — see PR thread replies):
- resume rebuild without `addedConvo` for a multi-conversation/added-agent pane
- cross-replica re-prime of manually-selected (not model-invoked) skill files
Tests: stale-agent createJob clearing (Redis integration), promptPrefix
fingerprint match/mismatch (resume.spec.js + policy.spec.ts), re-pause content
persistence (resume.spec.js).
* fix(hitl): address Codex re-review (round 8)
Five findings verified against the code before fixing; the headline is a durable-
resume correctness fix (the fingerprint had surfaced it as a 403):
- resume durability (the important one): persist the graph-determining request
fields (endpoint, agent_id, model, spec, promptPrefix, ephemeralAgent) on the
pending action as `resumeContext`, and REPLAY them onto the resume request via
a router-level middleware that runs before buildEndpointOption. The client
can't reconstruct the ephemeral-agent config after a reload/cross-session, so
the round-6/7 fingerprint would 403 a valid durable resume — and even without
it the rebuilt agent would lose its tools. Replaying server-side rebuilds the
SAME graph regardless of client state (and a crafted resume can't swap it; the
fingerprint still matches because the body is restored first).
- RedisJobStore: also clear `isTemporary` on createJob (same class as agent_id):
a prior temporary turn's flag would otherwise survive a reused conversation
hash and a later non-temporary resume would save its response as temporary.
- resume: persist `contextMeta` (context-window calibration) onto the saved
response like BaseClient does, so the next turn can seed its pruner.
- request: carry manualSkills/alwaysAppliedSkills into the onStart metadata
update (not just the preliminary one it overwrites), so a resumed turn's
requestMessage keeps its skill pills.
Deferred (narrow — see thread reply):
- saved-agent edited WHILE a run is paused: agent_id matches but the definition
changed; needs an agent version/config hash, which is a larger change for a
narrow window.
Tests: resumeContext pick/apply + round-trip (policy.spec.ts), contextMeta +
manualSkills-on-requestMessage (resume.spec.js), isTemporary clearing (Redis
integration).
* style(hitl): prettier line-wrap in policy.spec.ts (R8 lint fix)
* fix(hitl): address Codex re-review (round 9)
Five findings, all fixed (addedConvo — deferred in rounds 7/8 — is now trivial
thanks to the round-8 replay):
- replay addedConvo: add it to RESUME_CONTEXT_KEYS so the resume middleware
restores the parallel/secondary-agent config from the paused request; the
client can't reconstruct it, and it determines the rebuilt graph.
- skill pills (the real fix this time): the round-8 onStart metadata write was
overwritten by trackUserMessage (the authoritative userMessage writer). Carry
manualSkills/alwaysAppliedSkills in the emitted `created` message and persist
them in trackUserMessage; widen UserMessageMeta + SerializableJobData.userMessage.
- execute-code files on resume: seed the paused user message's own files onto
req.body.files before initializeClient — they're excluded from the
parent-walk code-session rebuild, so an approved code/read-file tool would
otherwise resume without them.
- in-memory pending-action UI: route ApprovalEvents.ON_PENDING_ACTION in the
resume replay/pending-event loops to applyPendingActionToMessages (mirror the
live handler), so a pause that lands in the snapshot window still renders its
approval controls instead of sitting paused with no UI.
- abort isTemporary: the /chat/abort partial-save now sources isTemporary from
the job metadata, not req.body (the stop button posts only conversationId), so
aborting a paused temporary chat no longer persists an orphaned partial.
Tests: addedConvo in pickResumeContext (policy.spec.ts), file-restore on resume
(resume.spec.js), abort-from-job-isTemporary (abort.spec.js).
* fix(hitl): address Codex re-review (round 10) — resume/expiry races
Three concurrency/coherence findings, verified against the code before fixing:
- expiry-sweep CAS scope: both stale-approval sweeps (GenerationJobManager
expireStaleApprovals and the RedisJobStore requires_action cleanup) called
expire()/transitionStatus WITHOUT the observed pendingAction.actionId, so the
CAS only checked status===requires_action. Between the read and the CAS a user
could resolve the observed action and the run re-pause on a FRESH action; the
stale sweep would then abort that valid new pause. Now both pass the observed
actionId as expectActionId, so the CAS only fires for the action read as stale
(a re-paused action has a different id → no-op).
- resume graph cache: resumeCompletion cached the rebuilt graph (created with
messages:[]) via setGraph; RedisJobStore.getContentParts prefers a cached
graph over reconstructing from the chunk log, so a same-replica reload/status
poll mid-resume returned aggregatedContent missing the pre-pause content. Skip
setGraph on resume so introspection falls back to the complete chunk
reconstruction (setContentParts still seeds the in-memory store).
- pending-action UI: applyPendingActionToMessages scheduled a SINGLE
animation-frame retry then dropped the pending action; Recoil/React updates can
take several frames under load, leaving a valid requires_action run with no
approval controls. Retry across frames (bounded at 120) until the target
message commits.
Test: expire() with a mismatched expectedActionId no-ops while the matching id
expires (pendingAction.spec.ts).
* chore(deps): update @librechat/agents to version 3.2.53 and @langchain/langgraph to version 1.4.7 in package-lock.json and related package.json files
* refactor(hitl): add resolveToolApprovalPolicy seam for layered policy
Extract the single point where tool-approval policy is resolved for a turn
(`resolveToolApprovalPolicy`) and route the run call site through it instead
of reading `endpoints.agents.toolApproval` inline.
Behaviour-preserving: only the `endpoint` layer is wired today, so the result
is identical to reading the app policy directly. The `agent` and `skills`
layers are reserved seams with documented precedence (endpoint owns the
`enabled` kill switch; agent overrides mode/allow/deny/ask/reason; skills may
only tighten), so future per-agent and per-skill policy plumbing lands in one
function rather than at the `createRun` site. Adds focused unit tests.
* fix(hitl): address Codex re-review (round 11) — resume hardening
F1 (P2, security) — applyResumeContext now DELETES any RESUME_CONTEXT_KEY
absent from the persisted context, so the resume body carries exactly the
graph-determining fields the pause had. Previously only defined keys were
overwritten, leaving a client-supplied `addedConvo` (which the request
fingerprint does not cover) in place — a crafted resume could rebuild a
single-agent checkpoint as a different multi-agent graph/tool set.
F3 (P2) — the resume route ACKs (res.json) before initializeClient, so a
post-ACK getMCPRequestContext(req, res) saw the response as finished and
returned undefined, leaving the resumed run without its run-scoped MCP
connection store (approved MCP / OAuth-overlay tools then ran without their
request-scoped connections). Pre-seed the store with a null res +
cleanupOnResponse:false before the ACK and tear it down in the finally,
mirroring the normal stream path (request.js). userMCPAuthMap was already
preserved separately, so credentials were not lost — only the connection store.
Declined: the ApprovalContext NEW_CONVO guard (P2) is a false positive — the
`created` SSE event updates the conversation atom before any pause renders, so
the id is concrete by click time (details in the PR thread).
Tests: policy.spec (absent-key delete) + resume.spec (MCP context pre-seed/cleanup order).
* fix(hitl): address Codex re-review (round 12) — resume fidelity + multi-tool UI
F4 (P2) — temporal prompt vars: resume rebuilt the agent without restoring
req.conversationCreatedAt or req.body.timezone, so {{current_datetime}}-style
vars compiled a different system prompt than the paused graph (resume wall-clock,
unzoned). Add 'timezone' to RESUME_CONTEXT_KEYS (persisted at pause, replayed by
the resume middleware) and restore conversationCreatedAt from the convo before
initializeClient — mirroring the normal path's resolveConversationCreatedAt.
F5 (P2) — multi-tool approval: applyPendingActionToMessages stopped retrying once
ANY tool-call part was tagged, so siblings that rendered on later frames never got
approval controls and the resume route 400'd the partial batch. Add
countTaggedApprovalParts and keep the bounded RAF retry going until every
action_request is tagged (ask_user_question unchanged — one synthetic part).
F6 (P3) — Edit accepted `null`/`[]` (valid JSON, non-object), enabling Submit for
a value the resume route rejects via findIncompleteDecisions. Mirror the server's
plain-object check in the client (store + editIsValid) so Submit only enables for
an accepted value.
Tests: policy.spec (timezone round-trip), resume.spec (conversationCreatedAt
restore), approval.spec (countTaggedApprovalParts).
* fix(hitl): address Codex re-review (round 13) — recurse into subagent approvals
F9 (P2) — a tool paused INSIDE a subagent has its tool_call_id in the parent
subagent tool_call's nested `subagent_content`, not as a top-level message part.
applyToolApproval and countTaggedApprovalParts only scanned top-level content, so
the approval never attached and the round-12 retry loop counted 0 tagged parts and
spun to its frame cap with no controls. Both now recurse into `subagent_content`
(immutably, so React refs update): the nested call gets tagged and is counted, so
the retry terminates. Added approval.spec cases for the nested tag + count.
Note: surfacing the interactive approve/reject controls inside the subagent view is
a deliberate follow-up — ToolApproval -> useResumeSubmit -> useChatContext crashes
when rendered in the portaled subagent dialog (outside the chat/approval providers),
so that needs the controls scoped to the in-provider inline render (or the dialog
wrapped with the providers). This commit fixes the data/traversal layer only.
F7 (discovered-tool history on resume) and F8 (redis chunk TTL pause race) were
verified false positives — see the PR threads.
* fix(hitl): address Codex re-review (round 14) — resume fidelity + expiry relay
F13 (P2) — manualSkills are graph-determining (skill allowed-tools union into the
tool set before tools load) but weren't replayed, so a reload lost the skill tools
and a crafted resume could inject a different skill past the fingerprint. Add
'manualSkills' to RESUME_CONTEXT_KEYS (same replay-only pattern as timezone/
addedConvo; the delete-absent half blocks injection). Not alwaysAppliedSkills —
that's resolved server-side from the DB, not req.body.
F12 (P2) — the resume final SSE built requestMessage from job.metadata.userMessage
(persisted without files), so attachments vanished from the user bubble on resume.
Spread the already-restored req.body.files onto it, matching the normal path.
F11 (P2) — multi-replica approval expiry: RedisJobStore.cleanupRequiresActionIndex
on another replica can win the requires_action->aborted CAS (it sets the hash error
but has no event transport), and the local sweep then skips because the job is no
longer requires_action, so a client subscribed here never gets the terminal error
until the reap path. expireStaleApprovals now relays APPROVAL_EXPIRED_ERROR for a
locally-subscribed job already aborted FOR approval expiry (error-string gated,
idempotent via the errorEvent flag). emitError already publishes cross-replica.
Tests: policy.spec (manualSkills round-trip + inject-drop), resume.spec (final
requestMessage carries restored files).
* fix(hitl): render approval controls for subagent-nested tool pauses (F10)
Round-13 made applyToolApproval/countTaggedApprovalParts recurse into
subagent_content (data), but SubagentDialogPart rendered nested TOOL_CALL parts
with <ToolCall> only and never mounted <ToolApproval>, so a tool paused inside a
subagent showed no controls and the run was unresolvable.
Render <ToolApproval> in SubagentDialogPart's TOOL_CALL branch when the nested
tool_call carries an approval and isn't yet resolved, mirroring the top-level
Part.tsx render. The subagent dialog portals (OGDialog → ReactDOM.createPortal),
but React context flows through the React tree, not the DOM tree, so ToolApproval
resolves ApprovalProvider/ChatContext and the controls work + submit.
Also harden useResumeSubmit: read ChatContext via useContext (non-throwing)
instead of the throwing useChatContext wrapper, so the cards never crash when
rendered outside a ChatContext.Provider (e.g. a search/citation render that passes
chat context as a prop) — they degrade to inert (buildResumeFields returns null).
* style(hitl): re-sort run.ts imports after dev rebase
* fix(hitl): address Codex re-review (round 15) — resume content fidelity
F14 (P2) — hide_sequential_outputs was applied in chatCompletion before
saving/emitting content but not on resume, so a sequential-agent chain that
pauses for HITL and resumes persisted/emitted intermediate outputs the setting
is meant to hide. Extracted the filter into applyHideSequentialOutputsFilter()
and call it from both chatCompletion and resumeCompletion (after handleRunInterrupt,
covering the finalize + re-pause reads of client.contentParts).
F16 (P2) — on a reloaded HITL pause, the DB already holds the paused user row +
partial assistant row; useResumeOnLoad fed those as submission.messages, then
finalHandler/createdHandler appended the same pair via requestMessage/responseMessage,
duplicating the turn (buildTree doesn't dedupe children by messageId). buildSubmission-
FromResumeState now strips the paused user/response rows (by messageId, incl. the
padded/unpadded response id) from submission.messages — they're re-supplied by the
placeholders + final event. Frontend-only; live (non-reload) pause path untouched.
Deferred: F15 (collapsed-card subagent approval registration/visibility) — see thread.
Tests: client.test (filter keeps last + tool_call parts / no-op when off),
useResumeOnLoad.spec (paused pair stripped from submission.messages).
* fix(hitl): address Codex re-review (round 16) — chunk TTL, slot, job replacement
F17 (P2) — chunk-stream TTL on pause-before-chunk. CHUNK_APPEND_LUA derived its
ceiling only from the chunk key's current TTL, so when the chunks key didn't exist
at pause (fire-and-forget append in flight, or an ask-user pause before any chunk),
the on_pending_action append created the stream with only the 20m running TTL while
the approval window is 24h — content evicted before resume. The Lua now also reads
the job key (KEYS[2]); when status == requires_action it takes max(running, TTL(jobKey))
(the approval window transitionStatus set), else the running TTL. Extend-only preserved;
gated on paused status so normal runs never inflate. Both keys share {streamId} (cluster-safe).
F19 (P2) — with LIMIT_CONCURRENT_MESSAGES, the approval prompt was emitted before the
original request released its slot, so a fast Approve got /resume 429'd. handleRunInterrupt
now releases the slot (idempotent via pendingRequestReleased) right after the pause, before
the prompt; the request.js pause branch and resume.js finally only release if it didn't
(no double-release).
F20 (P2) — finalizeResumedTurn never checked the job wasn't replaced before emitDone/
completeJob/saveMessage, so a stale resume could clobber a newer turn that reused the
conversationId. Added the createdAt guard the normal request path uses (skip finalization
when the live job's createdAt != the paused job's).
Deferred: F18 (subagent_content not reconstructed on Redis resume) — joins the subagent
cluster (F15). See thread.
Tests: RedisJobStore integration (pause-before-chunk gets approval TTL; running stays short),
resume.spec (skip finalization on replacement; no double slot release on re-pause).
* 🛡️ fix: Guard HITL terminal side-effects against job replacement
Jobs are keyed by streamId == conversationId, so a new request REPLACES the
running one on the same conversation. The replaced generation's tail must not
clobber the live generation's state. Each path now re-reads the live job and
compares createdAt against the generation's captured identity before acting.
- Thread the generation's createdAt onto the client (request.js + resume.js)
as client.jobCreatedAt — the identity every guard compares against.
- handleRunInterrupt: skip approvals.pause when this run is no longer the live
job, so a stale interrupt can't flip the NEWER job to requires_action.
- chatCompletion finally: skip the checkpoint prune when replaced, so an older
run's late finally can't delete the newer run's resume checkpoint.
- resume catch-path: gate emitError/completeJob/prune behind a stillLive check
(fail-open if the read throws), mirroring finalizeResumedTurn's success guard.
- Persist the turn's uploaded files on job.metadata.userMessage (authoritative
trackUserMessage writer) and prefer them on resume over the user DB row, whose
save can still be racing a fast /resume.
Tests: 13 guard-predicate cases in jobReplacement.spec.js.
* 🔁 fix: Harden HITL resume — ownership re-check, file seeding, deferred-tool replay
Three follow-ups to the round-17 job-replacement guards (Codex review 4594099963):
- G1 (resume.js): the success-path ownership guard runs at the START of
finalizeResumedTurn, but saveMessage + first-turn title generation await long
enough for a new request to replace the job on the same conversationId. Re-read
the live job immediately before emitDone/completeJob/prune so the terminal writes
can't tear down the REPLACEMENT job — mirrors the catch-path guard.
- G2 (request.js): onStart's metadata/chunk writes that persist the turn's files
are fire-and-forget, so a fast approval could read job.metadata.userMessage before
files landed. Seed files into getPreliminaryUserMessage instead — that write is
AWAITED before the run starts, so files are durable before any interrupt can emit.
- G3 (run.ts + client.js + resume.js + IJobStore.ts): the resumed graph is rebuilt
with messages: [], so createRun's tool_search-discovery scan finds nothing. A
deferred tool discovered earlier in the turn (and targeted by the paused call) was
therefore absent from the rebuilt schema-only toolMap — resume would throw "unknown
tool" (no loadRuntimeTools fallback is wired). Capture discovered tool names at
pause via extractDiscoveredToolsFromHistory(run.getRunMessages()), persist them on
job.metadata.discoveredTools, and replay them into createRun's new discoveredToolNames
input (merged with message-extracted names, gated on hasAnyDeferredTools — inert
otherwise). A new createRun test proves the deferred tool is promoted with the replay
and absent without it (reproducing the bug).
Tests: real createRun deferred-replay suite (run-summarization.test.ts) + G1/G2/G3
guard predicates (jobReplacement.spec.js). Full suite green.
* 🔒 fix: Close HITL resume metadata + file-substitution + pause-race gaps
Four findings on the round-18 commit (Codex review 4594430222):
- H1 (P1, regression in round-18 G3): the discoveredTools captured at pause never
reached resume — three metadata allowlists dropped it: GenerationJobManager
.updateMetadata, RedisJobStore.deserializeJob, and buildJobFacade (plus the
GenerationJobMetadata type). Added discoveredTools to all four, so the deferred-tool
replay actually works end-to-end (in-memory store already kept it via Object.assign).
- H2 (P2, security): /resume honored a client-supplied `files` array, letting a crafted
client resume an approved code/read-file tool against a DIFFERENT file set than the one
approved (files aren't in the resume fingerprint/context). Resume now ALWAYS sources
files from the paused job (metadata → DB row), clearing any client-supplied set.
- H3 (P2, ephemeral fidelity): non-default model parameters (temperature, max tokens,
custom endpoint params) were lost on resume — ephemeral agents derive them from the
request body, which the resume payload omits. Capture the resolved model_parameters in
resumeContext at pause and replay them onto the body on resume (excluding `model`, which
is replayed via the fingerprinted RESUME_CONTEXT_KEYS path). Saved agents already source
these from the DB.
- H4 (P2, Redis race): a pause landing between the resume snapshot and the Pub/Sub
subscription reached neither resumeState.pendingAction nor (Redis) pendingEvents, and
approval events aren't persisted to replayEvents — the client attached to a paused job
with no approval UI. subscribeWithResume now re-reads the live job AFTER subscribing and
surfaces the pending action if the snapshot missed it (live read, no staleness).
Tests: discoveredTools metadata round-trip + subscribeWithResume re-read (pendingAction
.spec.ts); client-file substitution rejection (resume.spec.js); model-parameter replay
predicate (jobReplacement.spec.js).
* 🧹 fix: Clear stale discovered tools, release slot on claim error, extend run-step TTL
Three follow-ups on the round-19 commit (Codex review 4594783691):
- I1 (P2): the round-19 discoveredTools field wasn't cleared on Redis streamId reuse.
HSET only overwrites listed fields and handleRunInterrupt only writes discoveredTools
when THIS turn discovers a deferred tool — so a replacement turn that pauses without its
own discovery inherited the prior run's tool names and force-loaded undiscovered deferred
tools on resume. Added discoveredTools to createJob's staleHitlFields HDEL list (the
in-memory store already builds a fresh object, so it was Redis-only).
- I2 (P2): with LIMIT_CONCURRENT_MESSAGES, approvals.resolve runs after the slot increment
but before the run's try/finally, so a store/Redis error there leaked the slot until the
counter TTL expired (spurious 429s on retry of the still-paused approval). Wrapped the
claim in try/catch that decrements the slot and returns 500.
- I3 (P3): saveRunSteps did SET ... EX running unconditionally, resetting the run-steps key
to the 20-min running TTL even while the job is paused for the longer approval window —
a reload after that window lost the tool timeline. Now uses a paused-window TTL script
mirroring the chunk-stream no-shrink behavior (extends to the approval window when the
job hash is requires_action).
Also fixes a latent strict-tsc cast error in the round-19 pendingAction test.
Tests: claim-throws-releases-slot (resume.spec.js); discoveredTools cleared on reuse +
saveRunSteps preserves the paused TTL (RedisJobStore integration, USE_REDIS).
* 🛡️ fix: Guard fast-resume save race, gate HITL to resumable routes, expire on stale submit
Three findings on the round-20 commit (Codex review 4595045652):
- J2 (P1): a fast /resume can claim + finalize the COMPLETED response while the original
request's pause branch is still awaiting `response.databasePromise`; the later
unfinished-save then overwrites the completed content. Re-check the job is still paused on
THIS generation's action (a claim leaves requires_action; a replacement bumps createdAt)
before marking the row unfinished; fail open on a read error.
- J3 (P1): the tool-approval wiring (humanInTheLoop + PreToolUse hook + checkpointer) was
applied to EVERY createRun caller when toolApproval.enabled, but the OpenAI-compatible and
Responses controllers never inspect run.getInterrupt() or persist a pending action — an
approval-gated tool would pause there with no approval surface or resume endpoint and the
route would emit a normal final response / [DONE] with the tool call dangling. Gate the
wiring on a new createRun `hitlCapable` flag, set only by AgentClient (chat + resume).
- J4 (P2): a stale-action 409 on submit returned without driving expiry, leaving the job
requires_action with a dead action until the periodic sweeper ran — any attached SSE client
got no terminal event and the stream appeared to hang. Extracted GenerationJobManager
.expireApproval(streamId, actionId) (expire CAS + terminal SSE, shared with the sweeper) and
call it from the resume route when the observed action is stale.
J1 (nested subagent approval controls not mounting while the details dialog is closed) is a
valid frontend issue in the deferred subagent-HITL path — tracked separately (replied on the
thread) since the fix touches the shared dialog primitive and needs UI verification.
Tests: HITL-gate both directions (run-summarization.test.ts); expire-on-stale-submit
(resume.spec.js); fast-resume unfinished-save guard predicate (jobReplacement.spec.js).
* 💄 style: Wrap captureAgents signature to satisfy prettier (CI lint)
2367 lines
90 KiB
JavaScript
2367 lines
90 KiB
JavaScript
require('events').EventEmitter.defaultMaxListeners = 100;
|
|
const { logger } = require('@librechat/data-schemas');
|
|
const { getBufferString, HumanMessage } = require('@librechat/agents/langchain/messages');
|
|
const {
|
|
createRun,
|
|
isEnabled,
|
|
checkAccess,
|
|
buildToolSet,
|
|
logToolError,
|
|
sanitizeTitle,
|
|
payloadParser,
|
|
createSafeUser,
|
|
initializeAgent,
|
|
resolveConfigHeaders,
|
|
countTokens,
|
|
getBalanceConfig,
|
|
omitTitleOptions,
|
|
getProviderConfig,
|
|
memoryInstructions,
|
|
createTokenCounter,
|
|
applyContextToAgent,
|
|
isMemoryAgentEnabled,
|
|
recordCollectedUsage,
|
|
sendEvent,
|
|
computeUsageCostUSD,
|
|
aggregateEmittedUsage,
|
|
resolveAgentTokenConfig,
|
|
buildPersistedContextUsage,
|
|
computeSummaryUsedTokens,
|
|
priorRunOutputTokens,
|
|
createSubagentUsageSink,
|
|
anyAgentReplaysReasoningContent,
|
|
GenerationJobManager,
|
|
getTransactionsConfig,
|
|
resolveRecursionLimit,
|
|
buildPendingAction,
|
|
computeAgentRequestFingerprint,
|
|
extractDiscoveredToolsFromHistory,
|
|
pickResumeContext,
|
|
getApprovalTtlMs,
|
|
isHITLEnabled,
|
|
deleteAgentCheckpoint,
|
|
getRequestMemories,
|
|
createMemoryProcessor,
|
|
agentHasInlineMemoryTools,
|
|
loadAgent: loadAgentFn,
|
|
createMultiAgentMapper,
|
|
filterMalformedContentParts,
|
|
countFormattedMessageTokens,
|
|
prependFileContext,
|
|
prependQuotes,
|
|
hydrateMissingIndexTokenCounts,
|
|
injectSkillPrimes,
|
|
collectFreshSkillPrimeNames,
|
|
isSkillPrimeMessage,
|
|
collectFileIds,
|
|
processTextWithTokenLimit,
|
|
buildAgentScopedContext,
|
|
buildSkillPrimeContentParts,
|
|
buildInitialToolSessions,
|
|
hasUrlContextTool,
|
|
appendYouTubeVideoParts,
|
|
resolveYouTubeInjectionConfig,
|
|
decrementPendingRequest,
|
|
} = require('@librechat/api');
|
|
const {
|
|
Callback,
|
|
Providers,
|
|
TitleMethod,
|
|
formatMessage,
|
|
formatAgentMessages,
|
|
createMetadataAggregator,
|
|
} = require('@librechat/agents');
|
|
const {
|
|
Constants,
|
|
UsageEvents,
|
|
Permissions,
|
|
VisionModes,
|
|
ContentTypes,
|
|
ApprovalEvents,
|
|
EModelEndpoint,
|
|
PermissionTypes,
|
|
AgentCapabilities,
|
|
isAgentsEndpoint,
|
|
isEphemeralAgentId,
|
|
removeNullishValues,
|
|
DEFAULT_MEMORY_MAX_INPUT_TOKENS,
|
|
} = require('librechat-data-provider');
|
|
const { filterFilesByAgentAccess } = require('~/server/services/Files/permissions');
|
|
const { encodeAndFormat } = require('~/server/services/Files/images/encode');
|
|
const { createContextHandlers } = require('~/app/clients/prompts');
|
|
const { resolveConfigServers } = require('~/server/services/MCP');
|
|
const { getMCPServerTools } = require('~/server/services/Config');
|
|
const BaseClient = require('~/app/clients/BaseClient');
|
|
const { getMCPManager } = require('~/config');
|
|
const db = require('~/models');
|
|
|
|
const loadAgent = (params) => loadAgentFn(params, { getAgent: db.getAgent, getMCPServerTools });
|
|
|
|
const MEMORY_INPUT_CHARS_PER_TOKEN = 8;
|
|
|
|
class AgentClient extends BaseClient {
|
|
constructor(options = {}) {
|
|
super(null, options);
|
|
/** The current client class
|
|
* @type {string} */
|
|
this.clientName = EModelEndpoint.agents;
|
|
|
|
/** @deprecated @type {true} - Is a Chat Completion Request */
|
|
this.isChatCompletion = true;
|
|
|
|
/** @type {AgentRun} */
|
|
this.run;
|
|
|
|
/** Resolves with the agent run once `chatCompletion` initializes it (or
|
|
* `null` if initialization fails), letting immediate-mode title generation
|
|
* await the run instead of throwing when fired before the run exists.
|
|
* @type {Promise<AgentRun | null> | null} */
|
|
this._runReady = null;
|
|
/** @type {((run: AgentRun | null) => void) | null} */
|
|
this._resolveRun = null;
|
|
|
|
const {
|
|
agentConfigs,
|
|
contentParts,
|
|
collectedUsage,
|
|
collectedThoughtSignatures,
|
|
artifactPromises,
|
|
maxContextTokens,
|
|
subagentAggregatorsByToolCallId,
|
|
contextUsageSink,
|
|
usageEmitSink,
|
|
...clientOptions
|
|
} = options;
|
|
|
|
this.agentConfigs = agentConfigs;
|
|
this.maxContextTokens = maxContextTokens;
|
|
/** Latest visible context snapshot for this response, captured live by the
|
|
* ON_CONTEXT_USAGE handler; persisted on `metadata.contextUsage`.
|
|
* @type {{ latest: import('librechat-data-provider').TContextUsageEvent | null } | undefined} */
|
|
this.contextUsageSink = contextUsageSink;
|
|
/** Every emitted `on_token_usage` payload for this response (primary,
|
|
* summarization, sequential, and subagent); aggregated into the rollup
|
|
* persisted on `metadata.usage`.
|
|
* @type {Array<import('librechat-data-provider').TTokenUsageEvent> | undefined} */
|
|
this.usageEmitSink = usageEmitSink;
|
|
/** @type {MessageContentComplex[]} */
|
|
this.contentParts = contentParts;
|
|
/** @type {Array<UsageMetadata>} */
|
|
this.collectedUsage = collectedUsage;
|
|
/** Vertex Gemini 3 thought signatures captured during the run, keyed by
|
|
* `tool_call_id`. Persisted on `responseMessage.metadata.thoughtSignatures`
|
|
* and restored as `additional_kwargs.signatures` on subsequent turns to
|
|
* keep tool round-trips valid across DB reconstruction.
|
|
* @type {Record<string, string> | undefined} */
|
|
this.collectedThoughtSignatures = collectedThoughtSignatures;
|
|
/** @type {ArtifactPromises} */
|
|
this.artifactPromises = artifactPromises;
|
|
/** Per-request map of `createContentAggregator` instances keyed by
|
|
* the parent's `tool_call_id`. `ON_SUBAGENT_UPDATE` events stream
|
|
* into each aggregator as they arrive; `finalizeSubagentContent`
|
|
* harvests `contentParts` onto the matching `subagent` tool_call
|
|
* so the child's full activity survives a page refresh. */
|
|
this.subagentAggregatorsByToolCallId = subagentAggregatorsByToolCallId ?? new Map();
|
|
/** In-flight `on_token_usage` emits from subagent child runs. The sink
|
|
* fires the emitter without awaiting, so chatCompletion's finally flushes
|
|
* these before returning — otherwise job cleanup can race the persist.
|
|
* @type {Promise<void>[]} */
|
|
this.pendingSubagentEmits = [];
|
|
/** @type {AgentClientOptions} */
|
|
this.options = Object.assign({ endpoint: options.endpoint }, clientOptions);
|
|
/** @type {string} */
|
|
this.model = this.options.agent.model_parameters.model;
|
|
/** The key for the usage object's input tokens
|
|
* @type {string} */
|
|
this.inputTokensKey = 'input_tokens';
|
|
/** The key for the usage object's output tokens
|
|
* @type {string} */
|
|
this.outputTokensKey = 'output_tokens';
|
|
/** @type {UsageMetadata} */
|
|
this.usage;
|
|
/** @type {Record<string, number>} */
|
|
this.indexTokenCountMap = {};
|
|
/** @type {Array<Record<string, unknown>> | null} */
|
|
this.memoryPayload = null;
|
|
/** @type {(messages: BaseMessage[]) => Promise<void>} */
|
|
this.processMemory;
|
|
}
|
|
|
|
/**
|
|
* Returns the aggregated content parts for the current run.
|
|
* @returns {MessageContentComplex[]} */
|
|
getContentParts() {
|
|
return this.contentParts;
|
|
}
|
|
|
|
/**
|
|
* Harvest the `contentParts` from each per-subagent `createContentAggregator`
|
|
* instance and attach them onto the matching parent `subagent` tool_call
|
|
* as `subagent_content`. Runs once per message save (from
|
|
* `sendCompletion`'s `finally`) so the child's full reasoning / tool
|
|
* calls / final text survive a page refresh — the client-side Recoil
|
|
* atom is session-only. Aggregators keyed by a tool_call_id that never
|
|
* appeared in `contentParts` are discarded (no home to attach to).
|
|
*/
|
|
finalizeSubagentContent() {
|
|
const buffer = this.subagentAggregatorsByToolCallId;
|
|
if (!buffer || buffer.size === 0 || !Array.isArray(this.contentParts)) {
|
|
return;
|
|
}
|
|
for (const part of this.contentParts) {
|
|
if (part?.type !== ContentTypes.TOOL_CALL) continue;
|
|
const toolCall = part[ContentTypes.TOOL_CALL];
|
|
if (!toolCall || toolCall.name !== Constants.SUBAGENT || !toolCall.id) continue;
|
|
const aggregator = buffer.get(toolCall.id);
|
|
if (!aggregator) continue;
|
|
try {
|
|
/** `createContentAggregator` returns a sparse array (undefined
|
|
* slots for indices that never received content). Strip those
|
|
* so the persisted shape is a clean `TMessageContentParts[]`. */
|
|
const parts = Array.isArray(aggregator.contentParts)
|
|
? aggregator.contentParts.filter((p) => p != null)
|
|
: [];
|
|
if (parts.length > 0) {
|
|
toolCall.subagent_content = parts;
|
|
}
|
|
} catch (err) {
|
|
logger.warn(
|
|
`[AgentClient] Failed to attach subagent content for tool_call ${toolCall.id}: ${err?.message ?? err}`,
|
|
);
|
|
}
|
|
}
|
|
buffer.clear();
|
|
}
|
|
|
|
setOptions(_options) {}
|
|
|
|
/**
|
|
* `AgentClient` is not opinionated about vision requests, so we don't do anything here
|
|
* @param {MongoFile[]} attachments
|
|
*/
|
|
checkVisionRequest() {}
|
|
|
|
getSaveOptions() {
|
|
let runOptions = {};
|
|
try {
|
|
runOptions = payloadParser(this.options) ?? {};
|
|
} catch (error) {
|
|
logger.error(
|
|
'[api/server/controllers/agents/client.js #getSaveOptions] Error parsing options',
|
|
error,
|
|
);
|
|
}
|
|
|
|
return removeNullishValues(
|
|
Object.assign(
|
|
{
|
|
spec: this.options.spec,
|
|
iconURL: this.options.iconURL,
|
|
chatProjectId: this.options.chatProjectId,
|
|
endpoint: this.options.endpoint,
|
|
agent_id: this.options.agent.id,
|
|
modelLabel: this.options.modelLabel,
|
|
resendFiles: this.options.resendFiles,
|
|
imageDetail: this.options.imageDetail,
|
|
maxContextTokens: this.maxContextTokens,
|
|
},
|
|
// TODO: PARSE OPTIONS BY PROVIDER, MAY CONTAIN SENSITIVE DATA
|
|
runOptions,
|
|
),
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Returns build message options. For AgentClient, agent-specific instructions
|
|
* are retrieved directly from agent objects in buildMessages, so this returns empty.
|
|
* @returns {Object} Empty options object
|
|
*/
|
|
getBuildMessagesOptions() {
|
|
return {};
|
|
}
|
|
|
|
/**
|
|
*
|
|
* @param {TMessage} message
|
|
* @param {Array<MongoFile>} attachments
|
|
* @returns {Promise<Array<Partial<MongoFile>>>}
|
|
*/
|
|
async addImageURLs(message, attachments) {
|
|
const { files, image_urls } = await encodeAndFormat(
|
|
this.options.req,
|
|
attachments,
|
|
{
|
|
provider: this.options.agent.provider,
|
|
endpoint: this.options.endpoint,
|
|
},
|
|
VisionModes.agents,
|
|
);
|
|
message.image_urls = image_urls.length ? image_urls : undefined;
|
|
return files;
|
|
}
|
|
|
|
async buildMessages(messages, parentMessageId, _buildOptions, opts) {
|
|
/** Always pass mapMethod; getMessagesForConversation applies it only to messages with addedConvo flag */
|
|
const orderedMessages = this.constructor.getMessagesForConversation({
|
|
messages,
|
|
parentMessageId,
|
|
summary: this.shouldSummarize,
|
|
mapMethod: createMultiAgentMapper(this.options.agent, this.agentConfigs),
|
|
mapCondition: (message) => message.addedConvo === true,
|
|
});
|
|
|
|
let payload;
|
|
/** @type {number | undefined} */
|
|
let promptTokens;
|
|
|
|
/** Normalize instruction fields before applying per-run context. */
|
|
const normalizeInstructions = (agent) => {
|
|
agent.instructions = agent.instructions?.trim() || undefined;
|
|
agent.additional_instructions = agent.additional_instructions?.trim() || undefined;
|
|
return agent;
|
|
};
|
|
|
|
/** Collect all agents for unified processing while preserving stable/dynamic instruction fields. */
|
|
const allAgents = [
|
|
{ agent: normalizeInstructions(this.options.agent), agentId: this.options.agent.id },
|
|
...(this.agentConfigs?.size > 0
|
|
? Array.from(this.agentConfigs.entries()).map(([agentId, agent]) => ({
|
|
agent: normalizeInstructions(agent),
|
|
agentId,
|
|
}))
|
|
: []),
|
|
];
|
|
const sharedRunAttachmentIds = new Set();
|
|
if (this.options.attachments) {
|
|
const attachments = await this.options.attachments;
|
|
const latestMessage = orderedMessages[orderedMessages.length - 1];
|
|
|
|
for (const fileId of collectFileIds(attachments)) {
|
|
sharedRunAttachmentIds.add(fileId);
|
|
}
|
|
|
|
if (this.message_file_map) {
|
|
this.message_file_map[latestMessage.messageId] = attachments;
|
|
} else {
|
|
this.message_file_map = {
|
|
[latestMessage.messageId]: attachments,
|
|
};
|
|
}
|
|
|
|
await this.addFileContextToMessage(latestMessage, attachments);
|
|
const files = await this.processAttachments(latestMessage, attachments);
|
|
|
|
this.options.attachments = files;
|
|
}
|
|
|
|
/** Note: Bedrock uses legacy RAG API handling */
|
|
if (this.message_file_map && !isAgentsEndpoint(this.options.endpoint)) {
|
|
this.contextHandlers = createContextHandlers(
|
|
this.options.req,
|
|
orderedMessages[orderedMessages.length - 1].text,
|
|
);
|
|
}
|
|
|
|
/** @type {Record<number, number>} */
|
|
const indexTokenCountMap = {};
|
|
/** @type {Record<string, number>} */
|
|
const tokenCountMap = {};
|
|
const memoryPayload = [];
|
|
let hasFileContext = false;
|
|
let promptTokenTotal = 0;
|
|
const encoding = this.getEncoding();
|
|
const formattedMessages = orderedMessages.map((message, i) => {
|
|
const formattedMessage = formatMessage({
|
|
message,
|
|
userName: this.options?.name,
|
|
assistantName: this.options?.modelLabel,
|
|
});
|
|
const memoryFormattedMessage = formatMessage({
|
|
message,
|
|
userName: this.options?.name,
|
|
assistantName: this.options?.modelLabel,
|
|
});
|
|
|
|
/**
|
|
* Bind file context to the message it belongs to. Historical attachments
|
|
* are resent inline, so the current turn's text attachment must be inline
|
|
* too instead of living only in the dynamic system tail.
|
|
*/
|
|
if (message.fileContext) {
|
|
hasFileContext = true;
|
|
prependFileContext(formattedMessage, message.fileContext);
|
|
}
|
|
|
|
/**
|
|
* Durably re-merge quoted excerpts into every user turn that carries them
|
|
* (current and historical) so the model receives the referenced context on
|
|
* every prompt and the token count matches what was persisted. Applied to
|
|
* the memory copy too so the canonical per-message count includes them.
|
|
*/
|
|
if (Array.isArray(message.quotes) && message.quotes.length > 0) {
|
|
prependQuotes(formattedMessage, message.quotes);
|
|
prependQuotes(memoryFormattedMessage, message.quotes);
|
|
}
|
|
|
|
memoryPayload.push(memoryFormattedMessage);
|
|
|
|
const dbTokenCount = Number(orderedMessages[i].tokenCount);
|
|
const hasDbTokenCount = Number.isFinite(dbTokenCount) && dbTokenCount > 0;
|
|
/**
|
|
* Force a recount when the message carries quotes: a plain text-only
|
|
* "Save" edit recomputes `tokenCount` from `text` alone while leaving
|
|
* `message.quotes` persisted, so the stored count would undercount the
|
|
* quote block this turn prepends. Recounting from the quote-merged memory
|
|
* copy keeps context accounting accurate (and self-heals stale counts).
|
|
*/
|
|
const needsCanonicalTokenCount =
|
|
!hasDbTokenCount ||
|
|
(this.isVisionModel && (message.image_urls || message.files)) ||
|
|
(Array.isArray(message.quotes) && message.quotes.length > 0);
|
|
|
|
let canonicalTokenCount = hasDbTokenCount ? dbTokenCount : 0;
|
|
if (needsCanonicalTokenCount) {
|
|
canonicalTokenCount = countFormattedMessageTokens(memoryFormattedMessage, encoding);
|
|
}
|
|
|
|
const promptMessageTokenCount = message.fileContext
|
|
? countFormattedMessageTokens(formattedMessage, encoding)
|
|
: canonicalTokenCount;
|
|
|
|
/* If message has files, calculate image token cost */
|
|
if (this.message_file_map && this.message_file_map[message.messageId]) {
|
|
const attachments = this.message_file_map[message.messageId];
|
|
for (const file of attachments) {
|
|
if (file.embedded) {
|
|
this.contextHandlers?.processFile(file);
|
|
continue;
|
|
}
|
|
if (file.metadata?.codeEnvRef) {
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
|
|
const normalizedCanonicalTokenCount =
|
|
Number.isFinite(canonicalTokenCount) && canonicalTokenCount > 0 ? canonicalTokenCount : 0;
|
|
const normalizedPromptTokenCount =
|
|
Number.isFinite(promptMessageTokenCount) && promptMessageTokenCount > 0
|
|
? promptMessageTokenCount
|
|
: 0;
|
|
|
|
orderedMessages[i].tokenCount = normalizedCanonicalTokenCount;
|
|
indexTokenCountMap[i] = normalizedPromptTokenCount;
|
|
promptTokenTotal += normalizedPromptTokenCount;
|
|
|
|
if (message.messageId) {
|
|
tokenCountMap[message.messageId] = normalizedCanonicalTokenCount;
|
|
}
|
|
|
|
if (isEnabled(process.env.AGENT_DEBUG_LOGGING)) {
|
|
const role = message.isCreatedByUser ? 'user' : 'assistant';
|
|
const hasSummary =
|
|
Array.isArray(message.content) && message.content.some((p) => p && p.type === 'summary');
|
|
const suffix = hasSummary ? '[S]' : '';
|
|
const id = (message.messageId ?? message.id ?? '').slice(-8);
|
|
const recalced = needsCanonicalTokenCount ? normalizedCanonicalTokenCount : null;
|
|
const promptRecalced = message.fileContext ? normalizedPromptTokenCount : null;
|
|
logger.debug(
|
|
`[AgentClient] msg[${i}] ${role}${suffix} id=…${id} db=${dbTokenCount} needsRecount=${needsCanonicalTokenCount} recalced=${recalced} promptRecalced=${promptRecalced} tokens=${normalizedPromptTokenCount}`,
|
|
);
|
|
}
|
|
|
|
return formattedMessage;
|
|
});
|
|
|
|
/**
|
|
* Native YouTube -> video understanding: when Google `url_context` is enabled
|
|
* (resolved to the native `urlContext` provider tool), inject any YouTube URLs
|
|
* from the latest user turn as Gemini `fileData` video parts. The URL Context
|
|
* tool cannot read YouTube, so this routes those links through the video path
|
|
* while other URLs still flow through `urlContext`. Done after token counting
|
|
* (video tokens are reported by the provider) and only on the LLM payload, so
|
|
* the memory copy and persisted message are untouched.
|
|
*/
|
|
const latestOrdered = orderedMessages[orderedMessages.length - 1];
|
|
const provider = this.options.agent?.provider;
|
|
if (
|
|
latestOrdered?.isCreatedByUser === true &&
|
|
(provider === Providers.GOOGLE || provider === Providers.VERTEXAI) &&
|
|
hasUrlContextTool(this.options.agent?.tools)
|
|
) {
|
|
const latestFormatted = formattedMessages[formattedMessages.length - 1];
|
|
/** Use the resolved run model (model_parameters override) rather than the saved base model. */
|
|
const resolvedModel =
|
|
this.options.agent?.model_parameters?.model ?? this.options.agent?.model;
|
|
const { max, mimeType } = resolveYouTubeInjectionConfig({
|
|
provider,
|
|
model: resolvedModel,
|
|
});
|
|
latestFormatted.content = appendYouTubeVideoParts({
|
|
enabled: true,
|
|
text: latestOrdered.text,
|
|
content: latestFormatted.content,
|
|
max,
|
|
mimeType,
|
|
});
|
|
}
|
|
|
|
payload = formattedMessages;
|
|
this.memoryPayload = hasFileContext ? memoryPayload : null;
|
|
messages = orderedMessages;
|
|
promptTokens = promptTokenTotal;
|
|
|
|
/**
|
|
* Build shared run context - applies to ALL agents in the run.
|
|
* Request attachment file context is already bound inline to the latest
|
|
* user message above; only side-channel context belongs here.
|
|
* Memory context is handled separately and applied per-agent based on config.
|
|
*/
|
|
const sharedRunContextParts = [];
|
|
|
|
/** Augmented prompt from RAG/context handlers */
|
|
if (this.contextHandlers) {
|
|
this.augmentedPrompt = await this.contextHandlers.createContext();
|
|
if (this.augmentedPrompt) {
|
|
sharedRunContextParts.push(this.augmentedPrompt);
|
|
}
|
|
}
|
|
|
|
/** Memory context (user preferences/memories). Keyed context (with memory
|
|
* keys + token metadata) is reserved for agents that can call
|
|
* `delete_memory`; everyone else gets the unkeyed values only. */
|
|
const memories = await this.useMemory();
|
|
const buildMemoryContext = (text) =>
|
|
text ? `${memoryInstructions}\n\n# Existing memory about the user:\n${text}` : undefined;
|
|
const memoryContext = buildMemoryContext(memories?.withoutKeys);
|
|
const keyedMemoryContext = buildMemoryContext(memories?.withKeys);
|
|
|
|
const sharedRunContext = sharedRunContextParts.join('\n\n');
|
|
const memoryAgentEnabled = isMemoryAgentEnabled(this.options.req.config?.memory);
|
|
|
|
const agentScopedContext = await buildAgentScopedContext({
|
|
agentIds: allAgents.map(({ agentId }) => agentId),
|
|
attachmentsByAgentId: this.options.agentContextAttachmentsByAgentId,
|
|
sharedRunAttachmentIds,
|
|
req: this.options.req,
|
|
tokenCountFn: (text) => countTokens(text),
|
|
});
|
|
|
|
/** Preserve prompt token counts for graph formatting and pruning. */
|
|
this.indexTokenCountMap = indexTokenCountMap;
|
|
|
|
/** Extract contextMeta from the parent response (second-to-last in ordered chain;
|
|
* last is the current user message). Seeds the pruner's calibration EMA for this run. */
|
|
const parentResponse =
|
|
orderedMessages.length >= 2 ? orderedMessages[orderedMessages.length - 2] : undefined;
|
|
if (parentResponse?.contextMeta && !parentResponse.isCreatedByUser) {
|
|
this.contextMeta = parentResponse.contextMeta;
|
|
}
|
|
|
|
const result = {
|
|
prompt: payload,
|
|
tokenCountMap,
|
|
promptTokens,
|
|
messages,
|
|
};
|
|
|
|
if (promptTokens >= 0 && typeof opts?.getReqData === 'function') {
|
|
opts.getReqData({ promptTokens });
|
|
}
|
|
|
|
/**
|
|
* Apply context to all agents.
|
|
* Stable agent/MCP instructions stay on `instructions`; shared runtime context
|
|
* is appended to `additional_instructions` as the dynamic system tail.
|
|
*
|
|
* NOTE: This intentionally mutates agent objects in place. The agentConfigs Map
|
|
* holds references to config objects that will be passed to the graph runtime.
|
|
*/
|
|
const ephemeralAgent = this.options.req.body.ephemeralAgent;
|
|
const mcpManager = getMCPManager();
|
|
|
|
const configServers = await resolveConfigServers(this.options.req);
|
|
|
|
await Promise.all(
|
|
allAgents.map(({ agent, agentId }) => {
|
|
const agentRunContextParts = [sharedRunContext];
|
|
const agentHasMemory = agentHasInlineMemoryTools(agent);
|
|
const agentMemoryContext = agentHasMemory ? keyedMemoryContext : memoryContext;
|
|
if (
|
|
agentMemoryContext &&
|
|
(agentId === this.options.agent.id || memoryAgentEnabled || agentHasMemory)
|
|
) {
|
|
agentRunContextParts.push(agentMemoryContext);
|
|
}
|
|
const scopedContext = agentScopedContext.get(agentId);
|
|
if (scopedContext) {
|
|
agentRunContextParts.push(scopedContext);
|
|
}
|
|
|
|
return applyContextToAgent({
|
|
agent,
|
|
agentId,
|
|
logger,
|
|
mcpManager,
|
|
configServers,
|
|
sharedRunContext: agentRunContextParts.filter(Boolean).join('\n\n'),
|
|
ephemeralAgent: agentId === this.options.agent.id ? ephemeralAgent : undefined,
|
|
});
|
|
}),
|
|
);
|
|
|
|
return result;
|
|
}
|
|
|
|
/**
|
|
* Creates a promise that resolves with the memory promise result or undefined after a timeout
|
|
* @param {Promise<(TAttachment | null)[] | undefined>} memoryPromise - The memory promise to await
|
|
* @param {number} timeoutMs - Timeout in milliseconds (default: 3000)
|
|
* @returns {Promise<(TAttachment | null)[] | undefined>}
|
|
*/
|
|
async awaitMemoryWithTimeout(memoryPromise, timeoutMs = 3000) {
|
|
if (!memoryPromise) {
|
|
return;
|
|
}
|
|
|
|
try {
|
|
const timeoutPromise = new Promise((_, reject) =>
|
|
setTimeout(() => reject(new Error('Memory processing timeout')), timeoutMs),
|
|
);
|
|
|
|
const attachments = await Promise.race([memoryPromise, timeoutPromise]);
|
|
return attachments;
|
|
} catch (error) {
|
|
if (error.message === 'Memory processing timeout') {
|
|
logger.warn('[AgentClient] Memory processing timed out after 3 seconds');
|
|
} else {
|
|
logger.error('[AgentClient] Error processing memory:', error);
|
|
}
|
|
return;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @returns {Promise<{ withKeys?: string; withoutKeys?: string } | undefined>}
|
|
*/
|
|
async useMemory() {
|
|
const user = this.options.req.user;
|
|
if (user.personalization?.memories === false) {
|
|
return;
|
|
}
|
|
const hasAccess = await checkAccess({
|
|
user,
|
|
permissionType: PermissionTypes.MEMORIES,
|
|
permissions: [Permissions.USE],
|
|
getRoleByName: db.getRoleByName,
|
|
});
|
|
|
|
if (!hasAccess) {
|
|
logger.debug(
|
|
`[api/server/controllers/agents/client.js #useMemory] User ${user.id} does not have USE permission for memories`,
|
|
);
|
|
return;
|
|
}
|
|
const appConfig = this.options.req.config;
|
|
const memoryConfig = appConfig.memory;
|
|
if (!memoryConfig || memoryConfig.disabled === true) {
|
|
return;
|
|
}
|
|
|
|
const userId = this.options.req.user.id + '';
|
|
this.processMemory = undefined;
|
|
|
|
if (!isMemoryAgentEnabled(memoryConfig)) {
|
|
try {
|
|
const { withKeys, withoutKeys } = await getRequestMemories({
|
|
req: this.options.req,
|
|
userId,
|
|
getFormattedMemories: db.getFormattedMemories,
|
|
});
|
|
return { withKeys, withoutKeys };
|
|
} catch (error) {
|
|
logger.error(
|
|
'[api/server/controllers/agents/client.js #useMemory] Error loading memories',
|
|
error,
|
|
);
|
|
return;
|
|
}
|
|
}
|
|
|
|
/** @type {Agent} */
|
|
let prelimAgent;
|
|
const allowedProviders = new Set(
|
|
appConfig?.endpoints?.[EModelEndpoint.agents]?.allowedProviders,
|
|
);
|
|
try {
|
|
if (memoryConfig.agent?.id != null && memoryConfig.agent.id !== this.options.agent.id) {
|
|
prelimAgent = await loadAgent({
|
|
req: this.options.req,
|
|
agent_id: memoryConfig.agent.id,
|
|
endpoint: EModelEndpoint.agents,
|
|
});
|
|
} else if (memoryConfig.agent?.id != null) {
|
|
prelimAgent = this.options.agent;
|
|
} else if (
|
|
memoryConfig.agent?.id == null &&
|
|
memoryConfig.agent?.model != null &&
|
|
memoryConfig.agent?.provider != null
|
|
) {
|
|
prelimAgent = { id: Constants.EPHEMERAL_AGENT_ID, ...memoryConfig.agent };
|
|
}
|
|
} catch (error) {
|
|
logger.error(
|
|
'[api/server/controllers/agents/client.js #useMemory] Error loading agent for memory',
|
|
error,
|
|
);
|
|
}
|
|
|
|
if (!prelimAgent) {
|
|
return;
|
|
}
|
|
|
|
/** Forward the same `execute_code` capability gate the chat flow uses —
|
|
* memory agents are unlikely to list `execute_code`, but if one does,
|
|
* Phase 8 relies on this flag to expand the string into
|
|
* `bash_tool` + `read_file` (pre-Phase 8 the legacy `execute_code`
|
|
* tool registered unconditionally; without this passthrough the
|
|
* memory path would silently lose code-execution tooling). */
|
|
const memoryCapabilities = new Set(appConfig?.endpoints?.[EModelEndpoint.agents]?.capabilities);
|
|
const agent = await initializeAgent(
|
|
{
|
|
req: this.options.req,
|
|
res: this.options.res,
|
|
agent: prelimAgent,
|
|
allowedProviders,
|
|
endpointOption: {
|
|
endpoint: !isEphemeralAgentId(prelimAgent.id)
|
|
? EModelEndpoint.agents
|
|
: memoryConfig.agent?.provider,
|
|
},
|
|
codeEnvAvailable: memoryCapabilities.has(AgentCapabilities.execute_code),
|
|
},
|
|
{
|
|
getFiles: db.getFiles,
|
|
getUserKey: db.getUserKey,
|
|
getConvoFiles: db.getConvoFiles,
|
|
updateFilesUsage: db.updateFilesUsage,
|
|
getUserKeyValues: db.getUserKeyValues,
|
|
getToolFilesByIds: db.getToolFilesByIds,
|
|
getCodeGeneratedFiles: db.getCodeGeneratedFiles,
|
|
filterFilesByAgentAccess,
|
|
},
|
|
);
|
|
|
|
if (!agent) {
|
|
logger.warn(
|
|
'[api/server/controllers/agents/client.js #useMemory] No agent found for memory',
|
|
memoryConfig,
|
|
);
|
|
return;
|
|
}
|
|
|
|
const llmConfig = Object.assign(
|
|
{
|
|
provider: agent.provider,
|
|
model: agent.model,
|
|
},
|
|
agent.model_parameters,
|
|
);
|
|
|
|
/** @type {import('@librechat/api').MemoryConfig} */
|
|
const config = {
|
|
validKeys: memoryConfig.validKeys,
|
|
instructions: agent.instructions,
|
|
llmConfig,
|
|
tokenLimit: memoryConfig.tokenLimit,
|
|
};
|
|
|
|
const messageId = this.responseMessageId + '';
|
|
const conversationId = this.conversationId + '';
|
|
const streamId = this.options.req?._resumableStreamId || null;
|
|
const [withoutKeys, processMemory] = await createMemoryProcessor({
|
|
userId,
|
|
config,
|
|
messageId,
|
|
streamId,
|
|
conversationId,
|
|
memoryMethods: {
|
|
setMemory: db.setMemory,
|
|
deleteMemory: db.deleteMemory,
|
|
getFormattedMemories: db.getFormattedMemories,
|
|
},
|
|
res: this.options.res,
|
|
user: createSafeUser(this.options.req.user),
|
|
});
|
|
|
|
this.processMemory = processMemory;
|
|
let withKeys = withoutKeys;
|
|
try {
|
|
({ withKeys } = await getRequestMemories({
|
|
req: this.options.req,
|
|
userId,
|
|
getFormattedMemories: db.getFormattedMemories,
|
|
}));
|
|
} catch (error) {
|
|
logger.error(
|
|
'[api/server/controllers/agents/client.js #useMemory] Error loading keyed memories',
|
|
error,
|
|
);
|
|
}
|
|
return { withKeys, withoutKeys };
|
|
}
|
|
|
|
/**
|
|
* Filters out image URLs from message content
|
|
* @param {BaseMessage} message - The message to filter
|
|
* @returns {BaseMessage} - A new message with image URLs removed
|
|
*/
|
|
filterImageUrls(message) {
|
|
if (!message.content || typeof message.content === 'string') {
|
|
return message;
|
|
}
|
|
|
|
if (Array.isArray(message.content)) {
|
|
const filteredContent = message.content.filter(
|
|
(part) => part.type !== ContentTypes.IMAGE_URL,
|
|
);
|
|
|
|
if (filteredContent.length === 1 && filteredContent[0].type === ContentTypes.TEXT) {
|
|
const MessageClass = message.constructor;
|
|
return new MessageClass({
|
|
content: filteredContent[0].text,
|
|
additional_kwargs: message.additional_kwargs,
|
|
});
|
|
}
|
|
|
|
const MessageClass = message.constructor;
|
|
return new MessageClass({
|
|
content: filteredContent,
|
|
additional_kwargs: message.additional_kwargs,
|
|
});
|
|
}
|
|
|
|
return message;
|
|
}
|
|
|
|
/**
|
|
* @param {BaseMessage[]} messages
|
|
* @returns {Promise<void | (TAttachment | null)[]>}
|
|
*/
|
|
async runMemory(messages) {
|
|
try {
|
|
if (this.processMemory == null) {
|
|
return;
|
|
}
|
|
const appConfig = this.options.req.config;
|
|
const memoryConfig = appConfig.memory;
|
|
const messageWindowSize = memoryConfig?.messageWindowSize ?? 5;
|
|
|
|
/**
|
|
* Strip skill-primed meta messages before memory extraction. The primes
|
|
* sit next to the latest user message and carry large SKILL.md bodies,
|
|
* so letting them into the window would crowd out real chat turns and
|
|
* pollute extracted memories with synthetic instruction content the
|
|
* user never typed.
|
|
*/
|
|
const chatMessages = messages.filter((m) => !isSkillPrimeMessage(m));
|
|
|
|
let messagesToProcess = [...chatMessages];
|
|
if (chatMessages.length > messageWindowSize) {
|
|
for (let i = chatMessages.length - messageWindowSize; i >= 0; i--) {
|
|
const potentialWindow = chatMessages.slice(i, i + messageWindowSize);
|
|
if (potentialWindow[0]?.role === 'user') {
|
|
messagesToProcess = [...potentialWindow];
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (messagesToProcess.length === chatMessages.length) {
|
|
messagesToProcess = [...chatMessages.slice(-messageWindowSize)];
|
|
}
|
|
}
|
|
|
|
const filteredMessages = messagesToProcess.map((msg) => this.filterImageUrls(msg));
|
|
const bufferString = getBufferString(filteredMessages);
|
|
const configuredMaxInputTokens = Number.isFinite(memoryConfig?.maxInputTokens)
|
|
? Math.floor(memoryConfig.maxInputTokens)
|
|
: undefined;
|
|
const maxInputTokens =
|
|
configuredMaxInputTokens != null && configuredMaxInputTokens > 0
|
|
? configuredMaxInputTokens
|
|
: DEFAULT_MEMORY_MAX_INPUT_TOKENS;
|
|
const maxInputChars = maxInputTokens * MEMORY_INPUT_CHARS_PER_TOKEN;
|
|
const isCharTruncated = bufferString.length > maxInputChars;
|
|
const memoryInput = `# Current Chat:\n\n${
|
|
isCharTruncated
|
|
? `[Earlier chat content omitted due to memory input limit]\n\n${bufferString.slice(
|
|
-maxInputChars,
|
|
)}`
|
|
: bufferString
|
|
}`;
|
|
const {
|
|
text: limitedMemoryInput,
|
|
tokenCount,
|
|
wasTruncated,
|
|
} = await processTextWithTokenLimit({
|
|
text: memoryInput,
|
|
tokenLimit: maxInputTokens,
|
|
tokenCountFn: (text) => countTokens(text),
|
|
preserve: 'end',
|
|
});
|
|
if (isCharTruncated || wasTruncated) {
|
|
logger.warn('[MemoryAgent] Memory input truncated before processing', {
|
|
tokenCount,
|
|
messageId: this.responseMessageId,
|
|
conversationId: this.conversationId,
|
|
maxInputTokens,
|
|
wasTruncated,
|
|
maxInputChars,
|
|
originalLength: bufferString.length,
|
|
});
|
|
}
|
|
const bufferMessage = new HumanMessage(limitedMemoryInput);
|
|
return await this.processMemory([bufferMessage]);
|
|
} catch (error) {
|
|
logger.error('Memory Agent failed to process memory', error);
|
|
}
|
|
}
|
|
|
|
/** @type {sendCompletion} */
|
|
async sendCompletion(payload, opts = {}) {
|
|
await this.chatCompletion({
|
|
payload,
|
|
onProgress: opts.onProgress,
|
|
userMCPAuthMap: opts.userMCPAuthMap,
|
|
abortController: opts.abortController,
|
|
});
|
|
|
|
const completion = filterMalformedContentParts(this.contentParts);
|
|
const metadata = this.buildResponseMetadata();
|
|
return metadata ? { completion, metadata } : { completion };
|
|
}
|
|
|
|
/**
|
|
* Assembles the response message `metadata`: Vertex thought signatures plus
|
|
* the persisted context breakdown (Part A) and the usage/cost rollup (Part B),
|
|
* which rebuild the gauge breakdown and branch/total cost across reloads.
|
|
* Returns undefined when nothing was captured.
|
|
* @returns {{
|
|
* thoughtSignatures?: Record<string, string>,
|
|
* contextUsage?: import('librechat-data-provider').TContextUsageEvent,
|
|
* usage?: import('librechat-data-provider').TResponseUsage,
|
|
* } | undefined}
|
|
*/
|
|
buildResponseMetadata() {
|
|
/** @type {{
|
|
* thoughtSignatures?: Record<string, string>,
|
|
* contextUsage?: import('librechat-data-provider').TContextUsageEvent,
|
|
* usage?: import('librechat-data-provider').TResponseUsage,
|
|
* }} */
|
|
const metadata = {};
|
|
const signatures = this.collectedThoughtSignatures;
|
|
if (signatures && Object.keys(signatures).length > 0) {
|
|
metadata.thoughtSignatures = signatures;
|
|
}
|
|
const usageEvents = this.usageEmitSink ?? [];
|
|
/** Persist the breakdown only when the latest snapshot's OWN run completed —
|
|
* i.e. a PRIMARY usage event (usage_type == null) from that run's id arrived
|
|
* AFTER the snapshot. Matching by run id keeps `completedOutputTokens` a real
|
|
* post-snapshot delta even when parallel/direct runs interleave (A snapshot →
|
|
* B snapshot → A usage must NOT persist B's snapshot with A's output); an
|
|
* interrupted final call that emits no usage falls back to the per-message
|
|
* estimate. It still keeps the post-summary snapshot: the summarization detour
|
|
* emits an extra snapshot whose following primary usage shares that run's id,
|
|
* which the old snapshot-count guard miscounted and wrongly dropped. Events
|
|
* without a run id (older lib / resume) match any snapshot for back-compat. */
|
|
const latestSnapshot = this.contextUsageSink?.latest;
|
|
const latestSnapshotUsageIndex = this.contextUsageSink?.latestUsageIndex ?? 0;
|
|
const latestSnapshotRunId = latestSnapshot?.runId;
|
|
const hasPrimaryAfterSnapshot = usageEvents
|
|
.slice(latestSnapshotUsageIndex)
|
|
.some(
|
|
(event) =>
|
|
event.usage_type == null &&
|
|
(latestSnapshotRunId == null ||
|
|
event.runId == null ||
|
|
event.runId === latestSnapshotRunId),
|
|
);
|
|
if (latestSnapshot && hasPrimaryAfterSnapshot) {
|
|
metadata.contextUsage = buildPersistedContextUsage(latestSnapshot, usageEvents);
|
|
}
|
|
/** Lightweight summarization marker — persisted whenever this turn compacted
|
|
* the context, INDEPENDENT of the snapshot guard above. When the client has
|
|
* no usable snapshot on the branch and falls back to the per-message
|
|
* estimate, it caps the discarded pre-summary history at this baseline
|
|
* instead of re-summing it (the gauge otherwise reads 100% forever). Shared
|
|
* with the abort save path via `computeSummaryUsedTokens`. Subtract the
|
|
* response's earlier tool-loop outputs (the primaries that preceded the
|
|
* latest snapshot, same run): those tokens are inside the snapshot baseline
|
|
* AND in the response `tokenCount` the client estimate adds on top, so
|
|
* leaving them in the marker double-counts them on a multi-call turn. */
|
|
const priorOutputTokens = priorRunOutputTokens(
|
|
usageEvents,
|
|
latestSnapshotUsageIndex,
|
|
latestSnapshotRunId,
|
|
);
|
|
const summaryUsedTokens = computeSummaryUsedTokens(latestSnapshot, priorOutputTokens);
|
|
if (summaryUsedTokens != null) {
|
|
metadata.summaryUsedTokens = summaryUsedTokens;
|
|
}
|
|
const usage = aggregateEmittedUsage(usageEvents);
|
|
if (usage) {
|
|
metadata.usage = usage;
|
|
}
|
|
return Object.keys(metadata).length > 0 ? metadata : undefined;
|
|
}
|
|
|
|
/**
|
|
* Resolves the endpoint token config for a usage item by its producing agent
|
|
* (multi-endpoint graphs: connected agents + subagents). A known agent's
|
|
* config is authoritative — including `undefined`, which prices with built-in
|
|
* rates (e.g. a non-custom agent in a custom-primary graph). Only an
|
|
* untagged/unknown agent falls back to the primary config, so single-endpoint
|
|
* graphs are unchanged.
|
|
* @param {UsageMetadata} usage
|
|
* @returns {import('@librechat/api').EndpointTokenConfig | undefined}
|
|
*/
|
|
resolveAgentEndpointTokenConfig(usage) {
|
|
return resolveAgentTokenConfig({
|
|
agentId: usage?.agentId,
|
|
byAgentId: this.options.endpointTokenConfigByAgentId,
|
|
fallback: this.options.endpointTokenConfig,
|
|
});
|
|
}
|
|
|
|
/**
|
|
* @param {Object} params
|
|
* @param {string} [params.model]
|
|
* @param {string} [params.context='message']
|
|
* @param {AppConfig['balance']} [params.balance]
|
|
* @param {AppConfig['transactions']} [params.transactions]
|
|
* @param {UsageMetadata[]} [params.collectedUsage=this.collectedUsage]
|
|
*/
|
|
async recordCollectedUsage({
|
|
model,
|
|
balance,
|
|
transactions,
|
|
context = 'message',
|
|
collectedUsage = this.collectedUsage,
|
|
}) {
|
|
const result = await recordCollectedUsage(
|
|
{
|
|
spendTokens: db.spendTokens,
|
|
spendStructuredTokens: db.spendStructuredTokens,
|
|
pricing: { getMultiplier: db.getMultiplier, getCacheMultiplier: db.getCacheMultiplier },
|
|
bulkWriteOps: { insertMany: db.bulkInsertTransactions, updateBalance: db.updateBalance },
|
|
},
|
|
{
|
|
user: this.user ?? this.options.req.user?.id,
|
|
conversationId: this.conversationId,
|
|
collectedUsage,
|
|
model: model ?? this.model ?? this.options.agent.model_parameters.model,
|
|
context,
|
|
messageId: this.responseMessageId,
|
|
balance,
|
|
transactions,
|
|
endpointTokenConfig: this.options.endpointTokenConfig,
|
|
resolveEndpointTokenConfig: (usage) => this.resolveAgentEndpointTokenConfig(usage),
|
|
},
|
|
);
|
|
|
|
if (result) {
|
|
this.usage = result;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get stream usage as returned by this client's API response.
|
|
* @returns {UsageMetadata} The stream usage object.
|
|
*/
|
|
getStreamUsage() {
|
|
return this.usage;
|
|
}
|
|
|
|
/**
|
|
* Builds the subagent usage emitter for {@link createSubagentUsageSink}.
|
|
* Streams each billed child-run usage to the client as an `on_token_usage`
|
|
* event tagged `subagent` (folds into session cost/totals, not the live
|
|
* gauge), with the authoritative cost when `interface.contextCost` is on.
|
|
* Returns undefined when there's no stream to write to.
|
|
* @param {AppConfig} [appConfig]
|
|
* @returns {((usage: UsageMetadata) => void) | undefined}
|
|
*/
|
|
buildSubagentUsageEmitter(appConfig) {
|
|
const res = this.options.res;
|
|
const streamId = this.options.req?._resumableStreamId || null;
|
|
if (!res && !streamId) {
|
|
return undefined;
|
|
}
|
|
const includeCost = appConfig?.interfaceConfig?.contextCost === true;
|
|
return (usage) => {
|
|
const data = {
|
|
input_tokens: usage.input_tokens,
|
|
output_tokens: usage.output_tokens,
|
|
total_tokens: usage.total_tokens,
|
|
input_token_details: this.subagentCacheDetails(usage),
|
|
model: usage.model,
|
|
provider: usage.provider,
|
|
usage_type: 'subagent',
|
|
runId: this.responseMessageId,
|
|
/** Unique per collected entry (post-push length) for resume dedupe */
|
|
seq: this.collectedUsage.length,
|
|
/** Price with the SUBAGENT's own endpoint token config (its endpoint may
|
|
* differ from the parent's); `usage.agentId` is tagged by the sink. */
|
|
cost: includeCost
|
|
? computeUsageCostUSD(
|
|
usage,
|
|
{ getMultiplier: db.getMultiplier, getCacheMultiplier: db.getCacheMultiplier },
|
|
this.resolveAgentEndpointTokenConfig(usage),
|
|
)
|
|
: undefined,
|
|
};
|
|
/** Fold into the response's usage rollup (synchronously, regardless of
|
|
* emit success) so the persisted total matches the live session, which
|
|
* also folds subagent usage into its cost/totals. */
|
|
if (this.usageEmitSink) {
|
|
this.usageEmitSink.push(data);
|
|
}
|
|
/** The sink fires this without awaiting, so retain the promise and flush
|
|
* it in chatCompletion's finally — emitChunk persists (HSET) before
|
|
* publishing, and job cleanup must not race that persist or resumed
|
|
* clients miss billed subagent usage. */
|
|
const emit = (async () => {
|
|
try {
|
|
if (streamId) {
|
|
await GenerationJobManager.emitChunk(streamId, {
|
|
event: UsageEvents.ON_TOKEN_USAGE,
|
|
data,
|
|
});
|
|
} else {
|
|
sendEvent(res, { event: UsageEvents.ON_TOKEN_USAGE, data });
|
|
}
|
|
} catch (err) {
|
|
logger.warn('[AgentClient] Failed to emit subagent usage', err);
|
|
}
|
|
})();
|
|
this.pendingSubagentEmits.push(emit);
|
|
return emit;
|
|
};
|
|
}
|
|
|
|
/** Normalizes a subagent usage event's cache token details for emission. */
|
|
subagentCacheDetails(usage) {
|
|
const cache_creation =
|
|
usage.input_token_details?.cache_creation ?? usage.cache_creation_input_tokens;
|
|
const cache_read = usage.input_token_details?.cache_read ?? usage.cache_read_input_tokens;
|
|
if (cache_creation == null && cache_read == null) {
|
|
return undefined;
|
|
}
|
|
return { cache_creation, cache_read };
|
|
}
|
|
|
|
/**
|
|
* @param {TMessage} responseMessage
|
|
* @returns {number}
|
|
*/
|
|
getTokenCountForResponse({ content }) {
|
|
return countFormattedMessageTokens({ role: 'assistant', content }, this.getEncoding());
|
|
}
|
|
|
|
/**
|
|
* @param {object} params
|
|
* @param {string | ChatCompletionMessageParam[]} params.payload
|
|
* @param {Record<string, Record<string, string>>} [params.userMCPAuthMap]
|
|
* @param {AbortController} [params.abortController]
|
|
*/
|
|
/**
|
|
* @deprecated Agent Chain — strip hidden intermediate sequential-agent content
|
|
* before persistence, keeping only the last part + tool_call parts. Mirrors the
|
|
* chat path so a HITL resume doesn't persist/emit intermediate outputs the
|
|
* agent's `hide_sequential_outputs` setting is meant to hide.
|
|
*/
|
|
applyHideSequentialOutputsFilter() {
|
|
if (!this.options.agent?.hide_sequential_outputs || !Array.isArray(this.contentParts)) {
|
|
return;
|
|
}
|
|
this.contentParts = this.contentParts.filter(
|
|
(part, index) =>
|
|
index >= this.contentParts.length - 1 ||
|
|
part.type === ContentTypes.TOOL_CALL ||
|
|
part.tool_call_ids,
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Surface any human-in-the-loop interrupt the SDK captured during the most
|
|
* recent `processStream` / `resume`. When the run paused for tool approval (or
|
|
* an ask-user question), mark the job `requires_action`, persist the pending
|
|
* review record, and emit it to live clients — then set `this.pendingApproval`
|
|
* so the controller leaves the turn unfinalized for the resume route to continue.
|
|
*
|
|
* No-op when the run completed without an interrupt, or when the job was aborted
|
|
* between the interrupt firing and this mark (a late interrupt must not pause a
|
|
* dead job — the atomic `pause` transition returns false and we drop it).
|
|
*
|
|
* @param {AgentRun} run
|
|
* @param {string} [streamId]
|
|
*/
|
|
async handleRunInterrupt(run, streamId) {
|
|
if (!streamId || typeof run?.getInterrupt !== 'function') {
|
|
return;
|
|
}
|
|
const interrupt = run.getInterrupt();
|
|
if (!interrupt?.payload) {
|
|
return;
|
|
}
|
|
|
|
const appConfig = this.options.req?.config;
|
|
const checkpointerCfg = appConfig?.endpoints?.[EModelEndpoint.agents]?.checkpointer;
|
|
// Persist the resolved model parameters (temperature, max tokens, custom endpoint
|
|
// params, …) so an ephemeral-agent resume continues with the SAME settings the run
|
|
// paused on. The resume payload omits them and they aren't part of the fingerprint, so
|
|
// without this the rebuilt ephemeral run falls back to defaults. (Saved agents source
|
|
// these from the DB record server-side, so this is belt-and-suspenders for them.)
|
|
const resumeContext = pickResumeContext(this.options.req?.body);
|
|
const resolvedModelParameters = this.options.agent?.model_parameters;
|
|
if (resolvedModelParameters && typeof resolvedModelParameters === 'object') {
|
|
resumeContext.model_parameters = resolvedModelParameters;
|
|
}
|
|
const pendingAction = buildPendingAction(interrupt.payload, {
|
|
streamId,
|
|
conversationId: this.conversationId,
|
|
// runId mirrors the LangGraph checkpoint namespace when the SDK provides it
|
|
// (its documented meaning), falling back to the response message id.
|
|
runId: interrupt.checkpointNs ?? this.responseMessageId,
|
|
responseMessageId: this.responseMessageId,
|
|
interruptId: interrupt.interruptId,
|
|
// thread_id was bound to conversationId at run config (config.configurable);
|
|
// fall back to it when the SDK doesn't echo threadId on the interrupt.
|
|
threadId: interrupt.threadId ?? this.conversationId,
|
|
ttlMs: getApprovalTtlMs(checkpointerCfg),
|
|
// Pin the graph-determining request fields so resume can't rebuild this paused
|
|
// run on a different agent/tool set (esp. ephemeral agents, whose agent_id is
|
|
// undefined so the id guard can't tell two configs apart).
|
|
requestFingerprint: computeAgentRequestFingerprint(this.options.req?.body ?? {}),
|
|
// Persist those same fields verbatim so the resume route can REPLAY them — a
|
|
// reload/cross-replica resume can't reconstruct the ephemeral config client-side,
|
|
// so the server restores it and rebuilds the same graph (and the fingerprint matches).
|
|
resumeContext,
|
|
});
|
|
|
|
// Job-replacement guard: streamId == conversationId is reused per conversation, so a
|
|
// newer request can replace this run's job. If this (older) run hits an interrupt
|
|
// after a replacement, pausing would flip the NEWER job to requires_action with this
|
|
// stale run's pending action, blocking fresh work behind the wrong approval. Only
|
|
// pause when the live job is still the one THIS run created (mirrors request.js).
|
|
if (this.jobCreatedAt != null) {
|
|
const liveJob = await GenerationJobManager.getJobStore().getJob(streamId);
|
|
if (!liveJob || liveJob.createdAt !== this.jobCreatedAt) {
|
|
logger.debug(`[AgentClient] Interrupt fired but job ${streamId} was replaced; not pausing`);
|
|
return;
|
|
}
|
|
}
|
|
|
|
const paused = await GenerationJobManager.approvals.pause(streamId, pendingAction);
|
|
if (!paused) {
|
|
logger.debug(
|
|
`[AgentClient] Interrupt fired but job ${streamId} was not running; not pausing`,
|
|
);
|
|
return;
|
|
}
|
|
|
|
// Capture deferred tools discovered (via tool_search) earlier in THIS turn so resume
|
|
// can replay them into createRun. The resumed graph is rebuilt with `messages: []`
|
|
// (state comes from the checkpoint), so the in-turn tool_search results that mark a
|
|
// deferred tool discovered aren't present there — without this the paused deferred
|
|
// tool would be missing from the rebuilt schema-only toolMap and resume would fail
|
|
// with "unknown tool". Inert for non-deferred turns (the set comes back empty).
|
|
try {
|
|
const runMessages =
|
|
typeof run.getRunMessages === 'function' ? run.getRunMessages() : undefined;
|
|
if (Array.isArray(runMessages) && runMessages.length > 0) {
|
|
const discovered = extractDiscoveredToolsFromHistory(runMessages);
|
|
if (discovered.size > 0) {
|
|
await GenerationJobManager.updateMetadata(streamId, {
|
|
discoveredTools: Array.from(discovered),
|
|
});
|
|
}
|
|
}
|
|
} catch (err) {
|
|
logger.warn(
|
|
`[AgentClient] Failed to capture discovered tools for resume on ${streamId}`,
|
|
err?.message ?? err,
|
|
);
|
|
}
|
|
|
|
this.pendingApproval = pendingAction;
|
|
// Release the concurrency slot this request held the MOMENT the turn is durably
|
|
// paused — before the approval card is emitted — so the user's `/resume` can
|
|
// re-acquire one immediately. Otherwise a fast Approve races the HTTP-driver
|
|
// teardown (request.js pause branch / resume.js finally) that would otherwise
|
|
// release it, and `/resume` 429s under LIMIT_CONCURRENT_MESSAGES. Idempotent via
|
|
// the flag; if it fails here, the teardown still releases (it checks the flag).
|
|
if (!this.pendingRequestReleased) {
|
|
try {
|
|
await decrementPendingRequest(this.options.req?.user?.id);
|
|
this.pendingRequestReleased = true;
|
|
} catch (err) {
|
|
logger.error(`[AgentClient] Failed to release request slot on pause ${streamId}`, err);
|
|
}
|
|
}
|
|
await GenerationJobManager.emitChunk(streamId, {
|
|
event: ApprovalEvents.ON_PENDING_ACTION,
|
|
data: pendingAction,
|
|
});
|
|
logger.debug(
|
|
`[AgentClient] Paused ${streamId} for ${interrupt.payload.type} (action ${pendingAction.actionId})`,
|
|
);
|
|
}
|
|
|
|
async chatCompletion({ payload, userMCPAuthMap, abortController = null }) {
|
|
/** @type {Partial<GraphRunnableConfig>} */
|
|
let config;
|
|
/** @type {ReturnType<createRun>} */
|
|
let run;
|
|
/** @type {Promise<(TAttachment | null)[] | undefined>} */
|
|
let memoryPromise;
|
|
const appConfig = this.options.req.config;
|
|
const balanceConfig = getBalanceConfig(appConfig);
|
|
const transactionsConfig = getTransactionsConfig(appConfig);
|
|
try {
|
|
if (!abortController) {
|
|
abortController = new AbortController();
|
|
}
|
|
|
|
/** @type {AppConfig['endpoints']['agents']} */
|
|
const agentsEConfig = appConfig.endpoints?.[EModelEndpoint.agents];
|
|
|
|
config = {
|
|
runName: 'AgentRun',
|
|
configurable: {
|
|
thread_id: this.conversationId,
|
|
last_agent_index: this.agentConfigs?.size ?? 0,
|
|
user_id: this.user ?? this.options.req.user?.id,
|
|
hide_sequential_outputs: this.options.agent.hide_sequential_outputs,
|
|
requestBody: {
|
|
messageId: this.responseMessageId,
|
|
conversationId: this.conversationId,
|
|
parentMessageId: this.parentMessageId,
|
|
},
|
|
user: createSafeUser(this.options.req.user),
|
|
},
|
|
recursionLimit: resolveRecursionLimit(agentsEConfig, this.options.agent),
|
|
signal: abortController.signal,
|
|
streamMode: 'values',
|
|
version: 'v2',
|
|
};
|
|
|
|
const toolSet = buildToolSet(this.options.agent);
|
|
const tokenCounter = createTokenCounter(this.getEncoding());
|
|
|
|
/** Pre-resolve invoked skill bodies + re-prime files before formatting messages */
|
|
const skillPrimeResult = this.options.primeInvokedSkills
|
|
? await this.options.primeInvokedSkills(payload)
|
|
: undefined;
|
|
|
|
/**
|
|
* Seed `Graph.sessions` with code-env files primed across every
|
|
* reachable agent (primary, handoff/addedConvo, and nested
|
|
* subagents) plus skill-priming output. The merge logic and its
|
|
* run-wide semantics live in `buildInitialToolSessions`; see that
|
|
* helper's doc for why this is intentionally NOT per-agent.
|
|
*/
|
|
const initialSessions = buildInitialToolSessions({
|
|
skillSessions: skillPrimeResult?.initialSessions,
|
|
agents: [this.options.agent, ...(this.agentConfigs ? this.agentConfigs.values() : [])],
|
|
});
|
|
|
|
/**
|
|
* Reconstruct `reasoning_content` on prior tool-call turns: DeepSeek
|
|
* thinking-mode (#13366) or custom endpoints opting in via
|
|
* `customParams.includeReasoningHistory` (e.g. Xiaomi MiMo, Kimi).
|
|
* Walks subagents too — the opted-in endpoint may appear only as a
|
|
* nested subagent, not the primary or a top-level handoff agent.
|
|
*/
|
|
const needsReasoningContentFormat = anyAgentReplaysReasoningContent([
|
|
this.options.agent,
|
|
...(this.agentConfigs ? Array.from(this.agentConfigs.values()) : []),
|
|
]);
|
|
/**
|
|
* Skills primed fresh this turn — manual ($ popover) and always-apply
|
|
* (frontmatter). `injectSkillPrimes` (below) splices their SKILL.md
|
|
* bodies in, so `formatAgentMessages` must NOT also reconstruct the
|
|
* same names from a historical `skill` tool_call — otherwise the body
|
|
* lands twice and a prompt-cache marker can pin to the duplicated
|
|
* synthetic prefix. Names NOT primed this turn still reconstruct from
|
|
* history, preserving sticky manual re-priming across turns.
|
|
*/
|
|
const manualSkillPrimes = this.options.agent?.manualSkillPrimes;
|
|
const alwaysApplySkillPrimes = this.options.agent?.alwaysApplySkillPrimes;
|
|
const freshSkillPrimeNames = collectFreshSkillPrimeNames({
|
|
manualSkillPrimes,
|
|
alwaysApplySkillPrimes,
|
|
});
|
|
const formatOptions =
|
|
needsReasoningContentFormat || freshSkillPrimeNames.size > 0
|
|
? {
|
|
...(needsReasoningContentFormat ? { preserveReasoningContent: true } : {}),
|
|
...(freshSkillPrimeNames.size > 0
|
|
? { skipSkillBodyNames: freshSkillPrimeNames }
|
|
: {}),
|
|
}
|
|
: undefined;
|
|
let {
|
|
messages: initialMessages,
|
|
indexTokenCountMap,
|
|
summary: initialSummary,
|
|
boundaryTokenAdjustment,
|
|
} = formatAgentMessages(
|
|
payload,
|
|
this.indexTokenCountMap,
|
|
toolSet,
|
|
skillPrimeResult?.skills,
|
|
formatOptions,
|
|
);
|
|
if (boundaryTokenAdjustment) {
|
|
logger.debug(
|
|
`[AgentClient] Boundary token adjustment: ${boundaryTokenAdjustment.original} → ${boundaryTokenAdjustment.adjusted} (${boundaryTokenAdjustment.remainingChars}/${boundaryTokenAdjustment.totalChars} chars)`,
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Skill priming — both manual ($ popover) and always-apply (frontmatter).
|
|
*
|
|
* Splice + index-shift logic lives in `injectSkillPrimes`
|
|
* (packages/api/src/agents/skills.ts) so the delicate position math
|
|
* can be unit-tested in TS without standing up AgentClient. The
|
|
* resolver enforces a combined ceiling (manual-first, always-apply
|
|
* truncated first when over cap) before reaching here; the splice
|
|
* re-applies the cap as defense-in-depth. Runs for both single-
|
|
* agent and multi-agent runs; how primes interact with handoff /
|
|
* added-convo agents' per-agent state is an agents-SDK concern,
|
|
* not this layer's to gate.
|
|
*
|
|
* `manualSkillPrimes` / `alwaysApplySkillPrimes` are resolved above
|
|
* (used to build `freshSkillPrimeNames` for dedupe against historical
|
|
* skill reconstruction).
|
|
*/
|
|
if (
|
|
(manualSkillPrimes && manualSkillPrimes.length > 0) ||
|
|
(alwaysApplySkillPrimes && alwaysApplySkillPrimes.length > 0)
|
|
) {
|
|
const primeResult = injectSkillPrimes({
|
|
initialMessages,
|
|
indexTokenCountMap,
|
|
manualSkillPrimes,
|
|
alwaysApplySkillPrimes,
|
|
});
|
|
indexTokenCountMap = primeResult.indexTokenCountMap;
|
|
if (primeResult.inserted > 0) {
|
|
const manualNames = (manualSkillPrimes ?? []).map((p) => p.name);
|
|
const alwaysApplyNames = (alwaysApplySkillPrimes ?? []).map((p) => p.name);
|
|
logger.debug(
|
|
`[AgentClient] Primed ${primeResult.inserted} skill(s) at message index ${primeResult.insertIdx} — manual: [${manualNames.join(', ')}], always-apply: [${alwaysApplyNames.join(', ')}]`,
|
|
);
|
|
}
|
|
if (primeResult.alwaysApplyDropped > 0) {
|
|
logger.warn(
|
|
`[AgentClient] Dropped ${primeResult.alwaysApplyDropped} always-apply prime(s) to stay within MAX_PRIMED_SKILLS_PER_TURN.`,
|
|
);
|
|
}
|
|
}
|
|
|
|
if (indexTokenCountMap && isEnabled(process.env.AGENT_DEBUG_LOGGING)) {
|
|
const entries = Object.entries(indexTokenCountMap);
|
|
const perMsg = entries.map(([idx, count]) => {
|
|
const msg = initialMessages[Number(idx)];
|
|
const type = msg ? msg._getType() : '?';
|
|
return `${idx}:${type}=${count}`;
|
|
});
|
|
logger.debug(
|
|
`[AgentClient] Token map after format: [${perMsg.join(', ')}] (payload=${payload.length}, formatted=${initialMessages.length})`,
|
|
);
|
|
}
|
|
indexTokenCountMap = hydrateMissingIndexTokenCounts({
|
|
messages: initialMessages,
|
|
indexTokenCountMap,
|
|
tokenCounter,
|
|
});
|
|
|
|
const memoryMessages =
|
|
this.processMemory && this.memoryPayload
|
|
? formatAgentMessages(
|
|
this.memoryPayload,
|
|
undefined,
|
|
toolSet,
|
|
skillPrimeResult?.skills,
|
|
formatOptions,
|
|
).messages
|
|
: initialMessages;
|
|
|
|
/**
|
|
* @param {BaseMessage[]} messages
|
|
*/
|
|
const runAgents = async (messages) => {
|
|
const agents = [this.options.agent];
|
|
// Include additional agents when:
|
|
// - agentConfigs has agents (from addedConvo parallel execution or agent handoffs)
|
|
// - Agents without incoming edges become start nodes and run in parallel automatically
|
|
if (this.agentConfigs && this.agentConfigs.size > 0) {
|
|
agents.push(...this.agentConfigs.values());
|
|
}
|
|
|
|
// TODO: needs to be added as part of AgentContext initialization
|
|
// const noSystemModelRegex = [/\b(o1-preview|o1-mini|amazon\.titan-text)\b/gi];
|
|
// const noSystemMessages = noSystemModelRegex.some((regex) =>
|
|
// agent.model_parameters.model.match(regex),
|
|
// );
|
|
// if (noSystemMessages === true && systemContent?.length) {
|
|
// const latestMessageContent = _messages.pop().content;
|
|
// if (typeof latestMessageContent !== 'string') {
|
|
// latestMessageContent[0].text = [systemContent, latestMessageContent[0].text].join('\n');
|
|
// _messages.push(new HumanMessage({ content: latestMessageContent }));
|
|
// } else {
|
|
// const text = [systemContent, latestMessageContent].join('\n');
|
|
// _messages.push(new HumanMessage(text));
|
|
// }
|
|
// }
|
|
// let messages = _messages;
|
|
// if (agent.useLegacyContent === true) {
|
|
// messages = formatContentStrings(messages);
|
|
// }
|
|
// if (
|
|
// agent.model_parameters?.clientOptions?.defaultHeaders?.['anthropic-beta']?.includes(
|
|
// 'prompt-caching',
|
|
// )
|
|
// ) {
|
|
// messages = addCacheControl(messages);
|
|
// }
|
|
|
|
if (this.processMemory) {
|
|
memoryPromise = this.runMemory(memoryMessages);
|
|
}
|
|
|
|
/** Seed calibration state from previous run if encoding matches */
|
|
const currentEncoding = this.getEncoding();
|
|
const prevMeta = this.contextMeta;
|
|
const encodingMatch = prevMeta?.encoding === currentEncoding;
|
|
const calibrationRatio =
|
|
encodingMatch && prevMeta?.calibrationRatio > 0 ? prevMeta.calibrationRatio : undefined;
|
|
|
|
if (prevMeta) {
|
|
logger.debug(
|
|
`[AgentClient] contextMeta from parent: ratio=${prevMeta.calibrationRatio}, encoding=${prevMeta.encoding}, current=${currentEncoding}, seeded=${calibrationRatio ?? 'none'}`,
|
|
);
|
|
}
|
|
|
|
run = await createRun({
|
|
agents,
|
|
messages,
|
|
// This controller implements the full HITL pause/resume lifecycle (handleRunInterrupt
|
|
// persists the pending action; the /resume route rebuilds + continues the run), so it
|
|
// opts into the tool-approval wiring. Non-resumable callers (OpenAI-compat, Responses)
|
|
// leave this off so an approval-gated tool can't pause where there's no resume path.
|
|
hitlCapable: true,
|
|
indexTokenCountMap,
|
|
initialSummary,
|
|
initialSessions,
|
|
calibrationRatio,
|
|
runId: this.responseMessageId,
|
|
signal: abortController.signal,
|
|
customHandlers: this.options.eventHandlers,
|
|
requestBody: config.configurable.requestBody,
|
|
user: createSafeUser(this.options.req?.user),
|
|
tenantId: this.options.req?.user?.tenantId,
|
|
summarizationConfig: appConfig?.summarization,
|
|
appConfig,
|
|
tokenCounter,
|
|
/** Bills subagent child-run model calls — child graphs execute
|
|
* outside the streamEvents loop, so ModelEndHandler never sees
|
|
* them. Entries land in collectedUsage tagged
|
|
* `usage_type: 'subagent'` and are spent by recordCollectedUsage.
|
|
* The sink also streams each as an `on_token_usage` event so the
|
|
* gauge's session cost/totals include billed subagent usage (the
|
|
* `subagent` tag keeps it out of the live context meter). */
|
|
subagentUsageSink: createSubagentUsageSink(
|
|
this.collectedUsage,
|
|
this.buildSubagentUsageEmitter(appConfig),
|
|
),
|
|
});
|
|
|
|
if (!run) {
|
|
throw new Error('Failed to create run');
|
|
}
|
|
|
|
this.run = run;
|
|
if (this._resolveRun) {
|
|
this._resolveRun(run);
|
|
this._resolveRun = null;
|
|
}
|
|
|
|
const streamId = this.options.req?._resumableStreamId;
|
|
if (streamId && run.Graph) {
|
|
GenerationJobManager.setGraph(streamId, run.Graph);
|
|
}
|
|
|
|
if (userMCPAuthMap != null) {
|
|
config.configurable.userMCPAuthMap = userMCPAuthMap;
|
|
}
|
|
|
|
/** @deprecated Agent Chain */
|
|
config.configurable.last_agent_id = agents[agents.length - 1].id;
|
|
|
|
// HITL: clear any checkpoint orphaned by a prior paused turn in this
|
|
// conversation (one that expired or was aborted while paused) so this fresh
|
|
// turn starts clean instead of rehydrating a stale interrupt — thread_id is
|
|
// the stable conversationId. No-op when HITL is off or nothing is orphaned.
|
|
if (streamId && isHITLEnabled(agentsEConfig?.toolApproval)) {
|
|
await deleteAgentCheckpoint(this.conversationId, agentsEConfig?.checkpointer);
|
|
}
|
|
|
|
await run.processStream({ messages }, config, {
|
|
callbacks: {
|
|
[Callback.TOOL_ERROR]: logToolError,
|
|
},
|
|
});
|
|
|
|
// HITL: if the run paused for tool approval, mark the job
|
|
// `requires_action` + emit the prompt and leave the turn unfinalized
|
|
// (the resume route continues it). No-op when the run completed.
|
|
await this.handleRunInterrupt(run, streamId);
|
|
|
|
config.signal = null;
|
|
};
|
|
|
|
await runAgents(initialMessages);
|
|
|
|
/**
|
|
* Surface a completed `skill` tool_call content part per *manually*-
|
|
* primed skill so the existing `SkillCall` frontend renderer shows
|
|
* a "Skill X loaded" card on the assistant response. Applied after
|
|
* the graph finishes to avoid clashing with the aggregator's own
|
|
* per-step content indexing. Prepended (not appended) so cards sit
|
|
* above the model's output — priming ran before the turn, the
|
|
* reply follows.
|
|
*
|
|
* Always-apply primes intentionally do NOT emit assistant-side
|
|
* cards. `extractInvokedSkillsFromPayload` scans history for
|
|
* `skill` tool_calls and feeds `primeInvokedSkills`, which is
|
|
* Phase 3's sticky-re-prime path — that's the right behavior for
|
|
* manual (user picked `$skill` once; re-prime on every subsequent
|
|
* turn from history). For always-apply, `resolveAlwaysApplySkills`
|
|
* already re-primes every turn from fresh DB state, so persisting
|
|
* the card would cause the skill body to get primed twice per
|
|
* turn starting on turn 2. The user-facing acknowledgement for
|
|
* always-apply lives on the user bubble as the pinned
|
|
* `SkillPills` row (`message.alwaysAppliedSkills`), which
|
|
* is the durable signal the user wants: "this skill auto-primes".
|
|
*
|
|
* Live streaming display of manual user-bubble pills is handled
|
|
* by `SkillPills` reading `message.manualSkills`. No
|
|
* separate SSE emit is needed here; trying to stream a mid-run
|
|
* tool_call at index 0 collided with the LLM's first text
|
|
* content, while emitting at a sparse offset pushed the card
|
|
* below the reply on finalize. Post-run unshift keeps the final
|
|
* responseMessage.content in the right order.
|
|
*/
|
|
const manualPrimed = this.options.agent?.manualSkillPrimes ?? [];
|
|
if (manualPrimed.length > 0) {
|
|
const runId = this.responseMessageId ?? 'skill-prime';
|
|
const manualParts = buildSkillPrimeContentParts(manualPrimed, { runId });
|
|
this.contentParts.unshift(...manualParts);
|
|
}
|
|
|
|
this.applyHideSequentialOutputsFilter();
|
|
} catch (err) {
|
|
if (abortController.signal.aborted) {
|
|
logger.debug(
|
|
'[api/server/controllers/agents/client.js #sendCompletion] Operation aborted by user',
|
|
{ conversationId: this.conversationId, name: err?.name, code: err?.code },
|
|
);
|
|
} else {
|
|
logger.error(
|
|
'[api/server/controllers/agents/client.js #sendCompletion] Unhandled error type',
|
|
err,
|
|
);
|
|
this.contentParts.push({
|
|
type: ContentTypes.ERROR,
|
|
[ContentTypes.ERROR]: `An error occurred while processing the request${err?.message ? `: ${err.message}` : ''}`,
|
|
});
|
|
}
|
|
} finally {
|
|
/** Capture calibration state from the run for persistence on the response message.
|
|
* Runs in finally so values are captured even on abort. */
|
|
const ratio = this.run?.getCalibrationRatio() ?? 0;
|
|
if (ratio > 0 && ratio !== 1) {
|
|
this.contextMeta = {
|
|
calibrationRatio: Math.round(ratio * 1000) / 1000,
|
|
encoding: this.getEncoding(),
|
|
};
|
|
} else {
|
|
this.contextMeta = undefined;
|
|
}
|
|
|
|
this.finalizeSubagentContent();
|
|
|
|
/** Flush subagent usage emits the sink fired without awaiting, so their
|
|
* persist/publish completes before we return and the job is cleaned up
|
|
* (resumed clients read this persisted usage). */
|
|
if (this.pendingSubagentEmits.length > 0) {
|
|
await Promise.allSettled(this.pendingSubagentEmits);
|
|
this.pendingSubagentEmits = [];
|
|
}
|
|
|
|
try {
|
|
const attachments = await this.awaitMemoryWithTimeout(memoryPromise);
|
|
if (attachments && attachments.length > 0) {
|
|
this.artifactPromises.push(...attachments);
|
|
}
|
|
|
|
/** Skip token spending if aborted - the abort handler (abortMiddleware.js) handles it
|
|
This prevents double-spending when user aborts via `/api/agents/chat/abort` */
|
|
const wasAborted = abortController?.signal?.aborted;
|
|
if (!wasAborted) {
|
|
await this.recordCollectedUsage({
|
|
context: 'message',
|
|
balance: balanceConfig,
|
|
transactions: transactionsConfig,
|
|
});
|
|
} else {
|
|
logger.debug(
|
|
'[api/server/controllers/agents/client.js #chatCompletion] Skipping token spending - handled by abort middleware',
|
|
);
|
|
}
|
|
} catch (err) {
|
|
logger.error(
|
|
'[api/server/controllers/agents/client.js #chatCompletion] Error in cleanup phase',
|
|
err,
|
|
);
|
|
}
|
|
if (this._resolveRun) {
|
|
this._resolveRun(this.run ?? null);
|
|
this._resolveRun = null;
|
|
}
|
|
|
|
// HITL: a turn that completed (or errored) without pausing leaves a dead
|
|
// checkpoint. thread_id is the conversationId — stable across turns — so it
|
|
// MUST be pruned before the next turn, or LangGraph would resume this turn's
|
|
// state instead of starting fresh. Skip when paused (the checkpoint is needed
|
|
// to resume) or when HITL is off (none was written). The Mongo TTL is the backstop.
|
|
const agentsEConfig = appConfig?.endpoints?.[EModelEndpoint.agents];
|
|
if (!this.pendingApproval && isHITLEnabled(agentsEConfig?.toolApproval)) {
|
|
try {
|
|
// Job-replacement guard: only prune if THIS generation is still the live job.
|
|
// A newer request can replace this one on the same conversationId; if this
|
|
// (older) run's finally lands after the newer run paused, pruning by
|
|
// conversationId would delete the NEWER run's checkpoint and break its /resume.
|
|
const resumableStreamId = this.options.req?._resumableStreamId;
|
|
let replaced = false;
|
|
if (resumableStreamId && this.jobCreatedAt != null) {
|
|
const liveJob = await GenerationJobManager.getJobStore().getJob(resumableStreamId);
|
|
replaced = !liveJob || liveJob.createdAt !== this.jobCreatedAt;
|
|
}
|
|
if (replaced) {
|
|
logger.debug('[AgentClient] Skipping checkpoint prune — job was replaced', {
|
|
streamId: resumableStreamId,
|
|
});
|
|
} else {
|
|
await deleteAgentCheckpoint(this.conversationId, agentsEConfig?.checkpointer);
|
|
}
|
|
} catch (err) {
|
|
logger.warn('[AgentClient] Failed to prune checkpoint after completion', err);
|
|
}
|
|
}
|
|
|
|
run = null;
|
|
config = null;
|
|
memoryPromise = null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Resume a run that paused for human-in-the-loop review.
|
|
*
|
|
* The original run lives in a detached background task that exits when the run
|
|
* pauses, so resume REBUILDS the run on a fresh graph bound to the same
|
|
* `thread_id` (= conversationId) and the durable checkpointer. LangGraph rehydrates
|
|
* the paused graph state from the checkpoint; `run.resume(value)` re-enters the
|
|
* interrupted node with the user's decision. State comes from the checkpoint, so
|
|
* no message history is rebuilt here — `createRun` only needs the agent(s) to
|
|
* reconstruct the graph structure.
|
|
*
|
|
* `seedContent` is the content streamed before the pause (the assistant message +
|
|
* its tool call). In Redis mode the job store's append log already spans the pause,
|
|
* so the finalized message is complete regardless; seeding keeps the in-memory store
|
|
* complete too. The run drives events through the same `streamId`, so the client's
|
|
* open SSE receives the continuation live.
|
|
*
|
|
* Unlike `chatCompletion`, this does NOT prune the checkpoint in its `finally` — the
|
|
* resume controller owns checkpoint lifecycle (it must also clean up on failures that
|
|
* happen before this method runs, and keep the checkpoint on a re-pause).
|
|
*
|
|
* @param {object} params
|
|
* @param {Agents.ToolApprovalDecisionMap | { answer: string }} params.resumeValue
|
|
* @param {Array} [params.seedContent] - content aggregated before the pause
|
|
* @param {AbortController} [params.abortController]
|
|
* @param {Pick<import('@langchain/langgraph').Command, 'update' | 'goto'>} [params.commandOptions]
|
|
*/
|
|
async resumeCompletion({
|
|
resumeValue,
|
|
seedContent = [],
|
|
abortController = null,
|
|
commandOptions,
|
|
userMCPAuthMap,
|
|
discoveredToolNames,
|
|
}) {
|
|
/** @type {Partial<GraphRunnableConfig>} */
|
|
let config;
|
|
/** @type {ReturnType<createRun>} */
|
|
let run;
|
|
const appConfig = this.options.req.config;
|
|
const balanceConfig = getBalanceConfig(appConfig);
|
|
const transactionsConfig = getTransactionsConfig(appConfig);
|
|
try {
|
|
if (!abortController) {
|
|
abortController = new AbortController();
|
|
}
|
|
|
|
/** @type {AppConfig['endpoints']['agents']} */
|
|
const agentsEConfig = appConfig.endpoints?.[EModelEndpoint.agents];
|
|
|
|
config = {
|
|
runName: 'AgentRun',
|
|
configurable: {
|
|
thread_id: this.conversationId,
|
|
last_agent_index: this.agentConfigs?.size ?? 0,
|
|
user_id: this.user ?? this.options.req.user?.id,
|
|
hide_sequential_outputs: this.options.agent.hide_sequential_outputs,
|
|
requestBody: {
|
|
messageId: this.responseMessageId,
|
|
conversationId: this.conversationId,
|
|
parentMessageId: this.parentMessageId,
|
|
},
|
|
user: createSafeUser(this.options.req.user),
|
|
},
|
|
recursionLimit: resolveRecursionLimit(agentsEConfig, this.options.agent),
|
|
signal: abortController.signal,
|
|
streamMode: 'values',
|
|
version: 'v2',
|
|
};
|
|
|
|
// Seed pre-pause content so the in-memory job store reports the complete turn
|
|
// (Redis aggregates across the pause via its append log; this covers in-memory).
|
|
if (Array.isArray(seedContent) && seedContent.length > 0) {
|
|
this.contentParts.push(...seedContent);
|
|
}
|
|
|
|
const tokenCounter = createTokenCounter(this.getEncoding());
|
|
const agents = [this.options.agent];
|
|
if (this.agentConfigs && this.agentConfigs.size > 0) {
|
|
agents.push(...this.agentConfigs.values());
|
|
}
|
|
|
|
// Re-prime skill files invoked in the pre-pause segment (mirrors the normal path's
|
|
// `primeInvokedSkills(payload)`), so an approved code/file-backed tool keeps the
|
|
// injected skill-file session refs instead of running without them. The pre-pause
|
|
// content carries the `skill` tool_calls, so it stands in for the message payload.
|
|
let skillSessions;
|
|
if (
|
|
typeof this.options.primeInvokedSkills === 'function' &&
|
|
Array.isArray(seedContent) &&
|
|
seedContent.length > 0
|
|
) {
|
|
try {
|
|
const primed = await this.options.primeInvokedSkills([
|
|
{ role: 'assistant', content: seedContent },
|
|
]);
|
|
skillSessions = primed?.initialSessions;
|
|
} catch (err) {
|
|
logger.warn(
|
|
'[api/server/controllers/agents/client.js #resumeCompletion] Failed to re-prime skill sessions',
|
|
err?.message ?? err,
|
|
);
|
|
}
|
|
}
|
|
|
|
// Seed code-env / skill tool sessions so an approved code/file/skill-backed tool
|
|
// runs with the same uploaded-file context the pre-pause run had — the rebuilt
|
|
// graph otherwise has no `Graph.sessions` entries (especially cross-replica).
|
|
const initialSessions = buildInitialToolSessions({ skillSessions, agents });
|
|
|
|
run = await createRun({
|
|
agents,
|
|
// State (messages, tool calls) is rehydrated from the checkpoint by
|
|
// run.resume; createRun only needs the agents to rebuild the graph.
|
|
messages: [],
|
|
// The resumed run can pause AGAIN (another tool, a follow-up question), and this
|
|
// controller owns that lifecycle, so it must keep the HITL wiring on the rebuilt run.
|
|
hitlCapable: true,
|
|
// Replay deferred tools discovered before the pause. With `messages: []` the
|
|
// discovery scan finds nothing, so a deferred tool the paused call targets
|
|
// would be absent from the rebuilt toolMap; these names (captured at pause)
|
|
// force it back in. Undefined/empty for non-deferred turns — a harmless no-op.
|
|
discoveredToolNames,
|
|
initialSessions,
|
|
runId: this.responseMessageId,
|
|
signal: abortController.signal,
|
|
customHandlers: this.options.eventHandlers,
|
|
requestBody: config.configurable.requestBody,
|
|
user: createSafeUser(this.options.req?.user),
|
|
tenantId: this.options.req?.user?.tenantId,
|
|
summarizationConfig: appConfig?.summarization,
|
|
appConfig,
|
|
tokenCounter,
|
|
subagentUsageSink: createSubagentUsageSink(
|
|
this.collectedUsage,
|
|
this.buildSubagentUsageEmitter(appConfig),
|
|
),
|
|
});
|
|
|
|
if (!run) {
|
|
throw new Error('Failed to create run for resume');
|
|
}
|
|
|
|
this.run = run;
|
|
if (this._resolveRun) {
|
|
this._resolveRun(run);
|
|
this._resolveRun = null;
|
|
}
|
|
|
|
const streamId = this.options.req?._resumableStreamId;
|
|
// Do NOT cache the rebuilt graph on resume: it was created with `messages: []`, so
|
|
// RedisJobStore.getContentParts() (which prefers a cached graph over reconstructing
|
|
// from the chunk log) would return only the resumed segment and drop the pre-pause
|
|
// assistant/tool-call content on a same-replica reload/status poll. Skipping it makes
|
|
// introspection fall back to the durable chunk reconstruction, which is complete.
|
|
// `setContentParts` still points the in-memory store at the seeded client content.
|
|
if (streamId && this.contentParts) {
|
|
GenerationJobManager.setContentParts(streamId, this.contentParts);
|
|
}
|
|
|
|
// Carry the user's MCP auth into the rebuilt run so an approved MCP tool executes
|
|
// with the same OAuth/user credentials it had before the pause.
|
|
if (userMCPAuthMap != null) {
|
|
config.configurable.userMCPAuthMap = userMCPAuthMap;
|
|
}
|
|
|
|
/** @deprecated Agent Chain */
|
|
config.configurable.last_agent_id = agents[agents.length - 1].id;
|
|
|
|
await run.resume(
|
|
resumeValue,
|
|
config,
|
|
{ callbacks: { [Callback.TOOL_ERROR]: logToolError } },
|
|
commandOptions,
|
|
);
|
|
|
|
config.signal = null;
|
|
|
|
// The model may pause AGAIN (another tool needs approval, or a follow-up
|
|
// question). Re-arm the same interrupt gate so the cycle can repeat.
|
|
await this.handleRunInterrupt(run, streamId);
|
|
|
|
// Mirror chatCompletion: strip hidden intermediate sequential-agent content
|
|
// before resume finalize/re-pause persistence reads `this.contentParts`, so a
|
|
// resumed sequential chain doesn't persist/emit outputs hide_sequential_outputs
|
|
// is meant to hide.
|
|
this.applyHideSequentialOutputsFilter();
|
|
} catch (err) {
|
|
if (abortController.signal.aborted) {
|
|
logger.debug(
|
|
'[api/server/controllers/agents/client.js #resumeCompletion] Aborted by user',
|
|
{
|
|
conversationId: this.conversationId,
|
|
name: err?.name,
|
|
code: err?.code,
|
|
},
|
|
);
|
|
} else {
|
|
logger.error(
|
|
'[api/server/controllers/agents/client.js #resumeCompletion] Unhandled error',
|
|
err,
|
|
);
|
|
this.contentParts.push({
|
|
type: ContentTypes.ERROR,
|
|
[ContentTypes.ERROR]: `An error occurred while resuming the request${err?.message ? `: ${err.message}` : ''}`,
|
|
});
|
|
}
|
|
} finally {
|
|
const ratio = this.run?.getCalibrationRatio() ?? 0;
|
|
if (ratio > 0 && ratio !== 1) {
|
|
this.contextMeta = {
|
|
calibrationRatio: Math.round(ratio * 1000) / 1000,
|
|
encoding: this.getEncoding(),
|
|
};
|
|
} else {
|
|
this.contextMeta = undefined;
|
|
}
|
|
|
|
this.finalizeSubagentContent();
|
|
|
|
if (this.pendingSubagentEmits.length > 0) {
|
|
await Promise.allSettled(this.pendingSubagentEmits);
|
|
this.pendingSubagentEmits = [];
|
|
}
|
|
|
|
try {
|
|
const wasAborted = abortController?.signal?.aborted;
|
|
if (!wasAborted) {
|
|
await this.recordCollectedUsage({
|
|
context: 'message',
|
|
balance: balanceConfig,
|
|
transactions: transactionsConfig,
|
|
});
|
|
}
|
|
} catch (err) {
|
|
logger.error(
|
|
'[api/server/controllers/agents/client.js #resumeCompletion] Error in cleanup phase',
|
|
err,
|
|
);
|
|
}
|
|
if (this._resolveRun) {
|
|
this._resolveRun(this.run ?? null);
|
|
this._resolveRun = null;
|
|
}
|
|
run = null;
|
|
config = null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Resolves with the agent run once it is initialized, or `null` if
|
|
* initialization fails. Lets immediate-mode title generation await the run
|
|
* instead of throwing when fired before `chatCompletion` assigns `this.run`.
|
|
* Rejects promptly if the provided signal aborts before the run is ready.
|
|
* @param {AbortSignal} [signal]
|
|
* @returns {Promise<AgentRun | null>}
|
|
*/
|
|
_waitForRun(signal) {
|
|
if (this.run) {
|
|
return Promise.resolve(this.run);
|
|
}
|
|
if (!this._runReady) {
|
|
this._runReady = new Promise((resolve) => {
|
|
this._resolveRun = resolve;
|
|
});
|
|
}
|
|
if (!signal) {
|
|
return this._runReady;
|
|
}
|
|
if (signal.aborted) {
|
|
return Promise.reject(new Error('Aborted before run initialization'));
|
|
}
|
|
return new Promise((resolve, reject) => {
|
|
const onAbort = () => reject(new Error('Aborted before run initialization'));
|
|
signal.addEventListener('abort', onAbort, { once: true });
|
|
this._runReady.then((run) => {
|
|
signal.removeEventListener('abort', onAbort);
|
|
resolve(run);
|
|
});
|
|
});
|
|
}
|
|
|
|
/**
|
|
* @param {Object} params
|
|
* @param {string} params.text
|
|
* @param {AbortController} params.abortController
|
|
* @param {boolean} [params.immediate] When true, the title is generated as soon
|
|
* as the request is made — the run is awaited (instead of throwing) and the
|
|
* title derives from the user's input only (`contentParts` is empty).
|
|
*/
|
|
async titleConvo({ text, abortController, immediate = false }) {
|
|
if (!this.run) {
|
|
if (!immediate) {
|
|
throw new Error('Run not initialized');
|
|
}
|
|
await this._waitForRun(abortController?.signal);
|
|
if (!this.run) {
|
|
logger.debug(
|
|
'[api/server/controllers/agents/client.js #titleConvo] Run unavailable for immediate title generation',
|
|
);
|
|
return;
|
|
}
|
|
}
|
|
const { handleLLMEnd, collected: collectedMetadata } = createMetadataAggregator();
|
|
const { req, agent } = this.options;
|
|
|
|
if (req?.body?.isTemporary) {
|
|
logger.debug(
|
|
`[api/server/controllers/agents/client.js #titleConvo] Skipping title generation for temporary conversation`,
|
|
);
|
|
return;
|
|
}
|
|
|
|
const appConfig = req.config;
|
|
let endpoint = agent.endpoint;
|
|
|
|
/** @type {import('@librechat/agents').ClientOptions} */
|
|
let clientOptions = {
|
|
model: agent.model || agent.model_parameters.model,
|
|
};
|
|
|
|
let titleProviderConfig = getProviderConfig({ provider: endpoint, appConfig });
|
|
|
|
/** @type {TEndpoint | undefined} */
|
|
const endpointConfig =
|
|
appConfig.endpoints?.all ??
|
|
appConfig.endpoints?.[endpoint] ??
|
|
titleProviderConfig.customEndpointConfig;
|
|
if (!endpointConfig) {
|
|
logger.debug(
|
|
`[api/server/controllers/agents/client.js #titleConvo] No endpoint config for "${endpoint}"`,
|
|
);
|
|
}
|
|
|
|
if (endpointConfig?.titleConvo === false) {
|
|
logger.debug(
|
|
`[api/server/controllers/agents/client.js #titleConvo] Title generation disabled for endpoint "${endpoint}"`,
|
|
);
|
|
return;
|
|
}
|
|
|
|
if (endpointConfig?.titleEndpoint && endpointConfig.titleEndpoint !== endpoint) {
|
|
try {
|
|
titleProviderConfig = getProviderConfig({
|
|
provider: endpointConfig.titleEndpoint,
|
|
appConfig,
|
|
});
|
|
endpoint = endpointConfig.titleEndpoint;
|
|
} catch (error) {
|
|
logger.warn(
|
|
`[api/server/controllers/agents/client.js #titleConvo] Error getting title endpoint config for "${endpointConfig.titleEndpoint}", falling back to default`,
|
|
error,
|
|
);
|
|
// Fall back to original provider config
|
|
endpoint = agent.endpoint;
|
|
titleProviderConfig = getProviderConfig({ provider: endpoint, appConfig });
|
|
}
|
|
}
|
|
|
|
if (
|
|
endpointConfig &&
|
|
endpointConfig.titleModel &&
|
|
endpointConfig.titleModel !== Constants.CURRENT_MODEL
|
|
) {
|
|
clientOptions.model = endpointConfig.titleModel;
|
|
}
|
|
|
|
const options = await titleProviderConfig.getOptions({
|
|
req,
|
|
endpoint,
|
|
model_parameters: clientOptions,
|
|
db: {
|
|
getUserKey: db.getUserKey,
|
|
getUserKeyValues: db.getUserKeyValues,
|
|
},
|
|
});
|
|
|
|
let provider = options.provider ?? titleProviderConfig.overrideProvider ?? agent.provider;
|
|
if (
|
|
endpoint === EModelEndpoint.azureOpenAI &&
|
|
options.llmConfig?.azureOpenAIApiInstanceName == null
|
|
) {
|
|
provider = Providers.OPENAI;
|
|
} else if (
|
|
endpoint === EModelEndpoint.azureOpenAI &&
|
|
options.llmConfig?.azureOpenAIApiInstanceName != null &&
|
|
provider !== Providers.AZURE
|
|
) {
|
|
provider = Providers.AZURE;
|
|
}
|
|
|
|
/** @type {import('@librechat/agents').ClientOptions} */
|
|
clientOptions = { ...options.llmConfig };
|
|
if (options.configOptions) {
|
|
clientOptions.configuration = options.configOptions;
|
|
}
|
|
|
|
if (clientOptions.maxTokens != null) {
|
|
delete clientOptions.maxTokens;
|
|
}
|
|
if (clientOptions?.modelKwargs?.max_completion_tokens != null) {
|
|
delete clientOptions.modelKwargs.max_completion_tokens;
|
|
}
|
|
if (clientOptions?.modelKwargs?.max_output_tokens != null) {
|
|
delete clientOptions.modelKwargs.max_output_tokens;
|
|
}
|
|
|
|
/** `omitTitleOptions` drops the Anthropic `clientOptions` carrier (thinking,
|
|
* streaming, etc.), which would also drop its `defaultHeaders` — preserve the
|
|
* original `clientOptions` object so gateway/reverse-proxy metadata still
|
|
* reaches title requests (the proxy may require it for auth/routing). Restore
|
|
* the SAME object reference, not a copy: the Vertex `createClient` closure from
|
|
* `getLLMConfig` closes over this object, so `resolveConfigHeaders` must mutate
|
|
* the very object the client is built from. */
|
|
const anthropicClientOptions = clientOptions?.clientOptions;
|
|
|
|
clientOptions = Object.assign(
|
|
Object.fromEntries(
|
|
Object.entries(clientOptions).filter(([key]) => !omitTitleOptions.has(key)),
|
|
),
|
|
);
|
|
|
|
if (anthropicClientOptions?.defaultHeaders != null && clientOptions.clientOptions == null) {
|
|
clientOptions.clientOptions = anthropicClientOptions;
|
|
}
|
|
|
|
if (
|
|
provider === Providers.GOOGLE &&
|
|
(endpointConfig?.titleMethod === TitleMethod.FUNCTIONS ||
|
|
endpointConfig?.titleMethod === TitleMethod.STRUCTURED)
|
|
) {
|
|
clientOptions.json = true;
|
|
}
|
|
|
|
/** Resolve request-based headers across provider-specific header locations:
|
|
* OpenAI `configuration.defaultHeaders`, Anthropic `clientOptions.defaultHeaders`
|
|
* (preserved above), and Google `customHeaders`.
|
|
*/
|
|
resolveConfigHeaders({
|
|
llmConfig: clientOptions,
|
|
user: createSafeUser(this.options.req?.user),
|
|
body: {
|
|
messageId: this.responseMessageId,
|
|
conversationId: this.conversationId,
|
|
parentMessageId: this.parentMessageId,
|
|
},
|
|
});
|
|
|
|
try {
|
|
const titleResult = await this.run.generateTitle({
|
|
provider,
|
|
clientOptions,
|
|
inputText: text,
|
|
contentParts: immediate ? [] : this.contentParts,
|
|
titleMethod: endpointConfig?.titleMethod,
|
|
titlePrompt: endpointConfig?.titlePrompt,
|
|
titlePromptTemplate: endpointConfig?.titlePromptTemplate,
|
|
chainOptions: {
|
|
runName: 'TitleRun',
|
|
signal: abortController.signal,
|
|
callbacks: [
|
|
{
|
|
handleLLMEnd,
|
|
},
|
|
],
|
|
configurable: {
|
|
thread_id: this.conversationId,
|
|
user_id: this.user ?? this.options.req.user?.id,
|
|
},
|
|
},
|
|
});
|
|
|
|
const collectedUsage = collectedMetadata.map((item) => {
|
|
let input_tokens, output_tokens;
|
|
|
|
if (item.usage) {
|
|
input_tokens =
|
|
item.usage.prompt_tokens || item.usage.input_tokens || item.usage.inputTokens;
|
|
output_tokens =
|
|
item.usage.completion_tokens || item.usage.output_tokens || item.usage.outputTokens;
|
|
} else if (item.tokenUsage) {
|
|
input_tokens = item.tokenUsage.promptTokens;
|
|
output_tokens = item.tokenUsage.completionTokens;
|
|
} else if (item.usage_metadata) {
|
|
input_tokens = item.usage_metadata.input_tokens;
|
|
output_tokens = item.usage_metadata.output_tokens;
|
|
}
|
|
|
|
return {
|
|
input_tokens: input_tokens,
|
|
output_tokens: output_tokens,
|
|
};
|
|
});
|
|
|
|
const balanceConfig = getBalanceConfig(appConfig);
|
|
const transactionsConfig = getTransactionsConfig(appConfig);
|
|
await this.recordCollectedUsage({
|
|
collectedUsage,
|
|
context: 'title',
|
|
model: clientOptions.model,
|
|
balance: balanceConfig,
|
|
transactions: transactionsConfig,
|
|
messageId: this.responseMessageId,
|
|
}).catch((err) => {
|
|
logger.error(
|
|
'[api/server/controllers/agents/client.js #titleConvo] Error recording collected usage',
|
|
err,
|
|
);
|
|
});
|
|
|
|
return sanitizeTitle(titleResult.title);
|
|
} catch (err) {
|
|
logger.error('[api/server/controllers/agents/client.js #titleConvo] Error', err);
|
|
return;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @param {object} params
|
|
* @param {number} params.promptTokens
|
|
* @param {number} params.completionTokens
|
|
* @param {string} [params.model]
|
|
* @param {OpenAIUsageMetadata} [params.usage]
|
|
* @param {AppConfig['balance']} [params.balance]
|
|
* @param {string} [params.context='message']
|
|
* @returns {Promise<void>}
|
|
*/
|
|
async recordTokenUsage({
|
|
model,
|
|
usage,
|
|
balance,
|
|
promptTokens,
|
|
completionTokens,
|
|
context = 'message',
|
|
}) {
|
|
try {
|
|
await db.spendTokens(
|
|
{
|
|
model,
|
|
context,
|
|
balance,
|
|
messageId: this.responseMessageId,
|
|
conversationId: this.conversationId,
|
|
user: this.user ?? this.options.req.user?.id,
|
|
endpointTokenConfig: this.options.endpointTokenConfig,
|
|
},
|
|
{ promptTokens, completionTokens },
|
|
);
|
|
|
|
if (
|
|
usage &&
|
|
typeof usage === 'object' &&
|
|
'reasoning_tokens' in usage &&
|
|
typeof usage.reasoning_tokens === 'number'
|
|
) {
|
|
await db.spendTokens(
|
|
{
|
|
model,
|
|
balance,
|
|
context: 'reasoning',
|
|
messageId: this.responseMessageId,
|
|
conversationId: this.conversationId,
|
|
user: this.user ?? this.options.req.user?.id,
|
|
endpointTokenConfig: this.options.endpointTokenConfig,
|
|
},
|
|
{ completionTokens: usage.reasoning_tokens },
|
|
);
|
|
}
|
|
} catch (error) {
|
|
logger.error(
|
|
'[api/server/controllers/agents/client.js #recordTokenUsage] Error recording token usage',
|
|
error,
|
|
);
|
|
}
|
|
}
|
|
|
|
/** Anthropic Claude models use a distinct BPE tokenizer; all others default to o200k_base. */
|
|
getEncoding() {
|
|
if (this.model && this.model.toLowerCase().includes('claude')) {
|
|
return 'claude';
|
|
}
|
|
return 'o200k_base';
|
|
}
|
|
}
|
|
|
|
module.exports = AgentClient;
|