fix: align resumed latest message

This commit is contained in:
Danny Avila 2026-05-24 14:49:38 -04:00
parent 9c4049b1c9
commit e4149ae6ba
3 changed files with 266 additions and 3 deletions

View file

@ -1,6 +1,7 @@
import { renderHook, act } from '@testing-library/react';
import { Constants, LocalStorageKeys } from 'librechat-data-provider';
import type { TSubmission } from 'librechat-data-provider';
import type { TMessage, TSubmission } from 'librechat-data-provider';
import { logger } from '~/utils';
type SSEEventListener = (e: Partial<MessageEvent> & { responseCode?: number }) => void;
@ -76,6 +77,15 @@ const mockErrorHandler = jest.fn();
const mockSetIsSubmitting = jest.fn();
const mockClearStepMaps = jest.fn();
jest.mock('~/utils', () => ({
...jest.requireActual('~/utils'),
logger: {
log: jest.fn(),
},
}));
const mockLoggerLog = logger.log as jest.Mock;
jest.mock('~/hooks/SSE/useEventHandlers', () =>
jest.fn(() => ({
errorHandler: mockErrorHandler,
@ -152,7 +162,7 @@ const buildSubmission = (overrides: Partial<PartialSubmission> = {}): TSubmissio
const buildChatHelpers = () => ({
setMessages: jest.fn(),
getMessages: jest.fn(() => []),
getMessages: jest.fn((): TMessage[] => []),
setConversation: jest.fn(),
setIsSubmitting: mockSetIsSubmitting,
newConversation: jest.fn(),
@ -165,6 +175,150 @@ const getLastSSE = (): MockSSEInstance => {
return sse;
};
describe('useResumableSSE - resume sync latest message alignment', () => {
beforeEach(() => {
mockSSEInstances.length = 0;
jest.clearAllMocks();
});
const renderResumeScenario = async (chatHelpers: ReturnType<typeof buildChatHelpers>) => {
const submission = {
...buildSubmission(),
resumeStreamId: 'stream-resume-123',
} as TSubmission & { resumeStreamId: string };
const { unmount } = renderHook(() => useResumableSSE(submission, chatHelpers));
await act(async () => {
await Promise.resolve();
});
return { sse: getLastSSE(), submission, unmount };
};
const emitResumeSync = async (
sse: MockSSEInstance,
resumeState: {
aggregatedContent: Array<{ type: string; text: string }>;
responseMessageId?: string;
},
) => {
await act(async () => {
sse._emit('message', {
data: JSON.stringify({
sync: true,
resumeState,
pendingEvents: [],
}),
});
});
};
it('sets latestMessage before replacing an existing resumed response', async () => {
const userMessage = {
messageId: 'msg-1',
conversationId: CONV_ID,
text: 'Hello',
isCreatedByUser: true,
} as TMessage;
const responseMessage = {
messageId: 'resp-server',
parentMessageId: 'msg-1',
conversationId: CONV_ID,
text: '',
content: [{ type: 'text', text: 'old content' }],
isCreatedByUser: false,
} as TMessage;
const chatHelpers = buildChatHelpers();
chatHelpers.getMessages.mockReturnValue([userMessage, responseMessage]);
const { sse, unmount } = await renderResumeScenario(chatHelpers);
chatHelpers.setLatestMessage.mockClear();
mockLoggerLog.mockClear();
await emitResumeSync(sse, {
responseMessageId: 'resp-server',
aggregatedContent: [{ type: 'text', text: 'resumed content' }],
});
const updatedMessages = chatHelpers.setMessages.mock.calls[0][0] as TMessage[];
const updatedResponse = updatedMessages[1];
expect(updatedResponse).toEqual({
...responseMessage,
content: [{ type: 'text', text: 'resumed content' }],
});
expect(chatHelpers.setLatestMessage).toHaveBeenCalledWith(updatedResponse);
expect(chatHelpers.setLatestMessage.mock.invocationCallOrder[0]).toBeLessThan(
chatHelpers.setMessages.mock.invocationCallOrder[0],
);
expect(mockLoggerLog).toHaveBeenCalledWith(
'latest_message',
'useResumableSSE.sync: setting latest message',
);
unmount();
});
it('sets latestMessage before appending a missing resumed response', async () => {
const userMessage = {
messageId: 'msg-1',
conversationId: CONV_ID,
text: 'Hello',
isCreatedByUser: true,
} as TMessage;
const chatHelpers = buildChatHelpers();
chatHelpers.getMessages.mockReturnValue([userMessage]);
const { sse, unmount } = await renderResumeScenario(chatHelpers);
chatHelpers.setLatestMessage.mockClear();
mockLoggerLog.mockClear();
await emitResumeSync(sse, {
responseMessageId: 'resp-server',
aggregatedContent: [{ type: 'text', text: 'resumed content' }],
});
const updatedMessages = chatHelpers.setMessages.mock.calls[0][0] as TMessage[];
const newResponse = updatedMessages[1];
expect(updatedMessages).toEqual([userMessage, newResponse]);
expect(newResponse).toEqual({
messageId: 'resp-server',
parentMessageId: 'msg-1',
conversationId: CONV_ID,
text: '',
content: [{ type: 'text', text: 'resumed content' }],
isCreatedByUser: false,
});
expect(chatHelpers.setLatestMessage).toHaveBeenCalledWith(newResponse);
expect(chatHelpers.setLatestMessage.mock.invocationCallOrder[0]).toBeLessThan(
chatHelpers.setMessages.mock.invocationCallOrder[0],
);
expect(mockLoggerLog).toHaveBeenCalledWith(
'latest_message',
'useResumableSSE.sync: setting latest message',
);
unmount();
});
it('seeds latestMessage from the resume submission after marking the stream submitting', async () => {
const chatHelpers = buildChatHelpers();
const { submission, unmount } = await renderResumeScenario(chatHelpers);
expect(mockSetIsSubmitting).toHaveBeenCalledWith(true);
expect(chatHelpers.setLatestMessage).toHaveBeenCalledWith(submission.initialResponse);
expect(mockSetIsSubmitting.mock.invocationCallOrder[0]).toBeLessThan(
chatHelpers.setLatestMessage.mock.invocationCallOrder[0],
);
expect(mockLoggerLog).toHaveBeenCalledWith(
'latest_message',
'useResumableSSE.resume: seeding latest message',
);
unmount();
});
});
describe('useResumableSSE - 404 error path', () => {
beforeEach(() => {
mockSSEInstances.length = 0;

View file

@ -0,0 +1,100 @@
import { renderHook, waitFor } from '@testing-library/react';
import type { TConversation, TMessage } from 'librechat-data-provider';
import useResumeOnLoad from '~/hooks/SSE/useResumeOnLoad';
const mockSetSubmission = jest.fn();
const mockSetLatestMessage = jest.fn();
const mockUseStreamStatus = jest.fn();
jest.mock('recoil', () => ({
...jest.requireActual('recoil'),
useSetRecoilState: jest.fn((atom: string) => {
if (atom === 'latestMessageFamily-2') {
return mockSetLatestMessage;
}
return mockSetSubmission;
}),
useRecoilValue: jest.fn((atom: string) => {
if (atom === 'conversationByIndex-2') {
return { endpoint: 'openAI' } as TConversation;
}
return null;
}),
}));
jest.mock('~/store', () => ({
__esModule: true,
default: {
submissionByIndex: jest.fn((index: number) => `submissionByIndex-${index}`),
conversationByIndex: jest.fn((index: number) => `conversationByIndex-${index}`),
latestMessageFamily: jest.fn((index: number) => `latestMessageFamily-${index}`),
},
}));
jest.mock('~/data-provider', () => ({
useStreamStatus: (...args: unknown[]) => mockUseStreamStatus(...args),
}));
const CONVERSATION_ID = 'conversation-resume';
const userMessage = {
messageId: 'user-message',
parentMessageId: '00000000-0000-0000-0000-000000000000',
conversationId: CONVERSATION_ID,
text: 'continue this',
isCreatedByUser: true,
} as TMessage;
const responseMessage = {
messageId: 'assistant-message',
parentMessageId: 'user-message',
conversationId: CONVERSATION_ID,
text: '',
content: [{ type: 'text', text: 'stale content' }],
isCreatedByUser: false,
} as TMessage;
describe('useResumeOnLoad latest-message seed', () => {
beforeEach(() => {
jest.clearAllMocks();
mockUseStreamStatus.mockReturnValue({
data: {
active: true,
streamId: 'stream-resume',
status: 'active',
resumeState: {
userMessage,
responseMessageId: responseMessage.messageId,
aggregatedContent: [{ type: 'text', text: 'fresh resumed content' }],
},
},
isSuccess: true,
isFetching: false,
});
});
it('builds the resume submission without seeding latestMessage before streaming is active', async () => {
const getMessages = jest.fn(() => [userMessage, responseMessage]);
renderHook(() => useResumeOnLoad(CONVERSATION_ID, getMessages, 2, true));
await waitFor(() => {
expect(mockSetSubmission).toHaveBeenCalledTimes(1);
});
const submission = mockSetSubmission.mock.calls[0][0];
expect(submission.initialResponse).toEqual(
expect.objectContaining({
messageId: responseMessage.messageId,
parentMessageId: responseMessage.parentMessageId,
conversationId: CONVERSATION_ID,
content: [{ type: 'text', text: 'fresh resumed content' }],
isCreatedByUser: false,
}),
);
expect(mockSetLatestMessage).not.toHaveBeenCalled();
});
});

View file

@ -25,7 +25,7 @@ import {
import type { ActiveJobsResponse } from '~/data-provider';
import { useAuthContext } from '~/hooks/AuthContext';
import useEventHandlers from './useEventHandlers';
import { clearAllDrafts } from '~/utils';
import { clearAllDrafts, logger } from '~/utils';
import store from '~/store';
type ChatHelpers = Pick<
@ -275,6 +275,8 @@ export default function useResumableSSE(
oldContentLength: Array.isArray(oldContent) ? oldContent.length : 0,
newContentLength: data.resumeState.aggregatedContent?.length,
});
logger.log('latest_message', 'useResumableSSE.sync: setting latest message');
setLatestMessage(updated[responseIdx]);
setMessages(updated);
resetContentHandler();
syncStepMessage(updated[responseIdx]);
@ -289,6 +291,8 @@ export default function useResumableSSE(
content: data.resumeState.aggregatedContent,
isCreatedByUser: false,
} as TMessage;
logger.log('latest_message', 'useResumableSSE.sync: setting latest message');
setLatestMessage(newMessage);
setMessages([...messages, newMessage]);
resetContentHandler();
syncStepMessage(newMessage);
@ -550,6 +554,7 @@ export default function useResumableSSE(
setIsSubmitting,
getMessages,
setMessages,
setLatestMessage,
startupConfig?.balance?.enabled,
balanceQuery,
removeActiveJob,
@ -659,6 +664,10 @@ export default function useResumableSSE(
if (resumeStreamId) {
// Resume: just subscribe to existing stream, don't start new generation
console.log('[ResumableSSE] Resuming existing stream:', resumeStreamId);
if (submission.initialResponse) {
logger.log('latest_message', 'useResumableSSE.resume: seeding latest message');
setLatestMessage(submission.initialResponse);
}
setStreamId(resumeStreamId);
// Optimistically add to active jobs (in case it's not already there)
addActiveJob(resumeStreamId);