diff --git a/api/server/index.js b/api/server/index.js index d96180cc4e..20fa0f9bf1 100644 --- a/api/server/index.js +++ b/api/server/index.js @@ -56,6 +56,30 @@ const trusted_proxy = Number(TRUST_PROXY) || 1; /* trust first proxy by default const app = express(); let serverReady = false; +const SERVER_NOT_READY_CODE = 'SERVER_NOT_READY'; +const CHAT_START_RETRY_AFTER_SECONDS = '1'; + +const rejectChatStartsUntilReady = (req, res, next) => { + if (serverReady || req.method !== 'POST' || req.path === '/abort') { + return next(); + } + + res.set('Retry-After', CHAT_START_RETRY_AFTER_SECONDS); + return res.status(503).json({ + code: SERVER_NOT_READY_CODE, + error: 'Server is still starting. Please retry shortly.', + }); +}; + +const configureGenerationStreams = () => { + const streamServices = createStreamServices(); + GenerationJobManager.configure({ + ...streamServices, + cleanupOnComplete: !isEnabled(process.env.STREAM_KEEP_COMPLETED_JOBS), + }); + GenerationJobManager.initialize(); +}; + const startServer = async () => { const { metricsMiddleware, metricsRouter } = createMetrics(); if (!process.env.METRICS_SECRET) { @@ -214,6 +238,7 @@ const startServer = async () => { app.use('/images/', createValidateImageRequest(appConfig.secureImageLinks), routes.staticRoute); app.use('/api/share', preAuthTenantMiddleware, routes.share); app.use('/api/roles', routes.roles); + app.use('/api/agents/chat', rejectChatStartsUntilReady); app.use('/api/agents', routes.agents); app.use('/api/banner', routes.banner); app.use('/api/memories', routes.memories); @@ -251,6 +276,8 @@ const startServer = async () => { /** Error handler (must be last - Express identifies error middleware by its 4-arg signature) */ app.use(ErrorController); + configureGenerationStreams(); + const server = app.listen(port, host, async (err) => { if (err) { logger.error('Failed to start server:', err); @@ -280,14 +307,6 @@ const startServer = async () => { }); await checkMigrations(); - // Configure stream services (auto-detects Redis from USE_REDIS env var) - const streamServices = createStreamServices(); - GenerationJobManager.configure({ - ...streamServices, - cleanupOnComplete: !isEnabled(process.env.STREAM_KEEP_COMPLETED_JOBS), - }); - GenerationJobManager.initialize(); - const inspectFlags = process.execArgv.some((arg) => arg.startsWith('--inspect')); if (inspectFlags || isEnabled(process.env.MEM_DIAG)) { memoryDiagnostics.start(); diff --git a/api/server/index.spec.js b/api/server/index.spec.js index 573770e282..20bc5b5a79 100644 --- a/api/server/index.spec.js +++ b/api/server/index.spec.js @@ -83,6 +83,33 @@ describe('Telemetry wiring', () => { }); }); +describe('Startup readiness wiring', () => { + const source = fs.readFileSync(path.join(__dirname, 'index.js'), 'utf8'); + + it('configures generation streams before the server accepts requests', () => { + const streamConfigIndex = source.indexOf('configureGenerationStreams();'); + const listenIndex = source.indexOf('const server = app.listen'); + const postListenMcpIndex = source.indexOf('await initializeMCPs();'); + + expect(streamConfigIndex).toBeGreaterThan(-1); + expect(listenIndex).toBeGreaterThan(-1); + expect(postListenMcpIndex).toBeGreaterThan(-1); + expect(streamConfigIndex).toBeLessThan(listenIndex); + expect(streamConfigIndex).toBeLessThan(postListenMcpIndex); + }); + + it('mounts the chat-start readiness gate before agent routes', () => { + const readinessGateIndex = source.indexOf( + "app.use('/api/agents/chat', rejectChatStartsUntilReady);", + ); + const agentsRouteIndex = source.indexOf("app.use('/api/agents', routes.agents);"); + + expect(readinessGateIndex).toBeGreaterThan(-1); + expect(agentsRouteIndex).toBeGreaterThan(-1); + expect(readinessGateIndex).toBeLessThan(agentsRouteIndex); + }); +}); + describe('Server Configuration', () => { // Increase the default timeout to allow for Mongo cleanup jest.setTimeout(30_000); diff --git a/client/src/hooks/SSE/__tests__/useResumableSSE.spec.ts b/client/src/hooks/SSE/__tests__/useResumableSSE.spec.ts index e9ee6a6b0a..c1b8e3fc6c 100644 --- a/client/src/hooks/SSE/__tests__/useResumableSSE.spec.ts +++ b/client/src/hooks/SSE/__tests__/useResumableSSE.spec.ts @@ -1,5 +1,5 @@ import { renderHook, act } from '@testing-library/react'; -import { Constants, LocalStorageKeys, QueryKeys } from 'librechat-data-provider'; +import { Constants, LocalStorageKeys, QueryKeys, request } from 'librechat-data-provider'; import type { TSubmission } from 'librechat-data-provider'; type SSEEventListener = (e: Partial & { responseCode?: number }) => void; @@ -174,6 +174,28 @@ const getLastSSE = (): MockSSEInstance => { return sse; }; +const serverNotReadyError = (retryAfter = '0') => ({ + response: { + status: 503, + data: { code: 'SERVER_NOT_READY' }, + headers: { 'retry-after': retryAfter }, + }, +}); + +const flushMicrotasks = async () => { + await act(async () => { + await Promise.resolve(); + }); +}; + +const advanceRetryTimer = async (ms: number) => { + await act(async () => { + jest.advanceTimersByTime(ms); + await Promise.resolve(); + }); + await flushMicrotasks(); +}; + describe('useResumableSSE - 404 error path', () => { beforeEach(() => { mockSSEInstances.length = 0; @@ -189,6 +211,12 @@ describe('useResumableSSE - 404 error path', () => { mockInvalidateQueries.mockClear(); mockRemoveQueries.mockClear(); mockFindAll.mockClear(); + (request.post as jest.Mock).mockReset(); + (request.post as jest.Mock).mockResolvedValue({ streamId: 'stream-123' }); + }); + + afterEach(() => { + jest.useRealTimers(); }); const seedDraft = (conversationId: string) => { @@ -462,6 +490,53 @@ describe('useResumableSSE - 404 error path', () => { unmount(); }); + it('continues retrying chat start while the server reports startup readiness pending', async () => { + jest.useFakeTimers(); + for (let i = 0; i < 9; i++) { + (request.post as jest.Mock).mockRejectedValueOnce(serverNotReadyError('1')); + } + (request.post as jest.Mock).mockResolvedValueOnce({ streamId: 'stream-ready' }); + + const submission = buildSubmission(); + const chatHelpers = buildChatHelpers(); + + const { unmount } = renderHook(() => useResumableSSE(submission, chatHelpers)); + + await flushMicrotasks(); + + for (let i = 0; i < 9; i++) { + await advanceRetryTimer(1000); + } + + expect(request.post).toHaveBeenCalledTimes(10); + expect(mockSSEInstances).toHaveLength(1); + expect(mockSSEInstances[0].stream).toHaveBeenCalledTimes(1); + expect(mockErrorHandler).not.toHaveBeenCalled(); + unmount(); + jest.useRealTimers(); + }); + + it('cancels startup readiness retries on cleanup before opening a stream', async () => { + jest.useFakeTimers(); + (request.post as jest.Mock) + .mockRejectedValueOnce(serverNotReadyError('1')) + .mockResolvedValueOnce({ streamId: 'stale-stream' }); + + const submission = buildSubmission(); + const chatHelpers = buildChatHelpers(); + + const { unmount } = renderHook(() => useResumableSSE(submission, chatHelpers)); + + await flushMicrotasks(); + unmount(); + await advanceRetryTimer(1000); + + expect(request.post).toHaveBeenCalledTimes(1); + expect(mockSSEInstances).toHaveLength(0); + expect(mockErrorHandler).not.toHaveBeenCalled(); + jest.useRealTimers(); + }); + it('replays title events from resume state sync', async () => { const submission = buildSubmission(); const chatHelpers = buildChatHelpers(); diff --git a/client/src/hooks/SSE/useResumableSSE.ts b/client/src/hooks/SSE/useResumableSSE.ts index f10a7f16c9..0bbbca8878 100644 --- a/client/src/hooks/SSE/useResumableSSE.ts +++ b/client/src/hooks/SSE/useResumableSSE.ts @@ -40,6 +40,75 @@ type ChatHelpers = Pick< >; const MAX_RETRIES = 5; +const START_GENERATION_NETWORK_RETRIES = 3; +const START_GENERATION_READINESS_TIMEOUT_MS = 120000; +const SERVER_NOT_READY_CODE = 'SERVER_NOT_READY'; + +type StartGenerationError = { + code?: string; + response?: { + status?: number; + data?: { + code?: string; + }; + headers?: Record; + }; +}; + +const toStartGenerationError = (error: unknown): StartGenerationError | undefined => + error != null && typeof error === 'object' ? (error as StartGenerationError) : undefined; + +const isRetryableNetworkError = (error: unknown) => { + if (!(error instanceof Error)) { + return false; + } + + const { code } = toStartGenerationError(error) ?? {}; + return code === 'ERR_NETWORK' || code === 'ERR_INTERNET_DISCONNECTED'; +}; + +const isServerNotReadyError = (error: unknown) => { + const candidate = toStartGenerationError(error); + return ( + candidate?.response?.status === 503 && candidate.response?.data?.code === SERVER_NOT_READY_CODE + ); +}; + +const getRetryAfterDelay = (error: unknown, fallbackDelay: number) => { + const headers = toStartGenerationError(error)?.response?.headers; + const rawValue = headers?.['retry-after'] ?? headers?.['Retry-After']; + const retryAfter = Array.isArray(rawValue) ? rawValue[0] : rawValue; + const seconds = typeof retryAfter === 'number' ? retryAfter : Number(retryAfter); + + if (!Number.isFinite(seconds) || seconds < 0) { + return fallbackDelay; + } + + return Math.min(seconds * 1000, 30000); +}; + +const waitForRetryDelay = (delay: number, signal?: AbortSignal): Promise => + new Promise((resolve) => { + if (signal?.aborted) { + resolve(false); + return; + } + + function cleanup() { + signal?.removeEventListener('abort', onAbort); + } + function onAbort() { + clearTimeout(timeout); + cleanup(); + resolve(false); + } + const timeout = setTimeout(() => { + cleanup(); + resolve(true); + }, delay); + + signal?.addEventListener('abort', onAbort, { once: true }); + }); const hasConcreteConversationId = (conversationId?: string | null) => !!conversationId && @@ -703,10 +772,11 @@ export default function useResumableSSE( /** * Start generation (POST request that returns streamId) * Uses request.post which has axios interceptors for automatic token refresh. - * Retries up to 3 times on network errors with exponential backoff. + * Retries transient network failures and startup readiness responses. + * Readiness retries honor Retry-After until cleanup or the readiness window expires. */ const startGeneration = useCallback( - async (currentSubmission: TSubmission): Promise => { + async (currentSubmission: TSubmission, signal?: AbortSignal): Promise => { const payloadData = createPayload(currentSubmission); let { payload } = payloadData; payload = removeNullishValues(payload) as TPayload; @@ -715,29 +785,54 @@ export default function useResumableSSE( const url = payloadData.server; - const maxRetries = 3; let lastError: unknown = null; + let requestAttempts = 0; + let networkAttempts = 0; + let readinessAttempts = 0; + const readinessDeadline = Date.now() + START_GENERATION_READINESS_TIMEOUT_MS; - for (let attempt = 1; attempt <= maxRetries; attempt++) { + while (!signal?.aborted) { + requestAttempts += 1; try { // Use request.post which handles auth token refresh via axios interceptors const data = (await request.post(url, payload)) as { streamId: string }; + if (signal?.aborted) { + return null; + } console.log('[ResumableSSE] Generation started:', { streamId: data.streamId }); return data.streamId; } catch (error) { - lastError = error; - // Check if it's a network error (retry) vs server error (don't retry) - const isNetworkError = - error instanceof Error && - 'code' in error && - (error.code === 'ERR_NETWORK' || error.code === 'ERR_INTERNET_DISCONNECTED'); + if (signal?.aborted) { + return null; + } - if (isNetworkError && attempt < maxRetries) { - const delay = Math.min(1000 * Math.pow(2, attempt - 1), 8000); + lastError = error; + const isNetworkError = isRetryableNetworkError(error); + const isServerNotReady = isServerNotReadyError(error); + const remainingReadinessMs = readinessDeadline - Date.now(); + const shouldRetryNetwork = + isNetworkError && networkAttempts < START_GENERATION_NETWORK_RETRIES - 1; + const shouldRetryServerNotReady = isServerNotReady && remainingReadinessMs > 0; + + if (shouldRetryNetwork || shouldRetryServerNotReady) { + networkAttempts += isNetworkError ? 1 : 0; + readinessAttempts += isServerNotReady ? 1 : 0; + const fallbackDelay = Math.min(1000 * Math.pow(2, requestAttempts - 1), 8000); + const retryDelay = isServerNotReady + ? Math.min(getRetryAfterDelay(error, fallbackDelay), remainingReadinessMs) + : fallbackDelay; + const reason = isServerNotReady ? 'Server not ready' : 'Network error'; + const attempt = isServerNotReady ? readinessAttempts : networkAttempts; + const limit = isServerNotReady + ? `${Math.ceil(START_GENERATION_READINESS_TIMEOUT_MS / 1000)}s readiness window` + : `${START_GENERATION_NETWORK_RETRIES}`; console.log( - `[ResumableSSE] Network error starting generation, retrying in ${delay}ms (attempt ${attempt}/${maxRetries})`, + `[ResumableSSE] ${reason} starting generation, retrying in ${retryDelay}ms (attempt ${attempt}/${limit})`, ); - await new Promise((resolve) => setTimeout(resolve, delay)); + const shouldContinue = await waitForRetryDelay(retryDelay, signal); + if (!shouldContinue) { + return null; + } continue; } @@ -746,6 +841,10 @@ export default function useResumableSSE( } } + if (signal?.aborted) { + return null; + } + console.error('[ResumableSSE] Error starting generation:', lastError); const axiosError = lastError as { response?: { data?: Record } }; @@ -794,12 +893,21 @@ export default function useResumableSSE( }); submissionRef.current = submission; + const startController = new AbortController(); + const { signal } = startController; const initStream = async () => { + if (signal.aborted) { + return; + } + setIsSubmitting(true); setShowStopButton(true); if (resumeStreamId) { + if (signal.aborted) { + return; + } // Resume: just subscribe to existing stream, don't start new generation console.log('[ResumableSSE] Resuming existing stream:', resumeStreamId); setStreamId(resumeStreamId); @@ -809,7 +917,10 @@ export default function useResumableSSE( } else { // New generation: start and then subscribe console.log('[ResumableSSE] Starting NEW generation'); - const newStreamId = await startGeneration(submission); + const newStreamId = await startGeneration(submission, signal); + if (signal.aborted) { + return; + } if (newStreamId) { setStreamId(newStreamId); // Optimistically add to active jobs @@ -837,6 +948,7 @@ export default function useResumableSSE( return () => { console.log('[ResumableSSE] Cleanup - closing SSE, resetting UI state'); + startController.abort(); // Cleanup on unmount/navigation - close connection but DO NOT abort backend // Reset UI state so it doesn't leak to other conversations // If user returns to this conversation, useResumeOnLoad will restore the state