diff --git a/packages/api/src/stream/__tests__/pendingAction.spec.ts b/packages/api/src/stream/__tests__/pendingAction.spec.ts index bc3cc8bec7..bc764ea95e 100644 --- a/packages/api/src/stream/__tests__/pendingAction.spec.ts +++ b/packages/api/src/stream/__tests__/pendingAction.spec.ts @@ -214,5 +214,17 @@ describe('ApprovalLifecycle via GenerationJobManager.approvals (in-memory)', () const active = await manager.getActiveJobIdsForUser('user-mix'); expect(active.sort()).toEqual(['s-paused', 's-running']); }); + + test('excludes a pending-approval job whose prompt has expired', async () => { + const streamId = 'stream-expired-active'; + await manager.createJob(streamId, 'user-exp'); + await manager.approvals.pause( + streamId, + buildAction(streamId, { expiresAt: Date.now() - 1000 }), + ); + + // Still requires_action, but the prompt is past expiry → no longer active. + expect(await manager.getActiveJobIdsForUser('user-exp')).not.toContain(streamId); + }); }); }); diff --git a/packages/api/src/stream/implementations/InMemoryJobStore.ts b/packages/api/src/stream/implementations/InMemoryJobStore.ts index fa77e911c4..3da1a41fc7 100644 --- a/packages/api/src/stream/implementations/InMemoryJobStore.ts +++ b/packages/api/src/stream/implementations/InMemoryJobStore.ts @@ -8,6 +8,7 @@ import type { JobStatus, JobStatusTransition, } from '~/stream/interfaces/IJobStore'; +import { isPendingActionExpired } from '~/stream/interfaces/IJobStore'; /** * Content state for a job - volatile, in-memory only. @@ -321,8 +322,13 @@ export class InMemoryJobStore implements IJobStore { for (const streamId of trackedIds) { const job = this.jobs.get(streamId); // Include running jobs and jobs paused for human review (e.g. tool approval). - // A pending-approval job still occupies the user's conversation slot. + // A pending-approval job still occupies the user's conversation slot — but + // only while its prompt is live: a past-`expiresAt` approval no longer + // counts as active (cleanup/expiry will finalize it). if (job && (job.status === 'running' || job.status === 'requires_action')) { + if (job.status === 'requires_action' && isPendingActionExpired(job)) { + continue; + } activeIds.push(streamId); } else { // Self-healing: job completed/deleted but mapping wasn't cleaned - fix it now diff --git a/packages/api/src/stream/implementations/RedisJobStore.ts b/packages/api/src/stream/implementations/RedisJobStore.ts index 497475b97a..acc5b27100 100644 --- a/packages/api/src/stream/implementations/RedisJobStore.ts +++ b/packages/api/src/stream/implementations/RedisJobStore.ts @@ -10,6 +10,7 @@ import type { JobStatus, JobStatusTransition, } from '~/stream/interfaces/IJobStore'; +import { isPendingActionExpired } from '~/stream/interfaces/IJobStore'; /** * Atomic compare-and-set on the job hash — the single-winner decision for a @@ -194,10 +195,21 @@ export class RedisJobStore implements IJobStore { const key = KEYS.job(streamId); const userJobsKey = KEYS.userJobs(userId, tenantId); + // A reused streamId overlays onto any existing hash, so paused-run fields + // from a prior generation could survive. Drop the HITL fields so the fresh + // running job never exposes stale approval metadata and cleanup keys off the + // new createdAt rather than a leftover lastActiveAt. + const staleHitlFields: Array = [ + 'pendingAction', + 'pendingActionId', + 'lastActiveAt', + ]; + // For cluster mode, we can't pipeline keys on different slots // The job key uses hash tag {streamId}, runningJobs and userJobs are on different slots if (this.isCluster) { await this.redis.hset(key, this.serializeJob(job)); + await this.redis.hdel(key, ...staleHitlFields); await this.redis.expire(key, this.ttl.running); await this.redis.sadd(KEYS.runningJobs, streamId); await this.redis.srem(KEYS.requiresActionJobs, streamId); @@ -208,6 +220,7 @@ export class RedisJobStore implements IJobStore { } else { const pipeline = this.redis.pipeline(); pipeline.hset(key, this.serializeJob(job)); + pipeline.hdel(key, ...staleHitlFields); pipeline.expire(key, this.ttl.running); pipeline.sadd(KEYS.runningJobs, streamId); pipeline.srem(KEYS.requiresActionJobs, streamId); @@ -233,7 +246,19 @@ export class RedisJobStore implements IJobStore { async updateJob(streamId: string, updates: Partial): Promise { const key = KEYS.job(streamId); - const serialized = this.serializeJob(updates as SerializableJobData); + // Keep this generic path consistent with the guarded transitionStatus path: + // - mirror `pendingActionId` on any `pendingAction` write so a pause here + // still carries the flat field the stale-decision guard compares; + // - refresh `lastActiveAt` when resuming to `running` so a long-paused job + // isn't reaped by `createdAt` on the next cleanup tick. + let effective: Partial = updates; + if (updates.status === 'running') { + effective = { ...updates, lastActiveAt: updates.lastActiveAt ?? Date.now() }; + } else if (updates.pendingAction) { + effective = { ...updates, pendingActionId: updates.pendingAction.actionId }; + } + + const serialized = this.serializeJob(effective as SerializableJobData); if (Object.keys(serialized).length === 0) { return; } @@ -256,54 +281,67 @@ export class RedisJobStore implements IJobStore { } if (updates.status && ['complete', 'error', 'aborted'].includes(updates.status)) { - // Proactively remove from user's job set (requires reading userId from the job hash) - const job = await this.getJob(streamId); - const userJobsKey = job?.userId ? KEYS.userJobs(job.userId, job.tenantId) : null; + await this.applyTerminalContentCleanup(streamId); + } + } - if (this.isCluster) { - await this.redis.expire(key, this.ttl.completed); - await this.redis.srem(KEYS.runningJobs, streamId); - await this.redis.srem(KEYS.requiresActionJobs, streamId); + /** + * Terminal cleanup shared by `updateJob` (complete/error/aborted) and the + * terminal path of `transitionStatus` (approval expiry → aborted): drop the + * job from both membership sets and the user-active set, shorten the job-hash + * TTL to the completed window, and del/shorten the chunk + run-step keys per + * the configured after-complete TTLs. Without sharing this, an expired + * approval left Redis stream contents around for the full running TTL. + */ + private async applyTerminalContentCleanup(streamId: string): Promise { + const key = KEYS.job(streamId); + // Proactively remove from user's job set (requires reading userId from the job hash) + const job = await this.getJob(streamId); + const userJobsKey = job?.userId ? KEYS.userJobs(job.userId, job.tenantId) : null; - if (this.ttl.chunksAfterComplete === 0) { - await this.redis.del(KEYS.chunks(streamId)); - } else { - await this.redis.expire(KEYS.chunks(streamId), this.ttl.chunksAfterComplete); - } + if (this.isCluster) { + await this.redis.expire(key, this.ttl.completed); + await this.redis.srem(KEYS.runningJobs, streamId); + await this.redis.srem(KEYS.requiresActionJobs, streamId); - if (this.ttl.runStepsAfterComplete === 0) { - await this.redis.del(KEYS.runSteps(streamId)); - } else { - await this.redis.expire(KEYS.runSteps(streamId), this.ttl.runStepsAfterComplete); - } - - if (userJobsKey) { - await this.redis.srem(userJobsKey, streamId); - } + if (this.ttl.chunksAfterComplete === 0) { + await this.redis.del(KEYS.chunks(streamId)); } else { - const pipeline = this.redis.pipeline(); - pipeline.expire(key, this.ttl.completed); - pipeline.srem(KEYS.runningJobs, streamId); - pipeline.srem(KEYS.requiresActionJobs, streamId); - - if (this.ttl.chunksAfterComplete === 0) { - pipeline.del(KEYS.chunks(streamId)); - } else { - pipeline.expire(KEYS.chunks(streamId), this.ttl.chunksAfterComplete); - } - - if (this.ttl.runStepsAfterComplete === 0) { - pipeline.del(KEYS.runSteps(streamId)); - } else { - pipeline.expire(KEYS.runSteps(streamId), this.ttl.runStepsAfterComplete); - } - - if (userJobsKey) { - pipeline.srem(userJobsKey, streamId); - } - - await pipeline.exec(); + await this.redis.expire(KEYS.chunks(streamId), this.ttl.chunksAfterComplete); } + + if (this.ttl.runStepsAfterComplete === 0) { + await this.redis.del(KEYS.runSteps(streamId)); + } else { + await this.redis.expire(KEYS.runSteps(streamId), this.ttl.runStepsAfterComplete); + } + + if (userJobsKey) { + await this.redis.srem(userJobsKey, streamId); + } + } else { + const pipeline = this.redis.pipeline(); + pipeline.expire(key, this.ttl.completed); + pipeline.srem(KEYS.runningJobs, streamId); + pipeline.srem(KEYS.requiresActionJobs, streamId); + + if (this.ttl.chunksAfterComplete === 0) { + pipeline.del(KEYS.chunks(streamId)); + } else { + pipeline.expire(KEYS.chunks(streamId), this.ttl.chunksAfterComplete); + } + + if (this.ttl.runStepsAfterComplete === 0) { + pipeline.del(KEYS.runSteps(streamId)); + } else { + pipeline.expire(KEYS.runSteps(streamId), this.ttl.runStepsAfterComplete); + } + + if (userJobsKey) { + pipeline.srem(userJobsKey, streamId); + } + + await pipeline.exec(); } } @@ -352,9 +390,14 @@ export class RedisJobStore implements IJobStore { return false; } - // 2) Reconcile membership sets + live-key TTLs. Only the winner reaches - // here; set membership is derived state that periodic cleanup - // self-heals, so this non-atomic cross-slot step is safe. + // 2) Reconcile derived state. Only the winner reaches here; membership is + // self-healed by periodic cleanup, so this non-atomic cross-slot step is + // safe. A terminal target (e.g. approval expiry → aborted) gets the same + // content cleanup as updateJob's terminal path. + if (terminal) { + await this.applyTerminalContentCleanup(streamId); + return true; + } if (this.isCluster) { if (remSet) { await this.redis.srem(remSet, streamId); @@ -362,10 +405,8 @@ export class RedisJobStore implements IJobStore { if (addSet) { await this.redis.sadd(addSet, streamId); } - if (!terminal) { - await this.redis.expire(KEYS.chunks(streamId), ttl); - await this.redis.expire(KEYS.runSteps(streamId), ttl); - } + await this.redis.expire(KEYS.chunks(streamId), ttl); + await this.redis.expire(KEYS.runSteps(streamId), ttl); } else { const pipeline = this.redis.pipeline(); if (remSet) { @@ -374,10 +415,8 @@ export class RedisJobStore implements IJobStore { if (addSet) { pipeline.sadd(addSet, streamId); } - if (!terminal) { - pipeline.expire(KEYS.chunks(streamId), ttl); - pipeline.expire(KEYS.runSteps(streamId), ttl); - } + pipeline.expire(KEYS.chunks(streamId), ttl); + pipeline.expire(KEYS.runSteps(streamId), ttl); await pipeline.exec(); } return true; @@ -431,14 +470,15 @@ export class RedisJobStore implements IJobStore { streamId: string, fields: string[], ): Promise { - // Resume from requires_action and clear stale pendingAction. serializeJob skips - // `undefined`, so the hash field must be removed explicitly. + // Resume from requires_action and clear stale pendingAction + its flat + // pendingActionId mirror. serializeJob skips `undefined`, so the hash + // fields must be removed explicitly. if (this.isCluster) { const updated = await this.updateExistingJobHash(key, fields); if (!updated) { return; } - await this.redis.hdel(key, 'pendingAction'); + await this.redis.hdel(key, 'pendingAction', 'pendingActionId'); await this.refreshLiveJobTtls(key, streamId); await this.redis.srem(KEYS.requiresActionJobs, streamId); await this.redis.sadd(KEYS.runningJobs, streamId); @@ -446,7 +486,7 @@ export class RedisJobStore implements IJobStore { } await this.redis.eval( - 'if redis.call("EXISTS", KEYS[1]) == 0 then return 0 end redis.call("HSET", KEYS[1], unpack(ARGV, 3)) redis.call("HDEL", KEYS[1], "pendingAction") redis.call("EXPIRE", KEYS[1], tonumber(ARGV[2])) redis.call("EXPIRE", KEYS[4], tonumber(ARGV[2])) redis.call("EXPIRE", KEYS[5], tonumber(ARGV[2])) redis.call("SREM", KEYS[2], ARGV[1]) redis.call("SADD", KEYS[3], ARGV[1]) return 1', + 'if redis.call("EXISTS", KEYS[1]) == 0 then return 0 end redis.call("HSET", KEYS[1], unpack(ARGV, 3)) redis.call("HDEL", KEYS[1], "pendingAction", "pendingActionId") redis.call("EXPIRE", KEYS[1], tonumber(ARGV[2])) redis.call("EXPIRE", KEYS[4], tonumber(ARGV[2])) redis.call("EXPIRE", KEYS[5], tonumber(ARGV[2])) redis.call("SREM", KEYS[2], ARGV[1]) redis.call("SADD", KEYS[3], ARGV[1]) return 1', 5, key, KEYS.requiresActionJobs, @@ -717,8 +757,14 @@ export class RedisJobStore implements IJobStore { for (const streamId of trackedIds) { const job = await this.getJob(streamId); // Include running jobs and jobs paused for human review (e.g. tool approval). - // A pending-approval job still occupies the user's conversation slot. + // A pending-approval job still occupies the user's conversation slot — but + // only while its prompt is live: a past-`expiresAt` approval no longer + // counts as active (cleanup/expiry will finalize it), so the client stops + // polling and can complete. if (job && (job.status === 'running' || job.status === 'requires_action')) { + if (job.status === 'requires_action' && isPendingActionExpired(job)) { + continue; + } activeIds.push(streamId); } else { // Self-healing: job completed/deleted but mapping wasn't cleaned - mark for removal diff --git a/packages/api/src/stream/interfaces/IJobStore.ts b/packages/api/src/stream/interfaces/IJobStore.ts index 3fe6ba9b2a..da807ed4dd 100644 --- a/packages/api/src/stream/interfaces/IJobStore.ts +++ b/packages/api/src/stream/interfaces/IJobStore.ts @@ -87,6 +87,16 @@ export interface SerializableJobData { lastActiveAt?: number; } +/** + * Whether a job's pending review has passed its `expiresAt`. Shared by the + * stores so an expired approval is kept out of active-job listings (the client + * stops polling; cleanup/expiry finalizes it). + */ +export function isPendingActionExpired(job: Pick): boolean { + const exp = job.pendingAction?.expiresAt; + return exp != null && exp <= Date.now(); +} + /** * Arguments for an atomic {@link IJobStore.transitionStatus} compare-and-set. */