🧟 fix: Reap Hung In-Memory Generations for Redis Failsafe Parity (#13396)
Some checks are pending
Docker Dev Branch Images Build / build (Dockerfile, lc-dev, node) (push) Waiting to run
Docker Dev Branch Images Build / build (Dockerfile.multi, lc-dev-api, api-build) (push) Waiting to run
GitNexus Index / index (push) Waiting to run
GitNexus Index / post-index (push) Blocked by required conditions

* ♻️ fix: Reap Stale In-Memory Generation Jobs to Prevent Heap OOM

InMemoryJobStore only reaped terminal jobs, so a generation that hung
without reaching completeJob() stayed "running" forever, retaining its
full message context. Abandoned jobs accumulated until the V8 heap was
exhausted (#13391). RedisJobStore already guards this with a 20-minute
running-job TTL; the in-memory store had no equivalent failsafe.

- Add a configurable staleJobTimeout (default 20m) to InMemoryJobStore;
  cleanup() now reaps running jobs older than the timeout.
- Abort a pending generation in GenerationJobManager.cleanup() when its
  job has been reaped, releasing client/graph references for GC.
- Abort the previous generation in createJob() when a job is replaced
  for the same stream, closing an untracked-orphan leak.
- Forward staleJobTimeout through createStreamServices.

* 🩹 fix: Remove same-stream replacement abort (codex P1)

The createJob replacement-abort could let a stale, replaced request take
the abort-during-initialization path and complete/error the replacement
job via the shared streamId, and was a no-op across Redis replicas.
Removed it; the reported OOM is handled by the running-job failsafe and
the orphan-loop abort, which only fires when no job holds the streamId.

* 🩹 fix: Reap stale jobs on inactivity rather than age (codex P2)

Age-based reaping would drop a legitimately long but actively-streaming
generation at the timeout. Track a last-activity timestamp (refreshed on
each emitted chunk via recordActivity) and reap on inactivity instead,
mirroring RedisJobStore refreshing the running TTL on each appendChunk.

* 🩹 fix: Notify reaped streams and reset activity on replacement (codex P2)

- Emit a terminal error to any client still attached when a stale job is
  reaped, so the SSE connection closes instead of hanging open with no
  final/error event.
- Clear lastActivity in createJob so a replacement reusing the same
  streamId falls back to its fresh createdAt and isn't reaped immediately
  on the previous generation's stale activity timestamp.
This commit is contained in:
Danny Avila 2026-05-29 15:07:13 -04:00 committed by GitHub
parent 71a7c9ce7b
commit a14344ded7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 318 additions and 1 deletions

View file

@ -20,6 +20,9 @@ import type { GenerationJobStore } from '~/app/metrics';
import { InMemoryEventTransport } from './implementations/InMemoryEventTransport';
import { InMemoryJobStore } from './implementations/InMemoryJobStore';
/** Error surfaced to any client still attached when a stale/hung job is reaped. */
const REAPED_JOB_ERROR = 'Generation timed out';
/**
* Configuration options for GenerationJobManager
*/
@ -952,6 +955,11 @@ class GenerationJobManagerClass {
return;
}
// Refresh job activity so the store's stale-job failsafe reaps on inactivity
// (a hung generation), not on age (a long but live stream). Parity with
// RedisJobStore refreshing the running TTL on each appendChunk.
this.jobStore.recordActivity?.(streamId);
await this.trackUserMessage(streamId, event);
// For Redis mode, persist chunk for later reconstruction (fire-and-forget for resumability)
@ -1234,6 +1242,26 @@ class GenerationJobManagerClass {
// Cleanup runtime state for deleted jobs
for (const streamId of this.runtimeState.keys()) {
if (!(await this.jobStore.hasJob(streamId))) {
/**
* Abort any still-pending generation whose job has been reaped (e.g. a
* stale "running" job removed by the store's failsafe timeout). This
* unwinds the hung in-flight work so its client/graph references can be
* garbage collected, rather than leaking via the pending promise.
*/
const runtime = this.runtimeState.get(streamId);
if (runtime && !runtime.abortController.signal.aborted) {
runtime.abortController.abort();
}
// If a client is still attached when the job is reaped, send a terminal
// error first so the SSE connection closes instead of hanging open with no
// final/done event (the route only ends the response from onDone/onError).
if (this.eventTransport.getSubscriberCount(streamId) > 0) {
try {
await this.eventTransport.emitError(streamId, REAPED_JOB_ERROR);
} catch (err) {
logger.error(`[GenerationJobManager] Failed to notify reaped stream ${streamId}:`, err);
}
}
this.runtimeState.delete(streamId);
runningJobsChanged = this.runningJobs.delete(streamId) || runningJobsChanged;
this.runStepBuffers?.delete(streamId);

View file

@ -0,0 +1,220 @@
/**
* Tests for the stale running-job failsafe that prevents abandoned/hung
* generations from leaking their content state in memory until the process OOMs.
*
* - InMemoryJobStore reaps jobs stuck in "running" past `staleJobTimeout`
* (mirrors RedisJobStore's running-job TTL).
* - GenerationJobManager aborts the in-flight generation when its job is reaped
* or replaced, so client/graph references can be garbage collected.
*
* @see https://github.com/danny-avila/LibreChat/issues/13391
*/
/** Suppress winston Console transport output (survives jest.resetModules) */
jest.spyOn(console, 'log').mockImplementation();
describe('InMemoryJobStore - stale running-job failsafe', () => {
beforeEach(() => {
jest.resetModules();
});
it('reaps a running job older than staleJobTimeout', async () => {
const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore');
const store = new InMemoryJobStore({ ttlAfterComplete: 0, staleJobTimeout: 1000 });
await store.initialize();
await store.createJob('s1', 'u1', 's1');
await store.updateJob('s1', { createdAt: Date.now() - 5000 });
const removed = await store.cleanup();
expect(removed).toBe(1);
expect(await store.hasJob('s1')).toBe(false);
await store.destroy();
});
it('does not reap a running job with recent activity even if created long ago', async () => {
const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore');
const store = new InMemoryJobStore({ staleJobTimeout: 1000 });
await store.initialize();
await store.createJob('s1', 'u1', 's1');
await store.updateJob('s1', { createdAt: Date.now() - 5000 });
// Actively streaming: a long but live generation must not be reaped.
store.recordActivity('s1');
const removed = await store.cleanup();
expect(removed).toBe(0);
expect(await store.hasJob('s1')).toBe(true);
await store.destroy();
});
it('does not reap a replacement job that reuses a stale stream id', async () => {
jest.useFakeTimers();
try {
const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore');
const store = new InMemoryJobStore({ ttlAfterComplete: 0, staleJobTimeout: 1000 });
await store.initialize();
await store.createJob('s1', 'u1', 's1');
store.recordActivity('s1'); // old generation's activity
await jest.advanceTimersByTimeAsync(5000); // ...goes stale
// Replacement reuses the same streamId (old job never terminated).
await store.createJob('s1', 'u1', 's1');
const removed = await store.cleanup();
expect(removed).toBe(0); // fresh replacement must not be reaped immediately
expect(await store.hasJob('s1')).toBe(true);
await store.destroy();
} finally {
jest.useRealTimers();
}
});
it('does not reap a running job within the staleJobTimeout', async () => {
const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore');
const store = new InMemoryJobStore({ staleJobTimeout: 60000 });
await store.initialize();
await store.createJob('s1', 'u1', 's1');
const removed = await store.cleanup();
expect(removed).toBe(0);
expect(await store.hasJob('s1')).toBe(true);
await store.destroy();
});
it('treats staleJobTimeout=0 as disabling the running-job failsafe', async () => {
const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore');
const store = new InMemoryJobStore({ staleJobTimeout: 0 });
await store.initialize();
await store.createJob('s1', 'u1', 's1');
await store.updateJob('s1', { createdAt: Date.now() - 3_600_000 });
const removed = await store.cleanup();
expect(removed).toBe(0);
expect(await store.hasJob('s1')).toBe(true);
await store.destroy();
});
it('removes per-user tracking when reaping a stale running job', async () => {
const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore');
const store = new InMemoryJobStore({ staleJobTimeout: 1000 });
await store.initialize();
await store.createJob('s1', 'u1', 's1');
expect(await store.getActiveJobIdsByUser('u1')).toEqual(['s1']);
await store.updateJob('s1', { createdAt: Date.now() - 5000 });
await store.cleanup();
expect(await store.getActiveJobIdsByUser('u1')).toEqual([]);
expect(await store.getJobCount()).toBe(0);
await store.destroy();
});
it('reaps terminal jobs while leaving fresh running jobs intact', async () => {
const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore');
const store = new InMemoryJobStore({ ttlAfterComplete: 0, staleJobTimeout: 60000 });
await store.initialize();
await store.createJob('done', 'u1', 'done');
await store.updateJob('done', { status: 'complete', completedAt: Date.now() });
await store.createJob('live', 'u1', 'live');
const removed = await store.cleanup();
expect(removed).toBe(1);
expect(await store.hasJob('done')).toBe(false);
expect(await store.hasJob('live')).toBe(true);
await store.destroy();
});
});
describe('GenerationJobManager - generation abort on reaping', () => {
beforeEach(() => {
jest.resetModules();
});
it('aborts and cleans up a hung running job once the store reaps it', async () => {
const { GenerationJobManagerClass } = await import('../GenerationJobManager');
const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore');
const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport');
jest.useFakeTimers();
try {
const manager = new GenerationJobManagerClass();
manager.configure({
jobStore: new InMemoryJobStore({ ttlAfterComplete: 0, staleJobTimeout: 1000 }),
eventTransport: new InMemoryEventTransport(),
isRedis: false,
});
manager.initialize();
const job = await manager.createJob('conv-2', 'user-1', 'conv-2');
expect(await manager.hasJob('conv-2')).toBe(true);
expect(job.abortController.signal.aborted).toBe(false);
// Advance past the stale timeout + the 60s cleanup interval.
await jest.advanceTimersByTimeAsync(61000);
expect(job.abortController.signal.aborted).toBe(true);
expect(await manager.hasJob('conv-2')).toBe(false);
expect(manager.getRuntimeStats().runtimeStateSize).toBe(0);
expect(manager.getRuntimeStats().eventTransportStreams).toBe(0);
await manager.destroy();
} finally {
jest.useRealTimers();
}
});
it('sends a terminal error to a still-connected client when its job is reaped', async () => {
const { GenerationJobManagerClass } = await import('../GenerationJobManager');
const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore');
const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport');
jest.useFakeTimers();
try {
const manager = new GenerationJobManagerClass();
manager.configure({
jobStore: new InMemoryJobStore({ ttlAfterComplete: 0, staleJobTimeout: 1000 }),
eventTransport: new InMemoryEventTransport(),
isRedis: false,
});
manager.initialize();
await manager.createJob('conv-3', 'user-1', 'conv-3');
const errors: string[] = [];
const subscription = await manager.subscribe(
'conv-3',
() => undefined,
() => undefined,
(error) => errors.push(error),
);
// Hung generation: no chunks emitted; advance past the stale timeout + cleanup tick.
await jest.advanceTimersByTimeAsync(61000);
expect(errors).toContain('Generation timed out');
expect(await manager.hasJob('conv-3')).toBe(false);
subscription?.unsubscribe();
await manager.destroy();
} finally {
jest.useRealTimers();
}
});
});

View file

@ -34,6 +34,7 @@ export interface StreamServicesConfig {
inMemoryOptions?: {
ttlAfterComplete?: number;
maxJobs?: number;
staleJobTimeout?: number;
};
}
@ -119,6 +120,8 @@ function createInMemoryServices(options?: StreamServicesConfig['inMemoryOptions'
const jobStore = new InMemoryJobStore({
ttlAfterComplete: options?.ttlAfterComplete ?? 300000, // 5 minutes
maxJobs: options?.maxJobs ?? 1000,
// Failsafe for crashed/hung generations (mirrors RedisJobStore's running-job TTL).
staleJobTimeout: options?.staleJobTimeout ?? 1_200_000, // 20 minutes
});
const eventTransport = new InMemoryEventTransport();

View file

@ -35,19 +35,38 @@ export class InMemoryJobStore implements IJobStore {
/** Maps userId -> Set of streamIds (conversationIds) for active jobs */
private userJobMap = new Map<string, Set<string>>();
/**
* Maps streamId -> last generation-activity timestamp. Refreshed via
* recordActivity() on each emitted chunk so the stale-job failsafe reaps on
* inactivity (a hung generation) rather than age (a long but live stream).
*/
private lastActivity = new Map<string, number>();
/** Time to keep completed jobs before cleanup (0 = immediate) */
private ttlAfterComplete = 0;
/** Maximum number of concurrent jobs */
private maxJobs = 1000;
constructor(options?: { ttlAfterComplete?: number; maxJobs?: number }) {
/**
* Failsafe timeout (ms) for jobs stuck in "running" status. Mirrors
* RedisJobStore's running-job TTL: a crashed or hung generation that never
* reaches a terminal state would otherwise retain its content state forever,
* leaking the full message context until the process runs out of memory.
* 0 disables the failsafe. Default: 20 minutes.
*/
private staleJobTimeout = 1_200_000;
constructor(options?: { ttlAfterComplete?: number; maxJobs?: number; staleJobTimeout?: number }) {
if (options?.ttlAfterComplete) {
this.ttlAfterComplete = options.ttlAfterComplete;
}
if (options?.maxJobs) {
this.maxJobs = options.maxJobs;
}
if (options?.staleJobTimeout !== undefined) {
this.staleJobTimeout = options.staleJobTimeout;
}
}
async initialize(): Promise<void> {
@ -87,6 +106,10 @@ export class InMemoryJobStore implements IJobStore {
};
this.jobs.set(streamId, job);
// Clear any prior activity timestamp so a replacement reusing this streamId
// (the controller handles job replacement) falls back to the fresh createdAt
// and isn't reaped on the previous generation's stale last-activity time.
this.lastActivity.delete(streamId);
// Track job by userId (tenant-qualified when available) for efficient user-scoped queries
const userKey = tenantId ? `${tenantId}:${userId}` : userId;
@ -117,9 +140,21 @@ export class InMemoryJobStore implements IJobStore {
async deleteJob(streamId: string): Promise<void> {
this.jobs.delete(streamId);
this.contentState.delete(streamId);
this.lastActivity.delete(streamId);
logger.debug(`[InMemoryJobStore] Deleted job: ${streamId}`);
}
/**
* Refresh a job's last-activity timestamp (called on each emitted chunk) so the
* stale-job failsafe in cleanup() reaps on inactivity rather than total age,
* mirroring RedisJobStore refreshing the running TTL on each appendChunk.
*/
recordActivity(streamId: string): void {
if (this.jobs.has(streamId)) {
this.lastActivity.set(streamId, Date.now());
}
}
async hasJob(streamId: string): Promise<boolean> {
return this.jobs.has(streamId);
}
@ -137,6 +172,7 @@ export class InMemoryJobStore implements IJobStore {
async cleanup(): Promise<number> {
const now = Date.now();
const toDelete: string[] = [];
let staleRunning = 0;
for (const [streamId, job] of this.jobs) {
const isFinished = ['complete', 'error', 'aborted'].includes(job.status);
@ -145,6 +181,18 @@ export class InMemoryJobStore implements IJobStore {
if (this.ttlAfterComplete === 0 || now - job.completedAt > this.ttlAfterComplete) {
toDelete.push(streamId);
}
} else if (this.staleJobTimeout > 0 && job.status === 'running') {
// Failsafe: reap jobs stuck in "running" with no generation activity for
// longer than the stale timeout. These are crashed/hung generations that
// never reached a terminal state; without this they accumulate their
// content state in memory until the process OOMs. Reaping keys off last
// activity (not creation time) so a long but live stream is never reaped,
// mirroring RedisJobStore refreshing the running TTL on each chunk.
const lastActive = this.lastActivity.get(streamId) ?? job.createdAt;
if (now - lastActive > this.staleJobTimeout) {
toDelete.push(streamId);
staleRunning++;
}
}
}
@ -163,6 +211,12 @@ export class InMemoryJobStore implements IJobStore {
await this.deleteJob(id);
}
if (staleRunning > 0) {
logger.warn(
`[InMemoryJobStore] Reaped ${staleRunning} stale running job(s) exceeding ${this.staleJobTimeout}ms (likely crashed/hung generations)`,
);
}
if (toDelete.length > 0) {
logger.debug(`[InMemoryJobStore] Cleaned up ${toDelete.length} expired jobs`);
}

View file

@ -181,6 +181,18 @@ export interface IJobStore {
/** Cleanup expired jobs */
cleanup(): Promise<number>;
/**
* Record generation activity for a job (e.g. a chunk was emitted), refreshing
* its "last active" timestamp so the stale-running-job failsafe does not reap a
* stream that is still producing output.
*
* In-memory: updates an internal last-activity timestamp used by cleanup().
* Redis: no-op the running-job TTL is already refreshed on each appendChunk.
*
* @param streamId - The stream identifier
*/
recordActivity?(streamId: string): void;
/** Get total job count */
getJobCount(): Promise<number>;