mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-05-13 16:07:30 +00:00
fix: honor assistant versions in retention sweeps
This commit is contained in:
parent
8e12588b95
commit
ebdc4ea644
5 changed files with 164 additions and 30 deletions
|
|
@ -140,8 +140,27 @@ if (cluster.isMaster) {
|
|||
logger.info(`Spawning ${workers} workers to simulate multi-pod environment`);
|
||||
|
||||
let activeWorkers = 0;
|
||||
let retentionSweepWorkerId = null;
|
||||
const startTime = Date.now();
|
||||
|
||||
const assignRetentionSweepWorker = () => {
|
||||
if (retentionSweepWorkerId && cluster.workers[retentionSweepWorkerId]) {
|
||||
return;
|
||||
}
|
||||
|
||||
const availableWorkers = Object.values(cluster.workers).filter(Boolean);
|
||||
const retentionSweepWorker = availableWorkers[availableWorkers.length - 1];
|
||||
if (!retentionSweepWorker) {
|
||||
return;
|
||||
}
|
||||
|
||||
retentionSweepWorkerId = retentionSweepWorker.id;
|
||||
logger.info(
|
||||
wrapLogMessage(`Worker ${retentionSweepWorker.process.pid} assigned to file-retention sweep`),
|
||||
);
|
||||
retentionSweepWorker.send({ type: 'file-retention-sweep-worker' });
|
||||
};
|
||||
|
||||
/** Flush Redis cache before starting workers */
|
||||
flushRedisCache()
|
||||
.then(() => {
|
||||
|
|
@ -163,19 +182,23 @@ if (cluster.isMaster) {
|
|||
`Worker ${worker.process.pid} is online (${activeWorkers}/${workers}) after ${uptime}s`,
|
||||
);
|
||||
|
||||
/** Notify the last worker to perform one-time initialization tasks */
|
||||
/** Assign one worker for process-wide background jobs */
|
||||
if (activeWorkers === workers) {
|
||||
const allWorkers = Object.values(cluster.workers);
|
||||
const lastWorker = allWorkers[allWorkers.length - 1];
|
||||
if (lastWorker) {
|
||||
logger.info(wrapLogMessage(`All ${workers} workers are online`));
|
||||
lastWorker.send({ type: 'last-worker' });
|
||||
}
|
||||
logger.info(wrapLogMessage(`All ${workers} workers are online`));
|
||||
}
|
||||
});
|
||||
|
||||
cluster.on('listening', () => {
|
||||
if (activeWorkers === workers) {
|
||||
assignRetentionSweepWorker();
|
||||
}
|
||||
});
|
||||
|
||||
cluster.on('exit', (worker, code, signal) => {
|
||||
activeWorkers--;
|
||||
if (worker.id === retentionSweepWorkerId) {
|
||||
retentionSweepWorkerId = null;
|
||||
}
|
||||
logger.error(
|
||||
`Worker ${worker.process.pid} died (${activeWorkers}/${workers}). Code: ${code}, Signal: ${signal}`,
|
||||
);
|
||||
|
|
@ -203,6 +226,27 @@ if (cluster.isMaster) {
|
|||
* Each worker runs a full Express server instance
|
||||
*/
|
||||
const app = express();
|
||||
let shouldStartExpiredFileSweep = false;
|
||||
let expiredFileSweepOptions = null;
|
||||
let expiredFileSweepStarted = false;
|
||||
|
||||
const startExpiredFileSweepOnce = () => {
|
||||
if (!shouldStartExpiredFileSweep || expiredFileSweepStarted || !expiredFileSweepOptions) {
|
||||
return;
|
||||
}
|
||||
|
||||
expiredFileSweepStarted = true;
|
||||
startExpiredFileSweep(expiredFileSweepOptions);
|
||||
};
|
||||
|
||||
/** Handle inter-process messages from master */
|
||||
process.on('message', (msg) => {
|
||||
if (msg.type === 'file-retention-sweep-worker') {
|
||||
shouldStartExpiredFileSweep = true;
|
||||
logger.info(wrapLogMessage(`Worker ${process.pid} is assigned file-retention sweep`));
|
||||
startExpiredFileSweepOnce();
|
||||
}
|
||||
});
|
||||
|
||||
const startServer = async () => {
|
||||
logger.info(`Worker ${process.pid} initializing...`);
|
||||
|
|
@ -234,7 +278,8 @@ if (cluster.isMaster) {
|
|||
/** Initialize app configuration */
|
||||
const appConfig = await getAppConfig();
|
||||
initializeFileStorage(appConfig);
|
||||
startExpiredFileSweep({ appConfig, loadAppConfig: getAppConfig });
|
||||
expiredFileSweepOptions = { appConfig, loadAppConfig: getAppConfig };
|
||||
startExpiredFileSweepOnce();
|
||||
await performStartupChecks(appConfig);
|
||||
await updateInterfacePerms({ appConfig, getRoleByName, updateAccessPermissions });
|
||||
|
||||
|
|
@ -392,19 +437,6 @@ if (cluster.isMaster) {
|
|||
process.exit(1);
|
||||
}
|
||||
});
|
||||
|
||||
/** Handle inter-process messages from master */
|
||||
process.on('message', async (msg) => {
|
||||
if (msg.type === 'last-worker') {
|
||||
logger.info(
|
||||
wrapLogMessage(
|
||||
`Worker ${process.pid} is the last worker and can perform special initialization tasks`,
|
||||
),
|
||||
);
|
||||
/** Add any one-time initialization tasks here */
|
||||
/** For example: scheduled jobs, cleanup tasks, etc. */
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
startServer().catch((err) => {
|
||||
|
|
|
|||
|
|
@ -296,16 +296,38 @@ function hasExpiredFileEndpointConfig(appConfig, source) {
|
|||
return Boolean(appConfig?.endpoints?.[EModelEndpoint.assistants]);
|
||||
}
|
||||
|
||||
function getExpiredFileAssistantVersion(endpoint) {
|
||||
return String(
|
||||
defaultAssistantsVersion[endpoint] ?? defaultAssistantsVersion.assistants ?? 2,
|
||||
).replace(/^v/, '');
|
||||
function getConfiguredExpiredFileAssistantVersion({ appConfig, source, endpoint }) {
|
||||
const endpointVersion = appConfig?.endpoints?.[endpoint]?.version;
|
||||
if (endpointVersion != null) {
|
||||
return endpointVersion;
|
||||
}
|
||||
|
||||
if (source === FileSources.azure) {
|
||||
const azureAssistantsConfig = appConfig?.endpoints?.[EModelEndpoint.azureOpenAI]?.assistants;
|
||||
if (typeof azureAssistantsConfig === 'object' && azureAssistantsConfig?.version != null) {
|
||||
return azureAssistantsConfig.version;
|
||||
}
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function getExpiredFileAssistantVersion({ appConfig, source, endpoint }) {
|
||||
const configuredVersion = getConfiguredExpiredFileAssistantVersion({
|
||||
appConfig,
|
||||
source,
|
||||
endpoint,
|
||||
});
|
||||
const fallbackVersion =
|
||||
defaultAssistantsVersion[endpoint] ?? defaultAssistantsVersion.assistants ?? 2;
|
||||
|
||||
return String(configuredVersion ?? fallbackVersion).replace(/^v/, '');
|
||||
}
|
||||
|
||||
function createExpiredFileSweepRequest({ appConfig, file, userId }) {
|
||||
const source = file.source ?? FileSources.local;
|
||||
const endpoint = getExpiredFileEndpoint(source);
|
||||
const version = getExpiredFileAssistantVersion(endpoint);
|
||||
const version = getExpiredFileAssistantVersion({ appConfig, source, endpoint });
|
||||
const baseUrl = `/api/assistants/v${version}`;
|
||||
|
||||
return {
|
||||
|
|
|
|||
|
|
@ -679,12 +679,20 @@ describe('sweepExpiredFiles', () => {
|
|||
});
|
||||
|
||||
test.each([
|
||||
[FileSources.openai, EModelEndpoint.assistants, { [EModelEndpoint.assistants]: {} }, '2'],
|
||||
[
|
||||
FileSources.openai,
|
||||
EModelEndpoint.assistants,
|
||||
{ [EModelEndpoint.assistants]: { version: 'v3' } },
|
||||
'3',
|
||||
],
|
||||
[
|
||||
FileSources.azure,
|
||||
EModelEndpoint.azureAssistants,
|
||||
{ [EModelEndpoint.azureOpenAI]: { assistants: true } },
|
||||
'1',
|
||||
{
|
||||
[EModelEndpoint.azureOpenAI]: { assistants: true },
|
||||
[EModelEndpoint.azureAssistants]: { version: 4 },
|
||||
},
|
||||
'4',
|
||||
],
|
||||
])(
|
||||
'passes assistant request context when deleting %s expired files',
|
||||
|
|
|
|||
|
|
@ -52,6 +52,10 @@ export function assistantsConfigSetup(
|
|||
|
||||
return {
|
||||
...prevConfig,
|
||||
version:
|
||||
assistantsConfig?.version != null
|
||||
? parsedConfig.version
|
||||
: (prevConfig.version ?? parsedConfig.version),
|
||||
retrievalModels: parsedConfig.retrievalModels,
|
||||
disableBuilder: parsedConfig.disableBuilder,
|
||||
pollIntervalMs: parsedConfig.pollIntervalMs,
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
import type { DeepPartial, TCustomConfig } from 'librechat-data-provider';
|
||||
import { loadSummarizationConfig } from './service';
|
||||
import { EModelEndpoint, defaultAssistantsVersion } from 'librechat-data-provider';
|
||||
import { AppService, loadSummarizationConfig } from './service';
|
||||
import logger from '~/config/winston';
|
||||
|
||||
jest.mock('~/config/winston', () => ({
|
||||
|
|
@ -78,3 +79,70 @@ describe('loadSummarizationConfig', () => {
|
|||
expect(String(warnSpy.mock.calls[0][0])).toContain('Invalid summarization config');
|
||||
});
|
||||
});
|
||||
|
||||
describe('AppService assistants config', () => {
|
||||
it('preserves configured Assistants API versions', async () => {
|
||||
const config = {
|
||||
endpoints: {
|
||||
[EModelEndpoint.assistants]: {
|
||||
version: 'v3',
|
||||
},
|
||||
[EModelEndpoint.azureOpenAI]: {
|
||||
assistants: true,
|
||||
groups: [
|
||||
{
|
||||
group: 'azure-assistants-test',
|
||||
apiKey: 'test-key',
|
||||
instanceName: 'azure-assistants-test',
|
||||
assistants: true,
|
||||
version: '2024-02-15-preview',
|
||||
models: {
|
||||
'gpt-4': {
|
||||
deploymentName: 'gpt-4',
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
[EModelEndpoint.azureAssistants]: {
|
||||
version: 4,
|
||||
},
|
||||
},
|
||||
} as DeepPartial<TCustomConfig>;
|
||||
|
||||
const result = await AppService({ config });
|
||||
|
||||
expect(result.endpoints?.[EModelEndpoint.assistants]?.version).toBe('v3');
|
||||
expect(result.endpoints?.[EModelEndpoint.azureAssistants]?.version).toBe(4);
|
||||
});
|
||||
|
||||
it('keeps Azure Assistants default version when only Azure OpenAI enables assistants', async () => {
|
||||
const config = {
|
||||
endpoints: {
|
||||
[EModelEndpoint.azureOpenAI]: {
|
||||
assistants: true,
|
||||
groups: [
|
||||
{
|
||||
group: 'azure-assistants-test',
|
||||
apiKey: 'test-key',
|
||||
instanceName: 'azure-assistants-test',
|
||||
assistants: true,
|
||||
version: '2024-02-15-preview',
|
||||
models: {
|
||||
'gpt-4': {
|
||||
deploymentName: 'gpt-4',
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
} as DeepPartial<TCustomConfig>;
|
||||
|
||||
const result = await AppService({ config });
|
||||
|
||||
expect(result.endpoints?.[EModelEndpoint.azureAssistants]?.version).toBe(
|
||||
defaultAssistantsVersion.azureAssistants,
|
||||
);
|
||||
});
|
||||
});
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue