diff --git a/packages/api/src/stream/__tests__/RedisJobStore.stream_integration.spec.ts b/packages/api/src/stream/__tests__/RedisJobStore.stream_integration.spec.ts index d45f186dc0..23f25b1291 100644 --- a/packages/api/src/stream/__tests__/RedisJobStore.stream_integration.spec.ts +++ b/packages/api/src/stream/__tests__/RedisJobStore.stream_integration.spec.ts @@ -886,6 +886,255 @@ describe('RedisJobStore Integration Tests', () => { }); }); + describe('User Job Tracking TTL', () => { + test('should set TTL on user jobs set after createJob', async () => { + if (!ioredisClient) { + return; + } + + const { RedisJobStore } = await import('../implementations/RedisJobStore'); + const store = new RedisJobStore(ioredisClient); + await store.initialize(); + + const userId = `ttl-user-${Date.now()}`; + const streamId = `ttl-stream-${Date.now()}`; + + await store.createJob(streamId, userId, streamId); + + const userKey = `stream:user:{${userId}}:jobs`; + const ttl = await ioredisClient.ttl(userKey); + expect(ttl).toBeGreaterThan(0); + expect(ttl).toBeLessThanOrEqual(86400); + + await store.destroy(); + }); + + test('should respect custom userJobsSetTtl option', async () => { + if (!ioredisClient) { + return; + } + + const { RedisJobStore } = await import('../implementations/RedisJobStore'); + const store = new RedisJobStore(ioredisClient, { userJobsSetTtl: 3600 }); + await store.initialize(); + + const userId = `custom-ttl-user-${Date.now()}`; + const streamId = `custom-ttl-stream-${Date.now()}`; + + await store.createJob(streamId, userId, streamId); + + const userKey = `stream:user:{${userId}}:jobs`; + const ttl = await ioredisClient.ttl(userKey); + expect(ttl).toBeGreaterThan(0); + expect(ttl).toBeLessThanOrEqual(3600); + + await store.destroy(); + }); + + test('should not set TTL when userJobsSetTtl is 0', async () => { + if (!ioredisClient) { + return; + } + + const { RedisJobStore } = await import('../implementations/RedisJobStore'); + const store = new RedisJobStore(ioredisClient, { userJobsSetTtl: 0 }); + await store.initialize(); + + const userId = `no-ttl-user-${Date.now()}`; + const streamId = `no-ttl-stream-${Date.now()}`; + + await store.createJob(streamId, userId, streamId); + + const userKey = `stream:user:{${userId}}:jobs`; + // -1 means key exists but has no TTL + const ttl = await ioredisClient.ttl(userKey); + expect(ttl).toBe(-1); + + // Verify the set itself still exists and contains the streamId + const members = await ioredisClient.smembers(userKey); + expect(members).toContain(streamId); + + await store.destroy(); + }); + + test('should refresh TTL when a second createJob is issued for the same user', async () => { + if (!ioredisClient) { + return; + } + + const { RedisJobStore } = await import('../implementations/RedisJobStore'); + const store = new RedisJobStore(ioredisClient, { userJobsSetTtl: 120 }); + await store.initialize(); + + const userId = `refresh-ttl-user-${Date.now()}`; + const streamId1 = `refresh-stream-1-${Date.now()}`; + const streamId2 = `refresh-stream-2-${Date.now()}`; + + await store.createJob(streamId1, userId, streamId1); + + const userKey = `stream:user:{${userId}}:jobs`; + + // Manually reduce TTL to simulate time passing + await ioredisClient.expire(userKey, 30); + const reducedTtl = await ioredisClient.ttl(userKey); + expect(reducedTtl).toBeLessThanOrEqual(30); + + // Second createJob should refresh the TTL + await store.createJob(streamId2, userId, streamId2); + + const refreshedTtl = await ioredisClient.ttl(userKey); + expect(refreshedTtl).toBeGreaterThan(30); + expect(refreshedTtl).toBeLessThanOrEqual(120); + + await store.destroy(); + }); + + test('should proactively SREM from user jobs set on updateJob to terminal status', async () => { + if (!ioredisClient) { + return; + } + + const { RedisJobStore } = await import('../implementations/RedisJobStore'); + const store = new RedisJobStore(ioredisClient); + await store.initialize(); + + const userId = `proactive-srem-user-${Date.now()}`; + const streamId = `proactive-srem-stream-${Date.now()}`; + + await store.createJob(streamId, userId, streamId); + + const userKey = `stream:user:{${userId}}:jobs`; + + // Verify the entry exists before update + let members = await ioredisClient.smembers(userKey); + expect(members).toContain(streamId); + + await store.updateJob(streamId, { status: 'complete', completedAt: Date.now() }); + + // Directly check the Redis set — without calling getActiveJobIdsByUser (which self-heals) + members = await ioredisClient.smembers(userKey); + expect(members).not.toContain(streamId); + + await store.destroy(); + }); + + test('should proactively SREM from user jobs set on updateJob to aborted', async () => { + if (!ioredisClient) { + return; + } + + const { RedisJobStore } = await import('../implementations/RedisJobStore'); + const store = new RedisJobStore(ioredisClient); + await store.initialize(); + + const userId = `srem-aborted-user-${Date.now()}`; + const streamId = `srem-aborted-stream-${Date.now()}`; + + await store.createJob(streamId, userId, streamId); + + const userKey = `stream:user:{${userId}}:jobs`; + let members = await ioredisClient.smembers(userKey); + expect(members).toContain(streamId); + + await store.updateJob(streamId, { status: 'aborted', completedAt: Date.now() }); + + members = await ioredisClient.smembers(userKey); + expect(members).not.toContain(streamId); + + await store.destroy(); + }); + + test('should proactively SREM from user jobs set on updateJob to error', async () => { + if (!ioredisClient) { + return; + } + + const { RedisJobStore } = await import('../implementations/RedisJobStore'); + const store = new RedisJobStore(ioredisClient); + await store.initialize(); + + const userId = `srem-error-user-${Date.now()}`; + const streamId = `srem-error-stream-${Date.now()}`; + + await store.createJob(streamId, userId, streamId); + + const userKey = `stream:user:{${userId}}:jobs`; + let members = await ioredisClient.smembers(userKey); + expect(members).toContain(streamId); + + await store.updateJob(streamId, { + status: 'error', + error: 'Test error', + completedAt: Date.now(), + }); + + members = await ioredisClient.smembers(userKey); + expect(members).not.toContain(streamId); + + await store.destroy(); + }); + + test('should proactively SREM from user jobs set on deleteJob', async () => { + if (!ioredisClient) { + return; + } + + const { RedisJobStore } = await import('../implementations/RedisJobStore'); + const store = new RedisJobStore(ioredisClient); + await store.initialize(); + + const userId = `delete-srem-user-${Date.now()}`; + const streamId = `delete-srem-stream-${Date.now()}`; + + await store.createJob(streamId, userId, streamId); + + const userKey = `stream:user:{${userId}}:jobs`; + + // Verify entry exists + let members = await ioredisClient.smembers(userKey); + expect(members).toContain(streamId); + + await store.deleteJob(streamId); + + // Directly check the Redis set + members = await ioredisClient.smembers(userKey); + expect(members).not.toContain(streamId); + + await store.destroy(); + }); + + test('should set TTL on tenant-qualified user jobs set', async () => { + if (!ioredisClient) { + return; + } + + const { RedisJobStore } = await import('../implementations/RedisJobStore'); + const store = new RedisJobStore(ioredisClient); + await store.initialize(); + + const userId = `tenant-user-${Date.now()}`; + const tenantId = `tenant-${Date.now()}`; + const streamId = `tenant-stream-${Date.now()}`; + + await store.createJob(streamId, userId, streamId, tenantId); + + const userKey = `stream:user:{${tenantId}:${userId}}:jobs`; + const members = await ioredisClient.smembers(userKey); + expect(members).toContain(streamId); + + const ttl = await ioredisClient.ttl(userKey); + expect(ttl).toBeGreaterThan(0); + expect(ttl).toBeLessThanOrEqual(86400); + + // Non-tenant key should NOT contain this entry + const wrongKey = `stream:user:{${userId}}:jobs`; + const wrongMembers = await ioredisClient.smembers(wrongKey); + expect(wrongMembers).not.toContain(streamId); + + await store.destroy(); + }); + }); + describe('Race Condition: updateJob after deleteJob', () => { test('should not re-create job hash when updateJob runs after deleteJob', async () => { if (!ioredisClient) { diff --git a/packages/api/src/stream/implementations/RedisJobStore.ts b/packages/api/src/stream/implementations/RedisJobStore.ts index a631bc2044..f2c4dd4661 100644 --- a/packages/api/src/stream/implementations/RedisJobStore.ts +++ b/packages/api/src/stream/implementations/RedisJobStore.ts @@ -47,6 +47,8 @@ const DEFAULT_TTL = { chunksAfterComplete: 0, /** TTL for run steps after completion (0 = delete immediately) */ runStepsAfterComplete: 0, + /** Safety-net TTL for per-user job tracking sets (24 hours). Refreshed on each createJob. */ + userJobsSet: 86400, }; /** @@ -79,6 +81,8 @@ export interface RedisJobStoreOptions { chunksAfterCompleteTtl?: number; /** TTL for run steps after completion in seconds (default: 0 = delete immediately) */ runStepsAfterCompleteTtl?: number; + /** TTL for per-user job tracking sets in seconds (default: 86400 = 24 hours). 0 = no TTL. */ + userJobsSetTtl?: number; } export class RedisJobStore implements IJobStore { @@ -113,6 +117,7 @@ export class RedisJobStore implements IJobStore { running: options?.runningTtl ?? DEFAULT_TTL.running, chunksAfterComplete: options?.chunksAfterCompleteTtl ?? DEFAULT_TTL.chunksAfterComplete, runStepsAfterComplete: options?.runStepsAfterCompleteTtl ?? DEFAULT_TTL.runStepsAfterComplete, + userJobsSet: options?.userJobsSetTtl ?? DEFAULT_TTL.userJobsSet, }; // Detect cluster mode using ioredis's isCluster property this.isCluster = (redis as Cluster).isCluster === true; @@ -163,12 +168,18 @@ export class RedisJobStore implements IJobStore { await this.redis.expire(key, this.ttl.running); await this.redis.sadd(KEYS.runningJobs, streamId); await this.redis.sadd(userJobsKey, streamId); + if (this.ttl.userJobsSet > 0) { + await this.redis.expire(userJobsKey, this.ttl.userJobsSet); + } } else { const pipeline = this.redis.pipeline(); pipeline.hset(key, this.serializeJob(job)); pipeline.expire(key, this.ttl.running); pipeline.sadd(KEYS.runningJobs, streamId); pipeline.sadd(userJobsKey, streamId); + if (this.ttl.userJobsSet > 0) { + pipeline.expire(userJobsKey, this.ttl.userJobsSet); + } await pipeline.exec(); } @@ -204,10 +215,11 @@ export class RedisJobStore implements IJobStore { return; } - // If status changed to complete/error/aborted, update TTL and remove from running set - // Note: userJobs cleanup is handled lazily via self-healing in getActiveJobIdsByUser if (updates.status && ['complete', 'error', 'aborted'].includes(updates.status)) { - // In cluster mode, separate runningJobs (global) from stream-specific keys + // Proactively remove from user's job set (requires reading userId from the job hash) + const job = await this.getJob(streamId); + const userJobsKey = job?.userId ? KEYS.userJobs(job.userId, job.tenantId) : null; + if (this.isCluster) { await this.redis.expire(key, this.ttl.completed); await this.redis.srem(KEYS.runningJobs, streamId); @@ -223,6 +235,10 @@ export class RedisJobStore implements IJobStore { } else { await this.redis.expire(KEYS.runSteps(streamId), this.ttl.runStepsAfterComplete); } + + if (userJobsKey) { + await this.redis.srem(userJobsKey, streamId); + } } else { const pipeline = this.redis.pipeline(); pipeline.expire(key, this.ttl.completed); @@ -240,33 +256,46 @@ export class RedisJobStore implements IJobStore { pipeline.expire(KEYS.runSteps(streamId), this.ttl.runStepsAfterComplete); } + if (userJobsKey) { + pipeline.srem(userJobsKey, streamId); + } + await pipeline.exec(); } } } async deleteJob(streamId: string): Promise { - // Clear local caches + this.localGraphCache.delete(streamId); + this.localCollectedUsageCache.delete(streamId); + const job = await this.getJob(streamId); + const userJobsKey = job?.userId ? KEYS.userJobs(job.userId, job.tenantId) : null; + return this.deleteJobInternal(streamId, userJobsKey); + } + + private async deleteJobInternal(streamId: string, userJobsKey: string | null): Promise { this.localGraphCache.delete(streamId); this.localCollectedUsageCache.delete(streamId); - // Note: userJobs cleanup is handled lazily via self-healing in getActiveJobIdsByUser - // In cluster mode, separate runningJobs (global) from stream-specific keys (same slot) if (this.isCluster) { - // Stream-specific keys all hash to same slot due to {streamId} const pipeline = this.redis.pipeline(); pipeline.del(KEYS.job(streamId)); pipeline.del(KEYS.chunks(streamId)); pipeline.del(KEYS.runSteps(streamId)); await pipeline.exec(); - // Global set is on different slot - execute separately await this.redis.srem(KEYS.runningJobs, streamId); + if (userJobsKey) { + await this.redis.srem(userJobsKey, streamId); + } } else { const pipeline = this.redis.pipeline(); pipeline.del(KEYS.job(streamId)); pipeline.del(KEYS.chunks(streamId)); pipeline.del(KEYS.runSteps(streamId)); pipeline.srem(KEYS.runningJobs, streamId); + if (userJobsKey) { + pipeline.srem(userJobsKey, streamId); + } await pipeline.exec(); } logger.debug(`[RedisJobStore] Deleted job: ${streamId}`); @@ -322,8 +351,13 @@ export class RedisJobStore implements IJobStore { } // Job completed but still in running set (shouldn't happen, but handle it) + // Only remove from tracking sets — do NOT delete the job hash, which has + // its own completedTtl so clients can still poll for final status. if (job.status !== 'running') { await this.redis.srem(KEYS.runningJobs, streamId); + if (job.userId) { + await this.redis.srem(KEYS.userJobs(job.userId, job.tenantId), streamId); + } this.localGraphCache.delete(streamId); this.localCollectedUsageCache.delete(streamId); return 1; @@ -332,7 +366,8 @@ export class RedisJobStore implements IJobStore { // Stale running job (failsafe - running for > configured TTL) if (now - job.createdAt > this.ttl.running * 1000) { logger.warn(`[RedisJobStore] Cleaning up stale job: ${streamId}`); - await this.deleteJob(streamId); + const userJobsKey = job.userId ? KEYS.userJobs(job.userId, job.tenantId) : null; + await this.deleteJobInternal(streamId, userJobsKey); return 1; } @@ -402,7 +437,6 @@ export class RedisJobStore implements IJobStore { } } - // Clean up stale entries if (staleIds.length > 0) { await this.redis.srem(userJobsKey, ...staleIds); logger.debug(