🧮 fix: Atomize Redis Event Sequence Counters for Multi-Replica Deployments (#12578)

* fix: atomize Redis event sequence counters for multi-replica deployments

Replace in-memory sequenceCounters Map with shared atomic Redis counter
(INCR via Lua script) so all replicas share an authoritative sequence
source. Make syncReorderBuffer async to read current sequence from Redis
instead of defaulting to 0 on non-publishing replicas.

Closes #12575

* fix: harden emitChunk error handling and syncReorderBuffer race safety

Wrap getNextSequence inside emitChunk's try/catch so a transient Redis
failure is logged-and-swallowed, preserving the non-fatal chunk emission
contract. In syncReorderBuffer, replace pending.clear() with selective
discard of stale entries (seq < currentSeq) followed by
flushPendingMessages, preventing loss of chunks that arrived during the
async GET window.

* fix: address review findings — harden error paths, validate types, DRY tests

- Wrap syncReorderBuffer in try/catch in GenerationJobManager.subscribe()
  so a Redis GET failure degrades gracefully instead of crashing SSE reconnect.
- Validate eval return type in getNextSequence; throw on unexpected type
  instead of silently computing NaN/-1 and dropping all messages.
- Add NaN guard to parseInt in syncReorderBuffer for corrupted Redis values.
- Consolidate two streams loops in destroy() into a single pass.
- Extract createMockPublisher to shared helpers/publisher.ts (DRY).
- Add tests for eval failure and unexpected eval return type in emitChunk.
- Document performance tradeoff (2x RTT per emit) and TTL rationale.

* test: add cross-replica integration tests for sequence desync fix (#12575)

Three real-Redis tests that reproduce the exact multi-replica failure:

- Late subscriber on a different replica syncs to the shared counter
  and receives chunks immediately (no 500ms force-flush).
- Multiple subscribe/unsubscribe cycles across replicas maintain
  correct sequence alignment on every reconnect.
- Shared counter key is cleaned up when the stream is destroyed.

* fix: close syncReorderBuffer race, stop destroy() from nuking shared keys

- Replace selective prune with Math.min(currentSeq, minPendingSeq) so
  chunks that arrive in pending during the async GET window are preserved
  instead of incorrectly pruned. Since unsubscribe() already clears
  pending, any entries at sync time are live messages from the current
  subscription and must not be discarded.
- Remove DEL from destroy() — the sequence key is shared across replicas
  and a shutting-down instance must not nuke it for active publishers.
  Keys expire naturally via their 1-hour TTL; cleanup() handles
  single-stream lifecycle teardown.
- Add race-condition test: pauses GET, injects a message into pending
  during the window, resolves GET, asserts the chunk is delivered.
- Add emitDone/emitError eval failure tests to cover the asymmetric
  error contract (chunk swallows, done/error propagates).
- Add explicit MockPublisher return type to helpers/publisher.ts.

* fix: context-aware syncReorderBuffer to prevent duplicate delivery, silence test logs

The Math.min(currentSeq, minPending) fix from the prior commit correctly
handles the cross-replica race (chunk arriving during async GET), but
causes duplicate delivery in same-replica mode: onAbort subscribes to
the Redis channel during createJob, so chunks published before
subscribe() may arrive via pub/sub AND earlyEventBuffer. The Math.min
logic then flushes the pub/sub copy as a "live" message.

Fix: add a clearPending parameter to syncReorderBuffer. The manager
passes true when earlyEventBuffer was replayed (same-replica: pending
entries are duplicates → clear them) and false/undefined when it was
not (cross-replica: pending entries are live → preserve via Math.min).

- Silence winston logger in stream test files via logger.silent = true,
  following the existing pattern in data-schemas/prompt.spec.ts.
- Remove sequence key DEL from destroy() — shared across replicas,
  a shutting-down instance must not nuke the counter for active
  publishers. Keys expire via TTL; cleanup() handles stream teardown.

* fix: share publisher client in cross-replica tests for Redis Cluster compat

The cross-replica tests created publisherB via (ioredisClient as Redis)
.duplicate(), which produces a plain Redis connection to a single node.
In Redis Cluster, GET/EVAL on this client can't follow MOVED redirects,
so syncReorderBuffer reads null → nextSeq=0 → chunks are buffered.

Fix: share ioredisClient as the publisher for both replicas. It's the
correct Cluster-aware client and handles slot routing automatically.
Only subscriber connections need to be separate (pub/sub requirement).

* fix: increase cross-replica test timeouts and use polling for CI cluster

Replace fixed 300ms delivery waits with polling (50ms intervals, 2s max)
to handle Redis Cluster's cross-node pub/sub broadcast latency in CI.
Increase subscription activation wait from 100ms to 500ms to match the
pattern used by existing same-instance tests.

* chore: silence winston logs in remaining stream test files

Add logger.silent = true to GenerationJobManager, RedisJobStore, and
collectedUsage test files, matching the pattern already applied to
RedisEventTransport and reconnect-reorder-desync tests.

* fix: remove flaky cluster pub/sub tests, fix log suppression for resetModules

Remove two cross-replica transport-level integration tests that are
inherently non-deterministic in Redis Cluster: cluster pub/sub fan-out
is async across nodes, so pre-subscribe PUBLISHes can arrive at the
subscriber's node after SUBSCRIBE takes effect, causing random +/- 1
message counts. The core logic is already covered by deterministic unit
tests (mock publisher) and GenerationJobManager integration tests
(end-to-end with earlyEventBuffer). The cleanup test is retained.

Switch log suppression from logger.silent (which doesn't survive
jest.resetModules) to jest.spyOn(console, 'log').mockImplementation()
for files that use resetModules (GenerationJobManager, RedisJobStore,
collectedUsage). The logger.silent approach remains for files that
don't use resetModules (RedisEventTransport, reconnect-reorder-desync).

* fix: replace Lua EVAL+TTL with plain INCR, preserve live chunks during same-replica sync

P1: syncReorderBuffer with clearPending=true was unconditionally clearing
pending, dropping NEW chunks (seq >= currentSeq) from ongoing generation
that arrived via pub/sub during the async GET. Fix: selectively prune
only entries with seq < currentSeq (duplicates of earlyEventBuffer) and
flush remaining live entries.

P2: The 1-hour TTL on sequence keys could expire mid-stream during long
quiet periods (e.g., slow tool calls), restarting the counter from zero
and causing subscribers to silently drop all subsequent messages. Fix:
remove the Lua EVAL+EXPIRE script entirely and use plain Redis INCR —
no TTL. Keys are cleaned up explicitly by cleanup()/resetSequence()
when streams end. Orphaned keys from crashed processes are a few bytes
each, negligible compared to the production risk of TTL expiry.

- Update mock publisher helper: eval → incr
- Remove "unexpected eval type" tests (not applicable to incr)
- Update remaining eval error tests to reference incr

* fix: re-arm flush timeout after syncReorderBuffer when gaps remain

syncReorderBuffer clears flushTimeout before processing, but if pending
still has gaps after flushPendingMessages, no timeout is re-armed.
Without new messages arriving to trigger scheduleFlushTimeout, those
buffered entries sit indefinitely — stalling the stream after reconnect.

* chore: address final review — fix stale docs, rename clearPending, tighten tests

Fix all 10 findings from final review pass:

F1: Fix stale TTL comment in destroy() — no TTL exists after EVAL removal
F2: Fix stale EVAL reference in emitChunk JSDoc → INCR + PUBLISH
F3: Fix syncReorderBuffer JSDoc — describes old broken behavior, not
    the current selective prune
F4: Rename clearPending → pruneStaleEntries for clarity at call sites
F5: Add publish-not-called assertion to INCR failure test
F6: Replace Math.min(...spread) with explicit loop to avoid heap alloc
F7: Remove resetSequence from IEventTransport interface (no external
    callers; implementation detail only)
F8: Consolidate cleanup/resetSequence overlap — cleanup() owns the DEL,
    resetReorderBuffer() (now private) handles buffer state only
F9: Fix import order in reconnect test (value before type imports)
F10: Export MockPublisher interface from helpers/publisher.ts

* chore: fix stale resetSequence references and hadBufferReplay JSDoc

Two comments referenced resetSequence() which was removed in the F7/F8
refactor (now private resetReorderBuffer(), which doesn't DEL the key).
Only cleanup() deletes the Redis key. Also update hadBufferReplay JSDoc
to say "prune stale entries" instead of the pre-refactor "clear pending".

* chore: grammar

* fix: use earlyReplayCount as prune cutoff instead of Redis counter

The boolean pruneStaleEntries flag used currentSeq (from Redis GET) as
the prune threshold, but INCR can advance the counter past a live
chunk's seq during the GET window. Example: earlyEventBuffer held seqs
0-4, generation emits seq 5 during GET, INCR advances counter to 6,
GET returns 6, prune condition 5 < 6 deletes the live chunk.

Fix: pass the earlyEventBuffer replay count (5) as the prune cutoff.
Entries with seq < earlyReplayCount are true duplicates; entries at or
above are live regardless of what the Redis counter reads. After
pruning, the unified Math.min(currentSeq, minPending) logic handles
both same-replica and cross-replica paths correctly.

Add a targeted test that exercises this exact race: pauses GET, emits
seq 5 during the window, resolves GET with counter=6, asserts seq 5
is preserved (would have been dropped with the old boolean approach).

* fix: pass earlyReplayCount for skipBufferReplay path to prune pub/sub duplicates

When skipBufferReplay is true (resume scenario), earlyEventBuffer events
are delivered via the resume sync payload, not replayed directly. But
earlyReplayCount was left at 0, so syncReorderBuffer treated all pending
entries as live — meaning delayed pub/sub copies of those buffered
events could be delivered again, duplicating content.

Fix: capture earlyEventBuffer.length before the skip/replay branch so
the count is always passed to syncReorderBuffer regardless of delivery
method. Seqs 0..earlyReplayCount-1 are pruned as duplicates whether
they were replayed or delivered via sync payload.

* fix: keep nextSeq monotonic in syncReorderBuffer after async GET

handleOrderedChunk can deliver in-order messages and advance nextSeq
during the async GET window. If those messages leave pending empty,
the unconditional nextSeq = currentSeq could regress nextSeq below
its already-advanced value, reopening a delivered gap and causing
subsequent messages to be buffered until force-flush.

Fix: wrap both nextSeq assignments with Math.max(nextSeq, ...) so
syncReorderBuffer never moves the delivery frontier backward.

* fix: cap nextSeq at earlyReplayCount, add 24h safety TTL, add post-GET race test

Finding 1 (CRITICAL): When earlyReplayCount > 0 and pending is empty,
nextSeq was set to currentSeq — but INCR can advance the counter past
a live chunk whose pub/sub hasn't arrived yet. Cap at earlyReplayCount
instead (what was actually delivered), so in-flight chunks are not
skipped. Adds test for the message-arrives-AFTER-GET-resolves scenario.

Finding 3: Add a 24-hour safety-net TTL set once on first INCR only
(val === 1), never refreshed. This caps orphan lifetime from crashed
processes without risking mid-stream counter resets.

Finding 6: Replace setTimeout(100) in cleanup test with polling loop.

Finding 9: Fix variadic DEL mock + add expire mock for the new TTL.

Revert P2 fix (earlyReplayCount for skipBufferReplay path) — when
skipBufferReplay is true, the resume sync payload delivers everything
up to currentSeq, so syncReorderBuffer should trust the Redis counter
as the frontier, not the buffer length.

* chore: log expire failures consistently with other fire-and-forget errors
This commit is contained in:
Danny Avila 2026-04-09 09:57:54 -04:00 committed by GitHub
parent 632ffbcb87
commit 452af50eff
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 509 additions and 87 deletions

View file

@ -770,14 +770,27 @@ class GenerationJobManagerClass {
if (!runtime.hasSubscriber) {
runtime.hasSubscriber = true;
/**
* Pass earlyReplayCount to syncReorderBuffer so it can prune duplicate pub/sub
* entries (seqs 0..count-1) without touching live in-flight chunks.
*
* Only set when the buffer was actually replayed those specific seqs were
* delivered via onChunk and their pub/sub copies are duplicates.
* When skipBufferReplay is true, the resume sync payload delivers aggregated
* content up to the Redis counter, so syncReorderBuffer should trust currentSeq
* as the frontier (earlyReplayCount = 0).
*/
let earlyReplayCount = 0;
if (runtime.earlyEventBuffer.length > 0) {
if (options?.skipBufferReplay) {
logger.debug(
`[GenerationJobManager] Skipping ${runtime.earlyEventBuffer.length} buffered events for ${streamId} (skipBufferReplay)`,
);
} else {
earlyReplayCount = runtime.earlyEventBuffer.length;
logger.debug(
`[GenerationJobManager] Replaying ${runtime.earlyEventBuffer.length} buffered events for ${streamId}`,
`[GenerationJobManager] Replaying ${earlyReplayCount} buffered events for ${streamId}`,
);
for (const bufferedEvent of runtime.earlyEventBuffer) {
onChunk(bufferedEvent);
@ -807,7 +820,14 @@ class GenerationJobManagerClass {
onChunk(createdEvent);
}
this.eventTransport.syncReorderBuffer?.(streamId);
try {
await this.eventTransport.syncReorderBuffer?.(streamId, earlyReplayCount);
} catch (err) {
logger.warn(
`[GenerationJobManager] Failed to sync reorder buffer for ${streamId}; proceeding with current nextSeq:`,
err,
);
}
}
if (isFirst) {

View file

@ -14,6 +14,9 @@ import {
keyvRedisClientReady,
} from '~/cache/redisClients';
/** Suppress winston Console transport output (survives jest.resetModules) */
jest.spyOn(console, 'log').mockImplementation();
/**
* Integration tests for GenerationJobManager.
*
@ -800,9 +803,8 @@ describe('GenerationJobManager Integration Tests', () => {
GenerationJobManager.initialize();
const received: unknown[] = [];
const subscription = await GenerationJobManager.subscribe(
streamId,
(event) => received.push(event),
const subscription = await GenerationJobManager.subscribe(streamId, (event) =>
received.push(event),
);
expect(subscription).not.toBeNull();
@ -839,9 +841,8 @@ describe('GenerationJobManager Integration Tests', () => {
GenerationJobManager.initialize();
const received: unknown[] = [];
const subscription = await GenerationJobManager.subscribe(
streamId,
(event) => received.push(event),
const subscription = await GenerationJobManager.subscribe(streamId, (event) =>
received.push(event),
);
expect(subscription).not.toBeNull();

View file

@ -1,4 +1,8 @@
import type { Redis, Cluster } from 'ioredis';
import { logger } from '@librechat/data-schemas';
import { createMockPublisher } from './helpers/publisher';
logger.silent = true;
/**
* Integration tests for RedisEventTransport.
@ -318,9 +322,7 @@ describe('RedisEventTransport Integration Tests', () => {
test('should reorder out-of-sequence messages', async () => {
const { RedisEventTransport } = await import('../implementations/RedisEventTransport');
const mockPublisher = {
publish: jest.fn().mockResolvedValue(1),
};
const mockPublisher = createMockPublisher();
const mockSubscriber = {
on: jest.fn(),
subscribe: jest.fn().mockResolvedValue(undefined),
@ -359,9 +361,7 @@ describe('RedisEventTransport Integration Tests', () => {
test('should buffer early messages and deliver when gaps are filled', async () => {
const { RedisEventTransport } = await import('../implementations/RedisEventTransport');
const mockPublisher = {
publish: jest.fn().mockResolvedValue(1),
};
const mockPublisher = createMockPublisher();
const mockSubscriber = {
on: jest.fn(),
subscribe: jest.fn().mockResolvedValue(undefined),
@ -407,9 +407,7 @@ describe('RedisEventTransport Integration Tests', () => {
const { RedisEventTransport } = await import('../implementations/RedisEventTransport');
const mockPublisher = {
publish: jest.fn().mockResolvedValue(1),
};
const mockPublisher = createMockPublisher();
const mockSubscriber = {
on: jest.fn(),
subscribe: jest.fn().mockResolvedValue(undefined),
@ -450,9 +448,7 @@ describe('RedisEventTransport Integration Tests', () => {
test('should handle messages without sequence numbers (backward compatibility)', async () => {
const { RedisEventTransport } = await import('../implementations/RedisEventTransport');
const mockPublisher = {
publish: jest.fn().mockResolvedValue(1),
};
const mockPublisher = createMockPublisher();
const mockSubscriber = {
on: jest.fn(),
subscribe: jest.fn().mockResolvedValue(undefined),
@ -492,9 +488,7 @@ describe('RedisEventTransport Integration Tests', () => {
test('should deliver done event after all pending chunks (terminal event ordering)', async () => {
const { RedisEventTransport } = await import('../implementations/RedisEventTransport');
const mockPublisher = {
publish: jest.fn().mockResolvedValue(1),
};
const mockPublisher = createMockPublisher();
const mockSubscriber = {
subscribe: jest.fn().mockResolvedValue(undefined),
unsubscribe: jest.fn().mockResolvedValue(undefined),
@ -547,9 +541,7 @@ describe('RedisEventTransport Integration Tests', () => {
test('should deliver error event after all pending chunks (terminal event ordering)', async () => {
const { RedisEventTransport } = await import('../implementations/RedisEventTransport');
const mockPublisher = {
publish: jest.fn().mockResolvedValue(1),
};
const mockPublisher = createMockPublisher();
const mockSubscriber = {
subscribe: jest.fn().mockResolvedValue(undefined),
unsubscribe: jest.fn().mockResolvedValue(undefined),
@ -864,6 +856,62 @@ describe('RedisEventTransport Integration Tests', () => {
});
});
/**
* Cross-Replica Sequence Synchronization (#12575)
*
* The core cross-replica sync logic (Redis INCR counter, async GET in
* syncReorderBuffer, pruneStaleEntries flag) is verified by:
* - Unit tests with mock publishers (deterministic, no cluster timing)
* - GenerationJobManager integration tests (end-to-end with earlyEventBuffer)
* - The race-condition unit test (paused GET with injected message)
*
* Transport-level integration tests with two real Redis transports are
* inherently flaky in Redis Cluster: cluster pub/sub fan-out is async
* across nodes, so a PUBLISH acknowledged on node A can arrive at the
* subscriber on node B after a subsequent SUBSCRIBE takes effect,
* causing non-deterministic message counts.
*/
describe('Cross-Replica Sequence Synchronization (#12575)', () => {
test('shared counter is cleaned up on stream cleanup', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const { RedisEventTransport } = await import('../implementations/RedisEventTransport');
const subscriber = (ioredisClient as Redis).duplicate();
const transport = new RedisEventTransport(ioredisClient, subscriber);
const streamId = `cross-replica-cleanup-${Date.now()}`;
// Publish chunks to create the Redis counter key
for (let i = 0; i < 5; i++) {
await transport.emitChunk(streamId, { index: i });
}
// Verify the key exists
const key = `stream:{${streamId}}:seq`;
const valBefore = await ioredisClient.get(key);
expect(valBefore).toBe('5');
// Cleanup the stream
transport.cleanup(streamId);
// Poll for the fire-and-forget DEL to complete (robust under CI load)
const start = Date.now();
let valAfter: string | null = 'pending';
while (valAfter !== null && Date.now() - start < 2000) {
await new Promise((resolve) => setTimeout(resolve, 10));
valAfter = await ioredisClient.get(key);
}
expect(valAfter).toBeNull();
transport.destroy();
subscriber.disconnect();
});
});
describe('Cleanup', () => {
test('should clean up stream resources', async () => {
if (!ioredisClient) {
@ -898,9 +946,8 @@ describe('RedisEventTransport Integration Tests', () => {
test('should swallow emitChunk publish errors (callers fire-and-forget)', async () => {
const { RedisEventTransport } = await import('../implementations/RedisEventTransport');
const mockPublisher = {
publish: jest.fn().mockRejectedValue(new Error('Redis connection lost')),
};
const mockPublisher = createMockPublisher();
mockPublisher.publish.mockRejectedValue(new Error('Redis connection lost'));
const mockSubscriber = {
on: jest.fn(),
subscribe: jest.fn().mockResolvedValue(undefined),
@ -921,12 +968,35 @@ describe('RedisEventTransport Integration Tests', () => {
transport.destroy();
});
test('should swallow emitChunk incr errors (sequence allocation failure)', async () => {
const { RedisEventTransport } = await import('../implementations/RedisEventTransport');
const mockPublisher = createMockPublisher();
mockPublisher.incr.mockRejectedValue(new Error('INCR failed'));
const mockSubscriber = {
on: jest.fn(),
subscribe: jest.fn().mockResolvedValue(undefined),
unsubscribe: jest.fn().mockResolvedValue(undefined),
};
const transport = new RedisEventTransport(
mockPublisher as unknown as Redis,
mockSubscriber as unknown as Redis,
);
const streamId = `error-prop-incr-${Date.now()}`;
await expect(transport.emitChunk(streamId, { data: 'test' })).resolves.toBeUndefined();
expect(mockPublisher.publish).not.toHaveBeenCalled();
transport.destroy();
});
test('should throw when emitDone publish fails', async () => {
const { RedisEventTransport } = await import('../implementations/RedisEventTransport');
const mockPublisher = {
publish: jest.fn().mockRejectedValue(new Error('Redis connection lost')),
};
const mockPublisher = createMockPublisher();
mockPublisher.publish.mockRejectedValue(new Error('Redis connection lost'));
const mockSubscriber = {
on: jest.fn(),
subscribe: jest.fn().mockResolvedValue(undefined),
@ -950,9 +1020,8 @@ describe('RedisEventTransport Integration Tests', () => {
test('should throw when emitError publish fails', async () => {
const { RedisEventTransport } = await import('../implementations/RedisEventTransport');
const mockPublisher = {
publish: jest.fn().mockRejectedValue(new Error('Redis connection lost')),
};
const mockPublisher = createMockPublisher();
mockPublisher.publish.mockRejectedValue(new Error('Redis connection lost'));
const mockSubscriber = {
on: jest.fn(),
subscribe: jest.fn().mockResolvedValue(undefined),
@ -973,6 +1042,52 @@ describe('RedisEventTransport Integration Tests', () => {
transport.destroy();
});
test('should propagate when emitDone incr fails', async () => {
const { RedisEventTransport } = await import('../implementations/RedisEventTransport');
const mockPublisher = createMockPublisher();
mockPublisher.incr.mockRejectedValue(new Error('INCR failed'));
const mockSubscriber = {
on: jest.fn(),
subscribe: jest.fn().mockResolvedValue(undefined),
unsubscribe: jest.fn().mockResolvedValue(undefined),
};
const transport = new RedisEventTransport(
mockPublisher as unknown as Redis,
mockSubscriber as unknown as Redis,
);
const streamId = `error-prop-done-incr-${Date.now()}`;
await expect(transport.emitDone(streamId, { finished: true })).rejects.toThrow('INCR failed');
transport.destroy();
});
test('should propagate when emitError incr fails', async () => {
const { RedisEventTransport } = await import('../implementations/RedisEventTransport');
const mockPublisher = createMockPublisher();
mockPublisher.incr.mockRejectedValue(new Error('INCR failed'));
const mockSubscriber = {
on: jest.fn(),
subscribe: jest.fn().mockResolvedValue(undefined),
unsubscribe: jest.fn().mockResolvedValue(undefined),
};
const transport = new RedisEventTransport(
mockPublisher as unknown as Redis,
mockSubscriber as unknown as Redis,
);
const streamId = `error-prop-error-incr-${Date.now()}`;
await expect(transport.emitError(streamId, 'some error')).rejects.toThrow('INCR failed');
transport.destroy();
});
test('should still deliver events successfully when publish succeeds', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');

View file

@ -3,6 +3,9 @@ import type { Agents } from 'librechat-data-provider';
import type { Redis, Cluster } from 'ioredis';
import { StandardGraph } from '@librechat/agents';
/** Suppress winston Console transport output (survives jest.resetModules) */
jest.spyOn(console, 'log').mockImplementation();
/**
* Integration tests for RedisJobStore.
*

View file

@ -8,6 +8,9 @@
import type { UsageMetadata } from '../interfaces/IJobStore';
/** Suppress winston Console transport output (survives jest.resetModules) */
jest.spyOn(console, 'log').mockImplementation();
describe('CollectedUsage - InMemoryJobStore', () => {
beforeEach(() => {
jest.resetModules();

View file

@ -0,0 +1,31 @@
export interface MockPublisher {
publish: jest.Mock;
incr: jest.Mock;
expire: jest.Mock;
get: jest.Mock;
del: jest.Mock;
}
/** Mock publisher with Redis command simulation for atomic sequence counters */
export function createMockPublisher(): MockPublisher {
const counters = new Map<string, number>();
return {
publish: jest.fn().mockResolvedValue(1),
incr: jest.fn().mockImplementation((key: string) => {
const current = (counters.get(key) ?? 0) + 1;
counters.set(key, current);
return Promise.resolve(current);
}),
expire: jest.fn().mockResolvedValue(1),
get: jest.fn().mockImplementation((key: string) => {
const val = counters.get(key);
return Promise.resolve(val != null ? String(val) : null);
}),
del: jest.fn().mockImplementation((...keys: string[]) => {
for (const key of keys) {
counters.delete(key);
}
return Promise.resolve(keys.length);
}),
};
}

View file

@ -1,13 +1,17 @@
import { logger } from '@librechat/data-schemas';
import type { Redis, Cluster } from 'ioredis';
import { RedisEventTransport } from '~/stream/implementations/RedisEventTransport';
import { GenerationJobManagerClass } from '~/stream/GenerationJobManager';
import { createStreamServices } from '~/stream/createStreamServices';
import { createMockPublisher } from './helpers/publisher';
import {
ioredisClient as staticRedisClient,
keyvRedisClient as staticKeyvClient,
keyvRedisClientReady,
} from '~/cache/redisClients';
logger.silent = true;
/**
* Regression tests for the reconnect reorder buffer desync bug.
*
@ -26,9 +30,7 @@ import {
describe('Reconnect Reorder Buffer Desync (Regression)', () => {
describe('Callback preservation across reconnect cycles (Unit)', () => {
test('allSubscribersLeft callback fires on every disconnect, not just the first', () => {
const mockPublisher = {
publish: jest.fn().mockResolvedValue(1),
};
const mockPublisher = createMockPublisher();
const mockSubscriber = {
on: jest.fn(),
subscribe: jest.fn().mockResolvedValue(undefined),
@ -70,9 +72,7 @@ describe('Reconnect Reorder Buffer Desync (Regression)', () => {
});
test('abort callback survives across reconnect cycles', () => {
const mockPublisher = {
publish: jest.fn().mockResolvedValue(1),
};
const mockPublisher = createMockPublisher();
const mockSubscriber = {
on: jest.fn(),
subscribe: jest.fn().mockResolvedValue(undefined),
@ -125,9 +125,7 @@ describe('Reconnect Reorder Buffer Desync (Regression)', () => {
* immediately regardless of how many reconnect cycles have occurred.
*/
test('syncReorderBuffer works correctly on third+ reconnect', async () => {
const mockPublisher = {
publish: jest.fn().mockResolvedValue(1),
};
const mockPublisher = createMockPublisher();
const mockSubscriber = {
on: jest.fn(),
subscribe: jest.fn().mockResolvedValue(undefined),
@ -159,7 +157,7 @@ describe('Reconnect Reorder Buffer Desync (Regression)', () => {
});
// Sync reorder buffer (as GenerationJobManager.subscribe does)
transport.syncReorderBuffer(streamId);
await transport.syncReorderBuffer(streamId);
const baseSeq = cycle * 10;
@ -189,9 +187,7 @@ describe('Reconnect Reorder Buffer Desync (Regression)', () => {
});
test('reorder buffer works correctly when syncReorderBuffer IS called', async () => {
const mockPublisher = {
publish: jest.fn().mockResolvedValue(1),
};
const mockPublisher = createMockPublisher();
const mockSubscriber = {
on: jest.fn(),
subscribe: jest.fn().mockResolvedValue(undefined),
@ -216,8 +212,8 @@ describe('Reconnect Reorder Buffer Desync (Regression)', () => {
onChunk: (event) => chunks.push(event),
});
// This is the critical call - sync nextSeq to match publisher
transport.syncReorderBuffer(streamId);
// This is the critical call - sync nextSeq to match publisher (reads from Redis)
await transport.syncReorderBuffer(streamId);
// Deliver messages starting at seq 20
const messageHandler = mockSubscriber.on.mock.calls.find(
@ -240,6 +236,187 @@ describe('Reconnect Reorder Buffer Desync (Regression)', () => {
});
});
describe('syncReorderBuffer race: message arrives during async GET window (Unit)', () => {
test('should not drop a chunk that lands in pending while GET is in-flight', async () => {
const mockPublisher = createMockPublisher();
const mockSubscriber = {
on: jest.fn(),
subscribe: jest.fn().mockResolvedValue(undefined),
unsubscribe: jest.fn().mockResolvedValue(undefined),
};
const transport = new RedisEventTransport(
mockPublisher as unknown as Redis,
mockSubscriber as unknown as Redis,
);
const streamId = 'race-get-window-test';
const chunks: unknown[] = [];
// Emit seq 0 so the Redis counter is 1
await transport.emitChunk(streamId, { index: 0 });
// Subscribe (nextSeq starts at 0)
transport.subscribe(streamId, {
onChunk: (event) => chunks.push(event),
});
const messageHandler = mockSubscriber.on.mock.calls.find(
(call) => call[0] === 'message',
)?.[1] as (channel: string, message: string) => void;
const channel = `stream:{${streamId}}:events`;
// Pause the GET: intercept with a deferred promise
let resolveGet!: (val: string | null) => void;
mockPublisher.get.mockImplementationOnce(
() =>
new Promise<string | null>((resolve) => {
resolveGet = resolve;
}),
);
const syncPromise = transport.syncReorderBuffer(streamId);
// While GET is in-flight, the publisher emits seq 1 (INCR → counter=2)
// and the subscriber receives it via pub/sub → pending[1]
await transport.emitChunk(streamId, { index: 1 });
messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 1, data: { index: 1 } }));
// Resolve GET with counter=2 (reflects the INCR for seq 1)
resolveGet('2');
await syncPromise;
// seq 1 MUST be delivered — it arrived during the GET window and must not be pruned
expect(chunks.map((c) => (c as { index: number }).index)).toContain(1);
// Subsequent chunks must also deliver immediately
messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 2, data: { index: 2 } }));
expect(chunks.map((c) => (c as { index: number }).index)).toContain(2);
transport.destroy();
});
test('same-replica: should not drop a live chunk when INCR advances past earlyReplayCount during GET', async () => {
const mockPublisher = createMockPublisher();
const mockSubscriber = {
on: jest.fn(),
subscribe: jest.fn().mockResolvedValue(undefined),
unsubscribe: jest.fn().mockResolvedValue(undefined),
};
const transport = new RedisEventTransport(
mockPublisher as unknown as Redis,
mockSubscriber as unknown as Redis,
);
const streamId = 'race-same-replica-test';
const chunks: unknown[] = [];
// Emit seqs 04 (earlyEventBuffer would have held these; counter = 5)
for (let i = 0; i < 5; i++) {
await transport.emitChunk(streamId, { index: i });
}
// Subscribe (nextSeq starts at 0)
transport.subscribe(streamId, {
onChunk: (event) => chunks.push(event),
});
const messageHandler = mockSubscriber.on.mock.calls.find(
(call) => call[0] === 'message',
)?.[1] as (channel: string, message: string) => void;
const channel = `stream:{${streamId}}:events`;
// Pause the GET
let resolveGet!: (val: string | null) => void;
mockPublisher.get.mockImplementationOnce(
() =>
new Promise<string | null>((resolve) => {
resolveGet = resolve;
}),
);
// Call syncReorderBuffer with earlyReplayCount=5 (seqs 04 were replayed)
const syncPromise = transport.syncReorderBuffer(streamId, 5);
// During GET window: LLM emits seq 5 (INCR → counter=6), subscriber receives it
await transport.emitChunk(streamId, { index: 5 });
messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 5, data: { index: 5 } }));
// GET returns 6 (counter advanced past seq 5)
resolveGet('6');
await syncPromise;
// seq 5 MUST be delivered — it's live (seq 5 >= earlyReplayCount 5), not a duplicate.
// With the old boolean pruneStaleEntries, 5 < currentSeq(6) would have pruned it.
expect(chunks.map((c) => (c as { index: number }).index)).toContain(5);
// Subsequent chunks deliver normally
messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 6, data: { index: 6 } }));
expect(chunks.map((c) => (c as { index: number }).index)).toContain(6);
transport.destroy();
});
test('same-replica: should not drop chunk whose pub/sub arrives AFTER GET resolves', async () => {
const mockPublisher = createMockPublisher();
const mockSubscriber = {
on: jest.fn(),
subscribe: jest.fn().mockResolvedValue(undefined),
unsubscribe: jest.fn().mockResolvedValue(undefined),
};
const transport = new RedisEventTransport(
mockPublisher as unknown as Redis,
mockSubscriber as unknown as Redis,
);
const streamId = 'race-post-get-test';
const chunks: unknown[] = [];
// Emit seqs 04 (earlyEventBuffer would have held these; counter = 5)
for (let i = 0; i < 5; i++) {
await transport.emitChunk(streamId, { index: i });
}
transport.subscribe(streamId, {
onChunk: (event) => chunks.push(event),
});
const messageHandler = mockSubscriber.on.mock.calls.find(
(call) => call[0] === 'message',
)?.[1] as (channel: string, message: string) => void;
const channel = `stream:{${streamId}}:events`;
let resolveGet!: (val: string | null) => void;
mockPublisher.get.mockImplementationOnce(
() =>
new Promise<string | null>((resolve) => {
resolveGet = resolve;
}),
);
const syncPromise = transport.syncReorderBuffer(streamId, 5);
// LLM emits seq 5 during the GET window (INCR → counter=6)
// but do NOT deliver via messageHandler yet — simulates pub/sub arriving late
await transport.emitChunk(streamId, { index: 5 });
// GET resolves with counter=6 while pending is EMPTY (pub/sub hasn't arrived)
resolveGet('6');
await syncPromise;
// Now pub/sub for seq 5 arrives AFTER sync completed
messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 5, data: { index: 5 } }));
// seq 5 must be delivered — nextSeq should have been capped at earlyReplayCount (5),
// not advanced to currentSeq (6) which would have dropped it.
expect(chunks.map((c) => (c as { index: number }).index)).toContain(5);
transport.destroy();
});
});
describe('End-to-end reconnect with GenerationJobManager (Integration)', () => {
let originalEnv: NodeJS.ProcessEnv;
let ioredisClient: Redis | Cluster | null = null;

View file

@ -10,6 +10,14 @@ const CHANNELS = {
events: (streamId: string) => `stream:{${streamId}}:events`,
};
/**
* Redis keys for shared state (hash-tagged for cluster slot compatibility)
*/
const KEYS = {
/** Atomic sequence counter: shared across all replicas for a given stream */
sequence: (streamId: string) => `stream:{${streamId}}:seq`,
};
/**
* Event types for pub/sub messages
*/
@ -96,8 +104,6 @@ export class RedisEventTransport implements IEventTransport {
private channelSubscriptions = new Map<string, Promise<void>>();
/** Counter for generating unique subscriber IDs */
private subscriberIdCounter = 0;
/** Sequence counters per stream for publishing (ensures ordered delivery in cluster mode) */
private sequenceCounters = new Map<string, number>();
/**
* Create a new Redis event transport.
@ -115,16 +121,29 @@ export class RedisEventTransport implements IEventTransport {
});
}
/** Get next sequence number for a stream (0-indexed) */
private getNextSequence(streamId: string): number {
const current = this.sequenceCounters.get(streamId) ?? 0;
this.sequenceCounters.set(streamId, current + 1);
return current;
/** Safety-net TTL (seconds) set once on first INCR. Not refreshed — prevents mid-stream resets. */
private static readonly SEQUENCE_TTL_SECONDS = 86400;
/**
* Get next sequence number for a stream (0-indexed, backed by Redis INCR).
* A 24-hour TTL is set on the first INCR only (val === 1) as a safety net
* for orphaned keys from crashed processes. It is never refreshed, so an
* active stream cannot have its counter reset mid-generation.
* Keys are also deleted explicitly by cleanup() on normal stream teardown.
*/
private async getNextSequence(streamId: string): Promise<number> {
const key = KEYS.sequence(streamId);
const val = await this.publisher.incr(key);
if (val === 1) {
this.publisher.expire(key, RedisEventTransport.SEQUENCE_TTL_SECONDS).catch((err) => {
logger.warn(`[RedisEventTransport] Failed to set TTL on sequence key ${key}:`, err);
});
}
return val - 1;
}
/** Reset publish sequence counter and subscriber reorder state for a stream (full cleanup only) */
resetSequence(streamId: string): void {
this.sequenceCounters.delete(streamId);
/** Reset subscriber reorder buffer state to initial values */
private resetReorderBuffer(streamId: string): void {
const state = this.streams.get(streamId);
if (state) {
if (state.reorderBuffer.flushTimeout) {
@ -136,17 +155,63 @@ export class RedisEventTransport implements IEventTransport {
}
}
/** Advance subscriber reorder buffer to current publisher sequence without resetting publisher (cross-replica safe) */
syncReorderBuffer(streamId: string): void {
const currentSeq = this.sequenceCounters.get(streamId) ?? 0;
/**
* Advance subscriber reorder buffer to the authoritative Redis sequence counter (cross-replica safe).
*
* @param earlyReplayCount - Number of events replayed from earlyEventBuffer (same-replica).
* Pending entries with seq < earlyReplayCount are duplicates and are pruned; entries at or
* above are live chunks from ongoing generation that arrived during the async GET window.
* Using the replay count (not the Redis counter) as the prune cutoff is critical: INCR can
* advance the counter past a live chunk's seq during the GET window, so currentSeq is not
* a safe proxy for "already delivered via earlyEventBuffer."
* When 0/undefined (cross-replica), all pending entries are treated as live and preserved.
*/
async syncReorderBuffer(streamId: string, earlyReplayCount = 0): Promise<void> {
const key = KEYS.sequence(streamId);
const rawStr = await this.publisher.get(key);
const parsed = rawStr != null ? parseInt(rawStr, 10) : 0;
const currentSeq = Number.isNaN(parsed) ? 0 : parsed;
const state = this.streams.get(streamId);
if (state) {
if (state.reorderBuffer.flushTimeout) {
clearTimeout(state.reorderBuffer.flushTimeout);
state.reorderBuffer.flushTimeout = null;
}
state.reorderBuffer.nextSeq = currentSeq;
state.reorderBuffer.pending.clear();
// Prune true duplicates: entries with seq < earlyReplayCount were already delivered
// via earlyEventBuffer. Entries at or above are live (possibly from ongoing generation).
if (earlyReplayCount > 0) {
for (const seq of state.reorderBuffer.pending.keys()) {
if (seq < earlyReplayCount) {
state.reorderBuffer.pending.delete(seq);
}
}
}
// Set nextSeq from remaining state. Never regress — handleOrderedChunk may have
// already advanced it during the async GET window.
if (state.reorderBuffer.pending.size === 0) {
// Same-replica: INCR precedes PUBLISH, so currentSeq may reflect allocated-but-
// not-yet-delivered events. Cap at earlyReplayCount to avoid skipping in-flight chunks.
// Cross-replica (earlyReplayCount=0): trust the Redis counter.
const ceiling = earlyReplayCount > 0 ? earlyReplayCount : currentSeq;
state.reorderBuffer.nextSeq = Math.max(state.reorderBuffer.nextSeq, ceiling);
} else {
let minPending = Infinity;
for (const seq of state.reorderBuffer.pending.keys()) {
if (seq < minPending) {
minPending = seq;
}
}
state.reorderBuffer.nextSeq = Math.max(
state.reorderBuffer.nextSeq,
Math.min(currentSeq, minPending),
);
this.flushPendingMessages(streamId, state);
}
// Re-arm flush timeout if gaps remain after sync — without this,
// buffered messages could sit indefinitely if no new messages arrive.
if (state.reorderBuffer.pending.size > 0) {
this.scheduleFlushTimeout(streamId, state);
}
}
}
@ -442,13 +507,15 @@ export class RedisEventTransport implements IEventTransport {
/**
* Publish a chunk event to all subscribers across all instances.
* Includes sequence number for ordered delivery in Redis Cluster mode.
*
* Performance: each emit requires two sequential Redis round-trips (INCR + PUBLISH).
* This is the unavoidable cost of cross-replica sequence coordination.
*/
async emitChunk(streamId: string, event: unknown): Promise<void> {
const channel = CHANNELS.events(streamId);
const seq = this.getNextSequence(streamId);
const message: PubSubMessage = { type: EventTypes.CHUNK, seq, data: event };
try {
const channel = CHANNELS.events(streamId);
const seq = await this.getNextSequence(streamId);
const message: PubSubMessage = { type: EventTypes.CHUNK, seq, data: event };
await this.publisher.publish(channel, JSON.stringify(message));
} catch (err) {
logger.error(`[RedisEventTransport] Failed to publish chunk:`, err);
@ -461,7 +528,7 @@ export class RedisEventTransport implements IEventTransport {
*/
async emitDone(streamId: string, event: unknown): Promise<void> {
const channel = CHANNELS.events(streamId);
const seq = this.getNextSequence(streamId);
const seq = await this.getNextSequence(streamId);
const message: PubSubMessage = { type: EventTypes.DONE, seq, data: event };
try {
@ -478,7 +545,7 @@ export class RedisEventTransport implements IEventTransport {
*/
async emitError(streamId: string, error: string): Promise<void> {
const channel = CHANNELS.events(streamId);
const seq = this.getNextSequence(streamId);
const seq = await this.getNextSequence(streamId);
const message: PubSubMessage = { type: EventTypes.ERROR, seq, error };
try {
@ -598,20 +665,19 @@ export class RedisEventTransport implements IEventTransport {
const state = this.streams.get(streamId);
if (state) {
// Clear flush timeout
if (state.reorderBuffer.flushTimeout) {
clearTimeout(state.reorderBuffer.flushTimeout);
state.reorderBuffer.flushTimeout = null;
}
// Clear all handlers and callbacks
state.handlers.clear();
state.allSubscribersLeftCallbacks = [];
state.abortCallbacks = [];
state.reorderBuffer.pending.clear();
}
// Reset sequence counter for this stream
this.resetSequence(streamId);
this.resetReorderBuffer(streamId);
// Delete the shared sequence key — safe because cleanup() is only called
// when the stream's job is complete (no more publishes will happen).
const seqKey = KEYS.sequence(streamId);
this.publisher.del(seqKey).catch((err) => {
logger.error(`[RedisEventTransport] Failed to delete sequence key ${seqKey}:`, err);
});
if (this.channelSubscriptions.has(channel)) {
this.subscriber.unsubscribe(channel).catch((err) => {
@ -627,7 +693,11 @@ export class RedisEventTransport implements IEventTransport {
* Destroy all resources.
*/
destroy(): void {
// Clear all flush timeouts and buffered messages
// Clear all flush timeouts and buffered messages.
// Sequence keys are NOT deleted here — they are shared across replicas.
// A shutting-down replica must not nuke the counter for active publishers.
// cleanup() deletes keys on normal teardown; a 24h safety-net TTL (set once
// at first INCR, never refreshed) caps orphan lifetime on abnormal shutdown.
for (const [, state] of this.streams) {
if (state.reorderBuffer.flushTimeout) {
clearTimeout(state.reorderBuffer.flushTimeout);
@ -642,7 +712,6 @@ export class RedisEventTransport implements IEventTransport {
this.channelSubscriptions.clear();
this.streams.clear();
this.sequenceCounters.clear();
try {
this.subscriber.disconnect();

View file

@ -337,11 +337,14 @@ export interface IEventTransport {
/** Listen for all subscribers leaving */
onAllSubscribersLeft(streamId: string, callback: () => void): void;
/** Reset publish sequence counter for a stream (used during full stream cleanup) */
resetSequence?(streamId: string): void;
/** Advance subscriber reorder buffer to match publisher sequence (cross-replica safe: doesn't reset publisher counter) */
syncReorderBuffer?(streamId: string): void;
/**
* Advance subscriber reorder buffer to match publisher sequence (cross-replica safe).
* @param earlyReplayCount - Number of events replayed from earlyEventBuffer (same-replica).
* Pending entries with seq < earlyReplayCount are duplicates and are pruned; entries at or
* above are live chunks that arrived during the async GET window and are preserved.
* When 0/undefined (cross-replica), all pending entries are treated as live.
*/
syncReorderBuffer?(streamId: string, earlyReplayCount?: number): void | Promise<void>;
/** Cleanup transport resources for a specific stream */
cleanup(streamId: string): void;