diff --git a/packages/api/src/stream/GenerationJobManager.ts b/packages/api/src/stream/GenerationJobManager.ts index 5993c911ff..de6232e85b 100644 --- a/packages/api/src/stream/GenerationJobManager.ts +++ b/packages/api/src/stream/GenerationJobManager.ts @@ -770,14 +770,27 @@ class GenerationJobManagerClass { if (!runtime.hasSubscriber) { runtime.hasSubscriber = true; + /** + * Pass earlyReplayCount to syncReorderBuffer so it can prune duplicate pub/sub + * entries (seqs 0..count-1) without touching live in-flight chunks. + * + * Only set when the buffer was actually replayed — those specific seqs were + * delivered via onChunk and their pub/sub copies are duplicates. + * When skipBufferReplay is true, the resume sync payload delivers aggregated + * content up to the Redis counter, so syncReorderBuffer should trust currentSeq + * as the frontier (earlyReplayCount = 0). + */ + let earlyReplayCount = 0; + if (runtime.earlyEventBuffer.length > 0) { if (options?.skipBufferReplay) { logger.debug( `[GenerationJobManager] Skipping ${runtime.earlyEventBuffer.length} buffered events for ${streamId} (skipBufferReplay)`, ); } else { + earlyReplayCount = runtime.earlyEventBuffer.length; logger.debug( - `[GenerationJobManager] Replaying ${runtime.earlyEventBuffer.length} buffered events for ${streamId}`, + `[GenerationJobManager] Replaying ${earlyReplayCount} buffered events for ${streamId}`, ); for (const bufferedEvent of runtime.earlyEventBuffer) { onChunk(bufferedEvent); @@ -807,7 +820,14 @@ class GenerationJobManagerClass { onChunk(createdEvent); } - this.eventTransport.syncReorderBuffer?.(streamId); + try { + await this.eventTransport.syncReorderBuffer?.(streamId, earlyReplayCount); + } catch (err) { + logger.warn( + `[GenerationJobManager] Failed to sync reorder buffer for ${streamId}; proceeding with current nextSeq:`, + err, + ); + } } if (isFirst) { diff --git a/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts b/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts index 3e85ace56d..0c119ab8d2 100644 --- a/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts +++ b/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts @@ -14,6 +14,9 @@ import { keyvRedisClientReady, } from '~/cache/redisClients'; +/** Suppress winston Console transport output (survives jest.resetModules) */ +jest.spyOn(console, 'log').mockImplementation(); + /** * Integration tests for GenerationJobManager. * @@ -800,9 +803,8 @@ describe('GenerationJobManager Integration Tests', () => { GenerationJobManager.initialize(); const received: unknown[] = []; - const subscription = await GenerationJobManager.subscribe( - streamId, - (event) => received.push(event), + const subscription = await GenerationJobManager.subscribe(streamId, (event) => + received.push(event), ); expect(subscription).not.toBeNull(); @@ -839,9 +841,8 @@ describe('GenerationJobManager Integration Tests', () => { GenerationJobManager.initialize(); const received: unknown[] = []; - const subscription = await GenerationJobManager.subscribe( - streamId, - (event) => received.push(event), + const subscription = await GenerationJobManager.subscribe(streamId, (event) => + received.push(event), ); expect(subscription).not.toBeNull(); diff --git a/packages/api/src/stream/__tests__/RedisEventTransport.stream_integration.spec.ts b/packages/api/src/stream/__tests__/RedisEventTransport.stream_integration.spec.ts index b5e53dfbff..e254cf66ba 100644 --- a/packages/api/src/stream/__tests__/RedisEventTransport.stream_integration.spec.ts +++ b/packages/api/src/stream/__tests__/RedisEventTransport.stream_integration.spec.ts @@ -1,4 +1,8 @@ import type { Redis, Cluster } from 'ioredis'; +import { logger } from '@librechat/data-schemas'; +import { createMockPublisher } from './helpers/publisher'; + +logger.silent = true; /** * Integration tests for RedisEventTransport. @@ -318,9 +322,7 @@ describe('RedisEventTransport Integration Tests', () => { test('should reorder out-of-sequence messages', async () => { const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); - const mockPublisher = { - publish: jest.fn().mockResolvedValue(1), - }; + const mockPublisher = createMockPublisher(); const mockSubscriber = { on: jest.fn(), subscribe: jest.fn().mockResolvedValue(undefined), @@ -359,9 +361,7 @@ describe('RedisEventTransport Integration Tests', () => { test('should buffer early messages and deliver when gaps are filled', async () => { const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); - const mockPublisher = { - publish: jest.fn().mockResolvedValue(1), - }; + const mockPublisher = createMockPublisher(); const mockSubscriber = { on: jest.fn(), subscribe: jest.fn().mockResolvedValue(undefined), @@ -407,9 +407,7 @@ describe('RedisEventTransport Integration Tests', () => { const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); - const mockPublisher = { - publish: jest.fn().mockResolvedValue(1), - }; + const mockPublisher = createMockPublisher(); const mockSubscriber = { on: jest.fn(), subscribe: jest.fn().mockResolvedValue(undefined), @@ -450,9 +448,7 @@ describe('RedisEventTransport Integration Tests', () => { test('should handle messages without sequence numbers (backward compatibility)', async () => { const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); - const mockPublisher = { - publish: jest.fn().mockResolvedValue(1), - }; + const mockPublisher = createMockPublisher(); const mockSubscriber = { on: jest.fn(), subscribe: jest.fn().mockResolvedValue(undefined), @@ -492,9 +488,7 @@ describe('RedisEventTransport Integration Tests', () => { test('should deliver done event after all pending chunks (terminal event ordering)', async () => { const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); - const mockPublisher = { - publish: jest.fn().mockResolvedValue(1), - }; + const mockPublisher = createMockPublisher(); const mockSubscriber = { subscribe: jest.fn().mockResolvedValue(undefined), unsubscribe: jest.fn().mockResolvedValue(undefined), @@ -547,9 +541,7 @@ describe('RedisEventTransport Integration Tests', () => { test('should deliver error event after all pending chunks (terminal event ordering)', async () => { const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); - const mockPublisher = { - publish: jest.fn().mockResolvedValue(1), - }; + const mockPublisher = createMockPublisher(); const mockSubscriber = { subscribe: jest.fn().mockResolvedValue(undefined), unsubscribe: jest.fn().mockResolvedValue(undefined), @@ -864,6 +856,62 @@ describe('RedisEventTransport Integration Tests', () => { }); }); + /** + * Cross-Replica Sequence Synchronization (#12575) + * + * The core cross-replica sync logic (Redis INCR counter, async GET in + * syncReorderBuffer, pruneStaleEntries flag) is verified by: + * - Unit tests with mock publishers (deterministic, no cluster timing) + * - GenerationJobManager integration tests (end-to-end with earlyEventBuffer) + * - The race-condition unit test (paused GET with injected message) + * + * Transport-level integration tests with two real Redis transports are + * inherently flaky in Redis Cluster: cluster pub/sub fan-out is async + * across nodes, so a PUBLISH acknowledged on node A can arrive at the + * subscriber on node B after a subsequent SUBSCRIBE takes effect, + * causing non-deterministic message counts. + */ + describe('Cross-Replica Sequence Synchronization (#12575)', () => { + test('shared counter is cleaned up on stream cleanup', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); + + const subscriber = (ioredisClient as Redis).duplicate(); + const transport = new RedisEventTransport(ioredisClient, subscriber); + + const streamId = `cross-replica-cleanup-${Date.now()}`; + + // Publish chunks to create the Redis counter key + for (let i = 0; i < 5; i++) { + await transport.emitChunk(streamId, { index: i }); + } + + // Verify the key exists + const key = `stream:{${streamId}}:seq`; + const valBefore = await ioredisClient.get(key); + expect(valBefore).toBe('5'); + + // Cleanup the stream + transport.cleanup(streamId); + + // Poll for the fire-and-forget DEL to complete (robust under CI load) + const start = Date.now(); + let valAfter: string | null = 'pending'; + while (valAfter !== null && Date.now() - start < 2000) { + await new Promise((resolve) => setTimeout(resolve, 10)); + valAfter = await ioredisClient.get(key); + } + expect(valAfter).toBeNull(); + + transport.destroy(); + subscriber.disconnect(); + }); + }); + describe('Cleanup', () => { test('should clean up stream resources', async () => { if (!ioredisClient) { @@ -898,9 +946,8 @@ describe('RedisEventTransport Integration Tests', () => { test('should swallow emitChunk publish errors (callers fire-and-forget)', async () => { const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); - const mockPublisher = { - publish: jest.fn().mockRejectedValue(new Error('Redis connection lost')), - }; + const mockPublisher = createMockPublisher(); + mockPublisher.publish.mockRejectedValue(new Error('Redis connection lost')); const mockSubscriber = { on: jest.fn(), subscribe: jest.fn().mockResolvedValue(undefined), @@ -921,12 +968,35 @@ describe('RedisEventTransport Integration Tests', () => { transport.destroy(); }); + test('should swallow emitChunk incr errors (sequence allocation failure)', async () => { + const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); + + const mockPublisher = createMockPublisher(); + mockPublisher.incr.mockRejectedValue(new Error('INCR failed')); + const mockSubscriber = { + on: jest.fn(), + subscribe: jest.fn().mockResolvedValue(undefined), + unsubscribe: jest.fn().mockResolvedValue(undefined), + }; + + const transport = new RedisEventTransport( + mockPublisher as unknown as Redis, + mockSubscriber as unknown as Redis, + ); + + const streamId = `error-prop-incr-${Date.now()}`; + + await expect(transport.emitChunk(streamId, { data: 'test' })).resolves.toBeUndefined(); + expect(mockPublisher.publish).not.toHaveBeenCalled(); + + transport.destroy(); + }); + test('should throw when emitDone publish fails', async () => { const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); - const mockPublisher = { - publish: jest.fn().mockRejectedValue(new Error('Redis connection lost')), - }; + const mockPublisher = createMockPublisher(); + mockPublisher.publish.mockRejectedValue(new Error('Redis connection lost')); const mockSubscriber = { on: jest.fn(), subscribe: jest.fn().mockResolvedValue(undefined), @@ -950,9 +1020,8 @@ describe('RedisEventTransport Integration Tests', () => { test('should throw when emitError publish fails', async () => { const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); - const mockPublisher = { - publish: jest.fn().mockRejectedValue(new Error('Redis connection lost')), - }; + const mockPublisher = createMockPublisher(); + mockPublisher.publish.mockRejectedValue(new Error('Redis connection lost')); const mockSubscriber = { on: jest.fn(), subscribe: jest.fn().mockResolvedValue(undefined), @@ -973,6 +1042,52 @@ describe('RedisEventTransport Integration Tests', () => { transport.destroy(); }); + test('should propagate when emitDone incr fails', async () => { + const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); + + const mockPublisher = createMockPublisher(); + mockPublisher.incr.mockRejectedValue(new Error('INCR failed')); + const mockSubscriber = { + on: jest.fn(), + subscribe: jest.fn().mockResolvedValue(undefined), + unsubscribe: jest.fn().mockResolvedValue(undefined), + }; + + const transport = new RedisEventTransport( + mockPublisher as unknown as Redis, + mockSubscriber as unknown as Redis, + ); + + const streamId = `error-prop-done-incr-${Date.now()}`; + + await expect(transport.emitDone(streamId, { finished: true })).rejects.toThrow('INCR failed'); + + transport.destroy(); + }); + + test('should propagate when emitError incr fails', async () => { + const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); + + const mockPublisher = createMockPublisher(); + mockPublisher.incr.mockRejectedValue(new Error('INCR failed')); + const mockSubscriber = { + on: jest.fn(), + subscribe: jest.fn().mockResolvedValue(undefined), + unsubscribe: jest.fn().mockResolvedValue(undefined), + }; + + const transport = new RedisEventTransport( + mockPublisher as unknown as Redis, + mockSubscriber as unknown as Redis, + ); + + const streamId = `error-prop-error-incr-${Date.now()}`; + + await expect(transport.emitError(streamId, 'some error')).rejects.toThrow('INCR failed'); + + transport.destroy(); + }); + test('should still deliver events successfully when publish succeeds', async () => { if (!ioredisClient) { console.warn('Redis not available, skipping test'); diff --git a/packages/api/src/stream/__tests__/RedisJobStore.stream_integration.spec.ts b/packages/api/src/stream/__tests__/RedisJobStore.stream_integration.spec.ts index a64ba11f26..d45f186dc0 100644 --- a/packages/api/src/stream/__tests__/RedisJobStore.stream_integration.spec.ts +++ b/packages/api/src/stream/__tests__/RedisJobStore.stream_integration.spec.ts @@ -3,6 +3,9 @@ import type { Agents } from 'librechat-data-provider'; import type { Redis, Cluster } from 'ioredis'; import { StandardGraph } from '@librechat/agents'; +/** Suppress winston Console transport output (survives jest.resetModules) */ +jest.spyOn(console, 'log').mockImplementation(); + /** * Integration tests for RedisJobStore. * diff --git a/packages/api/src/stream/__tests__/collectedUsage.spec.ts b/packages/api/src/stream/__tests__/collectedUsage.spec.ts index d9a9ab95fe..635eba3aa4 100644 --- a/packages/api/src/stream/__tests__/collectedUsage.spec.ts +++ b/packages/api/src/stream/__tests__/collectedUsage.spec.ts @@ -8,6 +8,9 @@ import type { UsageMetadata } from '../interfaces/IJobStore'; +/** Suppress winston Console transport output (survives jest.resetModules) */ +jest.spyOn(console, 'log').mockImplementation(); + describe('CollectedUsage - InMemoryJobStore', () => { beforeEach(() => { jest.resetModules(); diff --git a/packages/api/src/stream/__tests__/helpers/publisher.ts b/packages/api/src/stream/__tests__/helpers/publisher.ts new file mode 100644 index 0000000000..3e1799d373 --- /dev/null +++ b/packages/api/src/stream/__tests__/helpers/publisher.ts @@ -0,0 +1,31 @@ +export interface MockPublisher { + publish: jest.Mock; + incr: jest.Mock; + expire: jest.Mock; + get: jest.Mock; + del: jest.Mock; +} + +/** Mock publisher with Redis command simulation for atomic sequence counters */ +export function createMockPublisher(): MockPublisher { + const counters = new Map(); + return { + publish: jest.fn().mockResolvedValue(1), + incr: jest.fn().mockImplementation((key: string) => { + const current = (counters.get(key) ?? 0) + 1; + counters.set(key, current); + return Promise.resolve(current); + }), + expire: jest.fn().mockResolvedValue(1), + get: jest.fn().mockImplementation((key: string) => { + const val = counters.get(key); + return Promise.resolve(val != null ? String(val) : null); + }), + del: jest.fn().mockImplementation((...keys: string[]) => { + for (const key of keys) { + counters.delete(key); + } + return Promise.resolve(keys.length); + }), + }; +} diff --git a/packages/api/src/stream/__tests__/reconnect-reorder-desync.stream_integration.spec.ts b/packages/api/src/stream/__tests__/reconnect-reorder-desync.stream_integration.spec.ts index effb7c5c7d..6f7ad2f6eb 100644 --- a/packages/api/src/stream/__tests__/reconnect-reorder-desync.stream_integration.spec.ts +++ b/packages/api/src/stream/__tests__/reconnect-reorder-desync.stream_integration.spec.ts @@ -1,13 +1,17 @@ +import { logger } from '@librechat/data-schemas'; import type { Redis, Cluster } from 'ioredis'; import { RedisEventTransport } from '~/stream/implementations/RedisEventTransport'; import { GenerationJobManagerClass } from '~/stream/GenerationJobManager'; import { createStreamServices } from '~/stream/createStreamServices'; +import { createMockPublisher } from './helpers/publisher'; import { ioredisClient as staticRedisClient, keyvRedisClient as staticKeyvClient, keyvRedisClientReady, } from '~/cache/redisClients'; +logger.silent = true; + /** * Regression tests for the reconnect reorder buffer desync bug. * @@ -26,9 +30,7 @@ import { describe('Reconnect Reorder Buffer Desync (Regression)', () => { describe('Callback preservation across reconnect cycles (Unit)', () => { test('allSubscribersLeft callback fires on every disconnect, not just the first', () => { - const mockPublisher = { - publish: jest.fn().mockResolvedValue(1), - }; + const mockPublisher = createMockPublisher(); const mockSubscriber = { on: jest.fn(), subscribe: jest.fn().mockResolvedValue(undefined), @@ -70,9 +72,7 @@ describe('Reconnect Reorder Buffer Desync (Regression)', () => { }); test('abort callback survives across reconnect cycles', () => { - const mockPublisher = { - publish: jest.fn().mockResolvedValue(1), - }; + const mockPublisher = createMockPublisher(); const mockSubscriber = { on: jest.fn(), subscribe: jest.fn().mockResolvedValue(undefined), @@ -125,9 +125,7 @@ describe('Reconnect Reorder Buffer Desync (Regression)', () => { * immediately regardless of how many reconnect cycles have occurred. */ test('syncReorderBuffer works correctly on third+ reconnect', async () => { - const mockPublisher = { - publish: jest.fn().mockResolvedValue(1), - }; + const mockPublisher = createMockPublisher(); const mockSubscriber = { on: jest.fn(), subscribe: jest.fn().mockResolvedValue(undefined), @@ -159,7 +157,7 @@ describe('Reconnect Reorder Buffer Desync (Regression)', () => { }); // Sync reorder buffer (as GenerationJobManager.subscribe does) - transport.syncReorderBuffer(streamId); + await transport.syncReorderBuffer(streamId); const baseSeq = cycle * 10; @@ -189,9 +187,7 @@ describe('Reconnect Reorder Buffer Desync (Regression)', () => { }); test('reorder buffer works correctly when syncReorderBuffer IS called', async () => { - const mockPublisher = { - publish: jest.fn().mockResolvedValue(1), - }; + const mockPublisher = createMockPublisher(); const mockSubscriber = { on: jest.fn(), subscribe: jest.fn().mockResolvedValue(undefined), @@ -216,8 +212,8 @@ describe('Reconnect Reorder Buffer Desync (Regression)', () => { onChunk: (event) => chunks.push(event), }); - // This is the critical call - sync nextSeq to match publisher - transport.syncReorderBuffer(streamId); + // This is the critical call - sync nextSeq to match publisher (reads from Redis) + await transport.syncReorderBuffer(streamId); // Deliver messages starting at seq 20 const messageHandler = mockSubscriber.on.mock.calls.find( @@ -240,6 +236,187 @@ describe('Reconnect Reorder Buffer Desync (Regression)', () => { }); }); + describe('syncReorderBuffer race: message arrives during async GET window (Unit)', () => { + test('should not drop a chunk that lands in pending while GET is in-flight', async () => { + const mockPublisher = createMockPublisher(); + const mockSubscriber = { + on: jest.fn(), + subscribe: jest.fn().mockResolvedValue(undefined), + unsubscribe: jest.fn().mockResolvedValue(undefined), + }; + + const transport = new RedisEventTransport( + mockPublisher as unknown as Redis, + mockSubscriber as unknown as Redis, + ); + + const streamId = 'race-get-window-test'; + const chunks: unknown[] = []; + + // Emit seq 0 so the Redis counter is 1 + await transport.emitChunk(streamId, { index: 0 }); + + // Subscribe (nextSeq starts at 0) + transport.subscribe(streamId, { + onChunk: (event) => chunks.push(event), + }); + + const messageHandler = mockSubscriber.on.mock.calls.find( + (call) => call[0] === 'message', + )?.[1] as (channel: string, message: string) => void; + const channel = `stream:{${streamId}}:events`; + + // Pause the GET: intercept with a deferred promise + let resolveGet!: (val: string | null) => void; + mockPublisher.get.mockImplementationOnce( + () => + new Promise((resolve) => { + resolveGet = resolve; + }), + ); + + const syncPromise = transport.syncReorderBuffer(streamId); + + // While GET is in-flight, the publisher emits seq 1 (INCR → counter=2) + // and the subscriber receives it via pub/sub → pending[1] + await transport.emitChunk(streamId, { index: 1 }); + messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 1, data: { index: 1 } })); + + // Resolve GET with counter=2 (reflects the INCR for seq 1) + resolveGet('2'); + await syncPromise; + + // seq 1 MUST be delivered — it arrived during the GET window and must not be pruned + expect(chunks.map((c) => (c as { index: number }).index)).toContain(1); + + // Subsequent chunks must also deliver immediately + messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 2, data: { index: 2 } })); + expect(chunks.map((c) => (c as { index: number }).index)).toContain(2); + + transport.destroy(); + }); + + test('same-replica: should not drop a live chunk when INCR advances past earlyReplayCount during GET', async () => { + const mockPublisher = createMockPublisher(); + const mockSubscriber = { + on: jest.fn(), + subscribe: jest.fn().mockResolvedValue(undefined), + unsubscribe: jest.fn().mockResolvedValue(undefined), + }; + + const transport = new RedisEventTransport( + mockPublisher as unknown as Redis, + mockSubscriber as unknown as Redis, + ); + + const streamId = 'race-same-replica-test'; + const chunks: unknown[] = []; + + // Emit seqs 0–4 (earlyEventBuffer would have held these; counter = 5) + for (let i = 0; i < 5; i++) { + await transport.emitChunk(streamId, { index: i }); + } + + // Subscribe (nextSeq starts at 0) + transport.subscribe(streamId, { + onChunk: (event) => chunks.push(event), + }); + + const messageHandler = mockSubscriber.on.mock.calls.find( + (call) => call[0] === 'message', + )?.[1] as (channel: string, message: string) => void; + const channel = `stream:{${streamId}}:events`; + + // Pause the GET + let resolveGet!: (val: string | null) => void; + mockPublisher.get.mockImplementationOnce( + () => + new Promise((resolve) => { + resolveGet = resolve; + }), + ); + + // Call syncReorderBuffer with earlyReplayCount=5 (seqs 0–4 were replayed) + const syncPromise = transport.syncReorderBuffer(streamId, 5); + + // During GET window: LLM emits seq 5 (INCR → counter=6), subscriber receives it + await transport.emitChunk(streamId, { index: 5 }); + messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 5, data: { index: 5 } })); + + // GET returns 6 (counter advanced past seq 5) + resolveGet('6'); + await syncPromise; + + // seq 5 MUST be delivered — it's live (seq 5 >= earlyReplayCount 5), not a duplicate. + // With the old boolean pruneStaleEntries, 5 < currentSeq(6) would have pruned it. + expect(chunks.map((c) => (c as { index: number }).index)).toContain(5); + + // Subsequent chunks deliver normally + messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 6, data: { index: 6 } })); + expect(chunks.map((c) => (c as { index: number }).index)).toContain(6); + + transport.destroy(); + }); + + test('same-replica: should not drop chunk whose pub/sub arrives AFTER GET resolves', async () => { + const mockPublisher = createMockPublisher(); + const mockSubscriber = { + on: jest.fn(), + subscribe: jest.fn().mockResolvedValue(undefined), + unsubscribe: jest.fn().mockResolvedValue(undefined), + }; + + const transport = new RedisEventTransport( + mockPublisher as unknown as Redis, + mockSubscriber as unknown as Redis, + ); + + const streamId = 'race-post-get-test'; + const chunks: unknown[] = []; + + // Emit seqs 0–4 (earlyEventBuffer would have held these; counter = 5) + for (let i = 0; i < 5; i++) { + await transport.emitChunk(streamId, { index: i }); + } + + transport.subscribe(streamId, { + onChunk: (event) => chunks.push(event), + }); + + const messageHandler = mockSubscriber.on.mock.calls.find( + (call) => call[0] === 'message', + )?.[1] as (channel: string, message: string) => void; + const channel = `stream:{${streamId}}:events`; + + let resolveGet!: (val: string | null) => void; + mockPublisher.get.mockImplementationOnce( + () => + new Promise((resolve) => { + resolveGet = resolve; + }), + ); + + const syncPromise = transport.syncReorderBuffer(streamId, 5); + + // LLM emits seq 5 during the GET window (INCR → counter=6) + // but do NOT deliver via messageHandler yet — simulates pub/sub arriving late + await transport.emitChunk(streamId, { index: 5 }); + + // GET resolves with counter=6 while pending is EMPTY (pub/sub hasn't arrived) + resolveGet('6'); + await syncPromise; + + // Now pub/sub for seq 5 arrives AFTER sync completed + messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 5, data: { index: 5 } })); + + // seq 5 must be delivered — nextSeq should have been capped at earlyReplayCount (5), + // not advanced to currentSeq (6) which would have dropped it. + expect(chunks.map((c) => (c as { index: number }).index)).toContain(5); + + transport.destroy(); + }); + }); + describe('End-to-end reconnect with GenerationJobManager (Integration)', () => { let originalEnv: NodeJS.ProcessEnv; let ioredisClient: Redis | Cluster | null = null; diff --git a/packages/api/src/stream/implementations/RedisEventTransport.ts b/packages/api/src/stream/implementations/RedisEventTransport.ts index 3682a9a749..73c82aa011 100644 --- a/packages/api/src/stream/implementations/RedisEventTransport.ts +++ b/packages/api/src/stream/implementations/RedisEventTransport.ts @@ -10,6 +10,14 @@ const CHANNELS = { events: (streamId: string) => `stream:{${streamId}}:events`, }; +/** + * Redis keys for shared state (hash-tagged for cluster slot compatibility) + */ +const KEYS = { + /** Atomic sequence counter: shared across all replicas for a given stream */ + sequence: (streamId: string) => `stream:{${streamId}}:seq`, +}; + /** * Event types for pub/sub messages */ @@ -96,8 +104,6 @@ export class RedisEventTransport implements IEventTransport { private channelSubscriptions = new Map>(); /** Counter for generating unique subscriber IDs */ private subscriberIdCounter = 0; - /** Sequence counters per stream for publishing (ensures ordered delivery in cluster mode) */ - private sequenceCounters = new Map(); /** * Create a new Redis event transport. @@ -115,16 +121,29 @@ export class RedisEventTransport implements IEventTransport { }); } - /** Get next sequence number for a stream (0-indexed) */ - private getNextSequence(streamId: string): number { - const current = this.sequenceCounters.get(streamId) ?? 0; - this.sequenceCounters.set(streamId, current + 1); - return current; + /** Safety-net TTL (seconds) set once on first INCR. Not refreshed — prevents mid-stream resets. */ + private static readonly SEQUENCE_TTL_SECONDS = 86400; + + /** + * Get next sequence number for a stream (0-indexed, backed by Redis INCR). + * A 24-hour TTL is set on the first INCR only (val === 1) as a safety net + * for orphaned keys from crashed processes. It is never refreshed, so an + * active stream cannot have its counter reset mid-generation. + * Keys are also deleted explicitly by cleanup() on normal stream teardown. + */ + private async getNextSequence(streamId: string): Promise { + const key = KEYS.sequence(streamId); + const val = await this.publisher.incr(key); + if (val === 1) { + this.publisher.expire(key, RedisEventTransport.SEQUENCE_TTL_SECONDS).catch((err) => { + logger.warn(`[RedisEventTransport] Failed to set TTL on sequence key ${key}:`, err); + }); + } + return val - 1; } - /** Reset publish sequence counter and subscriber reorder state for a stream (full cleanup only) */ - resetSequence(streamId: string): void { - this.sequenceCounters.delete(streamId); + /** Reset subscriber reorder buffer state to initial values */ + private resetReorderBuffer(streamId: string): void { const state = this.streams.get(streamId); if (state) { if (state.reorderBuffer.flushTimeout) { @@ -136,17 +155,63 @@ export class RedisEventTransport implements IEventTransport { } } - /** Advance subscriber reorder buffer to current publisher sequence without resetting publisher (cross-replica safe) */ - syncReorderBuffer(streamId: string): void { - const currentSeq = this.sequenceCounters.get(streamId) ?? 0; + /** + * Advance subscriber reorder buffer to the authoritative Redis sequence counter (cross-replica safe). + * + * @param earlyReplayCount - Number of events replayed from earlyEventBuffer (same-replica). + * Pending entries with seq < earlyReplayCount are duplicates and are pruned; entries at or + * above are live chunks from ongoing generation that arrived during the async GET window. + * Using the replay count (not the Redis counter) as the prune cutoff is critical: INCR can + * advance the counter past a live chunk's seq during the GET window, so currentSeq is not + * a safe proxy for "already delivered via earlyEventBuffer." + * When 0/undefined (cross-replica), all pending entries are treated as live and preserved. + */ + async syncReorderBuffer(streamId: string, earlyReplayCount = 0): Promise { + const key = KEYS.sequence(streamId); + const rawStr = await this.publisher.get(key); + const parsed = rawStr != null ? parseInt(rawStr, 10) : 0; + const currentSeq = Number.isNaN(parsed) ? 0 : parsed; const state = this.streams.get(streamId); if (state) { if (state.reorderBuffer.flushTimeout) { clearTimeout(state.reorderBuffer.flushTimeout); state.reorderBuffer.flushTimeout = null; } - state.reorderBuffer.nextSeq = currentSeq; - state.reorderBuffer.pending.clear(); + // Prune true duplicates: entries with seq < earlyReplayCount were already delivered + // via earlyEventBuffer. Entries at or above are live (possibly from ongoing generation). + if (earlyReplayCount > 0) { + for (const seq of state.reorderBuffer.pending.keys()) { + if (seq < earlyReplayCount) { + state.reorderBuffer.pending.delete(seq); + } + } + } + // Set nextSeq from remaining state. Never regress — handleOrderedChunk may have + // already advanced it during the async GET window. + if (state.reorderBuffer.pending.size === 0) { + // Same-replica: INCR precedes PUBLISH, so currentSeq may reflect allocated-but- + // not-yet-delivered events. Cap at earlyReplayCount to avoid skipping in-flight chunks. + // Cross-replica (earlyReplayCount=0): trust the Redis counter. + const ceiling = earlyReplayCount > 0 ? earlyReplayCount : currentSeq; + state.reorderBuffer.nextSeq = Math.max(state.reorderBuffer.nextSeq, ceiling); + } else { + let minPending = Infinity; + for (const seq of state.reorderBuffer.pending.keys()) { + if (seq < minPending) { + minPending = seq; + } + } + state.reorderBuffer.nextSeq = Math.max( + state.reorderBuffer.nextSeq, + Math.min(currentSeq, minPending), + ); + this.flushPendingMessages(streamId, state); + } + // Re-arm flush timeout if gaps remain after sync — without this, + // buffered messages could sit indefinitely if no new messages arrive. + if (state.reorderBuffer.pending.size > 0) { + this.scheduleFlushTimeout(streamId, state); + } } } @@ -442,13 +507,15 @@ export class RedisEventTransport implements IEventTransport { /** * Publish a chunk event to all subscribers across all instances. * Includes sequence number for ordered delivery in Redis Cluster mode. + * + * Performance: each emit requires two sequential Redis round-trips (INCR + PUBLISH). + * This is the unavoidable cost of cross-replica sequence coordination. */ async emitChunk(streamId: string, event: unknown): Promise { - const channel = CHANNELS.events(streamId); - const seq = this.getNextSequence(streamId); - const message: PubSubMessage = { type: EventTypes.CHUNK, seq, data: event }; - try { + const channel = CHANNELS.events(streamId); + const seq = await this.getNextSequence(streamId); + const message: PubSubMessage = { type: EventTypes.CHUNK, seq, data: event }; await this.publisher.publish(channel, JSON.stringify(message)); } catch (err) { logger.error(`[RedisEventTransport] Failed to publish chunk:`, err); @@ -461,7 +528,7 @@ export class RedisEventTransport implements IEventTransport { */ async emitDone(streamId: string, event: unknown): Promise { const channel = CHANNELS.events(streamId); - const seq = this.getNextSequence(streamId); + const seq = await this.getNextSequence(streamId); const message: PubSubMessage = { type: EventTypes.DONE, seq, data: event }; try { @@ -478,7 +545,7 @@ export class RedisEventTransport implements IEventTransport { */ async emitError(streamId: string, error: string): Promise { const channel = CHANNELS.events(streamId); - const seq = this.getNextSequence(streamId); + const seq = await this.getNextSequence(streamId); const message: PubSubMessage = { type: EventTypes.ERROR, seq, error }; try { @@ -598,20 +665,19 @@ export class RedisEventTransport implements IEventTransport { const state = this.streams.get(streamId); if (state) { - // Clear flush timeout - if (state.reorderBuffer.flushTimeout) { - clearTimeout(state.reorderBuffer.flushTimeout); - state.reorderBuffer.flushTimeout = null; - } - // Clear all handlers and callbacks state.handlers.clear(); state.allSubscribersLeftCallbacks = []; state.abortCallbacks = []; - state.reorderBuffer.pending.clear(); } - // Reset sequence counter for this stream - this.resetSequence(streamId); + this.resetReorderBuffer(streamId); + + // Delete the shared sequence key — safe because cleanup() is only called + // when the stream's job is complete (no more publishes will happen). + const seqKey = KEYS.sequence(streamId); + this.publisher.del(seqKey).catch((err) => { + logger.error(`[RedisEventTransport] Failed to delete sequence key ${seqKey}:`, err); + }); if (this.channelSubscriptions.has(channel)) { this.subscriber.unsubscribe(channel).catch((err) => { @@ -627,7 +693,11 @@ export class RedisEventTransport implements IEventTransport { * Destroy all resources. */ destroy(): void { - // Clear all flush timeouts and buffered messages + // Clear all flush timeouts and buffered messages. + // Sequence keys are NOT deleted here — they are shared across replicas. + // A shutting-down replica must not nuke the counter for active publishers. + // cleanup() deletes keys on normal teardown; a 24h safety-net TTL (set once + // at first INCR, never refreshed) caps orphan lifetime on abnormal shutdown. for (const [, state] of this.streams) { if (state.reorderBuffer.flushTimeout) { clearTimeout(state.reorderBuffer.flushTimeout); @@ -642,7 +712,6 @@ export class RedisEventTransport implements IEventTransport { this.channelSubscriptions.clear(); this.streams.clear(); - this.sequenceCounters.clear(); try { this.subscriber.disconnect(); diff --git a/packages/api/src/stream/interfaces/IJobStore.ts b/packages/api/src/stream/interfaces/IJobStore.ts index b59eed66f8..0d07b19538 100644 --- a/packages/api/src/stream/interfaces/IJobStore.ts +++ b/packages/api/src/stream/interfaces/IJobStore.ts @@ -337,11 +337,14 @@ export interface IEventTransport { /** Listen for all subscribers leaving */ onAllSubscribersLeft(streamId: string, callback: () => void): void; - /** Reset publish sequence counter for a stream (used during full stream cleanup) */ - resetSequence?(streamId: string): void; - - /** Advance subscriber reorder buffer to match publisher sequence (cross-replica safe: doesn't reset publisher counter) */ - syncReorderBuffer?(streamId: string): void; + /** + * Advance subscriber reorder buffer to match publisher sequence (cross-replica safe). + * @param earlyReplayCount - Number of events replayed from earlyEventBuffer (same-replica). + * Pending entries with seq < earlyReplayCount are duplicates and are pruned; entries at or + * above are live chunks that arrived during the async GET window and are preserved. + * When 0/undefined (cross-replica), all pending entries are treated as live. + */ + syncReorderBuffer?(streamId: string, earlyReplayCount?: number): void | Promise; /** Cleanup transport resources for a specific stream */ cleanup(streamId: string): void;