diff --git a/packages/api/src/stream/GenerationJobManager.ts b/packages/api/src/stream/GenerationJobManager.ts index 366445bb26..2ab4db6187 100644 --- a/packages/api/src/stream/GenerationJobManager.ts +++ b/packages/api/src/stream/GenerationJobManager.ts @@ -20,6 +20,9 @@ import type { GenerationJobStore } from '~/app/metrics'; import { InMemoryEventTransport } from './implementations/InMemoryEventTransport'; import { InMemoryJobStore } from './implementations/InMemoryJobStore'; +/** Error surfaced to any client still attached when a stale/hung job is reaped. */ +const REAPED_JOB_ERROR = 'Generation timed out'; + /** * Configuration options for GenerationJobManager */ @@ -952,6 +955,11 @@ class GenerationJobManagerClass { return; } + // Refresh job activity so the store's stale-job failsafe reaps on inactivity + // (a hung generation), not on age (a long but live stream). Parity with + // RedisJobStore refreshing the running TTL on each appendChunk. + this.jobStore.recordActivity?.(streamId); + await this.trackUserMessage(streamId, event); // For Redis mode, persist chunk for later reconstruction (fire-and-forget for resumability) @@ -1234,6 +1242,26 @@ class GenerationJobManagerClass { // Cleanup runtime state for deleted jobs for (const streamId of this.runtimeState.keys()) { if (!(await this.jobStore.hasJob(streamId))) { + /** + * Abort any still-pending generation whose job has been reaped (e.g. a + * stale "running" job removed by the store's failsafe timeout). This + * unwinds the hung in-flight work so its client/graph references can be + * garbage collected, rather than leaking via the pending promise. + */ + const runtime = this.runtimeState.get(streamId); + if (runtime && !runtime.abortController.signal.aborted) { + runtime.abortController.abort(); + } + // If a client is still attached when the job is reaped, send a terminal + // error first so the SSE connection closes instead of hanging open with no + // final/done event (the route only ends the response from onDone/onError). + if (this.eventTransport.getSubscriberCount(streamId) > 0) { + try { + await this.eventTransport.emitError(streamId, REAPED_JOB_ERROR); + } catch (err) { + logger.error(`[GenerationJobManager] Failed to notify reaped stream ${streamId}:`, err); + } + } this.runtimeState.delete(streamId); runningJobsChanged = this.runningJobs.delete(streamId) || runningJobsChanged; this.runStepBuffers?.delete(streamId); diff --git a/packages/api/src/stream/__tests__/staleJobReaping.spec.ts b/packages/api/src/stream/__tests__/staleJobReaping.spec.ts new file mode 100644 index 0000000000..003162af43 --- /dev/null +++ b/packages/api/src/stream/__tests__/staleJobReaping.spec.ts @@ -0,0 +1,220 @@ +/** + * Tests for the stale running-job failsafe that prevents abandoned/hung + * generations from leaking their content state in memory until the process OOMs. + * + * - InMemoryJobStore reaps jobs stuck in "running" past `staleJobTimeout` + * (mirrors RedisJobStore's running-job TTL). + * - GenerationJobManager aborts the in-flight generation when its job is reaped + * or replaced, so client/graph references can be garbage collected. + * + * @see https://github.com/danny-avila/LibreChat/issues/13391 + */ + +/** Suppress winston Console transport output (survives jest.resetModules) */ +jest.spyOn(console, 'log').mockImplementation(); + +describe('InMemoryJobStore - stale running-job failsafe', () => { + beforeEach(() => { + jest.resetModules(); + }); + + it('reaps a running job older than staleJobTimeout', async () => { + const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); + const store = new InMemoryJobStore({ ttlAfterComplete: 0, staleJobTimeout: 1000 }); + await store.initialize(); + + await store.createJob('s1', 'u1', 's1'); + await store.updateJob('s1', { createdAt: Date.now() - 5000 }); + + const removed = await store.cleanup(); + + expect(removed).toBe(1); + expect(await store.hasJob('s1')).toBe(false); + + await store.destroy(); + }); + + it('does not reap a running job with recent activity even if created long ago', async () => { + const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); + const store = new InMemoryJobStore({ staleJobTimeout: 1000 }); + await store.initialize(); + + await store.createJob('s1', 'u1', 's1'); + await store.updateJob('s1', { createdAt: Date.now() - 5000 }); + // Actively streaming: a long but live generation must not be reaped. + store.recordActivity('s1'); + + const removed = await store.cleanup(); + + expect(removed).toBe(0); + expect(await store.hasJob('s1')).toBe(true); + + await store.destroy(); + }); + + it('does not reap a replacement job that reuses a stale stream id', async () => { + jest.useFakeTimers(); + try { + const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); + const store = new InMemoryJobStore({ ttlAfterComplete: 0, staleJobTimeout: 1000 }); + await store.initialize(); + + await store.createJob('s1', 'u1', 's1'); + store.recordActivity('s1'); // old generation's activity + await jest.advanceTimersByTimeAsync(5000); // ...goes stale + + // Replacement reuses the same streamId (old job never terminated). + await store.createJob('s1', 'u1', 's1'); + const removed = await store.cleanup(); + + expect(removed).toBe(0); // fresh replacement must not be reaped immediately + expect(await store.hasJob('s1')).toBe(true); + + await store.destroy(); + } finally { + jest.useRealTimers(); + } + }); + + it('does not reap a running job within the staleJobTimeout', async () => { + const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); + const store = new InMemoryJobStore({ staleJobTimeout: 60000 }); + await store.initialize(); + + await store.createJob('s1', 'u1', 's1'); + + const removed = await store.cleanup(); + + expect(removed).toBe(0); + expect(await store.hasJob('s1')).toBe(true); + + await store.destroy(); + }); + + it('treats staleJobTimeout=0 as disabling the running-job failsafe', async () => { + const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); + const store = new InMemoryJobStore({ staleJobTimeout: 0 }); + await store.initialize(); + + await store.createJob('s1', 'u1', 's1'); + await store.updateJob('s1', { createdAt: Date.now() - 3_600_000 }); + + const removed = await store.cleanup(); + + expect(removed).toBe(0); + expect(await store.hasJob('s1')).toBe(true); + + await store.destroy(); + }); + + it('removes per-user tracking when reaping a stale running job', async () => { + const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); + const store = new InMemoryJobStore({ staleJobTimeout: 1000 }); + await store.initialize(); + + await store.createJob('s1', 'u1', 's1'); + expect(await store.getActiveJobIdsByUser('u1')).toEqual(['s1']); + + await store.updateJob('s1', { createdAt: Date.now() - 5000 }); + await store.cleanup(); + + expect(await store.getActiveJobIdsByUser('u1')).toEqual([]); + expect(await store.getJobCount()).toBe(0); + + await store.destroy(); + }); + + it('reaps terminal jobs while leaving fresh running jobs intact', async () => { + const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); + const store = new InMemoryJobStore({ ttlAfterComplete: 0, staleJobTimeout: 60000 }); + await store.initialize(); + + await store.createJob('done', 'u1', 'done'); + await store.updateJob('done', { status: 'complete', completedAt: Date.now() }); + await store.createJob('live', 'u1', 'live'); + + const removed = await store.cleanup(); + + expect(removed).toBe(1); + expect(await store.hasJob('done')).toBe(false); + expect(await store.hasJob('live')).toBe(true); + + await store.destroy(); + }); +}); + +describe('GenerationJobManager - generation abort on reaping', () => { + beforeEach(() => { + jest.resetModules(); + }); + + it('aborts and cleans up a hung running job once the store reaps it', async () => { + const { GenerationJobManagerClass } = await import('../GenerationJobManager'); + const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); + const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport'); + + jest.useFakeTimers(); + try { + const manager = new GenerationJobManagerClass(); + manager.configure({ + jobStore: new InMemoryJobStore({ ttlAfterComplete: 0, staleJobTimeout: 1000 }), + eventTransport: new InMemoryEventTransport(), + isRedis: false, + }); + manager.initialize(); + + const job = await manager.createJob('conv-2', 'user-1', 'conv-2'); + expect(await manager.hasJob('conv-2')).toBe(true); + expect(job.abortController.signal.aborted).toBe(false); + + // Advance past the stale timeout + the 60s cleanup interval. + await jest.advanceTimersByTimeAsync(61000); + + expect(job.abortController.signal.aborted).toBe(true); + expect(await manager.hasJob('conv-2')).toBe(false); + expect(manager.getRuntimeStats().runtimeStateSize).toBe(0); + expect(manager.getRuntimeStats().eventTransportStreams).toBe(0); + + await manager.destroy(); + } finally { + jest.useRealTimers(); + } + }); + + it('sends a terminal error to a still-connected client when its job is reaped', async () => { + const { GenerationJobManagerClass } = await import('../GenerationJobManager'); + const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); + const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport'); + + jest.useFakeTimers(); + try { + const manager = new GenerationJobManagerClass(); + manager.configure({ + jobStore: new InMemoryJobStore({ ttlAfterComplete: 0, staleJobTimeout: 1000 }), + eventTransport: new InMemoryEventTransport(), + isRedis: false, + }); + manager.initialize(); + + await manager.createJob('conv-3', 'user-1', 'conv-3'); + const errors: string[] = []; + const subscription = await manager.subscribe( + 'conv-3', + () => undefined, + () => undefined, + (error) => errors.push(error), + ); + + // Hung generation: no chunks emitted; advance past the stale timeout + cleanup tick. + await jest.advanceTimersByTimeAsync(61000); + + expect(errors).toContain('Generation timed out'); + expect(await manager.hasJob('conv-3')).toBe(false); + + subscription?.unsubscribe(); + await manager.destroy(); + } finally { + jest.useRealTimers(); + } + }); +}); diff --git a/packages/api/src/stream/createStreamServices.ts b/packages/api/src/stream/createStreamServices.ts index ebf3055f8d..11fd535275 100644 --- a/packages/api/src/stream/createStreamServices.ts +++ b/packages/api/src/stream/createStreamServices.ts @@ -34,6 +34,7 @@ export interface StreamServicesConfig { inMemoryOptions?: { ttlAfterComplete?: number; maxJobs?: number; + staleJobTimeout?: number; }; } @@ -119,6 +120,8 @@ function createInMemoryServices(options?: StreamServicesConfig['inMemoryOptions' const jobStore = new InMemoryJobStore({ ttlAfterComplete: options?.ttlAfterComplete ?? 300000, // 5 minutes maxJobs: options?.maxJobs ?? 1000, + // Failsafe for crashed/hung generations (mirrors RedisJobStore's running-job TTL). + staleJobTimeout: options?.staleJobTimeout ?? 1_200_000, // 20 minutes }); const eventTransport = new InMemoryEventTransport(); diff --git a/packages/api/src/stream/implementations/InMemoryJobStore.ts b/packages/api/src/stream/implementations/InMemoryJobStore.ts index 7280c3ce80..f318f3baf5 100644 --- a/packages/api/src/stream/implementations/InMemoryJobStore.ts +++ b/packages/api/src/stream/implementations/InMemoryJobStore.ts @@ -35,19 +35,38 @@ export class InMemoryJobStore implements IJobStore { /** Maps userId -> Set of streamIds (conversationIds) for active jobs */ private userJobMap = new Map>(); + /** + * Maps streamId -> last generation-activity timestamp. Refreshed via + * recordActivity() on each emitted chunk so the stale-job failsafe reaps on + * inactivity (a hung generation) rather than age (a long but live stream). + */ + private lastActivity = new Map(); + /** Time to keep completed jobs before cleanup (0 = immediate) */ private ttlAfterComplete = 0; /** Maximum number of concurrent jobs */ private maxJobs = 1000; - constructor(options?: { ttlAfterComplete?: number; maxJobs?: number }) { + /** + * Failsafe timeout (ms) for jobs stuck in "running" status. Mirrors + * RedisJobStore's running-job TTL: a crashed or hung generation that never + * reaches a terminal state would otherwise retain its content state forever, + * leaking the full message context until the process runs out of memory. + * 0 disables the failsafe. Default: 20 minutes. + */ + private staleJobTimeout = 1_200_000; + + constructor(options?: { ttlAfterComplete?: number; maxJobs?: number; staleJobTimeout?: number }) { if (options?.ttlAfterComplete) { this.ttlAfterComplete = options.ttlAfterComplete; } if (options?.maxJobs) { this.maxJobs = options.maxJobs; } + if (options?.staleJobTimeout !== undefined) { + this.staleJobTimeout = options.staleJobTimeout; + } } async initialize(): Promise { @@ -87,6 +106,10 @@ export class InMemoryJobStore implements IJobStore { }; this.jobs.set(streamId, job); + // Clear any prior activity timestamp so a replacement reusing this streamId + // (the controller handles job replacement) falls back to the fresh createdAt + // and isn't reaped on the previous generation's stale last-activity time. + this.lastActivity.delete(streamId); // Track job by userId (tenant-qualified when available) for efficient user-scoped queries const userKey = tenantId ? `${tenantId}:${userId}` : userId; @@ -117,9 +140,21 @@ export class InMemoryJobStore implements IJobStore { async deleteJob(streamId: string): Promise { this.jobs.delete(streamId); this.contentState.delete(streamId); + this.lastActivity.delete(streamId); logger.debug(`[InMemoryJobStore] Deleted job: ${streamId}`); } + /** + * Refresh a job's last-activity timestamp (called on each emitted chunk) so the + * stale-job failsafe in cleanup() reaps on inactivity rather than total age, + * mirroring RedisJobStore refreshing the running TTL on each appendChunk. + */ + recordActivity(streamId: string): void { + if (this.jobs.has(streamId)) { + this.lastActivity.set(streamId, Date.now()); + } + } + async hasJob(streamId: string): Promise { return this.jobs.has(streamId); } @@ -137,6 +172,7 @@ export class InMemoryJobStore implements IJobStore { async cleanup(): Promise { const now = Date.now(); const toDelete: string[] = []; + let staleRunning = 0; for (const [streamId, job] of this.jobs) { const isFinished = ['complete', 'error', 'aborted'].includes(job.status); @@ -145,6 +181,18 @@ export class InMemoryJobStore implements IJobStore { if (this.ttlAfterComplete === 0 || now - job.completedAt > this.ttlAfterComplete) { toDelete.push(streamId); } + } else if (this.staleJobTimeout > 0 && job.status === 'running') { + // Failsafe: reap jobs stuck in "running" with no generation activity for + // longer than the stale timeout. These are crashed/hung generations that + // never reached a terminal state; without this they accumulate their + // content state in memory until the process OOMs. Reaping keys off last + // activity (not creation time) so a long but live stream is never reaped, + // mirroring RedisJobStore refreshing the running TTL on each chunk. + const lastActive = this.lastActivity.get(streamId) ?? job.createdAt; + if (now - lastActive > this.staleJobTimeout) { + toDelete.push(streamId); + staleRunning++; + } } } @@ -163,6 +211,12 @@ export class InMemoryJobStore implements IJobStore { await this.deleteJob(id); } + if (staleRunning > 0) { + logger.warn( + `[InMemoryJobStore] Reaped ${staleRunning} stale running job(s) exceeding ${this.staleJobTimeout}ms (likely crashed/hung generations)`, + ); + } + if (toDelete.length > 0) { logger.debug(`[InMemoryJobStore] Cleaned up ${toDelete.length} expired jobs`); } diff --git a/packages/api/src/stream/interfaces/IJobStore.ts b/packages/api/src/stream/interfaces/IJobStore.ts index afd6ac68a0..042ebadad7 100644 --- a/packages/api/src/stream/interfaces/IJobStore.ts +++ b/packages/api/src/stream/interfaces/IJobStore.ts @@ -181,6 +181,18 @@ export interface IJobStore { /** Cleanup expired jobs */ cleanup(): Promise; + /** + * Record generation activity for a job (e.g. a chunk was emitted), refreshing + * its "last active" timestamp so the stale-running-job failsafe does not reap a + * stream that is still producing output. + * + * In-memory: updates an internal last-activity timestamp used by cleanup(). + * Redis: no-op — the running-job TTL is already refreshed on each appendChunk. + * + * @param streamId - The stream identifier + */ + recordActivity?(streamId: string): void; + /** Get total job count */ getJobCount(): Promise;