⏱️ refactor: User Job Tracking TTL and Proactive Cleanup to Redis Job Store (#12595)

* refactor: Add user job tracking TTL to RedisJobStore

- Introduced a new TTL for per-user job tracking sets, set to 24 hours, to enhance job management.
- Updated RedisJobStoreOptions interface to include userJobsSetTtl for configuration.
- Modified job creation and deletion methods to manage user job sets effectively, ensuring proper expiration and cleanup.
- Enhanced comments for clarity on the new TTL functionality and its implications for user job tracking.

* fix: Address review findings for user job tracking TTL

- Remove redundant `del(userJobsKey)` in `getActiveJobIdsByUser` that
  raced with concurrent `createJob` on other replicas (Redis auto-deletes
  empty Sets after SREM)
- Guard `userJobsSetTtl: 0` from silently destroying tracking sets
  (`EXPIRE key 0` deletes the key on Redis 7.0+)
- Extract `deleteJobInternal` so `cleanup()` reuses the already-fetched
  userId instead of issuing a redundant HGETALL per stale job
- Add integration tests for TTL behavior, proactive SREM, configurable
  userJobsSetTtl, and TTL refresh on repeated createJob

* fix: Address follow-up review findings for RedisJobStore

- Use deleteJobInternal in cleanup() terminal-but-in-running-set path
  to ensure userJobsKey SREM is not skipped
- Clear local caches in deleteJob before the fallible getJob call so
  they are cleaned even on transient Redis errors
- Add proactive SREM tests for aborted and error terminal statuses
- Add test for tenant-qualified user tracking key format

* fix: Preserve completedTtl for non-running jobs in cleanup()

The cleanup() terminal-status branch should only remove tracking set
membership, not delete the job hash. deleteJobInternal bypasses the
completedTtl window that updateJob already applied, causing clients
polling for final status to lose the job data early.
This commit is contained in:
Danny Avila 2026-04-09 17:42:54 -04:00 committed by GitHub
parent cc8ce15c38
commit 81275ff0e0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 293 additions and 10 deletions

View file

@ -886,6 +886,255 @@ describe('RedisJobStore Integration Tests', () => {
});
});
describe('User Job Tracking TTL', () => {
test('should set TTL on user jobs set after createJob', async () => {
if (!ioredisClient) {
return;
}
const { RedisJobStore } = await import('../implementations/RedisJobStore');
const store = new RedisJobStore(ioredisClient);
await store.initialize();
const userId = `ttl-user-${Date.now()}`;
const streamId = `ttl-stream-${Date.now()}`;
await store.createJob(streamId, userId, streamId);
const userKey = `stream:user:{${userId}}:jobs`;
const ttl = await ioredisClient.ttl(userKey);
expect(ttl).toBeGreaterThan(0);
expect(ttl).toBeLessThanOrEqual(86400);
await store.destroy();
});
test('should respect custom userJobsSetTtl option', async () => {
if (!ioredisClient) {
return;
}
const { RedisJobStore } = await import('../implementations/RedisJobStore');
const store = new RedisJobStore(ioredisClient, { userJobsSetTtl: 3600 });
await store.initialize();
const userId = `custom-ttl-user-${Date.now()}`;
const streamId = `custom-ttl-stream-${Date.now()}`;
await store.createJob(streamId, userId, streamId);
const userKey = `stream:user:{${userId}}:jobs`;
const ttl = await ioredisClient.ttl(userKey);
expect(ttl).toBeGreaterThan(0);
expect(ttl).toBeLessThanOrEqual(3600);
await store.destroy();
});
test('should not set TTL when userJobsSetTtl is 0', async () => {
if (!ioredisClient) {
return;
}
const { RedisJobStore } = await import('../implementations/RedisJobStore');
const store = new RedisJobStore(ioredisClient, { userJobsSetTtl: 0 });
await store.initialize();
const userId = `no-ttl-user-${Date.now()}`;
const streamId = `no-ttl-stream-${Date.now()}`;
await store.createJob(streamId, userId, streamId);
const userKey = `stream:user:{${userId}}:jobs`;
// -1 means key exists but has no TTL
const ttl = await ioredisClient.ttl(userKey);
expect(ttl).toBe(-1);
// Verify the set itself still exists and contains the streamId
const members = await ioredisClient.smembers(userKey);
expect(members).toContain(streamId);
await store.destroy();
});
test('should refresh TTL when a second createJob is issued for the same user', async () => {
if (!ioredisClient) {
return;
}
const { RedisJobStore } = await import('../implementations/RedisJobStore');
const store = new RedisJobStore(ioredisClient, { userJobsSetTtl: 120 });
await store.initialize();
const userId = `refresh-ttl-user-${Date.now()}`;
const streamId1 = `refresh-stream-1-${Date.now()}`;
const streamId2 = `refresh-stream-2-${Date.now()}`;
await store.createJob(streamId1, userId, streamId1);
const userKey = `stream:user:{${userId}}:jobs`;
// Manually reduce TTL to simulate time passing
await ioredisClient.expire(userKey, 30);
const reducedTtl = await ioredisClient.ttl(userKey);
expect(reducedTtl).toBeLessThanOrEqual(30);
// Second createJob should refresh the TTL
await store.createJob(streamId2, userId, streamId2);
const refreshedTtl = await ioredisClient.ttl(userKey);
expect(refreshedTtl).toBeGreaterThan(30);
expect(refreshedTtl).toBeLessThanOrEqual(120);
await store.destroy();
});
test('should proactively SREM from user jobs set on updateJob to terminal status', async () => {
if (!ioredisClient) {
return;
}
const { RedisJobStore } = await import('../implementations/RedisJobStore');
const store = new RedisJobStore(ioredisClient);
await store.initialize();
const userId = `proactive-srem-user-${Date.now()}`;
const streamId = `proactive-srem-stream-${Date.now()}`;
await store.createJob(streamId, userId, streamId);
const userKey = `stream:user:{${userId}}:jobs`;
// Verify the entry exists before update
let members = await ioredisClient.smembers(userKey);
expect(members).toContain(streamId);
await store.updateJob(streamId, { status: 'complete', completedAt: Date.now() });
// Directly check the Redis set — without calling getActiveJobIdsByUser (which self-heals)
members = await ioredisClient.smembers(userKey);
expect(members).not.toContain(streamId);
await store.destroy();
});
test('should proactively SREM from user jobs set on updateJob to aborted', async () => {
if (!ioredisClient) {
return;
}
const { RedisJobStore } = await import('../implementations/RedisJobStore');
const store = new RedisJobStore(ioredisClient);
await store.initialize();
const userId = `srem-aborted-user-${Date.now()}`;
const streamId = `srem-aborted-stream-${Date.now()}`;
await store.createJob(streamId, userId, streamId);
const userKey = `stream:user:{${userId}}:jobs`;
let members = await ioredisClient.smembers(userKey);
expect(members).toContain(streamId);
await store.updateJob(streamId, { status: 'aborted', completedAt: Date.now() });
members = await ioredisClient.smembers(userKey);
expect(members).not.toContain(streamId);
await store.destroy();
});
test('should proactively SREM from user jobs set on updateJob to error', async () => {
if (!ioredisClient) {
return;
}
const { RedisJobStore } = await import('../implementations/RedisJobStore');
const store = new RedisJobStore(ioredisClient);
await store.initialize();
const userId = `srem-error-user-${Date.now()}`;
const streamId = `srem-error-stream-${Date.now()}`;
await store.createJob(streamId, userId, streamId);
const userKey = `stream:user:{${userId}}:jobs`;
let members = await ioredisClient.smembers(userKey);
expect(members).toContain(streamId);
await store.updateJob(streamId, {
status: 'error',
error: 'Test error',
completedAt: Date.now(),
});
members = await ioredisClient.smembers(userKey);
expect(members).not.toContain(streamId);
await store.destroy();
});
test('should proactively SREM from user jobs set on deleteJob', async () => {
if (!ioredisClient) {
return;
}
const { RedisJobStore } = await import('../implementations/RedisJobStore');
const store = new RedisJobStore(ioredisClient);
await store.initialize();
const userId = `delete-srem-user-${Date.now()}`;
const streamId = `delete-srem-stream-${Date.now()}`;
await store.createJob(streamId, userId, streamId);
const userKey = `stream:user:{${userId}}:jobs`;
// Verify entry exists
let members = await ioredisClient.smembers(userKey);
expect(members).toContain(streamId);
await store.deleteJob(streamId);
// Directly check the Redis set
members = await ioredisClient.smembers(userKey);
expect(members).not.toContain(streamId);
await store.destroy();
});
test('should set TTL on tenant-qualified user jobs set', async () => {
if (!ioredisClient) {
return;
}
const { RedisJobStore } = await import('../implementations/RedisJobStore');
const store = new RedisJobStore(ioredisClient);
await store.initialize();
const userId = `tenant-user-${Date.now()}`;
const tenantId = `tenant-${Date.now()}`;
const streamId = `tenant-stream-${Date.now()}`;
await store.createJob(streamId, userId, streamId, tenantId);
const userKey = `stream:user:{${tenantId}:${userId}}:jobs`;
const members = await ioredisClient.smembers(userKey);
expect(members).toContain(streamId);
const ttl = await ioredisClient.ttl(userKey);
expect(ttl).toBeGreaterThan(0);
expect(ttl).toBeLessThanOrEqual(86400);
// Non-tenant key should NOT contain this entry
const wrongKey = `stream:user:{${userId}}:jobs`;
const wrongMembers = await ioredisClient.smembers(wrongKey);
expect(wrongMembers).not.toContain(streamId);
await store.destroy();
});
});
describe('Race Condition: updateJob after deleteJob', () => {
test('should not re-create job hash when updateJob runs after deleteJob', async () => {
if (!ioredisClient) {

View file

@ -47,6 +47,8 @@ const DEFAULT_TTL = {
chunksAfterComplete: 0,
/** TTL for run steps after completion (0 = delete immediately) */
runStepsAfterComplete: 0,
/** Safety-net TTL for per-user job tracking sets (24 hours). Refreshed on each createJob. */
userJobsSet: 86400,
};
/**
@ -79,6 +81,8 @@ export interface RedisJobStoreOptions {
chunksAfterCompleteTtl?: number;
/** TTL for run steps after completion in seconds (default: 0 = delete immediately) */
runStepsAfterCompleteTtl?: number;
/** TTL for per-user job tracking sets in seconds (default: 86400 = 24 hours). 0 = no TTL. */
userJobsSetTtl?: number;
}
export class RedisJobStore implements IJobStore {
@ -113,6 +117,7 @@ export class RedisJobStore implements IJobStore {
running: options?.runningTtl ?? DEFAULT_TTL.running,
chunksAfterComplete: options?.chunksAfterCompleteTtl ?? DEFAULT_TTL.chunksAfterComplete,
runStepsAfterComplete: options?.runStepsAfterCompleteTtl ?? DEFAULT_TTL.runStepsAfterComplete,
userJobsSet: options?.userJobsSetTtl ?? DEFAULT_TTL.userJobsSet,
};
// Detect cluster mode using ioredis's isCluster property
this.isCluster = (redis as Cluster).isCluster === true;
@ -163,12 +168,18 @@ export class RedisJobStore implements IJobStore {
await this.redis.expire(key, this.ttl.running);
await this.redis.sadd(KEYS.runningJobs, streamId);
await this.redis.sadd(userJobsKey, streamId);
if (this.ttl.userJobsSet > 0) {
await this.redis.expire(userJobsKey, this.ttl.userJobsSet);
}
} else {
const pipeline = this.redis.pipeline();
pipeline.hset(key, this.serializeJob(job));
pipeline.expire(key, this.ttl.running);
pipeline.sadd(KEYS.runningJobs, streamId);
pipeline.sadd(userJobsKey, streamId);
if (this.ttl.userJobsSet > 0) {
pipeline.expire(userJobsKey, this.ttl.userJobsSet);
}
await pipeline.exec();
}
@ -204,10 +215,11 @@ export class RedisJobStore implements IJobStore {
return;
}
// If status changed to complete/error/aborted, update TTL and remove from running set
// Note: userJobs cleanup is handled lazily via self-healing in getActiveJobIdsByUser
if (updates.status && ['complete', 'error', 'aborted'].includes(updates.status)) {
// In cluster mode, separate runningJobs (global) from stream-specific keys
// Proactively remove from user's job set (requires reading userId from the job hash)
const job = await this.getJob(streamId);
const userJobsKey = job?.userId ? KEYS.userJobs(job.userId, job.tenantId) : null;
if (this.isCluster) {
await this.redis.expire(key, this.ttl.completed);
await this.redis.srem(KEYS.runningJobs, streamId);
@ -223,6 +235,10 @@ export class RedisJobStore implements IJobStore {
} else {
await this.redis.expire(KEYS.runSteps(streamId), this.ttl.runStepsAfterComplete);
}
if (userJobsKey) {
await this.redis.srem(userJobsKey, streamId);
}
} else {
const pipeline = this.redis.pipeline();
pipeline.expire(key, this.ttl.completed);
@ -240,33 +256,46 @@ export class RedisJobStore implements IJobStore {
pipeline.expire(KEYS.runSteps(streamId), this.ttl.runStepsAfterComplete);
}
if (userJobsKey) {
pipeline.srem(userJobsKey, streamId);
}
await pipeline.exec();
}
}
}
async deleteJob(streamId: string): Promise<void> {
// Clear local caches
this.localGraphCache.delete(streamId);
this.localCollectedUsageCache.delete(streamId);
const job = await this.getJob(streamId);
const userJobsKey = job?.userId ? KEYS.userJobs(job.userId, job.tenantId) : null;
return this.deleteJobInternal(streamId, userJobsKey);
}
private async deleteJobInternal(streamId: string, userJobsKey: string | null): Promise<void> {
this.localGraphCache.delete(streamId);
this.localCollectedUsageCache.delete(streamId);
// Note: userJobs cleanup is handled lazily via self-healing in getActiveJobIdsByUser
// In cluster mode, separate runningJobs (global) from stream-specific keys (same slot)
if (this.isCluster) {
// Stream-specific keys all hash to same slot due to {streamId}
const pipeline = this.redis.pipeline();
pipeline.del(KEYS.job(streamId));
pipeline.del(KEYS.chunks(streamId));
pipeline.del(KEYS.runSteps(streamId));
await pipeline.exec();
// Global set is on different slot - execute separately
await this.redis.srem(KEYS.runningJobs, streamId);
if (userJobsKey) {
await this.redis.srem(userJobsKey, streamId);
}
} else {
const pipeline = this.redis.pipeline();
pipeline.del(KEYS.job(streamId));
pipeline.del(KEYS.chunks(streamId));
pipeline.del(KEYS.runSteps(streamId));
pipeline.srem(KEYS.runningJobs, streamId);
if (userJobsKey) {
pipeline.srem(userJobsKey, streamId);
}
await pipeline.exec();
}
logger.debug(`[RedisJobStore] Deleted job: ${streamId}`);
@ -322,8 +351,13 @@ export class RedisJobStore implements IJobStore {
}
// Job completed but still in running set (shouldn't happen, but handle it)
// Only remove from tracking sets — do NOT delete the job hash, which has
// its own completedTtl so clients can still poll for final status.
if (job.status !== 'running') {
await this.redis.srem(KEYS.runningJobs, streamId);
if (job.userId) {
await this.redis.srem(KEYS.userJobs(job.userId, job.tenantId), streamId);
}
this.localGraphCache.delete(streamId);
this.localCollectedUsageCache.delete(streamId);
return 1;
@ -332,7 +366,8 @@ export class RedisJobStore implements IJobStore {
// Stale running job (failsafe - running for > configured TTL)
if (now - job.createdAt > this.ttl.running * 1000) {
logger.warn(`[RedisJobStore] Cleaning up stale job: ${streamId}`);
await this.deleteJob(streamId);
const userJobsKey = job.userId ? KEYS.userJobs(job.userId, job.tenantId) : null;
await this.deleteJobInternal(streamId, userJobsKey);
return 1;
}
@ -402,7 +437,6 @@ export class RedisJobStore implements IJobStore {
}
}
// Clean up stale entries
if (staleIds.length > 0) {
await this.redis.srem(userJobsKey, ...staleIds);
logger.debug(