diff --git a/packages/api/src/agents/hitl/policy.ts b/packages/api/src/agents/hitl/policy.ts index fb5fdefbcc..bfe0543c7c 100644 --- a/packages/api/src/agents/hitl/policy.ts +++ b/packages/api/src/agents/hitl/policy.ts @@ -117,6 +117,10 @@ export interface PendingActionContext { ttlMs?: number; /** Override actionId; defaults to a fresh uuid. */ actionId?: string; + /** SDK interrupt id (`RunInterruptResult.interruptId`) for cross-process resume. */ + interruptId?: string; + /** LangGraph `thread_id` (`RunInterruptResult.threadId`) for cross-process resume. */ + threadId?: string; } /** @@ -140,5 +144,7 @@ export function buildPendingAction( payload, createdAt, expiresAt: typeof ctx.ttlMs === 'number' ? createdAt + ctx.ttlMs : undefined, + interruptId: ctx.interruptId, + threadId: ctx.threadId, }; } diff --git a/packages/api/src/stream/ApprovalLifecycle.ts b/packages/api/src/stream/ApprovalLifecycle.ts index bd2162f797..859493da67 100644 --- a/packages/api/src/stream/ApprovalLifecycle.ts +++ b/packages/api/src/stream/ApprovalLifecycle.ts @@ -36,7 +36,8 @@ export class ApprovalLifecycle { const ok = await this.store.transitionStatus(streamId, { from: 'running', to: 'requires_action', - patch: { pendingAction }, + // pendingActionId is the flat mirror the atomic resolve/expire guard on. + patch: { pendingAction, pendingActionId: pendingAction.actionId }, }); if (ok) { logger.debug( @@ -62,40 +63,54 @@ export class ApprovalLifecycle { /** * `requires_action → running`, atomically. Returns `true` to the single * caller that won the transition; `false` if the job was not paused, was - * already resumed by a racing submit, or had expired — in which case it is - * moved to a terminal state instead of resumed. + * already resumed by a racing submit, no longer matches `expectedActionId`, + * or had expired — in which case it is moved to a terminal state instead of + * resumed. + * + * Pass `expectedActionId` (the id the user actually decided on, from the + * approval route) so a stale decision can't resume a job that has since + * paused for a *different* action. Omit it only for callers with no specific + * action in hand. * * The caller MUST treat `false` as "do not drive the run": only the `true` * winner may re-enter the agent. */ - async resolve(streamId: string): Promise { + async resolve(streamId: string, expectedActionId?: string): Promise { const job = await this.store.getJob(streamId); if ( job?.status === 'requires_action' && job.pendingAction && this.isExpired(job.pendingAction) ) { - await this.expire(streamId); + await this.expire(streamId, expectedActionId); return false; } return this.store.transitionStatus(streamId, { from: 'requires_action', to: 'running', - clear: ['pendingAction'], + clear: ['pendingAction', 'pendingActionId'], + // Refresh the liveness basis so a long-paused run isn't reaped as stale + // immediately after resuming (cleanup keys off lastActiveAt). + patch: { lastActiveAt: Date.now() }, + expectActionId: expectedActionId, }); } /** * `requires_action → aborted`: the edge that fires when no decision arrives * in time. Previously undefined; now an explicit, idempotent terminal - * transition. Returns `true` to the single caller that expired it. + * transition. Returns `true` to the single caller that expired it. Honors + * `expectedActionId` for the same stale-decision protection as `resolve`. */ - async expire(streamId: string): Promise { + async expire(streamId: string, expectedActionId?: string): Promise { const ok = await this.store.transitionStatus(streamId, { from: 'requires_action', to: 'aborted', - clear: ['pendingAction'], - patch: { error: 'Approval expired before a decision was made' }, + clear: ['pendingAction', 'pendingActionId'], + // completedAt lets the stores' terminal-cleanup reclaim the job; without + // it an expired approval lingers in the in-memory map indefinitely. + patch: { error: 'Approval expired before a decision was made', completedAt: Date.now() }, + expectActionId: expectedActionId, }); if (ok) { logger.debug(`[ApprovalLifecycle] expired pending review: ${streamId}`); diff --git a/packages/api/src/stream/GenerationJobManager.ts b/packages/api/src/stream/GenerationJobManager.ts index 20de0c33cb..e145d7ed17 100644 --- a/packages/api/src/stream/GenerationJobManager.ts +++ b/packages/api/src/stream/GenerationJobManager.ts @@ -480,6 +480,9 @@ class GenerationJobManagerClass { iconURL: jobData.iconURL, model: jobData.model, promptTokens: jobData.promptTokens, + // Surface the pending review so status/resume routes built on the + // facade can render the prompt for a `requires_action` job. + pendingAction: jobData.pendingAction, }, readyPromise: runtime.readyPromise, resolveReady: runtime.resolveReady, diff --git a/packages/api/src/stream/__tests__/pendingAction.spec.ts b/packages/api/src/stream/__tests__/pendingAction.spec.ts index 4c547eee3c..bc3cc8bec7 100644 --- a/packages/api/src/stream/__tests__/pendingAction.spec.ts +++ b/packages/api/src/stream/__tests__/pendingAction.spec.ts @@ -124,6 +124,21 @@ describe('ApprovalLifecycle via GenerationJobManager.approvals (in-memory)', () expect(await manager.approvals.resolve(streamId)).toBe(false); }); + test('rejects a resolve whose actionId no longer matches (stale-decision guard)', async () => { + const streamId = 'stream-stale-action'; + await manager.createJob(streamId, 'user-1'); + const action = buildAction(streamId); + await manager.approvals.pause(streamId, action); + + // A decision targeting a different action must not resume this one. + expect(await manager.approvals.resolve(streamId, 'some-other-action-id')).toBe(false); + expect(await manager.getJobStatus(streamId)).toBe('requires_action'); + + // The matching actionId resolves it. + expect(await manager.approvals.resolve(streamId, action.actionId)).toBe(true); + expect(await manager.getJobStatus(streamId)).toBe('running'); + }); + test('an expired pending action expires instead of resuming', async () => { const streamId = 'stream-resolve-expired'; await manager.createJob(streamId, 'user-1'); @@ -156,6 +171,17 @@ describe('ApprovalLifecycle via GenerationJobManager.approvals (in-memory)', () await manager.createJob(streamId, 'user-1'); expect(await manager.approvals.expire(streamId)).toBe(false); }); + + test('sets completedAt so terminal cleanup can reclaim the job', async () => { + const streamId = 'stream-expire-completed'; + await manager.createJob(streamId, 'user-1'); + await manager.approvals.pause(streamId, buildAction(streamId)); + + expect(await manager.approvals.expire(streamId)).toBe(true); + const job = await manager.getJob(streamId); + expect(job?.status).toBe('aborted'); + expect(job?.completedAt).toBeGreaterThan(0); + }); }); describe('facade integration', () => { diff --git a/packages/api/src/stream/implementations/InMemoryJobStore.ts b/packages/api/src/stream/implementations/InMemoryJobStore.ts index cfdbab1b72..fa77e911c4 100644 --- a/packages/api/src/stream/implementations/InMemoryJobStore.ts +++ b/packages/api/src/stream/implementations/InMemoryJobStore.ts @@ -149,6 +149,9 @@ export class InMemoryJobStore implements IJobStore { if (!job || job.status !== args.from) { return false; } + if (args.expectActionId != null && job.pendingActionId !== args.expectActionId) { + return false; + } job.status = args.to; if (args.patch) { Object.assign(job, args.patch); @@ -210,7 +213,7 @@ export class InMemoryJobStore implements IJobStore { // content state in memory until the process OOMs. Reaping keys off last // activity (not creation time) so a long but live stream is never reaped, // mirroring RedisJobStore refreshing the running TTL on each chunk. - const lastActive = this.lastActivity.get(streamId) ?? job.createdAt; + const lastActive = this.lastActivity.get(streamId) ?? job.lastActiveAt ?? job.createdAt; if (now - lastActive > this.staleJobTimeout) { toDelete.push(streamId); staleRunning++; diff --git a/packages/api/src/stream/implementations/RedisJobStore.ts b/packages/api/src/stream/implementations/RedisJobStore.ts index 764626651a..497475b97a 100644 --- a/packages/api/src/stream/implementations/RedisJobStore.ts +++ b/packages/api/src/stream/implementations/RedisJobStore.ts @@ -12,31 +12,30 @@ import type { } from '~/stream/interfaces/IJobStore'; /** - * Atomic compare-and-set status transition (single-node / sentinel Redis). + * Atomic compare-and-set on the job hash — the single-winner decision for a + * status transition. Touches ONLY the job key, which lives on one hash slot, so + * it is atomic on both single-node and Redis Cluster (cross-slot membership + * sets are reconciled by the caller AFTER this decides the winner). * - * Guards on the current `status` field, then — in one indivisible step — - * removes `clear` fields, writes the `status`+patch pairs, reconciles the - * membership sets, and refreshes TTLs. Returns 1 if it fired, 0 if the job - * was missing or no longer in the expected `from` status. + * Guards on the current `status` and, when ARGV[2] is non-empty, on the flat + * `pendingActionId` field — so a stale decision targeting a different action + * loses. On success: removes `clear` fields, writes `status`+patch pairs, + * refreshes the job-hash TTL. Returns 1 if it fired, 0 otherwise. * - * KEYS: [job, remSet | "", addSet | "", chunks, runSteps] - * ARGV: [from, member, ttl, refreshLive(0|1), hdelCount, ...hdel, ...hsetPairs] + * KEYS: [job] + * ARGV: [from, expectActionId | "", ttl, hdelCount, ...hdelFields, ...hsetPairs] */ -const TRANSITION_STATUS_LUA = +const JOB_CAS_LUA = 'if redis.call("HGET", KEYS[1], "status") ~= ARGV[1] then return 0 end ' + - 'local member = ARGV[2] ' + + 'if ARGV[2] ~= "" and redis.call("HGET", KEYS[1], "pendingActionId") ~= ARGV[2] then return 0 end ' + 'local ttl = tonumber(ARGV[3]) ' + - 'local refreshLive = ARGV[4] ' + - 'local hdelCount = tonumber(ARGV[5]) ' + - 'local idx = 6 ' + + 'local hdelCount = tonumber(ARGV[4]) ' + + 'local idx = 5 ' + 'for i = 1, hdelCount do redis.call("HDEL", KEYS[1], ARGV[idx]) idx = idx + 1 end ' + 'local hset = {} ' + 'for i = idx, #ARGV do hset[#hset + 1] = ARGV[i] end ' + 'if #hset > 0 then redis.call("HSET", KEYS[1], unpack(hset)) end ' + - 'if KEYS[2] ~= "" then redis.call("SREM", KEYS[2], member) end ' + - 'if KEYS[3] ~= "" then redis.call("SADD", KEYS[3], member) end ' + 'redis.call("EXPIRE", KEYS[1], ttl) ' + - 'if refreshLive == "1" then redis.call("EXPIRE", KEYS[4], ttl) redis.call("EXPIRE", KEYS[5], ttl) end ' + 'return 1'; /** Decision kinds the SDK can emit, used to sanity-check persisted records. */ @@ -320,7 +319,7 @@ export class RedisJobStore implements IJobStore { } async transitionStatus(streamId: string, args: JobStatusTransition): Promise { - const { from, to, patch, clear } = args; + const { from, to, patch, clear, expectActionId } = args; const key = KEYS.job(streamId); // status + patch become HSET pairs; serializeJob skips undefined, so @@ -335,51 +334,53 @@ export class RedisJobStore implements IJobStore { const terminal = addSet === null; const ttl = terminal ? this.ttl.completed : this.ttl.running; + // 1) Single-winner decision: an atomic CAS on the single-slot job hash. + // Works identically on cluster and single-node, so two concurrent + // resolves can never both win (and drive the run twice). + const won = await this.redis.eval( + JOB_CAS_LUA, + 1, + key, + from, + expectActionId ?? '', + String(ttl), + String(clearFields.length), + ...clearFields, + ...fields, + ); + if (won !== 1) { + return false; + } + + // 2) Reconcile membership sets + live-key TTLs. Only the winner reaches + // here; set membership is derived state that periodic cleanup + // self-heals, so this non-atomic cross-slot step is safe. if (this.isCluster) { - // Membership sets live on a different slot from the job hash, so a single - // cross-slot script isn't possible. Guard best-effort (read status, then - // apply) — matches the existing cluster posture for status writes. - const current = await this.redis.hget(key, 'status'); - if (current !== from) { - return false; - } - if (clearFields.length > 0) { - await this.redis.hdel(key, ...clearFields); - } - if (fields.length > 0) { - await this.updateExistingJobHash(key, fields); - } if (remSet) { await this.redis.srem(remSet, streamId); } if (addSet) { await this.redis.sadd(addSet, streamId); } - await this.redis.expire(key, ttl); if (!terminal) { await this.redis.expire(KEYS.chunks(streamId), ttl); await this.redis.expire(KEYS.runSteps(streamId), ttl); } - return true; + } else { + const pipeline = this.redis.pipeline(); + if (remSet) { + pipeline.srem(remSet, streamId); + } + if (addSet) { + pipeline.sadd(addSet, streamId); + } + if (!terminal) { + pipeline.expire(KEYS.chunks(streamId), ttl); + pipeline.expire(KEYS.runSteps(streamId), ttl); + } + await pipeline.exec(); } - - const result = await this.redis.eval( - TRANSITION_STATUS_LUA, - 5, - key, - remSet ?? '', - addSet ?? '', - KEYS.chunks(streamId), - KEYS.runSteps(streamId), - from, - streamId, - String(ttl), - terminal ? '0' : '1', - String(clearFields.length), - ...clearFields, - ...fields, - ); - return result === 1; + return true; } private async updateExistingJobHash(key: string, fields: string[]): Promise { @@ -576,8 +577,11 @@ export class RedisJobStore implements IJobStore { return 1; } - // Stale running job (failsafe - running for > configured TTL) - if (now - job.createdAt > this.ttl.running * 1000) { + // Stale running job (failsafe - running for > configured TTL). + // Keys off `lastActiveAt` when present so a just-resumed approval + // isn't reaped on the basis of its original creation time. + const liveSince = job.lastActiveAt ?? job.createdAt; + if (now - liveSince > this.ttl.running * 1000) { logger.warn(`[RedisJobStore] Cleaning up stale job: ${streamId}`); const userJobsKey = job.userId ? KEYS.userJobs(job.userId, job.tenantId) : null; await this.deleteJobInternal(streamId, userJobsKey); @@ -1211,6 +1215,8 @@ export class RedisJobStore implements IJobStore { contextUsage: data.contextUsage || undefined, tokenUsage: data.tokenUsage || undefined, pendingAction: this.parsePendingAction(data.pendingAction), + pendingActionId: data.pendingActionId || undefined, + lastActiveAt: data.lastActiveAt ? parseInt(data.lastActiveAt, 10) : undefined, }; } diff --git a/packages/api/src/stream/interfaces/IJobStore.ts b/packages/api/src/stream/interfaces/IJobStore.ts index 4ae0c3b96a..3fe6ba9b2a 100644 --- a/packages/api/src/stream/interfaces/IJobStore.ts +++ b/packages/api/src/stream/interfaces/IJobStore.ts @@ -69,6 +69,22 @@ export interface SerializableJobData { * run is waiting on. Cleared by the resume path before the job returns to `running`. */ pendingAction?: Agents.PendingAction; + + /** + * Flat mirror of `pendingAction.actionId`, kept as a top-level field so an + * atomic status transition can guard on it (a nested JSON field can't be + * compared inside a Redis Lua CAS). Lets `resolve`/`expire` reject a stale + * decision that targets a different action than the one currently pending. + */ + pendingActionId?: string; + + /** + * Liveness basis for the stale-running failsafe, refreshed when a paused job + * is resumed. Without it, cleanup keys off `createdAt`, so an approval that + * sat in `requires_action` past the running window would be reaped on the + * next tick right after resuming. Falls back to `createdAt` when unset. + */ + lastActiveAt?: number; } /** @@ -83,6 +99,12 @@ export interface JobStatusTransition { patch?: Partial; /** Field names removed in the same atomic step (e.g. `pendingAction`). */ clear?: Array; + /** + * Additional guard: only fire if the job's `pendingActionId` equals this. + * Checked atomically alongside the `from` status so a stale decision can't + * resolve a job that has since paused for a different action. + */ + expectActionId?: string; } /** diff --git a/packages/data-provider/src/types/agents.ts b/packages/data-provider/src/types/agents.ts index cd3e89e997..5943cce96b 100644 --- a/packages/data-provider/src/types/agents.ts +++ b/packages/data-provider/src/types/agents.ts @@ -350,6 +350,8 @@ export namespace Agents { /** The question itself: free-form prompt with optional curated answers. */ export interface AskUserQuestionRequest { question: string; + /** Optional descriptive context for the prompt; mirrors the SDK field. */ + description?: string; options?: AskUserQuestionOption[]; } @@ -383,6 +385,18 @@ export namespace Agents { createdAt: number; /** Optional expiry; clients should treat past `expiresAt` as stale */ expiresAt?: number; + /** + * SDK interrupt id (`RunInterruptResult.interruptId`). Persisted so a + * cross-process resume can correlate the decision with the LangGraph + * interrupt after the original `Run` object is gone. + */ + interruptId?: string; + /** + * LangGraph `thread_id` the run was bound to (`RunInterruptResult.threadId`). + * Required, with the checkpointer, to rebuild `Command({ resume })` on a + * worker that didn't originate the run. + */ + threadId?: string; } /**