diff --git a/api/server/experimental.js b/api/server/experimental.js index ce3ac2762a..d8b4126031 100644 --- a/api/server/experimental.js +++ b/api/server/experimental.js @@ -140,8 +140,27 @@ if (cluster.isMaster) { logger.info(`Spawning ${workers} workers to simulate multi-pod environment`); let activeWorkers = 0; + let retentionSweepWorkerId = null; const startTime = Date.now(); + const assignRetentionSweepWorker = () => { + if (retentionSweepWorkerId && cluster.workers[retentionSweepWorkerId]) { + return; + } + + const availableWorkers = Object.values(cluster.workers).filter(Boolean); + const retentionSweepWorker = availableWorkers[availableWorkers.length - 1]; + if (!retentionSweepWorker) { + return; + } + + retentionSweepWorkerId = retentionSweepWorker.id; + logger.info( + wrapLogMessage(`Worker ${retentionSweepWorker.process.pid} assigned to file-retention sweep`), + ); + retentionSweepWorker.send({ type: 'file-retention-sweep-worker' }); + }; + /** Flush Redis cache before starting workers */ flushRedisCache() .then(() => { @@ -163,19 +182,23 @@ if (cluster.isMaster) { `Worker ${worker.process.pid} is online (${activeWorkers}/${workers}) after ${uptime}s`, ); - /** Notify the last worker to perform one-time initialization tasks */ + /** Assign one worker for process-wide background jobs */ if (activeWorkers === workers) { - const allWorkers = Object.values(cluster.workers); - const lastWorker = allWorkers[allWorkers.length - 1]; - if (lastWorker) { - logger.info(wrapLogMessage(`All ${workers} workers are online`)); - lastWorker.send({ type: 'last-worker' }); - } + logger.info(wrapLogMessage(`All ${workers} workers are online`)); + } + }); + + cluster.on('listening', () => { + if (activeWorkers === workers) { + assignRetentionSweepWorker(); } }); cluster.on('exit', (worker, code, signal) => { activeWorkers--; + if (worker.id === retentionSweepWorkerId) { + retentionSweepWorkerId = null; + } logger.error( `Worker ${worker.process.pid} died (${activeWorkers}/${workers}). Code: ${code}, Signal: ${signal}`, ); @@ -203,6 +226,27 @@ if (cluster.isMaster) { * Each worker runs a full Express server instance */ const app = express(); + let shouldStartExpiredFileSweep = false; + let expiredFileSweepOptions = null; + let expiredFileSweepStarted = false; + + const startExpiredFileSweepOnce = () => { + if (!shouldStartExpiredFileSweep || expiredFileSweepStarted || !expiredFileSweepOptions) { + return; + } + + expiredFileSweepStarted = true; + startExpiredFileSweep(expiredFileSweepOptions); + }; + + /** Handle inter-process messages from master */ + process.on('message', (msg) => { + if (msg.type === 'file-retention-sweep-worker') { + shouldStartExpiredFileSweep = true; + logger.info(wrapLogMessage(`Worker ${process.pid} is assigned file-retention sweep`)); + startExpiredFileSweepOnce(); + } + }); const startServer = async () => { logger.info(`Worker ${process.pid} initializing...`); @@ -234,7 +278,8 @@ if (cluster.isMaster) { /** Initialize app configuration */ const appConfig = await getAppConfig(); initializeFileStorage(appConfig); - startExpiredFileSweep({ appConfig, loadAppConfig: getAppConfig }); + expiredFileSweepOptions = { appConfig, loadAppConfig: getAppConfig }; + startExpiredFileSweepOnce(); await performStartupChecks(appConfig); await updateInterfacePerms({ appConfig, getRoleByName, updateAccessPermissions }); @@ -392,19 +437,6 @@ if (cluster.isMaster) { process.exit(1); } }); - - /** Handle inter-process messages from master */ - process.on('message', async (msg) => { - if (msg.type === 'last-worker') { - logger.info( - wrapLogMessage( - `Worker ${process.pid} is the last worker and can perform special initialization tasks`, - ), - ); - /** Add any one-time initialization tasks here */ - /** For example: scheduled jobs, cleanup tasks, etc. */ - } - }); }; startServer().catch((err) => { diff --git a/api/server/services/Files/process.js b/api/server/services/Files/process.js index d5c0a24d66..75da4f27c9 100644 --- a/api/server/services/Files/process.js +++ b/api/server/services/Files/process.js @@ -296,16 +296,38 @@ function hasExpiredFileEndpointConfig(appConfig, source) { return Boolean(appConfig?.endpoints?.[EModelEndpoint.assistants]); } -function getExpiredFileAssistantVersion(endpoint) { - return String( - defaultAssistantsVersion[endpoint] ?? defaultAssistantsVersion.assistants ?? 2, - ).replace(/^v/, ''); +function getConfiguredExpiredFileAssistantVersion({ appConfig, source, endpoint }) { + const endpointVersion = appConfig?.endpoints?.[endpoint]?.version; + if (endpointVersion != null) { + return endpointVersion; + } + + if (source === FileSources.azure) { + const azureAssistantsConfig = appConfig?.endpoints?.[EModelEndpoint.azureOpenAI]?.assistants; + if (typeof azureAssistantsConfig === 'object' && azureAssistantsConfig?.version != null) { + return azureAssistantsConfig.version; + } + } + + return undefined; +} + +function getExpiredFileAssistantVersion({ appConfig, source, endpoint }) { + const configuredVersion = getConfiguredExpiredFileAssistantVersion({ + appConfig, + source, + endpoint, + }); + const fallbackVersion = + defaultAssistantsVersion[endpoint] ?? defaultAssistantsVersion.assistants ?? 2; + + return String(configuredVersion ?? fallbackVersion).replace(/^v/, ''); } function createExpiredFileSweepRequest({ appConfig, file, userId }) { const source = file.source ?? FileSources.local; const endpoint = getExpiredFileEndpoint(source); - const version = getExpiredFileAssistantVersion(endpoint); + const version = getExpiredFileAssistantVersion({ appConfig, source, endpoint }); const baseUrl = `/api/assistants/v${version}`; return { diff --git a/api/server/services/Files/process.spec.js b/api/server/services/Files/process.spec.js index 8f3a4498ea..c81a6dff17 100644 --- a/api/server/services/Files/process.spec.js +++ b/api/server/services/Files/process.spec.js @@ -679,12 +679,20 @@ describe('sweepExpiredFiles', () => { }); test.each([ - [FileSources.openai, EModelEndpoint.assistants, { [EModelEndpoint.assistants]: {} }, '2'], + [ + FileSources.openai, + EModelEndpoint.assistants, + { [EModelEndpoint.assistants]: { version: 'v3' } }, + '3', + ], [ FileSources.azure, EModelEndpoint.azureAssistants, - { [EModelEndpoint.azureOpenAI]: { assistants: true } }, - '1', + { + [EModelEndpoint.azureOpenAI]: { assistants: true }, + [EModelEndpoint.azureAssistants]: { version: 4 }, + }, + '4', ], ])( 'passes assistant request context when deleting %s expired files', diff --git a/packages/data-schemas/src/app/assistants.ts b/packages/data-schemas/src/app/assistants.ts index c41a8d603e..ddc56cf495 100644 --- a/packages/data-schemas/src/app/assistants.ts +++ b/packages/data-schemas/src/app/assistants.ts @@ -52,6 +52,10 @@ export function assistantsConfigSetup( return { ...prevConfig, + version: + assistantsConfig?.version != null + ? parsedConfig.version + : (prevConfig.version ?? parsedConfig.version), retrievalModels: parsedConfig.retrievalModels, disableBuilder: parsedConfig.disableBuilder, pollIntervalMs: parsedConfig.pollIntervalMs, diff --git a/packages/data-schemas/src/app/service.spec.ts b/packages/data-schemas/src/app/service.spec.ts index 80298b3e18..2a2a8ffd1d 100644 --- a/packages/data-schemas/src/app/service.spec.ts +++ b/packages/data-schemas/src/app/service.spec.ts @@ -1,5 +1,6 @@ import type { DeepPartial, TCustomConfig } from 'librechat-data-provider'; -import { loadSummarizationConfig } from './service'; +import { EModelEndpoint, defaultAssistantsVersion } from 'librechat-data-provider'; +import { AppService, loadSummarizationConfig } from './service'; import logger from '~/config/winston'; jest.mock('~/config/winston', () => ({ @@ -78,3 +79,70 @@ describe('loadSummarizationConfig', () => { expect(String(warnSpy.mock.calls[0][0])).toContain('Invalid summarization config'); }); }); + +describe('AppService assistants config', () => { + it('preserves configured Assistants API versions', async () => { + const config = { + endpoints: { + [EModelEndpoint.assistants]: { + version: 'v3', + }, + [EModelEndpoint.azureOpenAI]: { + assistants: true, + groups: [ + { + group: 'azure-assistants-test', + apiKey: 'test-key', + instanceName: 'azure-assistants-test', + assistants: true, + version: '2024-02-15-preview', + models: { + 'gpt-4': { + deploymentName: 'gpt-4', + }, + }, + }, + ], + }, + [EModelEndpoint.azureAssistants]: { + version: 4, + }, + }, + } as DeepPartial; + + const result = await AppService({ config }); + + expect(result.endpoints?.[EModelEndpoint.assistants]?.version).toBe('v3'); + expect(result.endpoints?.[EModelEndpoint.azureAssistants]?.version).toBe(4); + }); + + it('keeps Azure Assistants default version when only Azure OpenAI enables assistants', async () => { + const config = { + endpoints: { + [EModelEndpoint.azureOpenAI]: { + assistants: true, + groups: [ + { + group: 'azure-assistants-test', + apiKey: 'test-key', + instanceName: 'azure-assistants-test', + assistants: true, + version: '2024-02-15-preview', + models: { + 'gpt-4': { + deploymentName: 'gpt-4', + }, + }, + }, + ], + }, + }, + } as DeepPartial; + + const result = await AppService({ config }); + + expect(result.endpoints?.[EModelEndpoint.azureAssistants]?.version).toBe( + defaultAssistantsVersion.azureAssistants, + ); + }); +});