mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-07-02 04:12:36 +00:00
Some checks are pending
Docker Dev Branch Images Build / build (Dockerfile, lc-dev, node) (push) Waiting to run
Docker Dev Branch Images Build / build (Dockerfile.multi, lc-dev-api, api-build) (push) Waiting to run
GitNexus Index / index (push) Waiting to run
GitNexus Index / post-index (push) Blocked by required conditions
* chore: add @langchain/langgraph-checkpoint-mongodb for HITL durable resume
* feat: HITL tool approval runtime — backend (Slice B)
- endpoints.agents.checkpointer config + durable Mongo checkpointer (seam over the app
connection; SDK MemorySaver fallback) with a TTL index + deleteThread pruning
- HITL run wiring (PreToolUse policy hook + humanInTheLoop) attached in createRun, fully
inert when toolApproval.enabled is off
- interrupt gate (pause job -> requires_action + emit on_pending_action) and a resume
route that rebuilds the run from the durable checkpoint and run.resume()s it
- atomic single-winner resolve; agent-consistency guard; expireStaleApprovals terminal
event; checkpoint pruned on every non-paused completion (thread_id == conversationId)
* feat: HITL tool approval UI — frontend (Slice B)
approve/reject/edit/respond + ask-user controls in the tool card (OAuth-button precedent),
batch-aware single submit, live + reconnect (resumeState.pendingAction) wiring, and resume
mutations posting to /agents/chat/resume.
* fix(hitl): decouple ApprovalProvider from chat context
ApprovalProvider is now pure state (safe to mount in provider-less / shared / test
renders); the context-dependent submit moved to a useResumeSubmit hook the cards call.
Part imports getAskUserQuestionPart from ~/utils/approval directly so suites that
partial-mock ~/utils render Part without throwing.
* fix(hitl): address Codex review — backend
- P1: enforce per-tool allowed_decisions on resume (reject a crafted decision the
policy disallows) via findDisallowedDecisions
- prune the durable checkpoint on user-abort of a paused run, and before a fresh
HITL turn, so a new turn cannot rehydrate an expired/aborted interrupt (thread_id
is the stable conversationId)
- persist + use isTemporary and the original parentMessageId on resume (temporary
chats stay temporary; initializeAgent scopes thread files off the right parent)
- generate a deferred first-turn title BEFORE completeJob so its event reaches the
client and the final event carries the real title
- moderateText: skip when there is no text (tool-approval resume) and moderate the
ask-user answer, instead of denying on an empty input
* fix(hitl): address Codex review — frontend
- render ToolApproval for ANY paused agent tool card (bash/code/file/etc.), not just
the generic ToolCall, by wrapping the tool-card branch in Part (moved the rendering
out of ToolCall)
- findPendingActionMessageIndex only matches an assistant message, never the user
message (the underscore-strip could target the user bubble before the assistant
placeholder exists)
* fix(hitl): address Codex re-review
- title eligibility checks the user message’s parent (first turn), not the response’s
parent — the previous check could never be true and skipped title generation
- use client.buildResponseMetadata() for the resumed message so contextUsage /
thoughtSignatures survive (the abort-only helper dropped them)
- moderate decisions[].responseText (the respond action’s user text)
- give /chat/abort req.config (configMiddleware) so the HITL checkpoint prune on abort
actually runs
- read resume state BEFORE setContentParts so the in-memory store does not lose the
pre-pause seed content
- count resumes against LIMIT_CONCURRENT_MESSAGES (increment/decrement) so paused-then-
resumed turns cannot bypass the limit
- require actionId on resume so a body without it cannot resolve the current action
* fix(hitl): address Codex re-review (round 3) — resume fidelity
Bring the lean resume path to parity with sendMessage for things it bypassed:
- carry userMCPAuthMap into the rebuilt run so approved MCP tools keep the user's creds
- seed initialSessions (buildInitialToolSessions) so approved code/file/skill tools have
the pre-pause uploaded-file context (esp. cross-replica / after restart)
- await client.artifactPromises and persist them as response attachments (else tool
artifacts created after the pause vanish on reload / for late subscribers)
- merge metadata: cumulative usage (+ summary marker) from the job, contextUsage /
thoughtSignatures from the client — fixes the round-2 regression that underreported
post-resume cost
* fix(hitl): address Codex re-review (round 4) — resume hardening
- resume: require an EXACT paused agent_id match (reject omitted/ephemeral
agent_id, not just a different one) and reject an endpoint mismatch, so a
request can't rebuild the claimed checkpoint on a different graph
- moderateText: also moderate a tool-approval decision's reject `reason` and
stringified `editedArguments`, not just `responseText`
- request: re-mark the paused response `unfinished:true` after BaseClient saves
it as completed, so an expired / never-resumed approval doesn't leave a
"finished" response in history; the resume path overwrites it on success
* test(hitl): route-level integration test for the resume controller
Adds api/server/controllers/agents/__tests__/resume.spec.js, a supertest
integration test that drives the real ResumeAgentController over the full
pause -> approve -> resume -> finalize lifecycle with the SDK run, durable
checkpointer, Mongo, and concurrency cache mocked. The pure decision/liveness
helpers run for real via requireActual, so the guard ladder is exercised end to
end rather than stubbed.
25 cases covering:
- the authorization / staleness / agent-and-endpoint / actionId guard ladder
- tool_approval validation (undecided tool call, policy-disallowed decision)
- ask_user_question answer requirement
- the concurrency gate (429) and the atomic single-winner claim (409)
- the happy path: ACK, run reconstruction, decision->SDK mapping, finalize
(save the now-finished response, emit done, complete job, prune checkpoint)
- first-turn title generation before stream completion
- re-pause (no double finalize), abort-during-resume (no double finalize),
and the resume-failure terminal path (emitError + completeJob + prune)
* test(hitl): strengthen resume coverage + add approval util tests
Acts on a self-audit of the new resume integration test.
resume.spec.js (25 -> 32 cases):
- replace the tautological emitDone assertion (it only checked the hardcoded
`final: true`) with a structural check of the finalEvent payload —
responseMessage content/id/unfinished, requestMessage identity, title
- cover the previously-unwalked finalize branches: tool-artifact attachments
(null-filtered), the aggregatedContent fallback when live content is empty,
and client response-metadata attachment
- add guard cases: unsupported pending-action type (400) and the
pre-multi-tenancy null-tenantId pass-through (must not 403)
- add error-path cases: first-turn title generation throwing must still
finalize, and a completeJob failure during a resume error must force a
terminal job state via the last-resort updateJob
client/src/utils/approval.spec.ts (new, 15 cases):
- applyPendingAction tool_approval: join by tool_call_id not position,
skip completed calls, default allowed_decisions to [], referential
stability when nothing changes
- applyPendingAction ask_user_question: append, idempotent replace on replay,
non-array content coercion
- getAskUserQuestionPart type guard; findPendingActionMessageIndex
assistant-only resolution (never resolves to the user bubble)
* fix(hitl): address Codex re-review (round 5)
Five findings verified against the code before fixing:
- resume: require an EXACT endpoint match (like agent_id) — a resume that OMITS
endpoint must not fall through, since the shared chat middleware treats a
missing/non-agents endpoint as the ephemeral agent and could rebuild the
claimed checkpoint on a different graph
- resume: filter malformed content parts before saving the finished response,
matching the normal AgentClient path (a resumed turn could otherwise persist
an empty/invalid tool_call part that breaks reload/rendering)
- resume: accumulate tool artifacts across pause segments — persist them on
re-pause and MERGE (not overwrite) at finalize, so artifacts produced before
a second approval pause aren't dropped by the next rebuilt client
- approval (client): findPendingActionMessageIndex returns -1 when a provided
responseMessageId isn't found, so the caller retries instead of attaching the
prompt/approval to a prior assistant reply; fall back to the last assistant
only when no responseMessageId is given
- RedisJobStore: make appendChunk extend-only (XADD + EXPIRE-if-shorter via a
single eval) so the on_pending_action chunk emitted after a pause can't reset
the chunk-stream TTL back to the running window and evict pre-pause content
before the approval is resolved
Tests: +endpoint-omitted/unsupported-type/malformed-filter/attachment-merge/
re-pause-persist cases in resume.spec.js (36); ask-retry -1 semantics in
approval.spec.ts (16); extend-only TTL assertion in the RedisJobStore Redis
integration spec.
* test(hitl): mongodb-memory-server integration test for the checkpointer seam
The checkpointer unit spec covers config/selection with no DB connection; this
exercises the durable Mongo seam against a real (in-memory) MongoDB — the part
correctness actually depends on:
- getAgentCheckpointer builds a real MongoDBSaver when Mongo is connected and
setup() creates the TTL index (expireAfterSeconds) on the checkpoint collection
- memory type returns undefined (SDK MemorySaver fallback) even when connected
- saver is memoized per resolved config
- deleteAgentCheckpoint prunes a thread's persisted checkpoint (the cross-turn
isolation guarantee: turn N+1 on the same conversationId can't rehydrate it)
- pruning is thread-scoped — deleting one conversation leaves others intact
- undefined threadId is a no-op
* fix(hitl): address Codex re-review (round 6)
Four findings verified against the code before fixing:
- messageFilterPii: scan the resume payload's user-authored text (ask-user
`answer`, and a tool-approval decision's `respond` text, `reject` reason, and
edited tool arguments) — the shared /resume route ran through the PII filter
but it only inspected req.body.text, so a blocked token rode the resume
payload back into the model/tool (mirrors the earlier moderateText fix)
- resume: re-prime skill files invoked in the pre-pause segment before rebuilding
the run, so an approved code/file-backed tool keeps the injected skill-file
session refs instead of running without them (mirrors the normal path's
primeInvokedSkills; the pre-pause content stands in for the message payload)
- hitl: pin the graph identity. Persist a fingerprint of the graph-determining
request fields (endpoint, agent_id, model, spec, ephemeralAgent — normalized)
on the pending action at pause, and reject a resume whose recomputed
fingerprint differs. This closes the ephemeral-agent gap, where agent_id is
undefined so the id guard can't tell two ephemeral configs apart
- resume: reject incomplete edit/respond decisions (findIncompleteDecisions) —
an `edit` without an object editedArguments or a `respond` without non-empty
responseText is 400'd before mapping, rather than defaulting to {} / '' and
resuming with behavior the user never approved
Tests: incomplete-decision + fingerprint match/mismatch cases in resume.spec.js
(41); findIncompleteDecisions + computeAgentRequestFingerprint unit tests; and
resume-field PII cases in messageFilterPii.spec.ts.
* fix(hitl): address Codex re-review (round 7)
Four findings verified against the code before fixing:
- RedisJobStore: clear `agent_id` on createJob (add it to staleHitlFields). The
job hash is keyed by conversationId and reused across turns; updateMetadata
only writes agent_id when truthy, so a conversation that switched from a saved
agent to an ephemeral/no-agent turn kept the old id and the resume guard
rejected the valid pause as a different agent. (real correctness bug)
- fingerprint: include `promptPrefix` in computeAgentRequestFingerprint, and
re-send it on resume (ResumeAgentFields + buildResumeFields). Ephemeral agents
derive their system instructions from promptPrefix, so a resume changing it
previously passed the pin and rebuilt different instructions. (completes the
round-6 fingerprint)
- resume: the re-pause branch now persists the segment's accumulated CONTENT
(filtered), not just artifacts, so an approval that expires/reaps without a
final resume no longer loses everything streamed during the resumed segment.
- request: carry `manualSkills`/`alwaysAppliedSkills` on the persisted user
message so a resumed turn's reconstructed requestMessage keeps its skill pills
instead of dropping them until a full reload.
Deferred (narrow, no safe contained fix yet — see PR thread replies):
- resume rebuild without `addedConvo` for a multi-conversation/added-agent pane
- cross-replica re-prime of manually-selected (not model-invoked) skill files
Tests: stale-agent createJob clearing (Redis integration), promptPrefix
fingerprint match/mismatch (resume.spec.js + policy.spec.ts), re-pause content
persistence (resume.spec.js).
* fix(hitl): address Codex re-review (round 8)
Five findings verified against the code before fixing; the headline is a durable-
resume correctness fix (the fingerprint had surfaced it as a 403):
- resume durability (the important one): persist the graph-determining request
fields (endpoint, agent_id, model, spec, promptPrefix, ephemeralAgent) on the
pending action as `resumeContext`, and REPLAY them onto the resume request via
a router-level middleware that runs before buildEndpointOption. The client
can't reconstruct the ephemeral-agent config after a reload/cross-session, so
the round-6/7 fingerprint would 403 a valid durable resume — and even without
it the rebuilt agent would lose its tools. Replaying server-side rebuilds the
SAME graph regardless of client state (and a crafted resume can't swap it; the
fingerprint still matches because the body is restored first).
- RedisJobStore: also clear `isTemporary` on createJob (same class as agent_id):
a prior temporary turn's flag would otherwise survive a reused conversation
hash and a later non-temporary resume would save its response as temporary.
- resume: persist `contextMeta` (context-window calibration) onto the saved
response like BaseClient does, so the next turn can seed its pruner.
- request: carry manualSkills/alwaysAppliedSkills into the onStart metadata
update (not just the preliminary one it overwrites), so a resumed turn's
requestMessage keeps its skill pills.
Deferred (narrow — see thread reply):
- saved-agent edited WHILE a run is paused: agent_id matches but the definition
changed; needs an agent version/config hash, which is a larger change for a
narrow window.
Tests: resumeContext pick/apply + round-trip (policy.spec.ts), contextMeta +
manualSkills-on-requestMessage (resume.spec.js), isTemporary clearing (Redis
integration).
* style(hitl): prettier line-wrap in policy.spec.ts (R8 lint fix)
* fix(hitl): address Codex re-review (round 9)
Five findings, all fixed (addedConvo — deferred in rounds 7/8 — is now trivial
thanks to the round-8 replay):
- replay addedConvo: add it to RESUME_CONTEXT_KEYS so the resume middleware
restores the parallel/secondary-agent config from the paused request; the
client can't reconstruct it, and it determines the rebuilt graph.
- skill pills (the real fix this time): the round-8 onStart metadata write was
overwritten by trackUserMessage (the authoritative userMessage writer). Carry
manualSkills/alwaysAppliedSkills in the emitted `created` message and persist
them in trackUserMessage; widen UserMessageMeta + SerializableJobData.userMessage.
- execute-code files on resume: seed the paused user message's own files onto
req.body.files before initializeClient — they're excluded from the
parent-walk code-session rebuild, so an approved code/read-file tool would
otherwise resume without them.
- in-memory pending-action UI: route ApprovalEvents.ON_PENDING_ACTION in the
resume replay/pending-event loops to applyPendingActionToMessages (mirror the
live handler), so a pause that lands in the snapshot window still renders its
approval controls instead of sitting paused with no UI.
- abort isTemporary: the /chat/abort partial-save now sources isTemporary from
the job metadata, not req.body (the stop button posts only conversationId), so
aborting a paused temporary chat no longer persists an orphaned partial.
Tests: addedConvo in pickResumeContext (policy.spec.ts), file-restore on resume
(resume.spec.js), abort-from-job-isTemporary (abort.spec.js).
* fix(hitl): address Codex re-review (round 10) — resume/expiry races
Three concurrency/coherence findings, verified against the code before fixing:
- expiry-sweep CAS scope: both stale-approval sweeps (GenerationJobManager
expireStaleApprovals and the RedisJobStore requires_action cleanup) called
expire()/transitionStatus WITHOUT the observed pendingAction.actionId, so the
CAS only checked status===requires_action. Between the read and the CAS a user
could resolve the observed action and the run re-pause on a FRESH action; the
stale sweep would then abort that valid new pause. Now both pass the observed
actionId as expectActionId, so the CAS only fires for the action read as stale
(a re-paused action has a different id → no-op).
- resume graph cache: resumeCompletion cached the rebuilt graph (created with
messages:[]) via setGraph; RedisJobStore.getContentParts prefers a cached
graph over reconstructing from the chunk log, so a same-replica reload/status
poll mid-resume returned aggregatedContent missing the pre-pause content. Skip
setGraph on resume so introspection falls back to the complete chunk
reconstruction (setContentParts still seeds the in-memory store).
- pending-action UI: applyPendingActionToMessages scheduled a SINGLE
animation-frame retry then dropped the pending action; Recoil/React updates can
take several frames under load, leaving a valid requires_action run with no
approval controls. Retry across frames (bounded at 120) until the target
message commits.
Test: expire() with a mismatched expectedActionId no-ops while the matching id
expires (pendingAction.spec.ts).
* chore(deps): update @librechat/agents to version 3.2.53 and @langchain/langgraph to version 1.4.7 in package-lock.json and related package.json files
* refactor(hitl): add resolveToolApprovalPolicy seam for layered policy
Extract the single point where tool-approval policy is resolved for a turn
(`resolveToolApprovalPolicy`) and route the run call site through it instead
of reading `endpoints.agents.toolApproval` inline.
Behaviour-preserving: only the `endpoint` layer is wired today, so the result
is identical to reading the app policy directly. The `agent` and `skills`
layers are reserved seams with documented precedence (endpoint owns the
`enabled` kill switch; agent overrides mode/allow/deny/ask/reason; skills may
only tighten), so future per-agent and per-skill policy plumbing lands in one
function rather than at the `createRun` site. Adds focused unit tests.
* fix(hitl): address Codex re-review (round 11) — resume hardening
F1 (P2, security) — applyResumeContext now DELETES any RESUME_CONTEXT_KEY
absent from the persisted context, so the resume body carries exactly the
graph-determining fields the pause had. Previously only defined keys were
overwritten, leaving a client-supplied `addedConvo` (which the request
fingerprint does not cover) in place — a crafted resume could rebuild a
single-agent checkpoint as a different multi-agent graph/tool set.
F3 (P2) — the resume route ACKs (res.json) before initializeClient, so a
post-ACK getMCPRequestContext(req, res) saw the response as finished and
returned undefined, leaving the resumed run without its run-scoped MCP
connection store (approved MCP / OAuth-overlay tools then ran without their
request-scoped connections). Pre-seed the store with a null res +
cleanupOnResponse:false before the ACK and tear it down in the finally,
mirroring the normal stream path (request.js). userMCPAuthMap was already
preserved separately, so credentials were not lost — only the connection store.
Declined: the ApprovalContext NEW_CONVO guard (P2) is a false positive — the
`created` SSE event updates the conversation atom before any pause renders, so
the id is concrete by click time (details in the PR thread).
Tests: policy.spec (absent-key delete) + resume.spec (MCP context pre-seed/cleanup order).
* fix(hitl): address Codex re-review (round 12) — resume fidelity + multi-tool UI
F4 (P2) — temporal prompt vars: resume rebuilt the agent without restoring
req.conversationCreatedAt or req.body.timezone, so {{current_datetime}}-style
vars compiled a different system prompt than the paused graph (resume wall-clock,
unzoned). Add 'timezone' to RESUME_CONTEXT_KEYS (persisted at pause, replayed by
the resume middleware) and restore conversationCreatedAt from the convo before
initializeClient — mirroring the normal path's resolveConversationCreatedAt.
F5 (P2) — multi-tool approval: applyPendingActionToMessages stopped retrying once
ANY tool-call part was tagged, so siblings that rendered on later frames never got
approval controls and the resume route 400'd the partial batch. Add
countTaggedApprovalParts and keep the bounded RAF retry going until every
action_request is tagged (ask_user_question unchanged — one synthetic part).
F6 (P3) — Edit accepted `null`/`[]` (valid JSON, non-object), enabling Submit for
a value the resume route rejects via findIncompleteDecisions. Mirror the server's
plain-object check in the client (store + editIsValid) so Submit only enables for
an accepted value.
Tests: policy.spec (timezone round-trip), resume.spec (conversationCreatedAt
restore), approval.spec (countTaggedApprovalParts).
* fix(hitl): address Codex re-review (round 13) — recurse into subagent approvals
F9 (P2) — a tool paused INSIDE a subagent has its tool_call_id in the parent
subagent tool_call's nested `subagent_content`, not as a top-level message part.
applyToolApproval and countTaggedApprovalParts only scanned top-level content, so
the approval never attached and the round-12 retry loop counted 0 tagged parts and
spun to its frame cap with no controls. Both now recurse into `subagent_content`
(immutably, so React refs update): the nested call gets tagged and is counted, so
the retry terminates. Added approval.spec cases for the nested tag + count.
Note: surfacing the interactive approve/reject controls inside the subagent view is
a deliberate follow-up — ToolApproval -> useResumeSubmit -> useChatContext crashes
when rendered in the portaled subagent dialog (outside the chat/approval providers),
so that needs the controls scoped to the in-provider inline render (or the dialog
wrapped with the providers). This commit fixes the data/traversal layer only.
F7 (discovered-tool history on resume) and F8 (redis chunk TTL pause race) were
verified false positives — see the PR threads.
* fix(hitl): address Codex re-review (round 14) — resume fidelity + expiry relay
F13 (P2) — manualSkills are graph-determining (skill allowed-tools union into the
tool set before tools load) but weren't replayed, so a reload lost the skill tools
and a crafted resume could inject a different skill past the fingerprint. Add
'manualSkills' to RESUME_CONTEXT_KEYS (same replay-only pattern as timezone/
addedConvo; the delete-absent half blocks injection). Not alwaysAppliedSkills —
that's resolved server-side from the DB, not req.body.
F12 (P2) — the resume final SSE built requestMessage from job.metadata.userMessage
(persisted without files), so attachments vanished from the user bubble on resume.
Spread the already-restored req.body.files onto it, matching the normal path.
F11 (P2) — multi-replica approval expiry: RedisJobStore.cleanupRequiresActionIndex
on another replica can win the requires_action->aborted CAS (it sets the hash error
but has no event transport), and the local sweep then skips because the job is no
longer requires_action, so a client subscribed here never gets the terminal error
until the reap path. expireStaleApprovals now relays APPROVAL_EXPIRED_ERROR for a
locally-subscribed job already aborted FOR approval expiry (error-string gated,
idempotent via the errorEvent flag). emitError already publishes cross-replica.
Tests: policy.spec (manualSkills round-trip + inject-drop), resume.spec (final
requestMessage carries restored files).
* fix(hitl): render approval controls for subagent-nested tool pauses (F10)
Round-13 made applyToolApproval/countTaggedApprovalParts recurse into
subagent_content (data), but SubagentDialogPart rendered nested TOOL_CALL parts
with <ToolCall> only and never mounted <ToolApproval>, so a tool paused inside a
subagent showed no controls and the run was unresolvable.
Render <ToolApproval> in SubagentDialogPart's TOOL_CALL branch when the nested
tool_call carries an approval and isn't yet resolved, mirroring the top-level
Part.tsx render. The subagent dialog portals (OGDialog → ReactDOM.createPortal),
but React context flows through the React tree, not the DOM tree, so ToolApproval
resolves ApprovalProvider/ChatContext and the controls work + submit.
Also harden useResumeSubmit: read ChatContext via useContext (non-throwing)
instead of the throwing useChatContext wrapper, so the cards never crash when
rendered outside a ChatContext.Provider (e.g. a search/citation render that passes
chat context as a prop) — they degrade to inert (buildResumeFields returns null).
* style(hitl): re-sort run.ts imports after dev rebase
* fix(hitl): address Codex re-review (round 15) — resume content fidelity
F14 (P2) — hide_sequential_outputs was applied in chatCompletion before
saving/emitting content but not on resume, so a sequential-agent chain that
pauses for HITL and resumes persisted/emitted intermediate outputs the setting
is meant to hide. Extracted the filter into applyHideSequentialOutputsFilter()
and call it from both chatCompletion and resumeCompletion (after handleRunInterrupt,
covering the finalize + re-pause reads of client.contentParts).
F16 (P2) — on a reloaded HITL pause, the DB already holds the paused user row +
partial assistant row; useResumeOnLoad fed those as submission.messages, then
finalHandler/createdHandler appended the same pair via requestMessage/responseMessage,
duplicating the turn (buildTree doesn't dedupe children by messageId). buildSubmission-
FromResumeState now strips the paused user/response rows (by messageId, incl. the
padded/unpadded response id) from submission.messages — they're re-supplied by the
placeholders + final event. Frontend-only; live (non-reload) pause path untouched.
Deferred: F15 (collapsed-card subagent approval registration/visibility) — see thread.
Tests: client.test (filter keeps last + tool_call parts / no-op when off),
useResumeOnLoad.spec (paused pair stripped from submission.messages).
* fix(hitl): address Codex re-review (round 16) — chunk TTL, slot, job replacement
F17 (P2) — chunk-stream TTL on pause-before-chunk. CHUNK_APPEND_LUA derived its
ceiling only from the chunk key's current TTL, so when the chunks key didn't exist
at pause (fire-and-forget append in flight, or an ask-user pause before any chunk),
the on_pending_action append created the stream with only the 20m running TTL while
the approval window is 24h — content evicted before resume. The Lua now also reads
the job key (KEYS[2]); when status == requires_action it takes max(running, TTL(jobKey))
(the approval window transitionStatus set), else the running TTL. Extend-only preserved;
gated on paused status so normal runs never inflate. Both keys share {streamId} (cluster-safe).
F19 (P2) — with LIMIT_CONCURRENT_MESSAGES, the approval prompt was emitted before the
original request released its slot, so a fast Approve got /resume 429'd. handleRunInterrupt
now releases the slot (idempotent via pendingRequestReleased) right after the pause, before
the prompt; the request.js pause branch and resume.js finally only release if it didn't
(no double-release).
F20 (P2) — finalizeResumedTurn never checked the job wasn't replaced before emitDone/
completeJob/saveMessage, so a stale resume could clobber a newer turn that reused the
conversationId. Added the createdAt guard the normal request path uses (skip finalization
when the live job's createdAt != the paused job's).
Deferred: F18 (subagent_content not reconstructed on Redis resume) — joins the subagent
cluster (F15). See thread.
Tests: RedisJobStore integration (pause-before-chunk gets approval TTL; running stays short),
resume.spec (skip finalization on replacement; no double slot release on re-pause).
* 🛡️ fix: Guard HITL terminal side-effects against job replacement
Jobs are keyed by streamId == conversationId, so a new request REPLACES the
running one on the same conversation. The replaced generation's tail must not
clobber the live generation's state. Each path now re-reads the live job and
compares createdAt against the generation's captured identity before acting.
- Thread the generation's createdAt onto the client (request.js + resume.js)
as client.jobCreatedAt — the identity every guard compares against.
- handleRunInterrupt: skip approvals.pause when this run is no longer the live
job, so a stale interrupt can't flip the NEWER job to requires_action.
- chatCompletion finally: skip the checkpoint prune when replaced, so an older
run's late finally can't delete the newer run's resume checkpoint.
- resume catch-path: gate emitError/completeJob/prune behind a stillLive check
(fail-open if the read throws), mirroring finalizeResumedTurn's success guard.
- Persist the turn's uploaded files on job.metadata.userMessage (authoritative
trackUserMessage writer) and prefer them on resume over the user DB row, whose
save can still be racing a fast /resume.
Tests: 13 guard-predicate cases in jobReplacement.spec.js.
* 🔁 fix: Harden HITL resume — ownership re-check, file seeding, deferred-tool replay
Three follow-ups to the round-17 job-replacement guards (Codex review 4594099963):
- G1 (resume.js): the success-path ownership guard runs at the START of
finalizeResumedTurn, but saveMessage + first-turn title generation await long
enough for a new request to replace the job on the same conversationId. Re-read
the live job immediately before emitDone/completeJob/prune so the terminal writes
can't tear down the REPLACEMENT job — mirrors the catch-path guard.
- G2 (request.js): onStart's metadata/chunk writes that persist the turn's files
are fire-and-forget, so a fast approval could read job.metadata.userMessage before
files landed. Seed files into getPreliminaryUserMessage instead — that write is
AWAITED before the run starts, so files are durable before any interrupt can emit.
- G3 (run.ts + client.js + resume.js + IJobStore.ts): the resumed graph is rebuilt
with messages: [], so createRun's tool_search-discovery scan finds nothing. A
deferred tool discovered earlier in the turn (and targeted by the paused call) was
therefore absent from the rebuilt schema-only toolMap — resume would throw "unknown
tool" (no loadRuntimeTools fallback is wired). Capture discovered tool names at
pause via extractDiscoveredToolsFromHistory(run.getRunMessages()), persist them on
job.metadata.discoveredTools, and replay them into createRun's new discoveredToolNames
input (merged with message-extracted names, gated on hasAnyDeferredTools — inert
otherwise). A new createRun test proves the deferred tool is promoted with the replay
and absent without it (reproducing the bug).
Tests: real createRun deferred-replay suite (run-summarization.test.ts) + G1/G2/G3
guard predicates (jobReplacement.spec.js). Full suite green.
* 🔒 fix: Close HITL resume metadata + file-substitution + pause-race gaps
Four findings on the round-18 commit (Codex review 4594430222):
- H1 (P1, regression in round-18 G3): the discoveredTools captured at pause never
reached resume — three metadata allowlists dropped it: GenerationJobManager
.updateMetadata, RedisJobStore.deserializeJob, and buildJobFacade (plus the
GenerationJobMetadata type). Added discoveredTools to all four, so the deferred-tool
replay actually works end-to-end (in-memory store already kept it via Object.assign).
- H2 (P2, security): /resume honored a client-supplied `files` array, letting a crafted
client resume an approved code/read-file tool against a DIFFERENT file set than the one
approved (files aren't in the resume fingerprint/context). Resume now ALWAYS sources
files from the paused job (metadata → DB row), clearing any client-supplied set.
- H3 (P2, ephemeral fidelity): non-default model parameters (temperature, max tokens,
custom endpoint params) were lost on resume — ephemeral agents derive them from the
request body, which the resume payload omits. Capture the resolved model_parameters in
resumeContext at pause and replay them onto the body on resume (excluding `model`, which
is replayed via the fingerprinted RESUME_CONTEXT_KEYS path). Saved agents already source
these from the DB.
- H4 (P2, Redis race): a pause landing between the resume snapshot and the Pub/Sub
subscription reached neither resumeState.pendingAction nor (Redis) pendingEvents, and
approval events aren't persisted to replayEvents — the client attached to a paused job
with no approval UI. subscribeWithResume now re-reads the live job AFTER subscribing and
surfaces the pending action if the snapshot missed it (live read, no staleness).
Tests: discoveredTools metadata round-trip + subscribeWithResume re-read (pendingAction
.spec.ts); client-file substitution rejection (resume.spec.js); model-parameter replay
predicate (jobReplacement.spec.js).
* 🧹 fix: Clear stale discovered tools, release slot on claim error, extend run-step TTL
Three follow-ups on the round-19 commit (Codex review 4594783691):
- I1 (P2): the round-19 discoveredTools field wasn't cleared on Redis streamId reuse.
HSET only overwrites listed fields and handleRunInterrupt only writes discoveredTools
when THIS turn discovers a deferred tool — so a replacement turn that pauses without its
own discovery inherited the prior run's tool names and force-loaded undiscovered deferred
tools on resume. Added discoveredTools to createJob's staleHitlFields HDEL list (the
in-memory store already builds a fresh object, so it was Redis-only).
- I2 (P2): with LIMIT_CONCURRENT_MESSAGES, approvals.resolve runs after the slot increment
but before the run's try/finally, so a store/Redis error there leaked the slot until the
counter TTL expired (spurious 429s on retry of the still-paused approval). Wrapped the
claim in try/catch that decrements the slot and returns 500.
- I3 (P3): saveRunSteps did SET ... EX running unconditionally, resetting the run-steps key
to the 20-min running TTL even while the job is paused for the longer approval window —
a reload after that window lost the tool timeline. Now uses a paused-window TTL script
mirroring the chunk-stream no-shrink behavior (extends to the approval window when the
job hash is requires_action).
Also fixes a latent strict-tsc cast error in the round-19 pendingAction test.
Tests: claim-throws-releases-slot (resume.spec.js); discoveredTools cleared on reuse +
saveRunSteps preserves the paused TTL (RedisJobStore integration, USE_REDIS).
* 🛡️ fix: Guard fast-resume save race, gate HITL to resumable routes, expire on stale submit
Three findings on the round-20 commit (Codex review 4595045652):
- J2 (P1): a fast /resume can claim + finalize the COMPLETED response while the original
request's pause branch is still awaiting `response.databasePromise`; the later
unfinished-save then overwrites the completed content. Re-check the job is still paused on
THIS generation's action (a claim leaves requires_action; a replacement bumps createdAt)
before marking the row unfinished; fail open on a read error.
- J3 (P1): the tool-approval wiring (humanInTheLoop + PreToolUse hook + checkpointer) was
applied to EVERY createRun caller when toolApproval.enabled, but the OpenAI-compatible and
Responses controllers never inspect run.getInterrupt() or persist a pending action — an
approval-gated tool would pause there with no approval surface or resume endpoint and the
route would emit a normal final response / [DONE] with the tool call dangling. Gate the
wiring on a new createRun `hitlCapable` flag, set only by AgentClient (chat + resume).
- J4 (P2): a stale-action 409 on submit returned without driving expiry, leaving the job
requires_action with a dead action until the periodic sweeper ran — any attached SSE client
got no terminal event and the stream appeared to hang. Extracted GenerationJobManager
.expireApproval(streamId, actionId) (expire CAS + terminal SSE, shared with the sweeper) and
call it from the resume route when the observed action is stale.
J1 (nested subagent approval controls not mounting while the details dialog is closed) is a
valid frontend issue in the deferred subagent-HITL path — tracked separately (replied on the
thread) since the fix touches the shared dialog primitive and needs UI verification.
Tests: HITL-gate both directions (run-summarization.test.ts); expire-on-stale-submit
(resume.spec.js); fast-resume unfinished-save guard predicate (jobReplacement.spec.js).
* 💄 style: Wrap captureAgents signature to satisfy prettier (CI lint)
1911 lines
68 KiB
TypeScript
1911 lines
68 KiB
TypeScript
import { logger, getTenantId, SYSTEM_TENANT_ID } from '@librechat/data-schemas';
|
|
import {
|
|
Constants,
|
|
UsageEvents,
|
|
ApprovalEvents,
|
|
parseTextParts,
|
|
reconcileContextUsage,
|
|
promptTokensFromUsage,
|
|
} from 'librechat-data-provider';
|
|
import type {
|
|
TMessageContentParts,
|
|
TContextUsageEvent,
|
|
TTokenUsageEvent,
|
|
Agents,
|
|
} from 'librechat-data-provider';
|
|
import type { StandardGraph } from '@librechat/agents';
|
|
import type {
|
|
SerializableJobData,
|
|
IEventTransport,
|
|
UsageMetadata,
|
|
AbortResult,
|
|
IJobStore,
|
|
} from './interfaces/IJobStore';
|
|
import type { GenerationJobStore } from '~/app/metrics';
|
|
import type * as t from '~/types';
|
|
import {
|
|
recordGenerationStreamResumePendingEvents,
|
|
recordGenerationStreamSubscription,
|
|
setGenerationJobsInFlight,
|
|
recordGenerationJob,
|
|
} from '~/app/metrics';
|
|
import { isPendingActionStale, isPendingActionExpired } from './interfaces/IJobStore';
|
|
import { InMemoryEventTransport } from './implementations/InMemoryEventTransport';
|
|
import { InMemoryJobStore } from './implementations/InMemoryJobStore';
|
|
import { filterPersistableAbortContent } from './abortContent';
|
|
import { ApprovalLifecycle } from './ApprovalLifecycle';
|
|
|
|
/** Terminal error surfaced to a client still attached when its approval window lapses. */
|
|
const APPROVAL_EXPIRED_ERROR = 'Approval expired before a decision was made';
|
|
|
|
/** Error surfaced to any client still attached when a stale/hung job is reaped. */
|
|
const REAPED_JOB_ERROR = 'Generation timed out';
|
|
const OAUTH_TOOL_CALL_PREFIX = `oauth${Constants.mcp_delimiter}`;
|
|
|
|
function getToolCallName(toolCall: unknown): unknown {
|
|
return toolCall != null && typeof toolCall === 'object' && 'name' in toolCall
|
|
? toolCall.name
|
|
: undefined;
|
|
}
|
|
|
|
function hasOAuthToolCall(toolCalls: unknown): boolean {
|
|
return (
|
|
Array.isArray(toolCalls) &&
|
|
toolCalls.some((toolCall) => {
|
|
const name = getToolCallName(toolCall);
|
|
return typeof name === 'string' && name.startsWith(OAUTH_TOOL_CALL_PREFIX);
|
|
})
|
|
);
|
|
}
|
|
|
|
function getReplayStepId(event: t.ServerSentEvent): unknown {
|
|
if (!('event' in event) || !event.data || typeof event.data !== 'object') {
|
|
return undefined;
|
|
}
|
|
|
|
if (event.event === 'on_run_step' || event.event === 'on_run_step_delta') {
|
|
return 'id' in event.data ? event.data.id : undefined;
|
|
}
|
|
|
|
if (event.event === 'on_run_step_completed') {
|
|
const result = 'result' in event.data ? event.data.result : undefined;
|
|
return result != null && typeof result === 'object' && 'id' in result ? result.id : undefined;
|
|
}
|
|
|
|
return undefined;
|
|
}
|
|
|
|
function isOAuthReplayEvent(event: t.ServerSentEvent): boolean {
|
|
if (!('event' in event) || !event.data || typeof event.data !== 'object') {
|
|
return false;
|
|
}
|
|
|
|
if (event.event === 'on_run_step') {
|
|
const stepDetails = 'stepDetails' in event.data ? event.data.stepDetails : undefined;
|
|
return (
|
|
stepDetails != null &&
|
|
typeof stepDetails === 'object' &&
|
|
'tool_calls' in stepDetails &&
|
|
hasOAuthToolCall(stepDetails.tool_calls)
|
|
);
|
|
}
|
|
|
|
if (event.event === 'on_run_step_delta') {
|
|
const delta = 'delta' in event.data ? event.data.delta : undefined;
|
|
if (delta == null || typeof delta !== 'object') {
|
|
return false;
|
|
}
|
|
if (!('tool_calls' in delta) || !hasOAuthToolCall(delta.tool_calls)) {
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
if (event.event === 'on_run_step_completed') {
|
|
const result = 'result' in event.data ? event.data.result : undefined;
|
|
if (result == null || typeof result !== 'object' || !('tool_call' in result)) {
|
|
return false;
|
|
}
|
|
const name = getToolCallName(result.tool_call);
|
|
return typeof name === 'string' && name.startsWith(OAUTH_TOOL_CALL_PREFIX);
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* Configuration options for GenerationJobManager
|
|
*/
|
|
export interface GenerationJobManagerOptions {
|
|
jobStore?: IJobStore;
|
|
eventTransport?: IEventTransport;
|
|
/**
|
|
* If true, cleans up event transport immediately when job completes.
|
|
* If false, keeps EventEmitters until periodic cleanup for late reconnections.
|
|
* Default: true (immediate cleanup to save memory)
|
|
*/
|
|
cleanupOnComplete?: boolean;
|
|
}
|
|
|
|
/**
|
|
* Runtime state for active jobs - not serializable, kept in-memory per instance.
|
|
* Contains AbortController, ready promise, and other non-serializable state.
|
|
*
|
|
* @property abortController - Controller to abort the generation
|
|
* @property readyPromise - Resolves immediately (legacy, kept for API compatibility)
|
|
* @property resolveReady - Function to resolve readyPromise
|
|
* @property finalEvent - Cached final event for late subscribers
|
|
* @property errorEvent - Cached error event for late subscribers (errors before client connects)
|
|
* @property syncSent - Whether sync event was sent (reset when all subscribers leave)
|
|
* @property earlyEventBuffer - Buffer for events emitted before first subscriber connects
|
|
* @property hasSubscriber - Whether at least one subscriber has connected
|
|
* @property allSubscribersLeftHandlers - Internal handlers for disconnect events.
|
|
* These are stored separately from eventTransport subscribers to avoid being counted
|
|
* in subscriber count. This is critical: if these were registered via subscribe(),
|
|
* they would count as subscribers, causing isFirstSubscriber() to return false
|
|
* when the real client connects, which would prevent readyPromise from resolving.
|
|
*/
|
|
interface RuntimeJobState {
|
|
abortController: AbortController;
|
|
readyPromise: Promise<void>;
|
|
resolveReady: () => void;
|
|
finalEvent?: t.ServerSentEvent;
|
|
errorEvent?: string;
|
|
syncSent: boolean;
|
|
earlyEventBuffer: t.ServerSentEvent[];
|
|
hasSubscriber: boolean;
|
|
allSubscribersLeftHandlers?: Array<(...args: unknown[]) => void>;
|
|
}
|
|
|
|
/**
|
|
* Manages generation jobs for resumable LLM streams.
|
|
*
|
|
* Architecture: Composes two pluggable services via dependency injection:
|
|
* - jobStore: Job metadata + content state (InMemory → Redis for horizontal scaling)
|
|
* - eventTransport: Pub/sub events (InMemory → Redis Pub/Sub for horizontal scaling)
|
|
*
|
|
* Content state is tied to jobs:
|
|
* - In-memory: jobStore holds WeakRef to graph for live content/run steps access
|
|
* - Redis: jobStore persists chunks, reconstructs content on demand
|
|
*
|
|
* All storage methods are async to support both in-memory and external stores (Redis, etc.).
|
|
*
|
|
* @example Redis injection:
|
|
* ```ts
|
|
* const manager = new GenerationJobManagerClass({
|
|
* jobStore: new RedisJobStore(redisClient),
|
|
* eventTransport: new RedisPubSubTransport(redisClient),
|
|
* });
|
|
* ```
|
|
*/
|
|
class GenerationJobManagerClass {
|
|
/** Job metadata + content state storage - swappable for Redis, etc. */
|
|
private jobStore: IJobStore;
|
|
/** Guarded human-review lifecycle (pause / resolve / expire) over the store. */
|
|
private _approvals: ApprovalLifecycle;
|
|
/** Event pub/sub transport - swappable for Redis Pub/Sub, etc. */
|
|
private eventTransport: IEventTransport;
|
|
|
|
/** Runtime state - always in-memory, not serializable */
|
|
private runtimeState = new Map<string, RuntimeJobState>();
|
|
|
|
/** Jobs actively generating in this process. */
|
|
private runningJobs = new Set<string>();
|
|
|
|
/** Serializes replay-event read/modify/write updates per stream. */
|
|
private replayEventWriteQueues = new Map<string, Promise<void>>();
|
|
|
|
/** Serializes token-usage read/modify/write updates per stream. */
|
|
private tokenUsageWriteQueues = new Map<string, Promise<void>>();
|
|
|
|
private cleanupInterval: NodeJS.Timeout | null = null;
|
|
|
|
/** Whether we're using Redis stores */
|
|
private _isRedis = false;
|
|
|
|
/** Whether to cleanup event transport immediately on job completion */
|
|
private _cleanupOnComplete = true;
|
|
|
|
constructor(options?: GenerationJobManagerOptions) {
|
|
this.jobStore =
|
|
options?.jobStore ?? new InMemoryJobStore({ ttlAfterComplete: 0, maxJobs: 1000 });
|
|
this._approvals = new ApprovalLifecycle(this.jobStore);
|
|
this.eventTransport = options?.eventTransport ?? new InMemoryEventTransport();
|
|
this._cleanupOnComplete = options?.cleanupOnComplete ?? true;
|
|
}
|
|
|
|
/**
|
|
* Initialize the job manager with periodic cleanup.
|
|
* Call this once at application startup.
|
|
*/
|
|
initialize(): void {
|
|
if (this.cleanupInterval) {
|
|
return;
|
|
}
|
|
|
|
this.jobStore.initialize();
|
|
|
|
this.cleanupInterval = setInterval(() => {
|
|
this.cleanup();
|
|
}, 60000);
|
|
|
|
if (this.cleanupInterval.unref) {
|
|
this.cleanupInterval.unref();
|
|
}
|
|
|
|
logger.debug('[GenerationJobManager] Initialized');
|
|
}
|
|
|
|
/**
|
|
* Configure the manager with custom stores.
|
|
* Call this BEFORE initialize() to use Redis or other stores.
|
|
*
|
|
* @example Using Redis
|
|
* ```ts
|
|
* import { createStreamServicesFromCache } from '~/stream/createStreamServices';
|
|
* import { cacheConfig, ioredisClient } from '~/cache';
|
|
*
|
|
* const services = createStreamServicesFromCache({ cacheConfig, ioredisClient });
|
|
* GenerationJobManager.configure(services);
|
|
* GenerationJobManager.initialize();
|
|
* ```
|
|
*/
|
|
configure(services: {
|
|
jobStore: IJobStore;
|
|
eventTransport: IEventTransport;
|
|
isRedis?: boolean;
|
|
cleanupOnComplete?: boolean;
|
|
}): void {
|
|
const previousStore = this.storeLabel;
|
|
if (this.cleanupInterval) {
|
|
logger.warn(
|
|
'[GenerationJobManager] Reconfiguring after initialization - destroying existing services',
|
|
);
|
|
this.destroy();
|
|
}
|
|
|
|
this.runningJobs.clear();
|
|
setGenerationJobsInFlight(previousStore, 0);
|
|
|
|
this.jobStore = services.jobStore;
|
|
this._approvals = new ApprovalLifecycle(this.jobStore);
|
|
this.eventTransport = services.eventTransport;
|
|
this._isRedis = services.isRedis ?? false;
|
|
this._cleanupOnComplete = services.cleanupOnComplete ?? true;
|
|
this.syncRunningJobMetrics();
|
|
|
|
logger.info(
|
|
`[GenerationJobManager] Configured with ${this._isRedis ? 'Redis' : 'in-memory'} stores`,
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Check if using Redis stores.
|
|
*/
|
|
get isRedis(): boolean {
|
|
return this._isRedis;
|
|
}
|
|
|
|
private get storeLabel(): GenerationJobStore {
|
|
return this._isRedis ? 'redis' : 'memory';
|
|
}
|
|
|
|
private syncRunningJobMetrics(store: GenerationJobStore = this.storeLabel): void {
|
|
setGenerationJobsInFlight(store, this.runningJobs.size);
|
|
}
|
|
|
|
/**
|
|
* Get the job store instance (for advanced use cases).
|
|
*/
|
|
getJobStore(): IJobStore {
|
|
return this.jobStore;
|
|
}
|
|
|
|
/**
|
|
* Create a new generation job.
|
|
*
|
|
* This sets up:
|
|
* 1. Serializable job data in the job store
|
|
* 2. Runtime state including readyPromise (resolves when first SSE client connects)
|
|
* 3. allSubscribersLeft callback for handling client disconnections
|
|
*
|
|
* The readyPromise mechanism ensures generation doesn't start before the client
|
|
* is ready to receive events. The controller awaits this promise (with a short timeout)
|
|
* before starting LLM generation.
|
|
*
|
|
* @param streamId - Unique identifier for this stream
|
|
* @param userId - User who initiated the request
|
|
* @param conversationId - Optional conversation ID for lookup
|
|
* @returns A facade object for the GenerationJob
|
|
*/
|
|
async createJob(
|
|
streamId: string,
|
|
userId: string,
|
|
conversationId?: string,
|
|
): Promise<t.GenerationJob> {
|
|
const tenantId = getTenantId();
|
|
const safeTenantId = tenantId && tenantId !== SYSTEM_TENANT_ID ? tenantId : undefined;
|
|
const jobData = await this.jobStore.createJob(streamId, userId, conversationId, safeTenantId);
|
|
|
|
/**
|
|
* Create runtime state with readyPromise.
|
|
*
|
|
* With the resumable stream architecture, we no longer need to wait for the
|
|
* first subscriber before starting generation:
|
|
* - Redis mode: Events are persisted and can be replayed via sync
|
|
* - In-memory mode: Content is aggregated and sent via sync on connect
|
|
*
|
|
* We resolve readyPromise immediately to eliminate startup latency.
|
|
* The sync mechanism handles late-connecting clients.
|
|
*/
|
|
let resolveReady: () => void;
|
|
const readyPromise = new Promise<void>((resolve) => {
|
|
resolveReady = resolve;
|
|
});
|
|
|
|
const runtime: RuntimeJobState = {
|
|
abortController: new AbortController(),
|
|
readyPromise,
|
|
resolveReady: resolveReady!,
|
|
syncSent: false,
|
|
earlyEventBuffer: [],
|
|
hasSubscriber: false,
|
|
};
|
|
this.runtimeState.set(streamId, runtime);
|
|
this.runningJobs.add(streamId);
|
|
this.syncRunningJobMetrics();
|
|
recordGenerationJob(this.storeLabel, 'created');
|
|
|
|
// Resolve immediately - early event buffer handles late subscribers
|
|
resolveReady!();
|
|
|
|
/**
|
|
* Set up all-subscribers-left callback.
|
|
* When all SSE clients disconnect, this:
|
|
* 1. Resets syncSent so reconnecting clients get sync event (persisted to Redis)
|
|
* 2. Calls any registered allSubscribersLeft handlers (e.g., to save partial responses)
|
|
*/
|
|
this.eventTransport.onAllSubscribersLeft(streamId, () => {
|
|
const currentRuntime = this.runtimeState.get(streamId);
|
|
if (currentRuntime) {
|
|
currentRuntime.syncSent = false;
|
|
currentRuntime.hasSubscriber = false;
|
|
// Persist syncSent=false to Redis for cross-replica consistency
|
|
this.jobStore.updateJob(streamId, { syncSent: false }).catch((err) => {
|
|
logger.error(`[GenerationJobManager] Failed to persist syncSent=false:`, err);
|
|
});
|
|
// Call registered handlers (from job.emitter.on('allSubscribersLeft', ...))
|
|
if (currentRuntime.allSubscribersLeftHandlers) {
|
|
this.jobStore
|
|
.getContentParts(streamId)
|
|
.then((result) => {
|
|
const parts = result?.content ?? [];
|
|
for (const handler of currentRuntime.allSubscribersLeftHandlers ?? []) {
|
|
try {
|
|
handler(parts);
|
|
} catch (err) {
|
|
logger.error(`[GenerationJobManager] Error in allSubscribersLeft handler:`, err);
|
|
}
|
|
}
|
|
})
|
|
.catch((err) => {
|
|
logger.error(
|
|
`[GenerationJobManager] Failed to get content parts for allSubscribersLeft handlers:`,
|
|
err,
|
|
);
|
|
});
|
|
}
|
|
}
|
|
});
|
|
|
|
/**
|
|
* Set up cross-replica abort listener (Redis mode only).
|
|
* When abort is triggered on ANY replica, this replica receives the signal
|
|
* and aborts its local AbortController (if it's the one running generation).
|
|
*/
|
|
if (this.eventTransport.onAbort) {
|
|
this.eventTransport.onAbort(streamId, () => {
|
|
const currentRuntime = this.runtimeState.get(streamId);
|
|
if (currentRuntime && !currentRuntime.abortController.signal.aborted) {
|
|
logger.debug(`[GenerationJobManager] Received cross-replica abort for ${streamId}`);
|
|
currentRuntime.abortController.abort();
|
|
}
|
|
});
|
|
}
|
|
|
|
logger.debug(`[GenerationJobManager] Created job: ${streamId}`);
|
|
|
|
// Return facade for backwards compatibility
|
|
return this.buildJobFacade(streamId, jobData, runtime);
|
|
}
|
|
|
|
/**
|
|
* Build a GenerationJob facade from composed services.
|
|
*
|
|
* This facade provides a unified API (job.emitter, job.abortController, etc.)
|
|
* while internally delegating to the injected services (jobStore, eventTransport,
|
|
* contentState). This allows swapping implementations (e.g., Redis) without
|
|
* changing consumer code.
|
|
*
|
|
* IMPORTANT: The emitterProxy.on('allSubscribersLeft') handler registration
|
|
* does NOT use eventTransport.subscribe(). This is intentional:
|
|
*
|
|
* If we used subscribe() for internal handlers, those handlers would count
|
|
* as subscribers. When the real SSE client connects, isFirstSubscriber()
|
|
* would return false (because internal handler was "first"), and readyPromise
|
|
* would never resolve - causing a 5-second timeout delay before generation starts.
|
|
*
|
|
* Instead, allSubscribersLeft handlers are stored in runtime.allSubscribersLeftHandlers
|
|
* and called directly from the onAllSubscribersLeft callback in createJob().
|
|
*
|
|
* @param streamId - The stream identifier
|
|
* @param jobData - Serializable job metadata from job store
|
|
* @param runtime - Non-serializable runtime state (abort controller, promises, etc.)
|
|
* @returns A GenerationJob facade object
|
|
*/
|
|
private buildJobFacade(
|
|
streamId: string,
|
|
jobData: SerializableJobData,
|
|
runtime: RuntimeJobState,
|
|
): t.GenerationJob {
|
|
/**
|
|
* Proxy emitter that delegates to eventTransport for most operations.
|
|
* Exception: allSubscribersLeft handlers are stored separately to avoid
|
|
* incrementing subscriber count (see class JSDoc above).
|
|
*/
|
|
const emitterProxy = {
|
|
on: (event: string, handler: (...args: unknown[]) => void) => {
|
|
if (event === 'allSubscribersLeft') {
|
|
// Store handler for internal callback - don't use subscribe() to avoid counting as a subscriber
|
|
if (!runtime.allSubscribersLeftHandlers) {
|
|
runtime.allSubscribersLeftHandlers = [];
|
|
}
|
|
runtime.allSubscribersLeftHandlers.push(handler);
|
|
}
|
|
},
|
|
emit: () => {
|
|
/* handled via eventTransport */
|
|
},
|
|
listenerCount: () => this.eventTransport.getSubscriberCount(streamId),
|
|
setMaxListeners: () => {
|
|
/* no-op for proxy */
|
|
},
|
|
removeAllListeners: () => this.eventTransport.cleanup(streamId),
|
|
off: () => {
|
|
/* handled via unsubscribe */
|
|
},
|
|
};
|
|
|
|
return {
|
|
streamId,
|
|
emitter: emitterProxy as unknown as t.GenerationJob['emitter'],
|
|
status: jobData.status as t.GenerationJobStatus,
|
|
createdAt: jobData.createdAt,
|
|
completedAt: jobData.completedAt,
|
|
abortController: runtime.abortController,
|
|
error: jobData.error,
|
|
metadata: {
|
|
userId: jobData.userId,
|
|
tenantId: jobData.tenantId,
|
|
conversationId: jobData.conversationId,
|
|
userMessage: jobData.userMessage,
|
|
responseMessageId: jobData.responseMessageId,
|
|
sender: jobData.sender,
|
|
endpoint: jobData.endpoint,
|
|
iconURL: jobData.iconURL,
|
|
model: jobData.model,
|
|
promptTokens: jobData.promptTokens,
|
|
// Surface the originating agent so the resume route can refuse to rebuild a
|
|
// paused run on a different agent.
|
|
agent_id: jobData.agent_id,
|
|
// Surface whether the turn was temporary so a resume keeps it non-persisted.
|
|
isTemporary: jobData.isTemporary,
|
|
// Surface deferred tools discovered before the pause so the resume route can
|
|
// replay them into createRun (the rebuilt graph passes `messages: []`).
|
|
discoveredTools: jobData.discoveredTools,
|
|
// Surface the pending review so status/resume routes built on the
|
|
// facade can render the prompt for a `requires_action` job.
|
|
pendingAction: jobData.pendingAction,
|
|
},
|
|
readyPromise: runtime.readyPromise,
|
|
resolveReady: runtime.resolveReady,
|
|
finalEvent: runtime.finalEvent,
|
|
syncSent: runtime.syncSent,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Get or create runtime state for a job.
|
|
*
|
|
* This enables cross-replica support in Redis mode:
|
|
* - If runtime exists locally (same replica), return it
|
|
* - If job exists in Redis but not locally (cross-replica), create minimal runtime
|
|
*
|
|
* The lazily-created runtime state is sufficient for:
|
|
* - Subscribing to events (via Redis pub/sub)
|
|
* - Getting resume state
|
|
* - Handling reconnections
|
|
* - Receiving cross-replica abort signals (via Redis pub/sub)
|
|
*
|
|
* @param streamId - The stream identifier
|
|
* @returns Runtime state or null if job doesn't exist anywhere
|
|
*/
|
|
private async getOrCreateRuntimeState(streamId: string): Promise<RuntimeJobState | null> {
|
|
const existingRuntime = this.runtimeState.get(streamId);
|
|
if (existingRuntime) {
|
|
return existingRuntime;
|
|
}
|
|
|
|
// Job doesn't exist locally - check Redis
|
|
const jobData = await this.jobStore.getJob(streamId);
|
|
if (!jobData) {
|
|
return null;
|
|
}
|
|
|
|
// Cross-replica scenario: job exists in Redis but not locally
|
|
// Create minimal runtime state for handling reconnection/subscription
|
|
logger.debug(`[GenerationJobManager] Creating cross-replica runtime for ${streamId}`);
|
|
|
|
let resolveReady: () => void;
|
|
const readyPromise = new Promise<void>((resolve) => {
|
|
resolveReady = resolve;
|
|
});
|
|
|
|
// For jobs created on other replicas, readyPromise should be pre-resolved
|
|
// since generation has already started
|
|
resolveReady!();
|
|
|
|
// Parse finalEvent from Redis if available
|
|
let finalEvent: t.ServerSentEvent | undefined;
|
|
if (jobData.finalEvent) {
|
|
try {
|
|
finalEvent = JSON.parse(jobData.finalEvent) as t.ServerSentEvent;
|
|
} catch {
|
|
// Ignore parse errors
|
|
}
|
|
}
|
|
|
|
const runtime: RuntimeJobState = {
|
|
abortController: new AbortController(),
|
|
readyPromise,
|
|
resolveReady: resolveReady!,
|
|
syncSent: jobData.syncSent ?? false,
|
|
earlyEventBuffer: [],
|
|
hasSubscriber: false,
|
|
finalEvent,
|
|
errorEvent: jobData.error,
|
|
};
|
|
|
|
this.runtimeState.set(streamId, runtime);
|
|
|
|
// Set up all-subscribers-left callback for this replica
|
|
this.eventTransport.onAllSubscribersLeft(streamId, () => {
|
|
const currentRuntime = this.runtimeState.get(streamId);
|
|
if (currentRuntime) {
|
|
currentRuntime.syncSent = false;
|
|
currentRuntime.hasSubscriber = false;
|
|
// Persist syncSent=false to Redis
|
|
this.jobStore.updateJob(streamId, { syncSent: false }).catch((err) => {
|
|
logger.error(`[GenerationJobManager] Failed to persist syncSent=false:`, err);
|
|
});
|
|
// Call registered handlers
|
|
if (currentRuntime.allSubscribersLeftHandlers) {
|
|
this.jobStore
|
|
.getContentParts(streamId)
|
|
.then((result) => {
|
|
const parts = result?.content ?? [];
|
|
for (const handler of currentRuntime.allSubscribersLeftHandlers ?? []) {
|
|
try {
|
|
handler(parts);
|
|
} catch (err) {
|
|
logger.error(`[GenerationJobManager] Error in allSubscribersLeft handler:`, err);
|
|
}
|
|
}
|
|
})
|
|
.catch((err) => {
|
|
logger.error(
|
|
`[GenerationJobManager] Failed to get content parts for allSubscribersLeft handlers:`,
|
|
err,
|
|
);
|
|
});
|
|
}
|
|
}
|
|
});
|
|
|
|
// Set up cross-replica abort listener (Redis mode only)
|
|
// This ensures lazily-initialized jobs can receive abort signals
|
|
if (this.eventTransport.onAbort) {
|
|
this.eventTransport.onAbort(streamId, () => {
|
|
const currentRuntime = this.runtimeState.get(streamId);
|
|
if (currentRuntime && !currentRuntime.abortController.signal.aborted) {
|
|
logger.debug(
|
|
`[GenerationJobManager] Received cross-replica abort for lazily-init job ${streamId}`,
|
|
);
|
|
currentRuntime.abortController.abort();
|
|
}
|
|
});
|
|
}
|
|
|
|
return runtime;
|
|
}
|
|
|
|
/**
|
|
* Get a job by streamId.
|
|
*/
|
|
async getJob(streamId: string): Promise<t.GenerationJob | undefined> {
|
|
const jobData = await this.jobStore.getJob(streamId);
|
|
if (!jobData) {
|
|
return undefined;
|
|
}
|
|
|
|
const runtime = await this.getOrCreateRuntimeState(streamId);
|
|
if (!runtime) {
|
|
return undefined;
|
|
}
|
|
|
|
return this.buildJobFacade(streamId, jobData, runtime);
|
|
}
|
|
|
|
/**
|
|
* Check if a job exists.
|
|
*/
|
|
async hasJob(streamId: string): Promise<boolean> {
|
|
return this.jobStore.hasJob(streamId);
|
|
}
|
|
|
|
/**
|
|
* Get job status.
|
|
*/
|
|
async getJobStatus(streamId: string): Promise<t.GenerationJobStatus | undefined> {
|
|
const jobData = await this.jobStore.getJob(streamId);
|
|
return jobData?.status as t.GenerationJobStatus | undefined;
|
|
}
|
|
|
|
/**
|
|
* Mark job as complete.
|
|
* If cleanupOnComplete is true (default), immediately cleans up job resources.
|
|
* Exception: Jobs with errors are NOT immediately deleted to allow late-connecting
|
|
* clients to receive the error (race condition where error occurs before client connects).
|
|
* Note: eventTransport is NOT cleaned up here to allow the final event to be
|
|
* fully transmitted. It will be cleaned up when subscribers disconnect or
|
|
* by the periodic cleanup job.
|
|
*/
|
|
async completeJob(streamId: string, error?: string): Promise<void> {
|
|
const runtime = this.runtimeState.get(streamId);
|
|
|
|
// Abort the controller to signal all pending operations (e.g., OAuth flow monitors)
|
|
// that the job is done and they should clean up
|
|
if (runtime) {
|
|
runtime.abortController.abort();
|
|
}
|
|
|
|
// Clear content state and run step buffer (Redis only)
|
|
this.jobStore.clearContentState(streamId);
|
|
this.runStepBuffers?.delete(streamId);
|
|
this.replayEventWriteQueues.delete(streamId);
|
|
this.tokenUsageWriteQueues.delete(streamId);
|
|
|
|
// For error jobs, DON'T delete immediately - keep around so late-connecting
|
|
// clients can receive the error. This handles the race condition where error
|
|
// occurs before client connects to SSE stream.
|
|
//
|
|
// Cleanup strategy: Error jobs are cleaned up by periodic cleanup (every 60s)
|
|
// via jobStore.cleanup() which checks for jobs with status 'error' and
|
|
// completedAt set. The TTL is configurable via jobStore options (default: 0,
|
|
// meaning cleanup on next interval). This gives clients ~60s to connect and
|
|
// receive the error before the job is removed.
|
|
if (error) {
|
|
await this.jobStore.updateJob(streamId, {
|
|
status: 'error',
|
|
completedAt: Date.now(),
|
|
error,
|
|
});
|
|
this.runningJobs.delete(streamId);
|
|
this.syncRunningJobMetrics();
|
|
recordGenerationJob(this.storeLabel, 'error');
|
|
// Keep runtime state so subscribe() can access errorEvent
|
|
logger.debug(
|
|
`[GenerationJobManager] Job completed with error (keeping for late subscribers): ${streamId}`,
|
|
);
|
|
return;
|
|
}
|
|
|
|
// Immediate cleanup if configured (default: true) - only for successful completions
|
|
if (this._cleanupOnComplete) {
|
|
this.runtimeState.delete(streamId);
|
|
// Don't cleanup eventTransport here - let the done event fully transmit first.
|
|
// EventTransport will be cleaned up when subscribers disconnect or by periodic cleanup.
|
|
await this.jobStore.deleteJob(streamId);
|
|
} else {
|
|
// Only update status if keeping the job around
|
|
await this.jobStore.updateJob(streamId, {
|
|
status: 'complete',
|
|
completedAt: Date.now(),
|
|
});
|
|
}
|
|
|
|
this.runningJobs.delete(streamId);
|
|
this.syncRunningJobMetrics();
|
|
recordGenerationJob(this.storeLabel, 'completed');
|
|
logger.debug(`[GenerationJobManager] Job completed: ${streamId}`);
|
|
}
|
|
|
|
/**
|
|
* Abort a job (user-initiated).
|
|
* Returns all data needed for token spending and message saving.
|
|
*
|
|
* Cross-replica support (Redis mode):
|
|
* - Emits abort signal via Redis pub/sub
|
|
* - The replica running generation receives signal and aborts its AbortController
|
|
*/
|
|
async abortJob(streamId: string): Promise<AbortResult> {
|
|
const jobData = await this.jobStore.getJob(streamId);
|
|
const runtime = this.runtimeState.get(streamId);
|
|
|
|
if (!jobData) {
|
|
logger.warn(`[GenerationJobManager] Cannot abort - job not found: ${streamId}`);
|
|
recordGenerationJob(this.storeLabel, 'abort_failed');
|
|
return {
|
|
text: '',
|
|
content: [],
|
|
jobData: null,
|
|
success: false,
|
|
finalEvent: null,
|
|
collectedUsage: [],
|
|
};
|
|
}
|
|
|
|
// Emit abort signal for cross-replica support (Redis mode)
|
|
// This ensures the generating replica receives the abort signal
|
|
if (this.eventTransport.emitAbort) {
|
|
this.eventTransport.emitAbort(streamId);
|
|
}
|
|
|
|
// Also abort local controller if we have it (same-replica abort)
|
|
if (runtime) {
|
|
runtime.abortController.abort();
|
|
}
|
|
|
|
/** Content before clearing state */
|
|
const result = await this.jobStore.getContentParts(streamId);
|
|
const content = result?.content ?? [];
|
|
const abortContent = filterPersistableAbortContent(content);
|
|
const shouldPersistAbortContent = abortContent.length > 0;
|
|
|
|
/** Collected usage for all models */
|
|
const collectedUsage = this.jobStore.getCollectedUsage(streamId);
|
|
|
|
/** Text from content parts for fallback token counting */
|
|
const text = shouldPersistAbortContent
|
|
? parseTextParts(abortContent as TMessageContentParts[])
|
|
: '';
|
|
|
|
/** Detect "early abort" - aborted before any generation happened (e.g., during tool loading)
|
|
In this case, no messages were saved to DB, so frontend shouldn't navigate to conversation */
|
|
const isEarlyAbort = !shouldPersistAbortContent && jobData.createdEventEmitted !== true;
|
|
|
|
/** Final event for abort */
|
|
const userMessageId = jobData.userMessage?.messageId;
|
|
|
|
const abortFinalEvent: t.ServerSentEvent = {
|
|
final: true,
|
|
// Don't include conversation for early aborts - it doesn't exist in DB
|
|
conversation: isEarlyAbort ? null : { conversationId: jobData.conversationId },
|
|
title: 'New Chat',
|
|
requestMessage: jobData.userMessage
|
|
? {
|
|
messageId: userMessageId,
|
|
parentMessageId: jobData.userMessage.parentMessageId,
|
|
conversationId: jobData.conversationId,
|
|
text: jobData.userMessage.text ?? '',
|
|
quotes: jobData.userMessage.quotes,
|
|
isCreatedByUser: true,
|
|
}
|
|
: null,
|
|
responseMessage: isEarlyAbort
|
|
? null
|
|
: {
|
|
messageId: jobData.responseMessageId ?? `${userMessageId ?? 'aborted'}_`,
|
|
parentMessageId: userMessageId,
|
|
conversationId: jobData.conversationId,
|
|
content: abortContent,
|
|
sender: jobData.sender ?? 'AI',
|
|
endpoint: jobData.endpoint,
|
|
iconURL: jobData.iconURL,
|
|
model: jobData.model,
|
|
unfinished: true,
|
|
error: false,
|
|
isCreatedByUser: false,
|
|
},
|
|
aborted: true,
|
|
// Flag for early abort - no messages saved, frontend should go to new chat
|
|
earlyAbort: isEarlyAbort,
|
|
} satisfies t.FinalEvent as t.ServerSentEvent;
|
|
|
|
if (runtime) {
|
|
runtime.finalEvent = abortFinalEvent;
|
|
}
|
|
|
|
await this.eventTransport.emitDone(streamId, abortFinalEvent);
|
|
this.jobStore.clearContentState(streamId);
|
|
this.runStepBuffers?.delete(streamId);
|
|
this.replayEventWriteQueues.delete(streamId);
|
|
this.tokenUsageWriteQueues.delete(streamId);
|
|
|
|
// Immediate cleanup if configured (default: true)
|
|
if (this._cleanupOnComplete) {
|
|
this.runtimeState.delete(streamId);
|
|
// Don't cleanup eventTransport here - let the abort event fully transmit first.
|
|
await this.jobStore.deleteJob(streamId);
|
|
} else {
|
|
// Only update status if keeping the job around
|
|
await this.jobStore.updateJob(streamId, {
|
|
status: 'aborted',
|
|
completedAt: Date.now(),
|
|
});
|
|
}
|
|
|
|
this.runningJobs.delete(streamId);
|
|
this.syncRunningJobMetrics();
|
|
recordGenerationJob(this.storeLabel, 'aborted');
|
|
logger.debug(`[GenerationJobManager] Job aborted: ${streamId}`);
|
|
|
|
return {
|
|
success: true,
|
|
jobData,
|
|
content: abortContent,
|
|
finalEvent: abortFinalEvent,
|
|
text,
|
|
collectedUsage,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Subscribe to a job's event stream.
|
|
*
|
|
* This is called when an SSE client connects to /chat/stream/:streamId.
|
|
* On first subscription:
|
|
* - Resolves readyPromise (legacy, for API compatibility)
|
|
* - Replays any buffered early events (e.g., 'created' event)
|
|
*
|
|
* Supports cross-replica reconnection in Redis mode:
|
|
* - If job exists in Redis but not locally, creates minimal runtime state
|
|
* - Events are delivered via Redis pub/sub, not in-memory EventEmitter
|
|
*
|
|
* @param streamId - The stream to subscribe to
|
|
* @param onChunk - Handler for chunk events (streamed tokens, run steps, etc.)
|
|
* @param onDone - Handler for completion event (includes final message)
|
|
* @param onError - Handler for error events
|
|
* @param options - Subscription configuration
|
|
* @param options.skipBufferReplay - When true, skips replaying the earlyEventBuffer.
|
|
* Use this when a sync event was already sent (resume), since the sync's
|
|
* aggregatedContent already includes all buffered events.
|
|
* @returns Subscription object with unsubscribe function, or null if job not found
|
|
*/
|
|
async subscribe(
|
|
streamId: string,
|
|
onChunk: t.ChunkHandler,
|
|
onDone?: t.DoneHandler,
|
|
onError?: t.ErrorHandler,
|
|
options?: t.SubscribeOptions,
|
|
): Promise<{ unsubscribe: t.UnsubscribeFn } | null> {
|
|
const subscriptionType = options?.skipBufferReplay ? 'resume' : 'initial';
|
|
// Use lazy initialization to support cross-replica subscriptions
|
|
const runtime = await this.getOrCreateRuntimeState(streamId);
|
|
if (!runtime) {
|
|
recordGenerationStreamSubscription(this.storeLabel, subscriptionType, 'not_found');
|
|
return null;
|
|
}
|
|
|
|
const jobData = await this.jobStore.getJob(streamId);
|
|
|
|
// If job already complete/error, send final event or error
|
|
// Error status takes precedence to ensure errors aren't misreported as successes
|
|
setImmediate(() => {
|
|
if (jobData && ['complete', 'error', 'aborted'].includes(jobData.status)) {
|
|
// Check for error status FIRST and prioritize error handling
|
|
if (jobData.status === 'error' && (runtime.errorEvent || jobData.error)) {
|
|
const errorToSend = runtime.errorEvent ?? jobData.error;
|
|
if (errorToSend) {
|
|
logger.debug(
|
|
`[GenerationJobManager] Sending stored error to late subscriber: ${streamId}`,
|
|
);
|
|
onError?.(errorToSend);
|
|
}
|
|
} else if (runtime.finalEvent) {
|
|
onDone?.(runtime.finalEvent);
|
|
}
|
|
}
|
|
});
|
|
|
|
const subscription = this.eventTransport.subscribe(streamId, {
|
|
onChunk: (event) => {
|
|
const e = event as t.ServerSentEvent;
|
|
if (!(e as Record<string, unknown>)._internal) {
|
|
onChunk(e);
|
|
}
|
|
},
|
|
onDone: (event) => onDone?.(event as t.ServerSentEvent),
|
|
onError,
|
|
});
|
|
|
|
try {
|
|
if (subscription.ready) {
|
|
await subscription.ready;
|
|
}
|
|
recordGenerationStreamSubscription(this.storeLabel, subscriptionType, 'success');
|
|
} catch (err) {
|
|
recordGenerationStreamSubscription(this.storeLabel, subscriptionType, 'error');
|
|
throw err;
|
|
}
|
|
|
|
const isFirst = this.eventTransport.isFirstSubscriber(streamId);
|
|
|
|
if (!runtime.hasSubscriber) {
|
|
runtime.hasSubscriber = true;
|
|
|
|
/**
|
|
* Pass earlyReplayCount to syncReorderBuffer so it can prune duplicate pub/sub
|
|
* entries (seqs 0..count-1) without touching live in-flight chunks.
|
|
*
|
|
* Only set when the buffer was actually replayed — those specific seqs were
|
|
* delivered via onChunk and their pub/sub copies are duplicates.
|
|
* When skipBufferReplay is true, the resume sync payload delivers aggregated
|
|
* content up to the Redis counter, so syncReorderBuffer should trust currentSeq
|
|
* as the frontier (earlyReplayCount = 0).
|
|
*/
|
|
let earlyReplayCount = 0;
|
|
|
|
if (runtime.earlyEventBuffer.length > 0) {
|
|
if (options?.skipBufferReplay) {
|
|
logger.debug(
|
|
`[GenerationJobManager] Skipping ${runtime.earlyEventBuffer.length} buffered events for ${streamId} (skipBufferReplay)`,
|
|
);
|
|
} else {
|
|
earlyReplayCount = runtime.earlyEventBuffer.length;
|
|
logger.debug(
|
|
`[GenerationJobManager] Replaying ${earlyReplayCount} buffered events for ${streamId}`,
|
|
);
|
|
for (const bufferedEvent of runtime.earlyEventBuffer) {
|
|
onChunk(bufferedEvent);
|
|
}
|
|
}
|
|
runtime.earlyEventBuffer = [];
|
|
} else if (this._isRedis && !options?.skipBufferReplay && jobData?.userMessage) {
|
|
/**
|
|
* Cross-replica fallback: the created event was buffered on the generating
|
|
* instance and published via Redis pub/sub before this subscriber was active.
|
|
* Reconstruct from persisted metadata. Only fields stored by trackUserMessage()
|
|
* are available (messageId, parentMessageId, conversationId, text);
|
|
* sender/isCreatedByUser are invariant for user messages and added back here.
|
|
*/
|
|
logger.debug(
|
|
`[GenerationJobManager] Cross-replica subscribe: emitting created event from metadata for ${streamId}`,
|
|
);
|
|
const createdEvent: t.CreatedEvent = {
|
|
created: true,
|
|
message: {
|
|
...jobData.userMessage,
|
|
sender: 'User',
|
|
isCreatedByUser: true,
|
|
},
|
|
streamId,
|
|
};
|
|
onChunk(createdEvent);
|
|
}
|
|
|
|
try {
|
|
await this.eventTransport.syncReorderBuffer?.(streamId, earlyReplayCount);
|
|
} catch (err) {
|
|
logger.warn(
|
|
`[GenerationJobManager] Failed to sync reorder buffer for ${streamId}; proceeding with current nextSeq:`,
|
|
err,
|
|
);
|
|
}
|
|
}
|
|
|
|
if (isFirst) {
|
|
runtime.resolveReady();
|
|
logger.debug(
|
|
`[GenerationJobManager] First subscriber ready, resolving promise for ${streamId}`,
|
|
);
|
|
}
|
|
|
|
return subscription;
|
|
}
|
|
|
|
/**
|
|
* Atomic resume + subscribe: snapshots resume state and drains the early event buffer
|
|
* in one synchronous step, then subscribes with skipBufferReplay.
|
|
*
|
|
* Closes the timing gap between separate `getResumeState()` and `subscribe()` calls
|
|
* where events could arrive in earlyEventBuffer after the snapshot but before subscribe
|
|
* clears the buffer.
|
|
*
|
|
* In-memory mode: drained buffer events are returned as `pendingEvents` since
|
|
* they exist nowhere else. The caller must deliver them after the sync payload.
|
|
* Redis mode: `pendingEvents` is empty — chunks are persisted via appendChunk
|
|
* and will appear in aggregatedContent on the next resume.
|
|
*/
|
|
async subscribeWithResume(
|
|
streamId: string,
|
|
onChunk: t.ChunkHandler,
|
|
onDone?: t.DoneHandler,
|
|
onError?: t.ErrorHandler,
|
|
): Promise<t.SubscribeWithResumeResult> {
|
|
const bufferLengthAtSnapshot = !this._isRedis
|
|
? (this.runtimeState.get(streamId)?.earlyEventBuffer.length ?? 0)
|
|
: 0;
|
|
|
|
const resumeState = await this.getResumeState(streamId);
|
|
recordGenerationStreamSubscription(
|
|
this.storeLabel,
|
|
'resume_state',
|
|
resumeState ? 'found' : 'missing',
|
|
);
|
|
|
|
let pendingEvents: t.ServerSentEvent[] = [];
|
|
if (!this._isRedis) {
|
|
const runtime = this.runtimeState.get(streamId);
|
|
if (runtime) {
|
|
pendingEvents = runtime.earlyEventBuffer.slice(bufferLengthAtSnapshot);
|
|
runtime.earlyEventBuffer = [];
|
|
if (pendingEvents.length > 0) {
|
|
recordGenerationStreamResumePendingEvents(this.storeLabel, pendingEvents.length);
|
|
logger.debug(
|
|
`[GenerationJobManager] Captured ${pendingEvents.length} gap events for ${streamId}`,
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
const subscription = await this.subscribe(streamId, onChunk, onDone, onError, {
|
|
skipBufferReplay: true,
|
|
});
|
|
|
|
// Close the snapshot→subscribe race: getResumeState() snapshots BEFORE we attach the
|
|
// subscription, so a pause that becomes durable in that window is in neither
|
|
// resumeState.pendingAction nor (Redis mode) pendingEvents — and trackReplayEvent does
|
|
// not persist approval events — leaving the client attached to a paused job with no
|
|
// approval UI. Re-read the live job AFTER subscribing; if it is now requires_action and
|
|
// the snapshot didn't already carry the action, surface it as a pending event so the
|
|
// approval prompt renders. Idempotent: a pause landing AFTER attach is delivered live
|
|
// too, and the client's handler just sets the current action, so a duplicate is benign.
|
|
if (!resumeState?.pendingAction) {
|
|
const liveJob = await this.jobStore.getJob(streamId);
|
|
if (
|
|
liveJob?.status === 'requires_action' &&
|
|
liveJob.pendingAction != null &&
|
|
!isPendingActionStale(liveJob)
|
|
) {
|
|
pendingEvents = [
|
|
...pendingEvents,
|
|
{
|
|
event: ApprovalEvents.ON_PENDING_ACTION,
|
|
data: liveJob.pendingAction as unknown as Record<string, unknown>,
|
|
},
|
|
];
|
|
}
|
|
}
|
|
|
|
return { subscription, resumeState, pendingEvents };
|
|
}
|
|
|
|
/**
|
|
* Emit a chunk event to all subscribers.
|
|
* Uses runtime state check for performance (avoids async job store lookup per token).
|
|
*
|
|
* If no subscriber has connected yet, buffers the event for replay when they do.
|
|
* This ensures early events (like 'created') aren't lost due to race conditions.
|
|
*
|
|
* In Redis mode, awaits the publish to guarantee event ordering.
|
|
* This is critical for streaming deltas (tool args, message content) to arrive in order.
|
|
*/
|
|
async emitChunk(streamId: string, event: t.ServerSentEvent): Promise<void> {
|
|
const runtime = this.runtimeState.get(streamId);
|
|
if (!runtime || runtime.abortController.signal.aborted) {
|
|
return;
|
|
}
|
|
|
|
// Refresh job activity so the store's stale-job failsafe reaps on inactivity
|
|
// (a hung generation), not on age (a long but live stream). Parity with
|
|
// RedisJobStore refreshing the running TTL on each appendChunk.
|
|
this.jobStore.recordActivity?.(streamId);
|
|
|
|
await this.trackUserMessage(streamId, event);
|
|
await this.trackTitleEvent(streamId, event);
|
|
await this.trackReplayEvent(streamId, event);
|
|
await this.trackContextUsage(streamId, event);
|
|
await this.trackTokenUsage(streamId, event);
|
|
|
|
// For Redis mode, persist chunk for later reconstruction (fire-and-forget for resumability)
|
|
if (this._isRedis) {
|
|
// The SSE event structure is { event: string, data: unknown, ... }
|
|
// The aggregator expects { event: string, data: unknown } where data is the payload
|
|
const eventObj = event as Record<string, unknown>;
|
|
const eventType = eventObj.event as string | undefined;
|
|
const eventData = eventObj.data;
|
|
|
|
if (eventType && eventData !== undefined) {
|
|
// Store in format expected by aggregateContent: { event, data }
|
|
this.jobStore.appendChunk(streamId, { event: eventType, data: eventData }).catch((err) => {
|
|
logger.error(`[GenerationJobManager] Failed to append chunk:`, err);
|
|
});
|
|
|
|
// For run step events, also save to run steps key for quick retrieval
|
|
if (eventType === 'on_run_step' || eventType === 'on_run_step_completed') {
|
|
this.saveRunStepFromEvent(streamId, eventData as Record<string, unknown>);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!runtime.hasSubscriber) {
|
|
runtime.earlyEventBuffer.push(event);
|
|
if (!this._isRedis) {
|
|
return;
|
|
}
|
|
}
|
|
|
|
await this.eventTransport.emitChunk(streamId, event);
|
|
}
|
|
|
|
/**
|
|
* Extract and save run step from event data.
|
|
* The data is already the run step object from the event payload.
|
|
*/
|
|
private saveRunStepFromEvent(streamId: string, data: Record<string, unknown>): void {
|
|
// The data IS the run step object
|
|
const runStep = data as Agents.RunStep;
|
|
if (!runStep.id) {
|
|
return;
|
|
}
|
|
|
|
// Fire and forget - accumulate run steps
|
|
this.accumulateRunStep(streamId, runStep);
|
|
}
|
|
|
|
/**
|
|
* Accumulate run steps for a stream (Redis mode only).
|
|
* Uses a simple in-memory buffer that gets flushed to Redis.
|
|
* Not used in in-memory mode - run steps come from live graph via WeakRef.
|
|
*/
|
|
private runStepBuffers: Map<string, Agents.RunStep[]> | null = null;
|
|
|
|
private accumulateRunStep(streamId: string, runStep: Agents.RunStep): void {
|
|
// Lazy initialization - only create map when first used (Redis mode)
|
|
if (!this.runStepBuffers) {
|
|
this.runStepBuffers = new Map();
|
|
}
|
|
|
|
let buffer = this.runStepBuffers.get(streamId);
|
|
if (!buffer) {
|
|
buffer = [];
|
|
this.runStepBuffers.set(streamId, buffer);
|
|
}
|
|
|
|
// Update or add run step
|
|
const existingIdx = buffer.findIndex((rs) => rs.id === runStep.id);
|
|
if (existingIdx >= 0) {
|
|
buffer[existingIdx] = runStep;
|
|
} else {
|
|
buffer.push(runStep);
|
|
}
|
|
|
|
// Save to Redis
|
|
if (this.jobStore.saveRunSteps) {
|
|
this.jobStore.saveRunSteps(streamId, buffer).catch((err) => {
|
|
logger.error(`[GenerationJobManager] Failed to save run steps:`, err);
|
|
});
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Persist the last title event so resume sync can replay it. Content
|
|
* aggregation only reconstructs message parts, so UI-only events need their
|
|
* own metadata slot.
|
|
*/
|
|
private async trackTitleEvent(streamId: string, event: t.ServerSentEvent): Promise<void> {
|
|
if (!('event' in event) || event.event !== 'title') {
|
|
return;
|
|
}
|
|
|
|
await this.jobStore.updateJob(streamId, {
|
|
titleEvent: JSON.stringify(event),
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Persist the latest context usage snapshot (one per model call) so a
|
|
* resuming client can restore the context gauge without waiting for the
|
|
* next model call.
|
|
*/
|
|
private async trackContextUsage(streamId: string, event: t.ServerSentEvent): Promise<void> {
|
|
if (!('event' in event) || event.event !== UsageEvents.ON_CONTEXT_USAGE) {
|
|
return;
|
|
}
|
|
|
|
/** Share the token-usage queue so snapshot + usage writes are serialized per
|
|
* stream: `persistTokenUsage` reconciles the stored snapshot (read-modify-
|
|
* write), and a snapshot landing between its read and write — or a stale
|
|
* reconciled write landing after a newer snapshot — would clobber the newer
|
|
* run's gauge when visible calls interleave. FIFO ordering keeps each call's
|
|
* pre-invoke snapshot ahead of its own usage and behind the next snapshot. */
|
|
await this.queueJobWrite(this.tokenUsageWriteQueues, streamId, () =>
|
|
this.jobStore.updateJob(streamId, {
|
|
contextUsage: JSON.stringify((event as { data?: unknown }).data ?? null),
|
|
}),
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Chains a read/modify/write job update onto the stream's queue so
|
|
* concurrent writers can't clobber each other's merged state.
|
|
*/
|
|
private async queueJobWrite(
|
|
queues: Map<string, Promise<void>>,
|
|
streamId: string,
|
|
write: () => Promise<void>,
|
|
): Promise<void> {
|
|
const previousWrite = queues.get(streamId) ?? Promise.resolve();
|
|
const nextWrite = previousWrite
|
|
.catch(() => {
|
|
// Keep the queue moving even if a prior metadata write failed.
|
|
})
|
|
.then(write);
|
|
|
|
queues.set(streamId, nextWrite);
|
|
|
|
try {
|
|
await nextWrite;
|
|
} finally {
|
|
if (queues.get(streamId) === nextWrite) {
|
|
queues.delete(streamId);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Persist replay-only stream events that are needed to reconstruct active
|
|
* UI state on resume but are not represented by aggregated message content.
|
|
*/
|
|
private async trackReplayEvent(streamId: string, event: t.ServerSentEvent): Promise<void> {
|
|
if (!isOAuthReplayEvent(event)) {
|
|
return;
|
|
}
|
|
|
|
await this.queueJobWrite(this.replayEventWriteQueues, streamId, () =>
|
|
this.persistReplayEvent(streamId, event),
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Persist per-model-call token usage so resuming clients can rebuild
|
|
* usage totals on any replica (the live collectedUsage array only exists
|
|
* on the generating instance).
|
|
*/
|
|
private async trackTokenUsage(streamId: string, event: t.ServerSentEvent): Promise<void> {
|
|
if (!('event' in event) || event.event !== UsageEvents.ON_TOKEN_USAGE) {
|
|
return;
|
|
}
|
|
|
|
await this.queueJobWrite(this.tokenUsageWriteQueues, streamId, () =>
|
|
this.persistTokenUsage(streamId, event as { data?: unknown }),
|
|
);
|
|
}
|
|
|
|
private async persistTokenUsage(streamId: string, event: { data?: unknown }): Promise<void> {
|
|
const jobData = await this.jobStore.getJob(streamId);
|
|
if (!jobData || event.data == null) {
|
|
return;
|
|
}
|
|
|
|
let tokenUsage: unknown[] = [];
|
|
if (jobData.tokenUsage) {
|
|
try {
|
|
tokenUsage = JSON.parse(jobData.tokenUsage) as unknown[];
|
|
} catch {
|
|
tokenUsage = [];
|
|
}
|
|
}
|
|
tokenUsage.push(event.data);
|
|
|
|
const update: Partial<SerializableJobData> = { tokenUsage: JSON.stringify(tokenUsage) };
|
|
|
|
/** Reconcile the resume snapshot to this call's ACTUAL prompt tokens. A primary
|
|
* usage is the post-invoke truth for the call the latest stored snapshot
|
|
* precedes (no snapshot is captured between a call's pre-invoke dispatch and
|
|
* its usage), so a resuming client restores the real context instead of the
|
|
* calibration-inflated estimate — and a mid-call resume (no usage yet) simply
|
|
* keeps the raw snapshot rather than mis-applying an earlier call's tokens. */
|
|
const usage = event.data as TTokenUsageEvent;
|
|
if (usage.usage_type == null && jobData.contextUsage) {
|
|
try {
|
|
const snapshot = JSON.parse(jobData.contextUsage) as TContextUsageEvent | null;
|
|
if (
|
|
snapshot != null &&
|
|
(snapshot.runId == null || usage.runId == null || snapshot.runId === usage.runId)
|
|
) {
|
|
update.contextUsage = JSON.stringify(
|
|
reconcileContextUsage(snapshot, promptTokensFromUsage(usage)),
|
|
);
|
|
}
|
|
} catch {
|
|
/* leave the stored snapshot as-is on parse failure */
|
|
}
|
|
}
|
|
|
|
await this.jobStore.updateJob(streamId, update);
|
|
}
|
|
|
|
private async persistReplayEvent(streamId: string, event: t.ServerSentEvent): Promise<void> {
|
|
const jobData = await this.jobStore.getJob(streamId);
|
|
if (!jobData) {
|
|
return;
|
|
}
|
|
|
|
let replayEvents: t.ServerSentEvent[] = [];
|
|
if (jobData.replayEvents) {
|
|
try {
|
|
replayEvents = JSON.parse(jobData.replayEvents) as t.ServerSentEvent[];
|
|
} catch {
|
|
replayEvents = [];
|
|
}
|
|
}
|
|
|
|
const stepId = getReplayStepId(event);
|
|
const eventName = 'event' in event ? event.event : undefined;
|
|
const existingIndex =
|
|
stepId == null
|
|
? -1
|
|
: replayEvents.findIndex((candidate) => {
|
|
if (!('event' in candidate) || candidate.event !== eventName) {
|
|
return false;
|
|
}
|
|
return getReplayStepId(candidate) === stepId;
|
|
});
|
|
|
|
if (existingIndex >= 0) {
|
|
replayEvents[existingIndex] = event;
|
|
} else {
|
|
replayEvents.push(event);
|
|
}
|
|
|
|
await this.jobStore.updateJob(streamId, {
|
|
replayEvents: JSON.stringify(replayEvents),
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Persist user message metadata from the created event.
|
|
* Awaited in emitChunk so the HSET commits before the PUBLISH,
|
|
* guaranteeing any cross-replica getJob() after the pub/sub window
|
|
* finds userMessage in Redis.
|
|
*/
|
|
private async trackUserMessage(streamId: string, event: t.ServerSentEvent): Promise<void> {
|
|
if (!('created' in event)) {
|
|
return;
|
|
}
|
|
|
|
const { message } = event;
|
|
const extra = message as {
|
|
manualSkills?: string[];
|
|
alwaysAppliedSkills?: string[];
|
|
files?: unknown[];
|
|
};
|
|
const updates: Partial<SerializableJobData> = {
|
|
createdEventEmitted: true,
|
|
userMessage: {
|
|
messageId: message.messageId,
|
|
parentMessageId: message.parentMessageId,
|
|
conversationId: message.conversationId,
|
|
text: message.text,
|
|
quotes: message.quotes,
|
|
// Persist the turn's uploaded files so a HITL resume sources them from the job
|
|
// (this authoritative writer), not a user DB row whose save can still be racing
|
|
// the approval prompt.
|
|
...(Array.isArray(extra.files) && extra.files.length > 0 && { files: extra.files }),
|
|
// Carry skill selections so a HITL resume's reconstructed requestMessage keeps
|
|
// its pills — this is the authoritative writer of job.metadata.userMessage and
|
|
// would otherwise drop them (the emitted created message includes them).
|
|
...(Array.isArray(extra.manualSkills) &&
|
|
extra.manualSkills.length > 0 && { manualSkills: extra.manualSkills }),
|
|
...(Array.isArray(extra.alwaysAppliedSkills) &&
|
|
extra.alwaysAppliedSkills.length > 0 && {
|
|
alwaysAppliedSkills: extra.alwaysAppliedSkills,
|
|
}),
|
|
},
|
|
};
|
|
|
|
if (message.conversationId) {
|
|
updates.conversationId = message.conversationId;
|
|
}
|
|
|
|
await this.jobStore.updateJob(streamId, updates);
|
|
}
|
|
|
|
/**
|
|
* Update job metadata.
|
|
*/
|
|
async updateMetadata(
|
|
streamId: string,
|
|
metadata: Partial<t.GenerationJobMetadata>,
|
|
): Promise<void> {
|
|
const updates: Partial<SerializableJobData> = {};
|
|
if (metadata.responseMessageId) {
|
|
updates.responseMessageId = metadata.responseMessageId;
|
|
}
|
|
if (metadata.sender) {
|
|
updates.sender = metadata.sender;
|
|
}
|
|
if (metadata.conversationId) {
|
|
updates.conversationId = metadata.conversationId;
|
|
}
|
|
if (metadata.userMessage) {
|
|
updates.userMessage = metadata.userMessage;
|
|
}
|
|
if (metadata.endpoint) {
|
|
updates.endpoint = metadata.endpoint;
|
|
}
|
|
if (metadata.iconURL) {
|
|
updates.iconURL = metadata.iconURL;
|
|
}
|
|
if (metadata.model) {
|
|
updates.model = metadata.model;
|
|
}
|
|
if (metadata.agent_id) {
|
|
updates.agent_id = metadata.agent_id;
|
|
}
|
|
if (metadata.isTemporary !== undefined) {
|
|
updates.isTemporary = metadata.isTemporary;
|
|
}
|
|
if (metadata.promptTokens !== undefined) {
|
|
updates.promptTokens = metadata.promptTokens;
|
|
}
|
|
if (metadata.discoveredTools) {
|
|
updates.discoveredTools = metadata.discoveredTools;
|
|
}
|
|
await this.jobStore.updateJob(streamId, updates);
|
|
}
|
|
|
|
/**
|
|
* Set reference to the graph's contentParts array.
|
|
*/
|
|
setContentParts(streamId: string, contentParts: Agents.MessageContentComplex[]): void {
|
|
// Use runtime state check for performance (sync check)
|
|
if (!this.runtimeState.has(streamId)) {
|
|
return;
|
|
}
|
|
this.jobStore.setContentParts(streamId, contentParts);
|
|
}
|
|
|
|
/**
|
|
* Set reference to the collectedUsage array.
|
|
* This array accumulates token usage from all models during generation.
|
|
*/
|
|
setCollectedUsage(streamId: string, collectedUsage: UsageMetadata[]): void {
|
|
// Use runtime state check for performance (sync check)
|
|
if (!this.runtimeState.has(streamId)) {
|
|
return;
|
|
}
|
|
this.jobStore.setCollectedUsage(streamId, collectedUsage);
|
|
}
|
|
|
|
/**
|
|
* Set reference to the graph instance.
|
|
*/
|
|
setGraph(streamId: string, graph: StandardGraph): void {
|
|
// Use runtime state check for performance (sync check)
|
|
if (!this.runtimeState.has(streamId)) {
|
|
return;
|
|
}
|
|
this.jobStore.setGraph(streamId, graph);
|
|
}
|
|
|
|
/**
|
|
* The guarded human-review lifecycle for paused runs:
|
|
* `approvals.pause()` / `peek()` / `resolve()` / `expire()`.
|
|
*
|
|
* This is the seam approval routes, the status endpoint, and the run wiring
|
|
* cross — it owns the legal `requires_action` transitions and is race-safe
|
|
* against concurrent resumes (a double-resolve would otherwise drive the run
|
|
* twice). The job's chunks, run steps, and user-active-set membership are
|
|
* preserved across a pause so the resume path can rebuild context; the store
|
|
* refreshes the job-hash TTL to give the user the full window to respond.
|
|
*/
|
|
get approvals(): ApprovalLifecycle {
|
|
return this._approvals;
|
|
}
|
|
|
|
/**
|
|
* Get resume state for reconnecting clients.
|
|
*/
|
|
async getResumeState(streamId: string): Promise<t.ResumeState | null> {
|
|
const jobData = await this.jobStore.getJob(streamId);
|
|
if (!jobData) {
|
|
return null;
|
|
}
|
|
|
|
const result = await this.jobStore.getContentParts(streamId);
|
|
const aggregatedContent = result?.content ?? [];
|
|
const runSteps = await this.jobStore.getRunSteps(streamId);
|
|
let titleEvent: t.ResumeState['titleEvent'];
|
|
if (jobData.titleEvent) {
|
|
try {
|
|
titleEvent = JSON.parse(jobData.titleEvent) as t.ResumeState['titleEvent'];
|
|
} catch {
|
|
// Ignore malformed persisted title events.
|
|
}
|
|
}
|
|
let replayEvents: t.ResumeState['replayEvents'];
|
|
if (jobData.replayEvents) {
|
|
try {
|
|
replayEvents = JSON.parse(jobData.replayEvents) as t.ResumeState['replayEvents'];
|
|
} catch {
|
|
// Ignore malformed persisted replay events.
|
|
}
|
|
}
|
|
|
|
let contextUsage: t.ResumeState['contextUsage'];
|
|
if (jobData.contextUsage) {
|
|
try {
|
|
contextUsage = JSON.parse(jobData.contextUsage) as t.ResumeState['contextUsage'];
|
|
} catch {
|
|
// Ignore malformed persisted context usage.
|
|
}
|
|
}
|
|
|
|
/** Persisted per model call by trackTokenUsage — unlike the live
|
|
* collectedUsage reference, this survives cross-replica resumes. */
|
|
let collectedUsage: t.ResumeState['collectedUsage'];
|
|
if (jobData.tokenUsage) {
|
|
try {
|
|
const parsed = JSON.parse(jobData.tokenUsage) as t.ResumeState['collectedUsage'];
|
|
collectedUsage = parsed && parsed.length > 0 ? parsed : undefined;
|
|
} catch {
|
|
// Ignore malformed persisted token usage.
|
|
}
|
|
}
|
|
|
|
logger.debug(`[GenerationJobManager] getResumeState:`, {
|
|
streamId,
|
|
runStepsLength: runSteps.length,
|
|
aggregatedContentLength: aggregatedContent.length,
|
|
collectedUsageLength: collectedUsage?.length ?? 0,
|
|
});
|
|
|
|
return {
|
|
runSteps,
|
|
aggregatedContent,
|
|
userMessage: jobData.userMessage,
|
|
responseMessageId: jobData.responseMessageId,
|
|
conversationId: jobData.conversationId,
|
|
sender: jobData.sender,
|
|
iconURL: jobData.iconURL,
|
|
model: jobData.model,
|
|
titleEvent,
|
|
replayEvents,
|
|
collectedUsage,
|
|
contextUsage,
|
|
// Carry the live pending approval in the resume contract so a reloading /
|
|
// cross-replica client can rebuild the prompt from resumeState.
|
|
pendingAction:
|
|
jobData.status === 'requires_action' && !isPendingActionStale(jobData)
|
|
? jobData.pendingAction
|
|
: undefined,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Mark that sync has been sent.
|
|
* Persists to Redis for cross-replica consistency.
|
|
*/
|
|
markSyncSent(streamId: string): void {
|
|
const runtime = this.runtimeState.get(streamId);
|
|
if (runtime) {
|
|
runtime.syncSent = true;
|
|
}
|
|
// Persist to Redis for cross-replica consistency
|
|
this.jobStore.updateJob(streamId, { syncSent: true }).catch((err) => {
|
|
logger.error(`[GenerationJobManager] Failed to persist syncSent flag:`, err);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Check if sync has been sent.
|
|
* Checks local runtime first, then falls back to Redis for cross-replica scenarios.
|
|
*/
|
|
async wasSyncSent(streamId: string): Promise<boolean> {
|
|
const localSyncSent = this.runtimeState.get(streamId)?.syncSent;
|
|
if (localSyncSent !== undefined) {
|
|
return localSyncSent;
|
|
}
|
|
// Cross-replica: check Redis
|
|
const jobData = await this.jobStore.getJob(streamId);
|
|
return jobData?.syncSent ?? false;
|
|
}
|
|
|
|
/**
|
|
* Emit a done event.
|
|
* Persists finalEvent to Redis for cross-replica access.
|
|
*/
|
|
async emitDone(streamId: string, event: t.ServerSentEvent): Promise<void> {
|
|
const runtime = this.runtimeState.get(streamId);
|
|
if (runtime) {
|
|
runtime.finalEvent = event;
|
|
}
|
|
// Persist finalEvent to Redis for cross-replica consistency
|
|
this.jobStore.updateJob(streamId, { finalEvent: JSON.stringify(event) }).catch((err) => {
|
|
logger.error(`[GenerationJobManager] Failed to persist finalEvent:`, err);
|
|
});
|
|
await this.eventTransport.emitDone(streamId, event);
|
|
}
|
|
|
|
/**
|
|
* Emit an error event.
|
|
* Stores the error for late-connecting subscribers (race condition where error
|
|
* occurs before client connects to SSE stream).
|
|
*/
|
|
async emitError(streamId: string, error: string): Promise<void> {
|
|
const runtime = this.runtimeState.get(streamId);
|
|
if (runtime) {
|
|
runtime.errorEvent = error;
|
|
}
|
|
// Persist error to job store for cross-replica consistency
|
|
this.jobStore.updateJob(streamId, { error }).catch((err) => {
|
|
logger.error(`[GenerationJobManager] Failed to persist error:`, err);
|
|
});
|
|
await this.eventTransport.emitError(streamId, error);
|
|
}
|
|
|
|
/**
|
|
* Cleanup expired jobs.
|
|
* Also cleans up any orphaned runtime state, buffers, and event transport entries.
|
|
*/
|
|
/**
|
|
* Expire any locally-tracked approval whose window has lapsed: drive the atomic
|
|
* `requires_action → aborted` transition and, if this caller won it, emit a
|
|
* terminal error so a connected SSE client closes. Only streams this replica has
|
|
* runtime for are scanned — those are exactly the ones with a client subscribed
|
|
* here; a paused job on another replica is finalized by that replica's sweep (and
|
|
* the store's own cleanup). The durable checkpoint is reclaimed by its Mongo TTL
|
|
* index, which shares the approval window, so no cross-layer delete is needed here.
|
|
*/
|
|
/**
|
|
* Expire a single observed-stale pending approval NOW (immediate, not via the periodic
|
|
* sweep): run the `requires_action → aborted` CAS — pinned to `actionId` so a concurrent
|
|
* resolve + re-pause on a fresh action isn't aborted — and, on success, emit the terminal
|
|
* `APPROVAL_EXPIRED_ERROR` so any attached SSE client gets a terminal event instead of a
|
|
* hung stream. Used by the periodic sweeper and by the resume route, which observes a
|
|
* just-expired action when the user submits a decision after the TTL lapsed. Returns true
|
|
* if this call expired the action.
|
|
*/
|
|
async expireApproval(streamId: string, actionId?: string): Promise<boolean> {
|
|
const expired = await this._approvals.expire(streamId, actionId);
|
|
if (!expired) {
|
|
return false;
|
|
}
|
|
try {
|
|
await this.emitError(streamId, APPROVAL_EXPIRED_ERROR);
|
|
} catch (err) {
|
|
logger.error(`[GenerationJobManager] Failed to notify expired approval ${streamId}`, err);
|
|
}
|
|
this.runningJobs.delete(streamId);
|
|
return true;
|
|
}
|
|
|
|
private async expireStaleApprovals(): Promise<void> {
|
|
let changed = false;
|
|
for (const streamId of this.runtimeState.keys()) {
|
|
let job: SerializableJobData | null;
|
|
try {
|
|
job = await this.jobStore.getJob(streamId);
|
|
} catch (err) {
|
|
logger.error(
|
|
`[GenerationJobManager] Failed to read job during approval expiry sweep: ${streamId}`,
|
|
err,
|
|
);
|
|
continue;
|
|
}
|
|
// Loser-replica relay: in a multi-replica deployment another replica's store
|
|
// cleanup (`cleanupRequiresActionIndex`) can win the requires_action → aborted
|
|
// approval-expiry CAS — it sets the hash error but cannot emit (the store has no
|
|
// event transport). A client subscribed on THIS replica would then never get a
|
|
// terminal event until the reap path. If the job is already aborted *for approval
|
|
// expiry* and we haven't emitted here, relay the terminal error to our subscriber.
|
|
// The `errorEvent` flag (set by emitError) keeps this idempotent vs the win path.
|
|
const runtime = this.runtimeState.get(streamId);
|
|
if (
|
|
job?.status === 'aborted' &&
|
|
job.error === APPROVAL_EXPIRED_ERROR &&
|
|
!runtime?.errorEvent
|
|
) {
|
|
try {
|
|
await this.emitError(streamId, APPROVAL_EXPIRED_ERROR);
|
|
} catch (err) {
|
|
logger.error(`[GenerationJobManager] Failed to relay expired approval ${streamId}`, err);
|
|
}
|
|
changed = this.runningJobs.delete(streamId) || changed;
|
|
continue;
|
|
}
|
|
if (!job || job.status !== 'requires_action' || !isPendingActionExpired(job)) {
|
|
continue;
|
|
}
|
|
// Pass the OBSERVED action id so the expire CAS only fires for the action we read
|
|
// as stale. Between this read and the CAS, the user could resolve it and the run
|
|
// re-pause on a fresh action; without the id, the CAS (status-only) would abort
|
|
// that valid new pause and leave it terminal.
|
|
const didExpire = await this.expireApproval(streamId, job.pendingAction?.actionId);
|
|
if (!didExpire) {
|
|
continue;
|
|
}
|
|
changed = true;
|
|
logger.debug(`[GenerationJobManager] Expired pending approval: ${streamId}`);
|
|
}
|
|
if (changed) {
|
|
this.syncRunningJobMetrics();
|
|
}
|
|
}
|
|
|
|
private async cleanup(): Promise<void> {
|
|
// Finalize approvals whose window lapsed before the store's own cleanup, so a
|
|
// client still attached to a paused stream gets a terminal event instead of a
|
|
// connection that hangs open until it gives up.
|
|
await this.expireStaleApprovals();
|
|
|
|
const count = await this.jobStore.cleanup();
|
|
let runningJobsChanged = false;
|
|
|
|
// Cleanup runtime state for deleted jobs
|
|
for (const streamId of this.runtimeState.keys()) {
|
|
if (!(await this.jobStore.hasJob(streamId))) {
|
|
/**
|
|
* Abort any still-pending generation whose job has been reaped (e.g. a
|
|
* stale "running" job removed by the store's failsafe timeout). This
|
|
* unwinds the hung in-flight work so its client/graph references can be
|
|
* garbage collected, rather than leaking via the pending promise.
|
|
*/
|
|
const runtime = this.runtimeState.get(streamId);
|
|
if (runtime && !runtime.abortController.signal.aborted) {
|
|
runtime.abortController.abort();
|
|
}
|
|
// If a client is still attached when the job is reaped, send a terminal
|
|
// error first so the SSE connection closes instead of hanging open with no
|
|
// final/done event (the route only ends the response from onDone/onError).
|
|
if (this.eventTransport.getSubscriberCount(streamId) > 0) {
|
|
try {
|
|
await this.eventTransport.emitError(streamId, REAPED_JOB_ERROR);
|
|
} catch (err) {
|
|
logger.error(`[GenerationJobManager] Failed to notify reaped stream ${streamId}:`, err);
|
|
}
|
|
}
|
|
this.runtimeState.delete(streamId);
|
|
runningJobsChanged = this.runningJobs.delete(streamId) || runningJobsChanged;
|
|
this.runStepBuffers?.delete(streamId);
|
|
this.jobStore.clearContentState(streamId);
|
|
this.eventTransport.cleanup(streamId);
|
|
}
|
|
}
|
|
|
|
// Also check runStepBuffers for any orphaned entries (Redis mode only)
|
|
if (this.runStepBuffers) {
|
|
for (const streamId of this.runStepBuffers.keys()) {
|
|
if (!(await this.jobStore.hasJob(streamId))) {
|
|
this.runStepBuffers.delete(streamId);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Check eventTransport for orphaned streams (e.g., connections dropped without clean close)
|
|
// These are streams that exist in eventTransport but have no corresponding job
|
|
for (const streamId of this.eventTransport.getTrackedStreamIds()) {
|
|
if (!(await this.jobStore.hasJob(streamId)) && !this.runtimeState.has(streamId)) {
|
|
this.eventTransport.cleanup(streamId);
|
|
}
|
|
}
|
|
|
|
if (runningJobsChanged) {
|
|
this.syncRunningJobMetrics();
|
|
}
|
|
|
|
if (count > 0) {
|
|
logger.debug(`[GenerationJobManager] Cleaned up ${count} expired jobs`);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get stream info for status endpoint.
|
|
*/
|
|
async getStreamInfo(streamId: string): Promise<{
|
|
active: boolean;
|
|
status: t.GenerationJobStatus;
|
|
aggregatedContent?: Agents.MessageContentComplex[];
|
|
createdAt: number;
|
|
} | null> {
|
|
const jobData = await this.jobStore.getJob(streamId);
|
|
if (!jobData) {
|
|
return null;
|
|
}
|
|
|
|
const result = await this.jobStore.getContentParts(streamId);
|
|
const aggregatedContent = result?.content ?? [];
|
|
|
|
return {
|
|
active: jobData.status === 'running',
|
|
status: jobData.status as t.GenerationJobStatus,
|
|
aggregatedContent,
|
|
createdAt: jobData.createdAt,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Get total job count.
|
|
*/
|
|
async getJobCount(): Promise<number> {
|
|
return this.jobStore.getJobCount();
|
|
}
|
|
|
|
/** Returns sizes of internal runtime maps for diagnostics */
|
|
getRuntimeStats(): {
|
|
runtimeStateSize: number;
|
|
runStepBufferSize: number;
|
|
eventTransportStreams: number;
|
|
} {
|
|
return {
|
|
runtimeStateSize: this.runtimeState.size,
|
|
runStepBufferSize: this.runStepBuffers?.size ?? 0,
|
|
eventTransportStreams: this.eventTransport.getTrackedStreamIds().length,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Get job count by status.
|
|
*/
|
|
async getJobCountByStatus(): Promise<Record<t.GenerationJobStatus, number>> {
|
|
const [running, complete, error, aborted, requires_action] = await Promise.all([
|
|
this.jobStore.getJobCountByStatus('running'),
|
|
this.jobStore.getJobCountByStatus('complete'),
|
|
this.jobStore.getJobCountByStatus('error'),
|
|
this.jobStore.getJobCountByStatus('aborted'),
|
|
this.jobStore.getJobCountByStatus('requires_action'),
|
|
]);
|
|
return { running, complete, error, aborted, requires_action };
|
|
}
|
|
|
|
/**
|
|
* Get active job IDs for a user.
|
|
* Returns conversation IDs of running jobs belonging to the user.
|
|
* Performs self-healing cleanup of stale entries.
|
|
*
|
|
* @param userId - The user ID to query
|
|
* @returns Array of conversation IDs with active jobs
|
|
*/
|
|
async getActiveJobIdsForUser(userId: string, tenantId?: string): Promise<string[]> {
|
|
return this.jobStore.getActiveJobIdsByUser(userId, tenantId);
|
|
}
|
|
|
|
/**
|
|
* Destroy the manager.
|
|
* Cleans up all resources including runtime state, buffers, and stores.
|
|
*/
|
|
async destroy(): Promise<void> {
|
|
if (this.cleanupInterval) {
|
|
clearInterval(this.cleanupInterval);
|
|
this.cleanupInterval = null;
|
|
}
|
|
|
|
await this.jobStore.destroy();
|
|
this.eventTransport.destroy();
|
|
this.runtimeState.clear();
|
|
this.runningJobs.clear();
|
|
this.syncRunningJobMetrics();
|
|
this.runStepBuffers?.clear();
|
|
this.replayEventWriteQueues.clear();
|
|
this.tokenUsageWriteQueues.clear();
|
|
|
|
logger.debug('[GenerationJobManager] Destroyed');
|
|
}
|
|
}
|
|
|
|
export const GenerationJobManager: GenerationJobManagerClass = new GenerationJobManagerClass();
|
|
export { GenerationJobManagerClass };
|