From 089ba09f9837e60a5bf192a150eea9c4b0b5401a Mon Sep 17 00:00:00 2001 From: Danny Avila Date: Tue, 16 Jun 2026 11:27:18 -0400 Subject: [PATCH] =?UTF-8?q?=F0=9F=8F=9B=EF=B8=8F=20refactor:=20Deepen=20HI?= =?UTF-8?q?TL=20approval=20lifecycle=20into=20one=20race-safe=20seam?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Architecture-review candidate #1 (+ #4). The requires_action lifecycle was three shallow pass-throughs over updateJob with the legal transitions smeared across JSDoc, the JobStatus union, and each store adapter — and the resume transition was NOT race-safe: the Redis lua checked existence, not status, so two concurrent approval submits both drove the run (re-executing tools / double-billing). - IJobStore.transitionStatus: atomic compare-and-set status transition that only fires if the job is currently `from`. InMemory: sync compare. Redis: single-node lua with a status guard (cluster best-effort, matching the existing posture); reconciles membership sets + TTLs to `to`. - New ApprovalLifecycle module: pause / peek / resolve / expire — guarded, race-safe transitions behind one interface. resolve() returns true to exactly one concurrent caller; the previously-undefined requires_action → aborted expiry edge is now explicit; peek treats past-expiresAt as gone (lazy expiry). - GenerationJobManager exposes `approvals` and delegates; the three shallow methods (mark/get/clearPendingAction) are removed — callers cross the deep interface. - #4: typeContract.spec asserts the SDK <-> data-provider HITL types stay compatible (fails the build on drift); RedisJobStore validates the pendingAction shape on deserialize instead of a bare JSON.parse (defends the cold-resume path against malformed/stale records). - Tests rewritten at the deep interface: double-resolve wins once, pause-on-terminal rejected, explicit expiry, lazy-expiry peek. No Slice B wiring — this deepens the existing scaffolding so the future resume route and run seam are born crossing one race-safe interface. --- .../api/src/agents/hitl/typeContract.spec.ts | 44 +++++ packages/api/src/stream/ApprovalLifecycle.ts | 109 +++++++++++ .../api/src/stream/GenerationJobManager.ts | 58 ++---- .../stream/__tests__/pendingAction.spec.ts | 182 +++++++++++++----- .../implementations/InMemoryJobStore.ts | 22 +++ .../stream/implementations/RedisJobStore.ts | 135 ++++++++++++- .../api/src/stream/interfaces/IJobStore.ts | 36 ++++ 7 files changed, 491 insertions(+), 95 deletions(-) create mode 100644 packages/api/src/agents/hitl/typeContract.spec.ts create mode 100644 packages/api/src/stream/ApprovalLifecycle.ts diff --git a/packages/api/src/agents/hitl/typeContract.spec.ts b/packages/api/src/agents/hitl/typeContract.spec.ts new file mode 100644 index 0000000000..419eefb39a --- /dev/null +++ b/packages/api/src/agents/hitl/typeContract.spec.ts @@ -0,0 +1,44 @@ +import type { + HumanInterruptPayload as SdkHumanInterruptPayload, + ToolApprovalRequest as SdkToolApprovalRequest, + ToolApprovalDecisionType as SdkToolApprovalDecisionType, +} from '@librechat/agents'; +import type { Agents } from 'librechat-data-provider'; + +/** + * Compile-time contract between the SDK's HITL wire types and LibreChat's + * `Agents.*` mirror in `librechat-data-provider`. The mirror is hand-maintained + * (data-provider can't depend on `@librechat/agents`), so these assignability + * checks are the seam that fails the build when the two drift. + * + * The assertions live inside the function signatures: each `accept*` function's + * parameter type forces TypeScript to prove assignability at compile time. If + * the SDK adds a field the mirror lacks (or a decision literal changes), this + * file stops compiling — caught here instead of silently dropped on the Redis + * round-trip. The runtime `expect`s exist only so Jest sees real tests. + */ +describe('HITL type contract: @librechat/agents ↔ librechat-data-provider', () => { + test('the SDK interrupt payload is persistable as the LC mirror', () => { + // Direction that matters most: `Run.getInterrupt()` returns the SDK payload, + // which `approvals.pause()` persists as `Agents.PendingAction.payload`. + // Losing a field here = silent data loss across the pause/resume boundary. + const acceptLcPayload = (p: Agents.HumanInterruptPayload): Agents.HumanInterruptType => p.type; + const fromSdk = (p: SdkHumanInterruptPayload) => acceptLcPayload(p); + expect(typeof fromSdk).toBe('function'); + }); + + test('the SDK action request is persistable as the LC mirror', () => { + const acceptLcRequest = (r: Agents.ToolApprovalRequest): string => r.tool_call_id; + const fromSdk = (r: SdkToolApprovalRequest) => acceptLcRequest(r); + expect(typeof fromSdk).toBe('function'); + }); + + test('decision-type literals match in both directions (resume input contract)', () => { + // What an approval route sends to `run.resume()` must be a valid SDK + // decision, and the LC mirror must enumerate exactly the SDK's literals. + const lcToSdk = (d: Agents.ToolApprovalDecisionType): SdkToolApprovalDecisionType => d; + const sdkToLc = (d: SdkToolApprovalDecisionType): Agents.ToolApprovalDecisionType => d; + expect(typeof lcToSdk).toBe('function'); + expect(typeof sdkToLc).toBe('function'); + }); +}); diff --git a/packages/api/src/stream/ApprovalLifecycle.ts b/packages/api/src/stream/ApprovalLifecycle.ts new file mode 100644 index 0000000000..bd2162f797 --- /dev/null +++ b/packages/api/src/stream/ApprovalLifecycle.ts @@ -0,0 +1,109 @@ +import { logger } from '@librechat/data-schemas'; +import type { Agents } from 'librechat-data-provider'; +import type { IJobStore } from '~/stream/interfaces/IJobStore'; + +/** + * The guarded lifecycle of a run paused for human review (`requires_action`). + * + * Owns the legal transitions — pause, resolve, expire — behind one interface, + * on top of the store's atomic {@link IJobStore.transitionStatus}. Callers + * (approval routes, the status endpoint, the run seam) cross this seam instead + * of re-implementing the "is this transition legal from the current state, and + * is it safe under a concurrent second submit" logic at each site. + * + * Race-safety is the point. Two approval clicks racing to resume the same job + * must not both drive the run — a double-drive re-executes tools and + * double-bills. {@link resolve} returns `true` to exactly one caller; the loser + * gets `false`. The same guard protects {@link pause} (don't pause a job that + * was aborted between the interrupt firing and the mark) and {@link expire}. + * + * State machine: + * ``` + * running ──pause(pendingAction)──▶ requires_action + * requires_action ──resolve()──────▶ running + * requires_action ──expire()───────▶ aborted (the edge that was undefined) + * ``` + */ +export class ApprovalLifecycle { + constructor(private readonly store: IJobStore) {} + + /** + * `running → requires_action`, attaching the pending review record. + * Returns `false` when the job was not running (aborted mid-flight, gone), + * so a late interrupt is dropped rather than pausing a dead job. + */ + async pause(streamId: string, pendingAction: Agents.PendingAction): Promise { + const ok = await this.store.transitionStatus(streamId, { + from: 'running', + to: 'requires_action', + patch: { pendingAction }, + }); + if (ok) { + logger.debug( + `[ApprovalLifecycle] paused for review: ${streamId} action=${pendingAction.actionId}`, + ); + } + return ok; + } + + /** + * The pending review record, or `null` when the job isn't awaiting review. + * A past-`expiresAt` record reads as `null` (lazy expiry) so a stale prompt + * is never surfaced to a UI or fed to a resume. + */ + async peek(streamId: string): Promise { + const job = await this.store.getJob(streamId); + if (!job || job.status !== 'requires_action' || !job.pendingAction) { + return null; + } + return this.isExpired(job.pendingAction) ? null : job.pendingAction; + } + + /** + * `requires_action → running`, atomically. Returns `true` to the single + * caller that won the transition; `false` if the job was not paused, was + * already resumed by a racing submit, or had expired — in which case it is + * moved to a terminal state instead of resumed. + * + * The caller MUST treat `false` as "do not drive the run": only the `true` + * winner may re-enter the agent. + */ + async resolve(streamId: string): Promise { + const job = await this.store.getJob(streamId); + if ( + job?.status === 'requires_action' && + job.pendingAction && + this.isExpired(job.pendingAction) + ) { + await this.expire(streamId); + return false; + } + return this.store.transitionStatus(streamId, { + from: 'requires_action', + to: 'running', + clear: ['pendingAction'], + }); + } + + /** + * `requires_action → aborted`: the edge that fires when no decision arrives + * in time. Previously undefined; now an explicit, idempotent terminal + * transition. Returns `true` to the single caller that expired it. + */ + async expire(streamId: string): Promise { + const ok = await this.store.transitionStatus(streamId, { + from: 'requires_action', + to: 'aborted', + clear: ['pendingAction'], + patch: { error: 'Approval expired before a decision was made' }, + }); + if (ok) { + logger.debug(`[ApprovalLifecycle] expired pending review: ${streamId}`); + } + return ok; + } + + private isExpired(pendingAction: Agents.PendingAction): boolean { + return pendingAction.expiresAt != null && pendingAction.expiresAt <= Date.now(); + } +} diff --git a/packages/api/src/stream/GenerationJobManager.ts b/packages/api/src/stream/GenerationJobManager.ts index 2f95011ba5..20de0c33cb 100644 --- a/packages/api/src/stream/GenerationJobManager.ts +++ b/packages/api/src/stream/GenerationJobManager.ts @@ -20,6 +20,7 @@ import { import { InMemoryEventTransport } from './implementations/InMemoryEventTransport'; import { InMemoryJobStore } from './implementations/InMemoryJobStore'; import { filterPersistableAbortContent } from './abortContent'; +import { ApprovalLifecycle } from './ApprovalLifecycle'; /** Error surfaced to any client still attached when a stale/hung job is reaped. */ const REAPED_JOB_ERROR = 'Generation timed out'; @@ -165,6 +166,8 @@ interface RuntimeJobState { class GenerationJobManagerClass { /** Job metadata + content state storage - swappable for Redis, etc. */ private jobStore: IJobStore; + /** Guarded human-review lifecycle (pause / resolve / expire) over the store. */ + private _approvals: ApprovalLifecycle; /** Event pub/sub transport - swappable for Redis Pub/Sub, etc. */ private eventTransport: IEventTransport; @@ -191,6 +194,7 @@ class GenerationJobManagerClass { constructor(options?: GenerationJobManagerOptions) { this.jobStore = options?.jobStore ?? new InMemoryJobStore({ ttlAfterComplete: 0, maxJobs: 1000 }); + this._approvals = new ApprovalLifecycle(this.jobStore); this.eventTransport = options?.eventTransport ?? new InMemoryEventTransport(); this._cleanupOnComplete = options?.cleanupOnComplete ?? true; } @@ -249,6 +253,7 @@ class GenerationJobManagerClass { setGenerationJobsInFlight(previousStore, 0); this.jobStore = services.jobStore; + this._approvals = new ApprovalLifecycle(this.jobStore); this.eventTransport = services.eventTransport; this._isRedis = services.isRedis ?? false; this._cleanupOnComplete = services.cleanupOnComplete ?? true; @@ -1386,51 +1391,18 @@ class GenerationJobManagerClass { } /** - * Transition a job to `requires_action` and persist the pending review record. + * The guarded human-review lifecycle for paused runs: + * `approvals.pause()` / `peek()` / `resolve()` / `expire()`. * - * The job is NOT cleaned up: chunks, run steps, and user-active-set membership - * remain so the resume path can rebuild context. The Redis job-hash TTL is - * refreshed by the store to give the user the full TTL window to respond. - * - * @param streamId - The stream identifier - * @param pendingAction - The pending review record (tool approval, etc.) + * This is the seam approval routes, the status endpoint, and the run wiring + * cross — it owns the legal `requires_action` transitions and is race-safe + * against concurrent resumes (a double-resolve would otherwise drive the run + * twice). The job's chunks, run steps, and user-active-set membership are + * preserved across a pause so the resume path can rebuild context; the store + * refreshes the job-hash TTL to give the user the full window to respond. */ - async markRequiresAction(streamId: string, pendingAction: Agents.PendingAction): Promise { - await this.jobStore.updateJob(streamId, { - status: 'requires_action', - pendingAction, - }); - logger.debug( - `[GenerationJobManager] Job awaiting human review: ${streamId} action=${pendingAction.actionId}`, - ); - } - - /** - * Read the pending review record for a job. - * - * Returns null when the job doesn't exist, isn't in `requires_action`, - * or has no recorded pending action. Callers (status endpoint, approval routes) - * should treat null as "nothing to approve." - */ - async getPendingAction(streamId: string): Promise { - const jobData = await this.jobStore.getJob(streamId); - if (!jobData || jobData.status !== 'requires_action') { - return null; - } - return jobData.pendingAction ?? null; - } - - /** - * Clear the pending review record and return the job to `running`. - * Called by the resume path after a user approval/rejection has been accepted - * and the run is about to be re-driven. - */ - async clearPendingAction(streamId: string): Promise { - await this.jobStore.updateJob(streamId, { - status: 'running', - pendingAction: undefined, - }); - logger.debug(`[GenerationJobManager] Cleared pending action: ${streamId}`); + get approvals(): ApprovalLifecycle { + return this._approvals; } /** diff --git a/packages/api/src/stream/__tests__/pendingAction.spec.ts b/packages/api/src/stream/__tests__/pendingAction.spec.ts index ddf80657a4..4c547eee3c 100644 --- a/packages/api/src/stream/__tests__/pendingAction.spec.ts +++ b/packages/api/src/stream/__tests__/pendingAction.spec.ts @@ -6,7 +6,7 @@ import { GenerationJobManagerClass } from '~/stream/GenerationJobManager'; jest.spyOn(console, 'log').mockImplementation(); -describe('GenerationJobManager pending-action lifecycle (in-memory)', () => { +describe('ApprovalLifecycle via GenerationJobManager.approvals (in-memory)', () => { let manager: GenerationJobManagerClass; beforeEach(() => { @@ -37,76 +37,156 @@ describe('GenerationJobManager pending-action lifecycle (in-memory)', () => { return { ...action, ...overrides }; } - test('markRequiresAction persists the pending action and transitions status', async () => { - const streamId = 'stream-mark'; - await manager.createJob(streamId, 'user-1'); + describe('pause', () => { + test('running → requires_action, persisting the pending record', async () => { + const streamId = 'stream-pause'; + await manager.createJob(streamId, 'user-1'); - const action = buildAction(streamId); - await manager.markRequiresAction(streamId, action); + const action = buildAction(streamId); + expect(await manager.approvals.pause(streamId, action)).toBe(true); - const status = await manager.getJobStatus(streamId); - expect(status).toBe('requires_action'); + expect(await manager.getJobStatus(streamId)).toBe('requires_action'); + const pending = await manager.approvals.peek(streamId); + expect(pending?.actionId).toBe(action.actionId); + expect(pending?.payload.type).toBe('tool_approval'); + if (pending?.payload.type === 'tool_approval') { + expect(pending.payload.action_requests[0].name).toBe('shell'); + } + }); - const pending = await manager.getPendingAction(streamId); - expect(pending).not.toBeNull(); - expect(pending?.actionId).toBe(action.actionId); - expect(pending?.payload.type).toBe('tool_approval'); - if (pending?.payload.type === 'tool_approval') { - expect(pending.payload.action_requests[0].name).toBe('shell'); - } + test('returns false when the job is already terminal', async () => { + const streamId = 'stream-pause-dead'; + await manager.createJob(streamId, 'user-1'); + await manager.completeJob(streamId, 'terminated mid-flight'); + + expect(await manager.approvals.pause(streamId, buildAction(streamId))).toBe(false); + // a late interrupt must NOT resurrect a terminal job into requires_action + expect(await manager.getJobStatus(streamId)).not.toBe('requires_action'); + }); + + test('returns false when the job does not exist', async () => { + expect(await manager.approvals.pause('nonexistent', buildAction('nonexistent'))).toBe(false); + }); }); - test('getPendingAction returns null for jobs not in requires_action', async () => { - const streamId = 'stream-running'; - await manager.createJob(streamId, 'user-1'); - expect(await manager.getPendingAction(streamId)).toBeNull(); + describe('peek', () => { + test('returns null for jobs not in requires_action', async () => { + const streamId = 'stream-running'; + await manager.createJob(streamId, 'user-1'); + expect(await manager.approvals.peek(streamId)).toBeNull(); + }); + + test('returns null when the job does not exist', async () => { + expect(await manager.approvals.peek('nonexistent')).toBeNull(); + }); + + test('treats a past-expiresAt record as gone (lazy expiry)', async () => { + const streamId = 'stream-expired-peek'; + await manager.createJob(streamId, 'user-1'); + await manager.approvals.pause( + streamId, + buildAction(streamId, { expiresAt: Date.now() - 1000 }), + ); + + expect(await manager.approvals.peek(streamId)).toBeNull(); + }); }); - test('getPendingAction returns null when the job does not exist', async () => { - expect(await manager.getPendingAction('nonexistent')).toBeNull(); + describe('resolve', () => { + test('requires_action → running, clearing the record, returns true once', async () => { + const streamId = 'stream-resolve'; + await manager.createJob(streamId, 'user-1'); + await manager.approvals.pause(streamId, buildAction(streamId)); + + expect(await manager.approvals.resolve(streamId)).toBe(true); + expect(await manager.getJobStatus(streamId)).toBe('running'); + expect(await manager.approvals.peek(streamId)).toBeNull(); + }); + + test('a concurrent double-resolve wins exactly once (race-safe)', async () => { + const streamId = 'stream-double-resolve'; + await manager.createJob(streamId, 'user-1'); + await manager.approvals.pause(streamId, buildAction(streamId)); + + const results = await Promise.all([ + manager.approvals.resolve(streamId), + manager.approvals.resolve(streamId), + ]); + + // Exactly one caller may drive the run — the other must be rejected. + expect(results.filter(Boolean)).toHaveLength(1); + expect(await manager.getJobStatus(streamId)).toBe('running'); + }); + + test('returns false when the job is not paused', async () => { + const streamId = 'stream-resolve-running'; + await manager.createJob(streamId, 'user-1'); + expect(await manager.approvals.resolve(streamId)).toBe(false); + }); + + test('an expired pending action expires instead of resuming', async () => { + const streamId = 'stream-resolve-expired'; + await manager.createJob(streamId, 'user-1'); + await manager.approvals.pause( + streamId, + buildAction(streamId, { expiresAt: Date.now() - 1000 }), + ); + + expect(await manager.approvals.resolve(streamId)).toBe(false); + expect(await manager.getJobStatus(streamId)).toBe('aborted'); + }); }); - test('clearPendingAction returns the job to running and removes the pending record', async () => { - const streamId = 'stream-clear'; - await manager.createJob(streamId, 'user-1'); + describe('expire', () => { + test('requires_action → aborted, clearing the record, returns true once', async () => { + const streamId = 'stream-expire'; + await manager.createJob(streamId, 'user-1'); + await manager.approvals.pause(streamId, buildAction(streamId)); - await manager.markRequiresAction(streamId, buildAction(streamId)); - expect(await manager.getJobStatus(streamId)).toBe('requires_action'); + expect(await manager.approvals.expire(streamId)).toBe(true); + expect(await manager.getJobStatus(streamId)).toBe('aborted'); + expect(await manager.approvals.peek(streamId)).toBeNull(); - await manager.clearPendingAction(streamId); + // idempotent — a second expire does not fire again + expect(await manager.approvals.expire(streamId)).toBe(false); + }); - expect(await manager.getJobStatus(streamId)).toBe('running'); - expect(await manager.getPendingAction(streamId)).toBeNull(); + test('returns false when the job is not paused', async () => { + const streamId = 'stream-expire-running'; + await manager.createJob(streamId, 'user-1'); + expect(await manager.approvals.expire(streamId)).toBe(false); + }); }); - test('requires_action drops the running count but keeps the user-active set', async () => { - const streamId = 'stream-counts'; - await manager.createJob(streamId, 'user-counts'); + describe('facade integration', () => { + test('requires_action drops the running count but keeps the user-active set', async () => { + const streamId = 'stream-counts'; + await manager.createJob(streamId, 'user-counts'); - const beforeCounts = await manager.getJobCountByStatus(); - expect(beforeCounts.running).toBe(1); - expect(beforeCounts.requires_action).toBe(0); + const before = await manager.getJobCountByStatus(); + expect(before.running).toBe(1); + expect(before.requires_action).toBe(0); - await manager.markRequiresAction(streamId, buildAction(streamId)); + await manager.approvals.pause(streamId, buildAction(streamId)); - const afterCounts = await manager.getJobCountByStatus(); - expect(afterCounts.running).toBe(0); - expect(afterCounts.requires_action).toBe(1); + const after = await manager.getJobCountByStatus(); + expect(after.running).toBe(0); + expect(after.requires_action).toBe(1); - // Pending-approval jobs still occupy the user's conversation slot. - const active = await manager.getActiveJobIdsForUser('user-counts'); - expect(active).toContain(streamId); - }); + // Pending-approval jobs still occupy the user's conversation slot. + expect(await manager.getActiveJobIdsForUser('user-counts')).toContain(streamId); + }); - test('getActiveJobIdsForUser excludes terminal jobs but includes requires_action', async () => { - await manager.createJob('s-running', 'user-mix'); - await manager.createJob('s-paused', 'user-mix'); - await manager.createJob('s-done', 'user-mix'); + test('getActiveJobIdsForUser excludes terminal jobs but includes requires_action', async () => { + await manager.createJob('s-running', 'user-mix'); + await manager.createJob('s-paused', 'user-mix'); + await manager.createJob('s-done', 'user-mix'); - await manager.markRequiresAction('s-paused', buildAction('s-paused')); - await manager.completeJob('s-done'); + await manager.approvals.pause('s-paused', buildAction('s-paused')); + await manager.completeJob('s-done'); - const active = await manager.getActiveJobIdsForUser('user-mix'); - expect(active.sort()).toEqual(['s-paused', 's-running']); + const active = await manager.getActiveJobIdsForUser('user-mix'); + expect(active.sort()).toEqual(['s-paused', 's-running']); + }); }); }); diff --git a/packages/api/src/stream/implementations/InMemoryJobStore.ts b/packages/api/src/stream/implementations/InMemoryJobStore.ts index 4567b55ae6..cfdbab1b72 100644 --- a/packages/api/src/stream/implementations/InMemoryJobStore.ts +++ b/packages/api/src/stream/implementations/InMemoryJobStore.ts @@ -6,6 +6,7 @@ import type { UsageMetadata, IJobStore, JobStatus, + JobStatusTransition, } from '~/stream/interfaces/IJobStore'; /** @@ -137,6 +138,27 @@ export class InMemoryJobStore implements IJobStore { Object.assign(job, updates); } + /** + * Atomic in-memory: the single-threaded event loop makes the + * read-check-write sequence indivisible, so the status guard is exact. + * Membership/counts derive from `job.status` directly, so there are no + * sets to reconcile here. + */ + async transitionStatus(streamId: string, args: JobStatusTransition): Promise { + const job = this.jobs.get(streamId); + if (!job || job.status !== args.from) { + return false; + } + job.status = args.to; + if (args.patch) { + Object.assign(job, args.patch); + } + for (const field of args.clear ?? []) { + delete job[field]; + } + return true; + } + async deleteJob(streamId: string): Promise { this.jobs.delete(streamId); this.contentState.delete(streamId); diff --git a/packages/api/src/stream/implementations/RedisJobStore.ts b/packages/api/src/stream/implementations/RedisJobStore.ts index f0bda4fdc4..764626651a 100644 --- a/packages/api/src/stream/implementations/RedisJobStore.ts +++ b/packages/api/src/stream/implementations/RedisJobStore.ts @@ -8,8 +8,40 @@ import type { UsageMetadata, IJobStore, JobStatus, + JobStatusTransition, } from '~/stream/interfaces/IJobStore'; +/** + * Atomic compare-and-set status transition (single-node / sentinel Redis). + * + * Guards on the current `status` field, then — in one indivisible step — + * removes `clear` fields, writes the `status`+patch pairs, reconciles the + * membership sets, and refreshes TTLs. Returns 1 if it fired, 0 if the job + * was missing or no longer in the expected `from` status. + * + * KEYS: [job, remSet | "", addSet | "", chunks, runSteps] + * ARGV: [from, member, ttl, refreshLive(0|1), hdelCount, ...hdel, ...hsetPairs] + */ +const TRANSITION_STATUS_LUA = + 'if redis.call("HGET", KEYS[1], "status") ~= ARGV[1] then return 0 end ' + + 'local member = ARGV[2] ' + + 'local ttl = tonumber(ARGV[3]) ' + + 'local refreshLive = ARGV[4] ' + + 'local hdelCount = tonumber(ARGV[5]) ' + + 'local idx = 6 ' + + 'for i = 1, hdelCount do redis.call("HDEL", KEYS[1], ARGV[idx]) idx = idx + 1 end ' + + 'local hset = {} ' + + 'for i = idx, #ARGV do hset[#hset + 1] = ARGV[i] end ' + + 'if #hset > 0 then redis.call("HSET", KEYS[1], unpack(hset)) end ' + + 'if KEYS[2] ~= "" then redis.call("SREM", KEYS[2], member) end ' + + 'if KEYS[3] ~= "" then redis.call("SADD", KEYS[3], member) end ' + + 'redis.call("EXPIRE", KEYS[1], ttl) ' + + 'if refreshLive == "1" then redis.call("EXPIRE", KEYS[4], ttl) redis.call("EXPIRE", KEYS[5], ttl) end ' + + 'return 1'; + +/** Decision kinds the SDK can emit, used to sanity-check persisted records. */ +const KNOWN_INTERRUPT_TYPES = new Set(['tool_approval', 'ask_user_question']); + /** * Key prefixes for Redis storage. * All keys include the streamId for easy cleanup. @@ -276,6 +308,80 @@ export class RedisJobStore implements IJobStore { } } + /** The membership set a status belongs to; terminal statuses have none. */ + private statusSetKey(status: JobStatus): string | null { + if (status === 'running') { + return KEYS.runningJobs; + } + if (status === 'requires_action') { + return KEYS.requiresActionJobs; + } + return null; + } + + async transitionStatus(streamId: string, args: JobStatusTransition): Promise { + const { from, to, patch, clear } = args; + const key = KEYS.job(streamId); + + // status + patch become HSET pairs; serializeJob skips undefined, so + // cleared fields go through HDEL (`clear`) instead. + const fields = Object.entries( + this.serializeJob({ status: to, ...(patch ?? {}) } as SerializableJobData), + ).flat(); + const clearFields = (clear ?? []).map(String); + + const remSet = this.statusSetKey(from); + const addSet = this.statusSetKey(to); + const terminal = addSet === null; + const ttl = terminal ? this.ttl.completed : this.ttl.running; + + if (this.isCluster) { + // Membership sets live on a different slot from the job hash, so a single + // cross-slot script isn't possible. Guard best-effort (read status, then + // apply) — matches the existing cluster posture for status writes. + const current = await this.redis.hget(key, 'status'); + if (current !== from) { + return false; + } + if (clearFields.length > 0) { + await this.redis.hdel(key, ...clearFields); + } + if (fields.length > 0) { + await this.updateExistingJobHash(key, fields); + } + if (remSet) { + await this.redis.srem(remSet, streamId); + } + if (addSet) { + await this.redis.sadd(addSet, streamId); + } + await this.redis.expire(key, ttl); + if (!terminal) { + await this.redis.expire(KEYS.chunks(streamId), ttl); + await this.redis.expire(KEYS.runSteps(streamId), ttl); + } + return true; + } + + const result = await this.redis.eval( + TRANSITION_STATUS_LUA, + 5, + key, + remSet ?? '', + addSet ?? '', + KEYS.chunks(streamId), + KEYS.runSteps(streamId), + from, + streamId, + String(ttl), + terminal ? '0' : '1', + String(clearFields.length), + ...clearFields, + ...fields, + ); + return result === 1; + } + private async updateExistingJobHash(key: string, fields: string[]): Promise { const updated = await this.redis.eval( 'if redis.call("EXISTS", KEYS[1]) == 1 then redis.call("HSET", KEYS[1], unpack(ARGV)) return 1 else return 0 end', @@ -1104,7 +1210,34 @@ export class RedisJobStore implements IJobStore { replayEvents: data.replayEvents || undefined, contextUsage: data.contextUsage || undefined, tokenUsage: data.tokenUsage || undefined, - pendingAction: data.pendingAction ? JSON.parse(data.pendingAction) : undefined, + pendingAction: this.parsePendingAction(data.pendingAction), }; } + + /** + * Parse a persisted `pendingAction`, defending the cold-resume path against + * malformed or stale records: a corrupt JSON blob or a payload whose shape + * predates the current SDK contract is dropped (logged) rather than crashing + * the resume or feeding a bad record to an approval route. Returns undefined + * when absent/invalid. + */ + private parsePendingAction(raw: string | undefined): Agents.PendingAction | undefined { + if (!raw) { + return undefined; + } + try { + const parsed = JSON.parse(raw) as Agents.PendingAction; + const typeOk = + typeof parsed?.actionId === 'string' && + KNOWN_INTERRUPT_TYPES.has(parsed?.payload?.type as string); + if (!typeOk) { + logger.warn('[RedisJobStore] Dropping malformed pendingAction record'); + return undefined; + } + return parsed; + } catch { + logger.warn('[RedisJobStore] Dropping unparseable pendingAction record'); + return undefined; + } + } } diff --git a/packages/api/src/stream/interfaces/IJobStore.ts b/packages/api/src/stream/interfaces/IJobStore.ts index 1bbea5c2dd..4ae0c3b96a 100644 --- a/packages/api/src/stream/interfaces/IJobStore.ts +++ b/packages/api/src/stream/interfaces/IJobStore.ts @@ -71,6 +71,20 @@ export interface SerializableJobData { pendingAction?: Agents.PendingAction; } +/** + * Arguments for an atomic {@link IJobStore.transitionStatus} compare-and-set. + */ +export interface JobStatusTransition { + /** Only fire the transition if the job is currently in this status. */ + from: JobStatus; + /** Status to move to when the `from` guard holds. */ + to: JobStatus; + /** Fields written in the same atomic step as the status change. */ + patch?: Partial; + /** Field names removed in the same atomic step (e.g. `pendingAction`). */ + clear?: Array; +} + /** * Usage metadata for token spending across different LLM providers. * @@ -211,6 +225,28 @@ export interface IJobStore { /** Update job data */ updateJob(streamId: string, updates: Partial): Promise; + /** + * Atomically transition a job's status, **only if** it is currently `from`. + * Returns `true` when the transition fired, `false` when the job was missing + * or no longer in `from` (lost a race / illegal transition). + * + * `patch` fields are written and `clear` fields removed in the same atomic + * step, and the running / requires_action membership sets plus live-key TTLs + * are reconciled to match `to`. This is the race-safe primitive behind the + * approval lifecycle — it prevents two concurrent resumes from both driving a + * paused run (a double-drive would re-execute tools / double-bill). + * + * Distinct from {@link updateJob}, which writes status unconditionally for + * callers that don't know the prior state. Reach for `transitionStatus` + * whenever the legal prior state is known. + * + * Atomicity: fully atomic on in-memory and single-node / sentinel Redis + * (Lua). On Redis Cluster the status guard is best-effort — the membership + * sets live on a different hash slot from the job hash — matching the store's + * existing cluster posture for status writes. + */ + transitionStatus(streamId: string, args: JobStatusTransition): Promise; + /** Delete a job */ deleteJob(streamId: string): Promise;