diff --git a/client/src/hooks/SSE/__tests__/useResumableSSE.spec.ts b/client/src/hooks/SSE/__tests__/useResumableSSE.spec.ts index 7d2942448f..c383b4d8d8 100644 --- a/client/src/hooks/SSE/__tests__/useResumableSSE.spec.ts +++ b/client/src/hooks/SSE/__tests__/useResumableSSE.spec.ts @@ -1,6 +1,7 @@ import { renderHook, act } from '@testing-library/react'; import { Constants, LocalStorageKeys } from 'librechat-data-provider'; -import type { TSubmission } from 'librechat-data-provider'; +import type { TMessage, TSubmission } from 'librechat-data-provider'; +import { logger } from '~/utils'; type SSEEventListener = (e: Partial & { responseCode?: number }) => void; @@ -76,6 +77,15 @@ const mockErrorHandler = jest.fn(); const mockSetIsSubmitting = jest.fn(); const mockClearStepMaps = jest.fn(); +jest.mock('~/utils', () => ({ + ...jest.requireActual('~/utils'), + logger: { + log: jest.fn(), + }, +})); + +const mockLoggerLog = logger.log as jest.Mock; + jest.mock('~/hooks/SSE/useEventHandlers', () => jest.fn(() => ({ errorHandler: mockErrorHandler, @@ -152,7 +162,7 @@ const buildSubmission = (overrides: Partial = {}): TSubmissio const buildChatHelpers = () => ({ setMessages: jest.fn(), - getMessages: jest.fn(() => []), + getMessages: jest.fn((): TMessage[] => []), setConversation: jest.fn(), setIsSubmitting: mockSetIsSubmitting, newConversation: jest.fn(), @@ -165,6 +175,150 @@ const getLastSSE = (): MockSSEInstance => { return sse; }; +describe('useResumableSSE - resume sync latest message alignment', () => { + beforeEach(() => { + mockSSEInstances.length = 0; + jest.clearAllMocks(); + }); + + const renderResumeScenario = async (chatHelpers: ReturnType) => { + const submission = { + ...buildSubmission(), + resumeStreamId: 'stream-resume-123', + } as TSubmission & { resumeStreamId: string }; + + const { unmount } = renderHook(() => useResumableSSE(submission, chatHelpers)); + + await act(async () => { + await Promise.resolve(); + }); + + return { sse: getLastSSE(), submission, unmount }; + }; + + const emitResumeSync = async ( + sse: MockSSEInstance, + resumeState: { + aggregatedContent: Array<{ type: string; text: string }>; + responseMessageId?: string; + }, + ) => { + await act(async () => { + sse._emit('message', { + data: JSON.stringify({ + sync: true, + resumeState, + pendingEvents: [], + }), + }); + }); + }; + + it('sets latestMessage before replacing an existing resumed response', async () => { + const userMessage = { + messageId: 'msg-1', + conversationId: CONV_ID, + text: 'Hello', + isCreatedByUser: true, + } as TMessage; + const responseMessage = { + messageId: 'resp-server', + parentMessageId: 'msg-1', + conversationId: CONV_ID, + text: '', + content: [{ type: 'text', text: 'old content' }], + isCreatedByUser: false, + } as TMessage; + const chatHelpers = buildChatHelpers(); + chatHelpers.getMessages.mockReturnValue([userMessage, responseMessage]); + + const { sse, unmount } = await renderResumeScenario(chatHelpers); + chatHelpers.setLatestMessage.mockClear(); + mockLoggerLog.mockClear(); + + await emitResumeSync(sse, { + responseMessageId: 'resp-server', + aggregatedContent: [{ type: 'text', text: 'resumed content' }], + }); + + const updatedMessages = chatHelpers.setMessages.mock.calls[0][0] as TMessage[]; + const updatedResponse = updatedMessages[1]; + + expect(updatedResponse).toEqual({ + ...responseMessage, + content: [{ type: 'text', text: 'resumed content' }], + }); + expect(chatHelpers.setLatestMessage).toHaveBeenCalledWith(updatedResponse); + expect(chatHelpers.setLatestMessage.mock.invocationCallOrder[0]).toBeLessThan( + chatHelpers.setMessages.mock.invocationCallOrder[0], + ); + expect(mockLoggerLog).toHaveBeenCalledWith( + 'latest_message', + 'useResumableSSE.sync: setting latest message', + ); + unmount(); + }); + + it('sets latestMessage before appending a missing resumed response', async () => { + const userMessage = { + messageId: 'msg-1', + conversationId: CONV_ID, + text: 'Hello', + isCreatedByUser: true, + } as TMessage; + const chatHelpers = buildChatHelpers(); + chatHelpers.getMessages.mockReturnValue([userMessage]); + + const { sse, unmount } = await renderResumeScenario(chatHelpers); + chatHelpers.setLatestMessage.mockClear(); + mockLoggerLog.mockClear(); + + await emitResumeSync(sse, { + responseMessageId: 'resp-server', + aggregatedContent: [{ type: 'text', text: 'resumed content' }], + }); + + const updatedMessages = chatHelpers.setMessages.mock.calls[0][0] as TMessage[]; + const newResponse = updatedMessages[1]; + + expect(updatedMessages).toEqual([userMessage, newResponse]); + expect(newResponse).toEqual({ + messageId: 'resp-server', + parentMessageId: 'msg-1', + conversationId: CONV_ID, + text: '', + content: [{ type: 'text', text: 'resumed content' }], + isCreatedByUser: false, + }); + expect(chatHelpers.setLatestMessage).toHaveBeenCalledWith(newResponse); + expect(chatHelpers.setLatestMessage.mock.invocationCallOrder[0]).toBeLessThan( + chatHelpers.setMessages.mock.invocationCallOrder[0], + ); + expect(mockLoggerLog).toHaveBeenCalledWith( + 'latest_message', + 'useResumableSSE.sync: setting latest message', + ); + unmount(); + }); + + it('seeds latestMessage from the resume submission after marking the stream submitting', async () => { + const chatHelpers = buildChatHelpers(); + + const { submission, unmount } = await renderResumeScenario(chatHelpers); + + expect(mockSetIsSubmitting).toHaveBeenCalledWith(true); + expect(chatHelpers.setLatestMessage).toHaveBeenCalledWith(submission.initialResponse); + expect(mockSetIsSubmitting.mock.invocationCallOrder[0]).toBeLessThan( + chatHelpers.setLatestMessage.mock.invocationCallOrder[0], + ); + expect(mockLoggerLog).toHaveBeenCalledWith( + 'latest_message', + 'useResumableSSE.resume: seeding latest message', + ); + unmount(); + }); +}); + describe('useResumableSSE - 404 error path', () => { beforeEach(() => { mockSSEInstances.length = 0; diff --git a/client/src/hooks/SSE/__tests__/useResumeOnLoad.spec.ts b/client/src/hooks/SSE/__tests__/useResumeOnLoad.spec.ts new file mode 100644 index 0000000000..f1bcb114cf --- /dev/null +++ b/client/src/hooks/SSE/__tests__/useResumeOnLoad.spec.ts @@ -0,0 +1,100 @@ +import { renderHook, waitFor } from '@testing-library/react'; +import type { TConversation, TMessage } from 'librechat-data-provider'; +import useResumeOnLoad from '~/hooks/SSE/useResumeOnLoad'; + +const mockSetSubmission = jest.fn(); +const mockSetLatestMessage = jest.fn(); +const mockUseStreamStatus = jest.fn(); + +jest.mock('recoil', () => ({ + ...jest.requireActual('recoil'), + useSetRecoilState: jest.fn((atom: string) => { + if (atom === 'latestMessageFamily-2') { + return mockSetLatestMessage; + } + + return mockSetSubmission; + }), + useRecoilValue: jest.fn((atom: string) => { + if (atom === 'conversationByIndex-2') { + return { endpoint: 'openAI' } as TConversation; + } + + return null; + }), +})); + +jest.mock('~/store', () => ({ + __esModule: true, + default: { + submissionByIndex: jest.fn((index: number) => `submissionByIndex-${index}`), + conversationByIndex: jest.fn((index: number) => `conversationByIndex-${index}`), + latestMessageFamily: jest.fn((index: number) => `latestMessageFamily-${index}`), + }, +})); + +jest.mock('~/data-provider', () => ({ + useStreamStatus: (...args: unknown[]) => mockUseStreamStatus(...args), +})); + +const CONVERSATION_ID = 'conversation-resume'; + +const userMessage = { + messageId: 'user-message', + parentMessageId: '00000000-0000-0000-0000-000000000000', + conversationId: CONVERSATION_ID, + text: 'continue this', + isCreatedByUser: true, +} as TMessage; + +const responseMessage = { + messageId: 'assistant-message', + parentMessageId: 'user-message', + conversationId: CONVERSATION_ID, + text: '', + content: [{ type: 'text', text: 'stale content' }], + isCreatedByUser: false, +} as TMessage; + +describe('useResumeOnLoad latest-message seed', () => { + beforeEach(() => { + jest.clearAllMocks(); + mockUseStreamStatus.mockReturnValue({ + data: { + active: true, + streamId: 'stream-resume', + status: 'active', + resumeState: { + userMessage, + responseMessageId: responseMessage.messageId, + aggregatedContent: [{ type: 'text', text: 'fresh resumed content' }], + }, + }, + isSuccess: true, + isFetching: false, + }); + }); + + it('builds the resume submission without seeding latestMessage before streaming is active', async () => { + const getMessages = jest.fn(() => [userMessage, responseMessage]); + + renderHook(() => useResumeOnLoad(CONVERSATION_ID, getMessages, 2, true)); + + await waitFor(() => { + expect(mockSetSubmission).toHaveBeenCalledTimes(1); + }); + + const submission = mockSetSubmission.mock.calls[0][0]; + + expect(submission.initialResponse).toEqual( + expect.objectContaining({ + messageId: responseMessage.messageId, + parentMessageId: responseMessage.parentMessageId, + conversationId: CONVERSATION_ID, + content: [{ type: 'text', text: 'fresh resumed content' }], + isCreatedByUser: false, + }), + ); + expect(mockSetLatestMessage).not.toHaveBeenCalled(); + }); +}); diff --git a/client/src/hooks/SSE/useResumableSSE.ts b/client/src/hooks/SSE/useResumableSSE.ts index 3729f4dbcc..6e386d13e4 100644 --- a/client/src/hooks/SSE/useResumableSSE.ts +++ b/client/src/hooks/SSE/useResumableSSE.ts @@ -25,7 +25,7 @@ import { import type { ActiveJobsResponse } from '~/data-provider'; import { useAuthContext } from '~/hooks/AuthContext'; import useEventHandlers from './useEventHandlers'; -import { clearAllDrafts } from '~/utils'; +import { clearAllDrafts, logger } from '~/utils'; import store from '~/store'; type ChatHelpers = Pick< @@ -275,6 +275,8 @@ export default function useResumableSSE( oldContentLength: Array.isArray(oldContent) ? oldContent.length : 0, newContentLength: data.resumeState.aggregatedContent?.length, }); + logger.log('latest_message', 'useResumableSSE.sync: setting latest message'); + setLatestMessage(updated[responseIdx]); setMessages(updated); resetContentHandler(); syncStepMessage(updated[responseIdx]); @@ -289,6 +291,8 @@ export default function useResumableSSE( content: data.resumeState.aggregatedContent, isCreatedByUser: false, } as TMessage; + logger.log('latest_message', 'useResumableSSE.sync: setting latest message'); + setLatestMessage(newMessage); setMessages([...messages, newMessage]); resetContentHandler(); syncStepMessage(newMessage); @@ -550,6 +554,7 @@ export default function useResumableSSE( setIsSubmitting, getMessages, setMessages, + setLatestMessage, startupConfig?.balance?.enabled, balanceQuery, removeActiveJob, @@ -659,6 +664,10 @@ export default function useResumableSSE( if (resumeStreamId) { // Resume: just subscribe to existing stream, don't start new generation console.log('[ResumableSSE] Resuming existing stream:', resumeStreamId); + if (submission.initialResponse) { + logger.log('latest_message', 'useResumableSSE.resume: seeding latest message'); + setLatestMessage(submission.initialResponse); + } setStreamId(resumeStreamId); // Optimistically add to active jobs (in case it's not already there) addActiveJob(resumeStreamId);