mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-06-09 17:31:19 +00:00
🚦 fix: Gate Chat Starts During Readiness (#13502)
Some checks are pending
Docker Dev Branch Images Build / build (Dockerfile, lc-dev, node) (push) Waiting to run
Docker Dev Branch Images Build / build (Dockerfile.multi, lc-dev-api, api-build) (push) Waiting to run
GitNexus Index / index (push) Waiting to run
GitNexus Index / post-index (push) Blocked by required conditions
Docker Dev Images Build / build (Dockerfile, librechat-dev, node) (push) Waiting to run
Docker Dev Images Build / build (Dockerfile.multi, librechat-dev-api, api-build) (push) Waiting to run
Sync Locize Translations & Create Translation PR / Sync Translation Keys with Locize (push) Waiting to run
Sync Locize Translations & Create Translation PR / Create Translation PR on Version Published (push) Blocked by required conditions
Sync Helm Chart Tags / Ignore non-main push (push) Waiting to run
Sync Helm Chart Tags / Sync chart tags (push) Waiting to run
Some checks are pending
Docker Dev Branch Images Build / build (Dockerfile, lc-dev, node) (push) Waiting to run
Docker Dev Branch Images Build / build (Dockerfile.multi, lc-dev-api, api-build) (push) Waiting to run
GitNexus Index / index (push) Waiting to run
GitNexus Index / post-index (push) Blocked by required conditions
Docker Dev Images Build / build (Dockerfile, librechat-dev, node) (push) Waiting to run
Docker Dev Images Build / build (Dockerfile.multi, librechat-dev-api, api-build) (push) Waiting to run
Sync Locize Translations & Create Translation PR / Sync Translation Keys with Locize (push) Waiting to run
Sync Locize Translations & Create Translation PR / Create Translation PR on Version Published (push) Blocked by required conditions
Sync Helm Chart Tags / Ignore non-main push (push) Waiting to run
Sync Helm Chart Tags / Sync chart tags (push) Waiting to run
* fix: guard chat starts during server readiness * style: format readiness retry condition * fix: clarify chat start retry diagnostics * fix: cancel stale chat start retries * style: use const for retry timeout
This commit is contained in:
parent
1da789bac0
commit
15072467b1
4 changed files with 257 additions and 24 deletions
|
|
@ -56,6 +56,30 @@ const trusted_proxy = Number(TRUST_PROXY) || 1; /* trust first proxy by default
|
|||
const app = express();
|
||||
let serverReady = false;
|
||||
|
||||
const SERVER_NOT_READY_CODE = 'SERVER_NOT_READY';
|
||||
const CHAT_START_RETRY_AFTER_SECONDS = '1';
|
||||
|
||||
const rejectChatStartsUntilReady = (req, res, next) => {
|
||||
if (serverReady || req.method !== 'POST' || req.path === '/abort') {
|
||||
return next();
|
||||
}
|
||||
|
||||
res.set('Retry-After', CHAT_START_RETRY_AFTER_SECONDS);
|
||||
return res.status(503).json({
|
||||
code: SERVER_NOT_READY_CODE,
|
||||
error: 'Server is still starting. Please retry shortly.',
|
||||
});
|
||||
};
|
||||
|
||||
const configureGenerationStreams = () => {
|
||||
const streamServices = createStreamServices();
|
||||
GenerationJobManager.configure({
|
||||
...streamServices,
|
||||
cleanupOnComplete: !isEnabled(process.env.STREAM_KEEP_COMPLETED_JOBS),
|
||||
});
|
||||
GenerationJobManager.initialize();
|
||||
};
|
||||
|
||||
const startServer = async () => {
|
||||
const { metricsMiddleware, metricsRouter } = createMetrics();
|
||||
if (!process.env.METRICS_SECRET) {
|
||||
|
|
@ -214,6 +238,7 @@ const startServer = async () => {
|
|||
app.use('/images/', createValidateImageRequest(appConfig.secureImageLinks), routes.staticRoute);
|
||||
app.use('/api/share', preAuthTenantMiddleware, routes.share);
|
||||
app.use('/api/roles', routes.roles);
|
||||
app.use('/api/agents/chat', rejectChatStartsUntilReady);
|
||||
app.use('/api/agents', routes.agents);
|
||||
app.use('/api/banner', routes.banner);
|
||||
app.use('/api/memories', routes.memories);
|
||||
|
|
@ -251,6 +276,8 @@ const startServer = async () => {
|
|||
/** Error handler (must be last - Express identifies error middleware by its 4-arg signature) */
|
||||
app.use(ErrorController);
|
||||
|
||||
configureGenerationStreams();
|
||||
|
||||
const server = app.listen(port, host, async (err) => {
|
||||
if (err) {
|
||||
logger.error('Failed to start server:', err);
|
||||
|
|
@ -280,14 +307,6 @@ const startServer = async () => {
|
|||
});
|
||||
await checkMigrations();
|
||||
|
||||
// Configure stream services (auto-detects Redis from USE_REDIS env var)
|
||||
const streamServices = createStreamServices();
|
||||
GenerationJobManager.configure({
|
||||
...streamServices,
|
||||
cleanupOnComplete: !isEnabled(process.env.STREAM_KEEP_COMPLETED_JOBS),
|
||||
});
|
||||
GenerationJobManager.initialize();
|
||||
|
||||
const inspectFlags = process.execArgv.some((arg) => arg.startsWith('--inspect'));
|
||||
if (inspectFlags || isEnabled(process.env.MEM_DIAG)) {
|
||||
memoryDiagnostics.start();
|
||||
|
|
|
|||
|
|
@ -83,6 +83,33 @@ describe('Telemetry wiring', () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe('Startup readiness wiring', () => {
|
||||
const source = fs.readFileSync(path.join(__dirname, 'index.js'), 'utf8');
|
||||
|
||||
it('configures generation streams before the server accepts requests', () => {
|
||||
const streamConfigIndex = source.indexOf('configureGenerationStreams();');
|
||||
const listenIndex = source.indexOf('const server = app.listen');
|
||||
const postListenMcpIndex = source.indexOf('await initializeMCPs();');
|
||||
|
||||
expect(streamConfigIndex).toBeGreaterThan(-1);
|
||||
expect(listenIndex).toBeGreaterThan(-1);
|
||||
expect(postListenMcpIndex).toBeGreaterThan(-1);
|
||||
expect(streamConfigIndex).toBeLessThan(listenIndex);
|
||||
expect(streamConfigIndex).toBeLessThan(postListenMcpIndex);
|
||||
});
|
||||
|
||||
it('mounts the chat-start readiness gate before agent routes', () => {
|
||||
const readinessGateIndex = source.indexOf(
|
||||
"app.use('/api/agents/chat', rejectChatStartsUntilReady);",
|
||||
);
|
||||
const agentsRouteIndex = source.indexOf("app.use('/api/agents', routes.agents);");
|
||||
|
||||
expect(readinessGateIndex).toBeGreaterThan(-1);
|
||||
expect(agentsRouteIndex).toBeGreaterThan(-1);
|
||||
expect(readinessGateIndex).toBeLessThan(agentsRouteIndex);
|
||||
});
|
||||
});
|
||||
|
||||
describe('Server Configuration', () => {
|
||||
// Increase the default timeout to allow for Mongo cleanup
|
||||
jest.setTimeout(30_000);
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
import { renderHook, act } from '@testing-library/react';
|
||||
import { Constants, LocalStorageKeys, QueryKeys } from 'librechat-data-provider';
|
||||
import { Constants, LocalStorageKeys, QueryKeys, request } from 'librechat-data-provider';
|
||||
import type { TSubmission } from 'librechat-data-provider';
|
||||
|
||||
type SSEEventListener = (e: Partial<MessageEvent> & { responseCode?: number }) => void;
|
||||
|
|
@ -174,6 +174,28 @@ const getLastSSE = (): MockSSEInstance => {
|
|||
return sse;
|
||||
};
|
||||
|
||||
const serverNotReadyError = (retryAfter = '0') => ({
|
||||
response: {
|
||||
status: 503,
|
||||
data: { code: 'SERVER_NOT_READY' },
|
||||
headers: { 'retry-after': retryAfter },
|
||||
},
|
||||
});
|
||||
|
||||
const flushMicrotasks = async () => {
|
||||
await act(async () => {
|
||||
await Promise.resolve();
|
||||
});
|
||||
};
|
||||
|
||||
const advanceRetryTimer = async (ms: number) => {
|
||||
await act(async () => {
|
||||
jest.advanceTimersByTime(ms);
|
||||
await Promise.resolve();
|
||||
});
|
||||
await flushMicrotasks();
|
||||
};
|
||||
|
||||
describe('useResumableSSE - 404 error path', () => {
|
||||
beforeEach(() => {
|
||||
mockSSEInstances.length = 0;
|
||||
|
|
@ -189,6 +211,12 @@ describe('useResumableSSE - 404 error path', () => {
|
|||
mockInvalidateQueries.mockClear();
|
||||
mockRemoveQueries.mockClear();
|
||||
mockFindAll.mockClear();
|
||||
(request.post as jest.Mock).mockReset();
|
||||
(request.post as jest.Mock).mockResolvedValue({ streamId: 'stream-123' });
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
jest.useRealTimers();
|
||||
});
|
||||
|
||||
const seedDraft = (conversationId: string) => {
|
||||
|
|
@ -462,6 +490,53 @@ describe('useResumableSSE - 404 error path', () => {
|
|||
unmount();
|
||||
});
|
||||
|
||||
it('continues retrying chat start while the server reports startup readiness pending', async () => {
|
||||
jest.useFakeTimers();
|
||||
for (let i = 0; i < 9; i++) {
|
||||
(request.post as jest.Mock).mockRejectedValueOnce(serverNotReadyError('1'));
|
||||
}
|
||||
(request.post as jest.Mock).mockResolvedValueOnce({ streamId: 'stream-ready' });
|
||||
|
||||
const submission = buildSubmission();
|
||||
const chatHelpers = buildChatHelpers();
|
||||
|
||||
const { unmount } = renderHook(() => useResumableSSE(submission, chatHelpers));
|
||||
|
||||
await flushMicrotasks();
|
||||
|
||||
for (let i = 0; i < 9; i++) {
|
||||
await advanceRetryTimer(1000);
|
||||
}
|
||||
|
||||
expect(request.post).toHaveBeenCalledTimes(10);
|
||||
expect(mockSSEInstances).toHaveLength(1);
|
||||
expect(mockSSEInstances[0].stream).toHaveBeenCalledTimes(1);
|
||||
expect(mockErrorHandler).not.toHaveBeenCalled();
|
||||
unmount();
|
||||
jest.useRealTimers();
|
||||
});
|
||||
|
||||
it('cancels startup readiness retries on cleanup before opening a stream', async () => {
|
||||
jest.useFakeTimers();
|
||||
(request.post as jest.Mock)
|
||||
.mockRejectedValueOnce(serverNotReadyError('1'))
|
||||
.mockResolvedValueOnce({ streamId: 'stale-stream' });
|
||||
|
||||
const submission = buildSubmission();
|
||||
const chatHelpers = buildChatHelpers();
|
||||
|
||||
const { unmount } = renderHook(() => useResumableSSE(submission, chatHelpers));
|
||||
|
||||
await flushMicrotasks();
|
||||
unmount();
|
||||
await advanceRetryTimer(1000);
|
||||
|
||||
expect(request.post).toHaveBeenCalledTimes(1);
|
||||
expect(mockSSEInstances).toHaveLength(0);
|
||||
expect(mockErrorHandler).not.toHaveBeenCalled();
|
||||
jest.useRealTimers();
|
||||
});
|
||||
|
||||
it('replays title events from resume state sync', async () => {
|
||||
const submission = buildSubmission();
|
||||
const chatHelpers = buildChatHelpers();
|
||||
|
|
|
|||
|
|
@ -40,6 +40,75 @@ type ChatHelpers = Pick<
|
|||
>;
|
||||
|
||||
const MAX_RETRIES = 5;
|
||||
const START_GENERATION_NETWORK_RETRIES = 3;
|
||||
const START_GENERATION_READINESS_TIMEOUT_MS = 120000;
|
||||
const SERVER_NOT_READY_CODE = 'SERVER_NOT_READY';
|
||||
|
||||
type StartGenerationError = {
|
||||
code?: string;
|
||||
response?: {
|
||||
status?: number;
|
||||
data?: {
|
||||
code?: string;
|
||||
};
|
||||
headers?: Record<string, string | number | string[] | undefined>;
|
||||
};
|
||||
};
|
||||
|
||||
const toStartGenerationError = (error: unknown): StartGenerationError | undefined =>
|
||||
error != null && typeof error === 'object' ? (error as StartGenerationError) : undefined;
|
||||
|
||||
const isRetryableNetworkError = (error: unknown) => {
|
||||
if (!(error instanceof Error)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const { code } = toStartGenerationError(error) ?? {};
|
||||
return code === 'ERR_NETWORK' || code === 'ERR_INTERNET_DISCONNECTED';
|
||||
};
|
||||
|
||||
const isServerNotReadyError = (error: unknown) => {
|
||||
const candidate = toStartGenerationError(error);
|
||||
return (
|
||||
candidate?.response?.status === 503 && candidate.response?.data?.code === SERVER_NOT_READY_CODE
|
||||
);
|
||||
};
|
||||
|
||||
const getRetryAfterDelay = (error: unknown, fallbackDelay: number) => {
|
||||
const headers = toStartGenerationError(error)?.response?.headers;
|
||||
const rawValue = headers?.['retry-after'] ?? headers?.['Retry-After'];
|
||||
const retryAfter = Array.isArray(rawValue) ? rawValue[0] : rawValue;
|
||||
const seconds = typeof retryAfter === 'number' ? retryAfter : Number(retryAfter);
|
||||
|
||||
if (!Number.isFinite(seconds) || seconds < 0) {
|
||||
return fallbackDelay;
|
||||
}
|
||||
|
||||
return Math.min(seconds * 1000, 30000);
|
||||
};
|
||||
|
||||
const waitForRetryDelay = (delay: number, signal?: AbortSignal): Promise<boolean> =>
|
||||
new Promise((resolve) => {
|
||||
if (signal?.aborted) {
|
||||
resolve(false);
|
||||
return;
|
||||
}
|
||||
|
||||
function cleanup() {
|
||||
signal?.removeEventListener('abort', onAbort);
|
||||
}
|
||||
function onAbort() {
|
||||
clearTimeout(timeout);
|
||||
cleanup();
|
||||
resolve(false);
|
||||
}
|
||||
const timeout = setTimeout(() => {
|
||||
cleanup();
|
||||
resolve(true);
|
||||
}, delay);
|
||||
|
||||
signal?.addEventListener('abort', onAbort, { once: true });
|
||||
});
|
||||
|
||||
const hasConcreteConversationId = (conversationId?: string | null) =>
|
||||
!!conversationId &&
|
||||
|
|
@ -703,10 +772,11 @@ export default function useResumableSSE(
|
|||
/**
|
||||
* Start generation (POST request that returns streamId)
|
||||
* Uses request.post which has axios interceptors for automatic token refresh.
|
||||
* Retries up to 3 times on network errors with exponential backoff.
|
||||
* Retries transient network failures and startup readiness responses.
|
||||
* Readiness retries honor Retry-After until cleanup or the readiness window expires.
|
||||
*/
|
||||
const startGeneration = useCallback(
|
||||
async (currentSubmission: TSubmission): Promise<string | null> => {
|
||||
async (currentSubmission: TSubmission, signal?: AbortSignal): Promise<string | null> => {
|
||||
const payloadData = createPayload(currentSubmission);
|
||||
let { payload } = payloadData;
|
||||
payload = removeNullishValues(payload) as TPayload;
|
||||
|
|
@ -715,29 +785,54 @@ export default function useResumableSSE(
|
|||
|
||||
const url = payloadData.server;
|
||||
|
||||
const maxRetries = 3;
|
||||
let lastError: unknown = null;
|
||||
let requestAttempts = 0;
|
||||
let networkAttempts = 0;
|
||||
let readinessAttempts = 0;
|
||||
const readinessDeadline = Date.now() + START_GENERATION_READINESS_TIMEOUT_MS;
|
||||
|
||||
for (let attempt = 1; attempt <= maxRetries; attempt++) {
|
||||
while (!signal?.aborted) {
|
||||
requestAttempts += 1;
|
||||
try {
|
||||
// Use request.post which handles auth token refresh via axios interceptors
|
||||
const data = (await request.post(url, payload)) as { streamId: string };
|
||||
if (signal?.aborted) {
|
||||
return null;
|
||||
}
|
||||
console.log('[ResumableSSE] Generation started:', { streamId: data.streamId });
|
||||
return data.streamId;
|
||||
} catch (error) {
|
||||
lastError = error;
|
||||
// Check if it's a network error (retry) vs server error (don't retry)
|
||||
const isNetworkError =
|
||||
error instanceof Error &&
|
||||
'code' in error &&
|
||||
(error.code === 'ERR_NETWORK' || error.code === 'ERR_INTERNET_DISCONNECTED');
|
||||
if (signal?.aborted) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (isNetworkError && attempt < maxRetries) {
|
||||
const delay = Math.min(1000 * Math.pow(2, attempt - 1), 8000);
|
||||
lastError = error;
|
||||
const isNetworkError = isRetryableNetworkError(error);
|
||||
const isServerNotReady = isServerNotReadyError(error);
|
||||
const remainingReadinessMs = readinessDeadline - Date.now();
|
||||
const shouldRetryNetwork =
|
||||
isNetworkError && networkAttempts < START_GENERATION_NETWORK_RETRIES - 1;
|
||||
const shouldRetryServerNotReady = isServerNotReady && remainingReadinessMs > 0;
|
||||
|
||||
if (shouldRetryNetwork || shouldRetryServerNotReady) {
|
||||
networkAttempts += isNetworkError ? 1 : 0;
|
||||
readinessAttempts += isServerNotReady ? 1 : 0;
|
||||
const fallbackDelay = Math.min(1000 * Math.pow(2, requestAttempts - 1), 8000);
|
||||
const retryDelay = isServerNotReady
|
||||
? Math.min(getRetryAfterDelay(error, fallbackDelay), remainingReadinessMs)
|
||||
: fallbackDelay;
|
||||
const reason = isServerNotReady ? 'Server not ready' : 'Network error';
|
||||
const attempt = isServerNotReady ? readinessAttempts : networkAttempts;
|
||||
const limit = isServerNotReady
|
||||
? `${Math.ceil(START_GENERATION_READINESS_TIMEOUT_MS / 1000)}s readiness window`
|
||||
: `${START_GENERATION_NETWORK_RETRIES}`;
|
||||
console.log(
|
||||
`[ResumableSSE] Network error starting generation, retrying in ${delay}ms (attempt ${attempt}/${maxRetries})`,
|
||||
`[ResumableSSE] ${reason} starting generation, retrying in ${retryDelay}ms (attempt ${attempt}/${limit})`,
|
||||
);
|
||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||
const shouldContinue = await waitForRetryDelay(retryDelay, signal);
|
||||
if (!shouldContinue) {
|
||||
return null;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
@ -746,6 +841,10 @@ export default function useResumableSSE(
|
|||
}
|
||||
}
|
||||
|
||||
if (signal?.aborted) {
|
||||
return null;
|
||||
}
|
||||
|
||||
console.error('[ResumableSSE] Error starting generation:', lastError);
|
||||
|
||||
const axiosError = lastError as { response?: { data?: Record<string, unknown> } };
|
||||
|
|
@ -794,12 +893,21 @@ export default function useResumableSSE(
|
|||
});
|
||||
|
||||
submissionRef.current = submission;
|
||||
const startController = new AbortController();
|
||||
const { signal } = startController;
|
||||
|
||||
const initStream = async () => {
|
||||
if (signal.aborted) {
|
||||
return;
|
||||
}
|
||||
|
||||
setIsSubmitting(true);
|
||||
setShowStopButton(true);
|
||||
|
||||
if (resumeStreamId) {
|
||||
if (signal.aborted) {
|
||||
return;
|
||||
}
|
||||
// Resume: just subscribe to existing stream, don't start new generation
|
||||
console.log('[ResumableSSE] Resuming existing stream:', resumeStreamId);
|
||||
setStreamId(resumeStreamId);
|
||||
|
|
@ -809,7 +917,10 @@ export default function useResumableSSE(
|
|||
} else {
|
||||
// New generation: start and then subscribe
|
||||
console.log('[ResumableSSE] Starting NEW generation');
|
||||
const newStreamId = await startGeneration(submission);
|
||||
const newStreamId = await startGeneration(submission, signal);
|
||||
if (signal.aborted) {
|
||||
return;
|
||||
}
|
||||
if (newStreamId) {
|
||||
setStreamId(newStreamId);
|
||||
// Optimistically add to active jobs
|
||||
|
|
@ -837,6 +948,7 @@ export default function useResumableSSE(
|
|||
|
||||
return () => {
|
||||
console.log('[ResumableSSE] Cleanup - closing SSE, resetting UI state');
|
||||
startController.abort();
|
||||
// Cleanup on unmount/navigation - close connection but DO NOT abort backend
|
||||
// Reset UI state so it doesn't leak to other conversations
|
||||
// If user returns to this conversation, useResumeOnLoad will restore the state
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue