mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-05-13 16:07:30 +00:00
fix: harden data retention semantics
This commit is contained in:
parent
1fdd81449d
commit
e23c3ea53d
32 changed files with 688 additions and 69 deletions
|
|
@ -7,6 +7,28 @@ const { Tool } = require('@librechat/agents/langchain/tools');
|
|||
const { getImageBasename, extractBaseURL } = require('@librechat/api');
|
||||
const { FileContext, ContentTypes } = require('librechat-data-provider');
|
||||
|
||||
const getRetentionRequest = (req) => {
|
||||
if (!req) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
return {
|
||||
user: req.user
|
||||
? {
|
||||
id: req.user.id,
|
||||
tenantId: req.user.tenantId,
|
||||
}
|
||||
: undefined,
|
||||
body: {
|
||||
conversationId: req.body?.conversationId,
|
||||
isTemporary: req.body?.isTemporary,
|
||||
},
|
||||
config: {
|
||||
interfaceConfig: req.config?.interfaceConfig,
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
const dalle3JsonSchema = {
|
||||
type: 'object',
|
||||
properties: {
|
||||
|
|
@ -49,6 +71,7 @@ class DALLE3 extends Tool {
|
|||
|
||||
this.userId = fields.userId;
|
||||
this.tenantId = fields.req?.user?.tenantId;
|
||||
this.retentionRequest = getRetentionRequest(fields.req);
|
||||
this.fileStrategy = fields.fileStrategy;
|
||||
/** @type {boolean} */
|
||||
this.isAgent = fields.isAgent;
|
||||
|
|
@ -230,6 +253,7 @@ Error Message: ${error.message}`);
|
|||
fileStrategy: this.fileStrategy,
|
||||
context: FileContext.image_generation,
|
||||
tenantId: this.tenantId,
|
||||
req: this.retentionRequest,
|
||||
});
|
||||
|
||||
if (this.returnMetadata) {
|
||||
|
|
|
|||
|
|
@ -6,6 +6,28 @@ const { HttpsProxyAgent } = require('https-proxy-agent');
|
|||
const { Tool } = require('@librechat/agents/langchain/tools');
|
||||
const { FileContext, ContentTypes } = require('librechat-data-provider');
|
||||
|
||||
const getRetentionRequest = (req) => {
|
||||
if (!req) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
return {
|
||||
user: req.user
|
||||
? {
|
||||
id: req.user.id,
|
||||
tenantId: req.user.tenantId,
|
||||
}
|
||||
: undefined,
|
||||
body: {
|
||||
conversationId: req.body?.conversationId,
|
||||
isTemporary: req.body?.isTemporary,
|
||||
},
|
||||
config: {
|
||||
interfaceConfig: req.config?.interfaceConfig,
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
const fluxApiJsonSchema = {
|
||||
type: 'object',
|
||||
properties: {
|
||||
|
|
@ -110,6 +132,7 @@ class FluxAPI extends Tool {
|
|||
|
||||
this.userId = fields.userId;
|
||||
this.tenantId = fields.req?.user?.tenantId;
|
||||
this.retentionRequest = getRetentionRequest(fields.req);
|
||||
this.fileStrategy = fields.fileStrategy;
|
||||
|
||||
/** @type {boolean} **/
|
||||
|
|
@ -343,6 +366,7 @@ class FluxAPI extends Tool {
|
|||
basePath: 'images',
|
||||
context: FileContext.image_generation,
|
||||
tenantId: this.tenantId,
|
||||
req: this.retentionRequest,
|
||||
});
|
||||
|
||||
logger.debug('[FluxAPI] Image saved to path:', result.filepath);
|
||||
|
|
@ -574,6 +598,7 @@ class FluxAPI extends Tool {
|
|||
basePath: 'images',
|
||||
context: FileContext.image_generation,
|
||||
tenantId: this.tenantId,
|
||||
req: this.retentionRequest,
|
||||
});
|
||||
|
||||
logger.debug('[FluxAPI] Finetuned image saved to path:', result.filepath);
|
||||
|
|
|
|||
|
|
@ -100,11 +100,21 @@ describe('image tools - agent mode ToolMessage format', () => {
|
|||
});
|
||||
|
||||
it('keeps tenant context without retaining the request object', () => {
|
||||
const req = { user: { tenantId: 'tenant-a' }, socket: {} };
|
||||
const req = {
|
||||
user: { id: 'user-1', tenantId: 'tenant-a' },
|
||||
body: { conversationId: 'convo-1', isTemporary: 'true' },
|
||||
config: { interfaceConfig: { retentionMode: 'all' } },
|
||||
socket: {},
|
||||
};
|
||||
const dalle = new DALLE3({ isAgent: false, processFileURL: jest.fn(), req });
|
||||
|
||||
expect(dalle.tenantId).toBe('tenant-a');
|
||||
expect(dalle.req).toBeUndefined();
|
||||
expect(dalle.retentionRequest).toEqual({
|
||||
user: { id: 'user-1', tenantId: 'tenant-a' },
|
||||
body: { conversationId: 'convo-1', isTemporary: 'true' },
|
||||
config: { interfaceConfig: { retentionMode: 'all' } },
|
||||
});
|
||||
});
|
||||
|
||||
it('invoke() returns ToolMessage with base64 in artifact, not serialized in content', async () => {
|
||||
|
|
@ -181,11 +191,90 @@ describe('image tools - agent mode ToolMessage format', () => {
|
|||
});
|
||||
|
||||
it('keeps tenant context without retaining the request object', () => {
|
||||
const req = { user: { tenantId: 'tenant-a' }, socket: {} };
|
||||
const req = {
|
||||
user: { id: 'user-1', tenantId: 'tenant-a' },
|
||||
body: { conversationId: 'convo-1', isTemporary: 'true' },
|
||||
config: { interfaceConfig: { retentionMode: 'all' } },
|
||||
socket: {},
|
||||
};
|
||||
const flux = new FluxAPI({ isAgent: false, processFileURL: jest.fn(), req });
|
||||
|
||||
expect(flux.tenantId).toBe('tenant-a');
|
||||
expect(flux.req).toBeUndefined();
|
||||
expect(flux.retentionRequest).toEqual({
|
||||
user: { id: 'user-1', tenantId: 'tenant-a' },
|
||||
body: { conversationId: 'convo-1', isTemporary: 'true' },
|
||||
config: { interfaceConfig: { retentionMode: 'all' } },
|
||||
});
|
||||
});
|
||||
|
||||
it('passes minimal retention context when saving generated images', async () => {
|
||||
const processFileURL = jest.fn().mockResolvedValue({ filepath: '/images/generated.png' });
|
||||
const req = {
|
||||
user: { id: 'user-1', tenantId: 'tenant-a' },
|
||||
body: { conversationId: 'convo-1', isTemporary: 'true' },
|
||||
config: { interfaceConfig: { retentionMode: 'all' } },
|
||||
socket: {},
|
||||
};
|
||||
const flux = new FluxAPI({
|
||||
isAgent: false,
|
||||
processFileURL,
|
||||
req,
|
||||
userId: 'user-1',
|
||||
fileStrategy: 'local',
|
||||
});
|
||||
const invokePromise = flux.invoke(
|
||||
makeToolCall('flux', { prompt: 'a box', endpoint: '/v1/flux-dev' }),
|
||||
);
|
||||
await jest.runAllTimersAsync();
|
||||
await invokePromise;
|
||||
|
||||
expect(processFileURL).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
req: {
|
||||
user: { id: 'user-1', tenantId: 'tenant-a' },
|
||||
body: { conversationId: 'convo-1', isTemporary: 'true' },
|
||||
config: { interfaceConfig: { retentionMode: 'all' } },
|
||||
},
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('passes minimal retention context when saving finetuned generated images', async () => {
|
||||
const processFileURL = jest.fn().mockResolvedValue({ filepath: '/images/generated.png' });
|
||||
const req = {
|
||||
user: { id: 'user-1', tenantId: 'tenant-a' },
|
||||
body: { conversationId: 'convo-1', isTemporary: 'true' },
|
||||
config: { interfaceConfig: { retentionMode: 'all' } },
|
||||
socket: {},
|
||||
};
|
||||
const flux = new FluxAPI({
|
||||
isAgent: false,
|
||||
processFileURL,
|
||||
req,
|
||||
userId: 'user-1',
|
||||
fileStrategy: 'local',
|
||||
});
|
||||
const invokePromise = flux.invoke(
|
||||
makeToolCall('flux', {
|
||||
action: 'generate_finetuned',
|
||||
prompt: 'a box',
|
||||
finetune_id: 'ft-abc123',
|
||||
endpoint: '/v1/flux-pro-finetuned',
|
||||
}),
|
||||
);
|
||||
await jest.runAllTimersAsync();
|
||||
await invokePromise;
|
||||
|
||||
expect(processFileURL).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
req: {
|
||||
user: { id: 'user-1', tenantId: 'tenant-a' },
|
||||
body: { conversationId: 'convo-1', isTemporary: 'true' },
|
||||
config: { interfaceConfig: { retentionMode: 'all' } },
|
||||
},
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('invoke() returns ToolMessage with base64 in artifact, not serialized in content', async () => {
|
||||
|
|
|
|||
|
|
@ -2,6 +2,16 @@ const { logger } = require('@librechat/data-schemas');
|
|||
|
||||
const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
|
||||
|
||||
const meiliIndexableFilter = {
|
||||
$or: [
|
||||
{ isTemporary: false },
|
||||
{
|
||||
isTemporary: { $exists: false },
|
||||
$or: [{ expiredAt: null }, { expiredAt: { $exists: false } }],
|
||||
},
|
||||
],
|
||||
};
|
||||
|
||||
/**
|
||||
* Batch update documents in chunks to avoid timeouts on weak instances
|
||||
* @param {mongoose.Collection} collection - MongoDB collection
|
||||
|
|
@ -26,7 +36,7 @@ async function batchResetMeiliFlags(collection) {
|
|||
try {
|
||||
while (hasMore) {
|
||||
const docs = await collection
|
||||
.find({ expiredAt: null, _meiliIndex: { $ne: false } }, { projection: { _id: 1 } })
|
||||
.find({ ...meiliIndexableFilter, _meiliIndex: { $ne: false } }, { projection: { _id: 1 } })
|
||||
.limit(BATCH_SIZE)
|
||||
.toArray();
|
||||
|
||||
|
|
|
|||
|
|
@ -83,6 +83,33 @@ describe('batchResetMeiliFlags', () => {
|
|||
expect(expiredDoc._meiliIndex).toBe(true);
|
||||
});
|
||||
|
||||
it('should reset non-temporary documents with expiredAt set for all-data retention', async () => {
|
||||
const retentionDate = new Date();
|
||||
await testCollection.insertMany([
|
||||
{
|
||||
_id: new mongoose.Types.ObjectId(),
|
||||
isTemporary: false,
|
||||
expiredAt: retentionDate,
|
||||
_meiliIndex: true,
|
||||
},
|
||||
{
|
||||
_id: new mongoose.Types.ObjectId(),
|
||||
isTemporary: true,
|
||||
expiredAt: retentionDate,
|
||||
_meiliIndex: true,
|
||||
},
|
||||
]);
|
||||
|
||||
const result = await batchResetMeiliFlags(testCollection);
|
||||
|
||||
expect(result).toBe(1);
|
||||
|
||||
const retainedDoc = await testCollection.findOne({ isTemporary: false });
|
||||
const temporaryDoc = await testCollection.findOne({ isTemporary: true });
|
||||
expect(retainedDoc._meiliIndex).toBe(false);
|
||||
expect(temporaryDoc._meiliIndex).toBe(true);
|
||||
});
|
||||
|
||||
it('should not modify documents with _meiliIndex: false', async () => {
|
||||
await testCollection.insertMany([
|
||||
{ _id: new mongoose.Types.ObjectId(), expiredAt: null, _meiliIndex: false },
|
||||
|
|
|
|||
|
|
@ -1,16 +1,19 @@
|
|||
const { nanoid } = require('nanoid');
|
||||
const { logger } = require('@librechat/data-schemas');
|
||||
const { checkAccess, loadWebSearchAuth, createTempChatExpirationDate } = require('@librechat/api');
|
||||
const { checkAccess, loadWebSearchAuth } = require('@librechat/api');
|
||||
const {
|
||||
Tools,
|
||||
AuthType,
|
||||
Permissions,
|
||||
ToolCallTypes,
|
||||
RetentionMode,
|
||||
PermissionTypes,
|
||||
} = require('librechat-data-provider');
|
||||
const { getRoleByName, createToolCall, getToolCallsByConvo, getMessage } = require('~/models');
|
||||
const { processFileURL, uploadImageBuffer } = require('~/server/services/Files/process');
|
||||
const {
|
||||
getRetentionExpiry,
|
||||
processFileURL,
|
||||
uploadImageBuffer,
|
||||
} = require('~/server/services/Files/process');
|
||||
const { processCodeOutput, runPreviewFinalize } = require('~/server/services/Files/Code/process');
|
||||
const { loadAuthValues } = require('~/server/services/Tools/credentials');
|
||||
const { loadTools } = require('~/app/clients/tools/util');
|
||||
|
|
@ -170,14 +173,7 @@ const callTool = async (req, res) => {
|
|||
user: req.user.id,
|
||||
};
|
||||
|
||||
if (req?.body?.isTemporary || appConfig?.interfaceConfig?.retentionMode === RetentionMode.ALL) {
|
||||
try {
|
||||
toolCallData.expiredAt = createTempChatExpirationDate(appConfig?.interfaceConfig);
|
||||
} catch (err) {
|
||||
logger.error('Error creating tool call expiration date:', err);
|
||||
toolCallData.expiredAt = null;
|
||||
}
|
||||
}
|
||||
Object.assign(toolCallData, await getRetentionExpiry(req));
|
||||
|
||||
if (!artifact || !artifact.files || toolId !== Tools.execute_code) {
|
||||
createToolCall(toolCallData).catch((error) => {
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ const {
|
|||
const { connectDb, indexSync } = require('~/db');
|
||||
const initializeOAuthReconnectManager = require('./services/initializeOAuthReconnectManager');
|
||||
const createValidateImageRequest = require('./middleware/validateImageRequest');
|
||||
const { startExpiredFileSweep } = require('./services/Files/process');
|
||||
const { jwtLogin, ldapLogin, passportLogin } = require('~/strategies');
|
||||
const { updateInterfacePermissions: updateInterfacePerms } = require('@librechat/api');
|
||||
const {
|
||||
|
|
@ -233,6 +234,7 @@ if (cluster.isMaster) {
|
|||
/** Initialize app configuration */
|
||||
const appConfig = await getAppConfig();
|
||||
initializeFileStorage(appConfig);
|
||||
startExpiredFileSweep({ appConfig });
|
||||
await performStartupChecks(appConfig);
|
||||
await updateInterfacePerms({ appConfig, getRoleByName, updateAccessPermissions });
|
||||
|
||||
|
|
|
|||
|
|
@ -33,6 +33,7 @@ const {
|
|||
} = require('~/models');
|
||||
const { capabilityContextMiddleware } = require('./middleware/roles/capabilities');
|
||||
const createValidateImageRequest = require('./middleware/validateImageRequest');
|
||||
const { startExpiredFileSweep } = require('./services/Files/process');
|
||||
const { jwtLogin, ldapLogin, passportLogin } = require('~/strategies');
|
||||
const { checkMigrations } = require('./services/start/migration');
|
||||
const initializeMCPs = require('./services/initializeMCPs');
|
||||
|
|
@ -83,6 +84,7 @@ const startServer = async () => {
|
|||
});
|
||||
const appConfig = await getAppConfig({ baseOnly: true });
|
||||
initializeFileStorage(appConfig);
|
||||
startExpiredFileSweep({ appConfig });
|
||||
await runAsSystem(async () => {
|
||||
await performStartupChecks(appConfig);
|
||||
await updateInterfacePermissions({ appConfig, getRoleByName, updateAccessPermissions });
|
||||
|
|
|
|||
|
|
@ -271,6 +271,7 @@ router.post(
|
|||
filepath: req.file.path,
|
||||
requestUserId: req.user.id,
|
||||
userRole: req.user.role,
|
||||
interfaceConfig: req.config?.interfaceConfig,
|
||||
});
|
||||
res.status(201).json({ message: 'Conversation(s) imported successfully' });
|
||||
} catch (error) {
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
const mongoose = require('mongoose');
|
||||
const express = require('express');
|
||||
const { isEnabled, createTempChatExpirationDate } = require('@librechat/api');
|
||||
const { logger } = require('@librechat/data-schemas');
|
||||
const { isEnabled } = require('@librechat/api');
|
||||
const { logger, createTempChatExpirationDate } = require('@librechat/data-schemas');
|
||||
const { RetentionMode } = require('librechat-data-provider');
|
||||
const {
|
||||
getSharedMessages,
|
||||
|
|
|
|||
|
|
@ -464,7 +464,7 @@ const processCodeOutput = async ({
|
|||
source: appConfig.fileStrategy,
|
||||
context: FileContext.execute_code,
|
||||
metadata: { codeEnvRef },
|
||||
...getRetentionExpiry(req),
|
||||
...(await getRetentionExpiry(req)),
|
||||
};
|
||||
await createFile(file, true);
|
||||
return { file: Object.assign(file, { messageId, toolCallId }) };
|
||||
|
|
@ -567,7 +567,7 @@ const processCodeOutput = async ({
|
|||
context: FileContext.execute_code,
|
||||
usage: isUpdate ? (claimed.usage ?? 0) + 1 : 1,
|
||||
createdAt: isUpdate ? claimed.createdAt : formattedDate,
|
||||
...getRetentionExpiry(req),
|
||||
...(await getRetentionExpiry(req)),
|
||||
};
|
||||
|
||||
if (expectsPreview) {
|
||||
|
|
|
|||
|
|
@ -19,12 +19,11 @@ const {
|
|||
getEndpointFileConfig,
|
||||
documentParserMimeTypes,
|
||||
} = require('librechat-data-provider');
|
||||
const { logger } = require('@librechat/data-schemas');
|
||||
const { logger, runAsSystem, createTempChatExpirationDate } = require('@librechat/data-schemas');
|
||||
const {
|
||||
sanitizeFilename,
|
||||
parseText,
|
||||
processAudioFile,
|
||||
createTempChatExpirationDate,
|
||||
getStorageMetadata,
|
||||
} = require('@librechat/api');
|
||||
const {
|
||||
|
|
@ -47,18 +46,41 @@ const db = require('~/models');
|
|||
* Returns `{ expiredAt }` when the request indicates data retention applies, otherwise `{}`.
|
||||
* Spread into file data objects before calling createFile.
|
||||
* @param {ServerRequest} req
|
||||
* @returns {{ expiredAt?: Date }}
|
||||
* @returns {Promise<{ expiredAt?: Date | null }>}
|
||||
*/
|
||||
function getRetentionExpiry(req) {
|
||||
if (req?.body?.isTemporary || req?.config?.interfaceConfig?.retentionMode === RetentionMode.ALL) {
|
||||
async function getRetentionExpiry(req) {
|
||||
if (!(await shouldApplyRetention(req))) {
|
||||
return {};
|
||||
}
|
||||
|
||||
try {
|
||||
return { expiredAt: createTempChatExpirationDate(req.config?.interfaceConfig) };
|
||||
} catch (err) {
|
||||
logger.error('[getRetentionExpiry] Error creating file expiration date:', err);
|
||||
return { expiredAt: null };
|
||||
}
|
||||
}
|
||||
|
||||
const isTruthy = (value) => value === true || value === 'true';
|
||||
|
||||
async function shouldApplyRetention(req) {
|
||||
if (req?.config?.interfaceConfig?.retentionMode === RetentionMode.ALL) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const conversationId = req?.body?.conversationId;
|
||||
if (conversationId && req?.user?.id) {
|
||||
try {
|
||||
return { expiredAt: createTempChatExpirationDate(req.config?.interfaceConfig) };
|
||||
const convo = await db.getConvo(req.user.id, conversationId);
|
||||
if (convo) {
|
||||
return convo.isTemporary === true || (convo.isTemporary == null && convo.expiredAt != null);
|
||||
}
|
||||
} catch (err) {
|
||||
logger.error('[getRetentionExpiry] Error creating file expiration date:', err);
|
||||
return { expiredAt: null };
|
||||
logger.error('[shouldApplyRetention] Error checking conversation retention:', err);
|
||||
}
|
||||
}
|
||||
return {};
|
||||
|
||||
return isTruthy(req?.body?.isTemporary);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -253,6 +275,88 @@ const processDeleteRequest = async ({ req, files }) => {
|
|||
}
|
||||
};
|
||||
|
||||
function getFileRetentionSweepInterval() {
|
||||
const value = Number(process.env.FILE_RETENTION_SWEEP_INTERVAL_MS);
|
||||
if (!Number.isFinite(value) || value < 0) {
|
||||
return 60 * 60 * 1000;
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes expired file storage before removing the corresponding File records.
|
||||
*
|
||||
* Mongo TTL indexes delete only the metadata document, so file retention uses
|
||||
* this application sweep for records with `expiredAt` instead.
|
||||
*
|
||||
* @param {object} params
|
||||
* @param {AppConfig} params.appConfig
|
||||
* @param {number} [params.limit]
|
||||
* @returns {Promise<{ scanned: number, deleted: number, failed: number }>}
|
||||
*/
|
||||
async function sweepExpiredFiles({ appConfig, limit = 100 } = {}) {
|
||||
const files = (await db.getExpiredFiles(limit)) ?? [];
|
||||
let deleted = 0;
|
||||
let failed = 0;
|
||||
|
||||
for (const file of files) {
|
||||
const userId = file.user?.toString?.() ?? file.user;
|
||||
if (!userId) {
|
||||
logger.warn(`[sweepExpiredFiles] Skipping expired file without user: ${file.file_id}`);
|
||||
failed++;
|
||||
continue;
|
||||
}
|
||||
|
||||
const req = {
|
||||
config: appConfig,
|
||||
body: {},
|
||||
user: {
|
||||
id: userId,
|
||||
tenantId: file.tenantId,
|
||||
},
|
||||
};
|
||||
|
||||
try {
|
||||
await processDeleteRequest({ req, files: [file] });
|
||||
const remaining = await db.findFileById(file.file_id);
|
||||
if (remaining) {
|
||||
failed++;
|
||||
} else {
|
||||
deleted++;
|
||||
}
|
||||
} catch (error) {
|
||||
failed++;
|
||||
logger.error(`[sweepExpiredFiles] Error deleting expired file ${file.file_id}:`, error);
|
||||
}
|
||||
}
|
||||
|
||||
if (deleted > 0 || failed > 0) {
|
||||
logger.info(
|
||||
`[sweepExpiredFiles] Processed ${files.length} expired files: ${deleted} deleted, ${failed} failed`,
|
||||
);
|
||||
}
|
||||
|
||||
return { scanned: files.length, deleted, failed };
|
||||
}
|
||||
|
||||
function startExpiredFileSweep({ appConfig } = {}) {
|
||||
const intervalMs = getFileRetentionSweepInterval();
|
||||
if (intervalMs === 0) {
|
||||
logger.info('[sweepExpiredFiles] Disabled by FILE_RETENTION_SWEEP_INTERVAL_MS=0');
|
||||
return null;
|
||||
}
|
||||
|
||||
const runSweep = () =>
|
||||
runAsSystem(() => sweepExpiredFiles({ appConfig })).catch((error) => {
|
||||
logger.error('[sweepExpiredFiles] Background sweep failed:', error);
|
||||
});
|
||||
|
||||
runSweep();
|
||||
const interval = setInterval(runSweep, intervalMs);
|
||||
interval.unref?.();
|
||||
return interval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes a file URL using a specified file handling strategy. This function accepts a strategy name,
|
||||
* fetches the corresponding file processing functions (for saving and retrieving file URLs), and then
|
||||
|
|
@ -271,6 +375,7 @@ const processDeleteRequest = async ({ req, files }) => {
|
|||
* @param {string} params.basePath - The base path or directory where the file will be saved or retrieved from.
|
||||
* @param {FileContext} params.context - The context of the file (e.g., 'avatar', 'image_generation', etc.)
|
||||
* @param {string} [params.tenantId] - Optional tenant identifier for tenant-prefixed storage paths.
|
||||
* @param {ServerRequest} [params.req] - Request context used to apply data retention metadata.
|
||||
* @returns {Promise<MongoFile>} A promise that resolves to the DB representation (MongoFile)
|
||||
* of the processed file. It throws an error if the file processing fails at any stage.
|
||||
*/
|
||||
|
|
@ -282,6 +387,7 @@ const processFileURL = async ({
|
|||
basePath,
|
||||
context,
|
||||
tenantId,
|
||||
req,
|
||||
}) => {
|
||||
const { saveURL, getFileURL } = getStrategyFunctions(fileStrategy);
|
||||
try {
|
||||
|
|
@ -325,6 +431,7 @@ const processFileURL = async ({
|
|||
source: fileStrategy,
|
||||
type,
|
||||
context,
|
||||
...(await getRetentionExpiry(req)),
|
||||
tenantId,
|
||||
width: dimensions.width,
|
||||
height: dimensions.height,
|
||||
|
|
@ -375,7 +482,7 @@ const processImageFile = async ({ req, res, metadata, returnFile = false }) => {
|
|||
context: FileContext.message_attachment,
|
||||
source,
|
||||
type: `image/${appConfig.imageOutputType}`,
|
||||
...getRetentionExpiry(req),
|
||||
...(await getRetentionExpiry(req)),
|
||||
width,
|
||||
height,
|
||||
tenantId: req.user.tenantId,
|
||||
|
|
@ -436,7 +543,7 @@ const uploadImageBuffer = async ({ req, context, metadata = {}, resize = true })
|
|||
source,
|
||||
type,
|
||||
width,
|
||||
...getRetentionExpiry(req),
|
||||
...(await getRetentionExpiry(req)),
|
||||
height,
|
||||
tenantId: req.user.tenantId,
|
||||
},
|
||||
|
|
@ -539,7 +646,7 @@ const processFileUpload = async ({ req, res, metadata }) => {
|
|||
context: isAssistantUpload ? FileContext.assistants : FileContext.message_attachment,
|
||||
model: isAssistantUpload ? req.body.model : undefined,
|
||||
type: file.mimetype,
|
||||
...getRetentionExpiry(req),
|
||||
...(await getRetentionExpiry(req)),
|
||||
embedded,
|
||||
source,
|
||||
height,
|
||||
|
|
@ -666,7 +773,7 @@ const processAgentFileUpload = async ({ req, res, metadata }) => {
|
|||
filename: file.originalname,
|
||||
model: messageAttachment ? undefined : req.body.model,
|
||||
context: messageAttachment ? FileContext.message_attachment : FileContext.agents,
|
||||
...getRetentionExpiry(req),
|
||||
...(await getRetentionExpiry(req)),
|
||||
tenantId: req.user.tenantId,
|
||||
});
|
||||
|
||||
|
|
@ -865,7 +972,7 @@ const processAgentFileUpload = async ({ req, res, metadata }) => {
|
|||
source,
|
||||
height,
|
||||
width,
|
||||
...getRetentionExpiry(req),
|
||||
...(await getRetentionExpiry(req)),
|
||||
tenantId: req.user.tenantId,
|
||||
});
|
||||
|
||||
|
|
@ -912,7 +1019,7 @@ const processOpenAIFile = async ({
|
|||
source,
|
||||
model: openai.req.body.model,
|
||||
filename: originalName ?? file_id,
|
||||
...getRetentionExpiry(openai.req),
|
||||
...(await getRetentionExpiry(openai.req)),
|
||||
tenantId: openai.req?.user?.tenantId,
|
||||
};
|
||||
|
||||
|
|
@ -957,7 +1064,7 @@ const processOpenAIImageOutput = async ({ req, buffer, file_id, filename, fileEx
|
|||
context: FileContext.assistants_output,
|
||||
file_id,
|
||||
filename,
|
||||
...getRetentionExpiry(req),
|
||||
...(await getRetentionExpiry(req)),
|
||||
tenantId: req.user.tenantId,
|
||||
};
|
||||
db.createFile(file, true);
|
||||
|
|
@ -1118,7 +1225,7 @@ async function saveBase64Image(
|
|||
user: req.user.id,
|
||||
bytes: image.bytes,
|
||||
width: image.width,
|
||||
...getRetentionExpiry(req),
|
||||
...(await getRetentionExpiry(req)),
|
||||
height: image.height,
|
||||
tenantId: req.user.tenantId,
|
||||
},
|
||||
|
|
@ -1211,6 +1318,8 @@ module.exports = {
|
|||
saveBase64Image,
|
||||
processImageFile,
|
||||
uploadImageBuffer,
|
||||
sweepExpiredFiles,
|
||||
startExpiredFileSweep,
|
||||
processFileUpload,
|
||||
processDeleteRequest,
|
||||
processAgentFileUpload,
|
||||
|
|
|
|||
|
|
@ -1,7 +1,8 @@
|
|||
jest.mock('uuid', () => ({ v4: jest.fn(() => 'mock-uuid') }));
|
||||
|
||||
jest.mock('@librechat/data-schemas', () => ({
|
||||
logger: { warn: jest.fn(), debug: jest.fn(), error: jest.fn() },
|
||||
logger: { warn: jest.fn(), debug: jest.fn(), error: jest.fn(), info: jest.fn() },
|
||||
createTempChatExpirationDate: jest.fn(() => new Date('2030-01-01T00:00:00.000Z')),
|
||||
}));
|
||||
|
||||
jest.mock('@librechat/agents', () => ({}));
|
||||
|
|
@ -41,8 +42,12 @@ jest.mock('~/models', () => ({
|
|||
createFile: jest.fn().mockResolvedValue({ file_id: 'created-file-id' }),
|
||||
updateFileUsage: jest.fn(),
|
||||
deleteFiles: jest.fn(),
|
||||
findFileById: jest.fn(),
|
||||
getConvo: jest.fn(),
|
||||
getExpiredFiles: jest.fn(),
|
||||
addAgentResourceFile: jest.fn().mockResolvedValue({}),
|
||||
removeAgentResourceFiles: jest.fn(),
|
||||
removeAgentResourceFilesFromAllAgents: jest.fn(),
|
||||
}));
|
||||
|
||||
jest.mock('~/server/utils/getFileStrategy', () => ({
|
||||
|
|
@ -79,7 +84,7 @@ const { mergeFileConfig } = require('librechat-data-provider');
|
|||
const { checkCapability } = require('~/server/services/Config');
|
||||
const { getStrategyFunctions } = require('~/server/services/Files/strategies');
|
||||
const db = require('~/models');
|
||||
const { processAgentFileUpload, processFileURL } = require('./process');
|
||||
const { processAgentFileUpload, processFileURL, sweepExpiredFiles } = require('./process');
|
||||
|
||||
const PDF_MIME = 'application/pdf';
|
||||
const DOCX_MIME = 'application/vnd.openxmlformats-officedocument.wordprocessingml.document';
|
||||
|
|
@ -534,6 +539,38 @@ describe('processFileURL', () => {
|
|||
);
|
||||
});
|
||||
|
||||
it('applies retention metadata for generated images when retention mode is all', async () => {
|
||||
const saveURL = jest.fn().mockResolvedValue({
|
||||
filepath: 'https://cdn.example.com/t/tenant-a/images/user-123/image.png',
|
||||
bytes: 512,
|
||||
type: 'image/png',
|
||||
});
|
||||
const getFileURL = jest.fn();
|
||||
getStrategyFunctions.mockReturnValue({ saveURL, getFileURL });
|
||||
|
||||
await processFileURL({
|
||||
fileStrategy: FileSources.cloudfront,
|
||||
userId: 'user-123',
|
||||
URL: 'https://example.com/image.png',
|
||||
fileName: 'image.png',
|
||||
basePath: 'images',
|
||||
context: FileContext.image_generation,
|
||||
tenantId: 'tenant-a',
|
||||
req: {
|
||||
user: { id: 'user-123', tenantId: 'tenant-a' },
|
||||
body: {},
|
||||
config: { interfaceConfig: { retentionMode: 'all' } },
|
||||
},
|
||||
});
|
||||
|
||||
expect(db.createFile).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
expiredAt: new Date('2030-01-01T00:00:00.000Z'),
|
||||
}),
|
||||
true,
|
||||
);
|
||||
});
|
||||
|
||||
it('falls back to getFileURL with user and tenant context when metadata lacks filepath', async () => {
|
||||
const saveURL = jest.fn().mockResolvedValue({
|
||||
bytes: 256,
|
||||
|
|
@ -602,3 +639,39 @@ describe('processFileURL', () => {
|
|||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('sweepExpiredFiles', () => {
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
it('deletes expired file storage before removing file records', async () => {
|
||||
const deleteFile = jest.fn().mockResolvedValue(undefined);
|
||||
getStrategyFunctions.mockReturnValue({ deleteFile });
|
||||
db.getExpiredFiles.mockResolvedValue([
|
||||
{
|
||||
file_id: 'expired-file',
|
||||
filepath: '/images/user-123/expired.png',
|
||||
source: FileSources.local,
|
||||
user: 'user-123',
|
||||
tenantId: 'tenant-a',
|
||||
},
|
||||
]);
|
||||
db.findFileById.mockResolvedValue(null);
|
||||
db.deleteFiles.mockResolvedValue({ deletedCount: 1 });
|
||||
|
||||
const result = await sweepExpiredFiles({
|
||||
appConfig: { paths: { publicPath: '/tmp/public', uploads: '/tmp/uploads' } },
|
||||
limit: 1,
|
||||
});
|
||||
|
||||
expect(deleteFile).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
user: { id: 'user-123', tenantId: 'tenant-a' },
|
||||
}),
|
||||
expect.objectContaining({ file_id: 'expired-file' }),
|
||||
);
|
||||
expect(db.deleteFiles).toHaveBeenCalledWith(['expired-file']);
|
||||
expect(result).toEqual({ scanned: 1, deleted: 1, failed: 0 });
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1,16 +1,22 @@
|
|||
const { v4: uuidv4 } = require('uuid');
|
||||
const { logger } = require('@librechat/data-schemas');
|
||||
const { EModelEndpoint, Constants, openAISettings } = require('librechat-data-provider');
|
||||
const { logger, createTempChatExpirationDate } = require('@librechat/data-schemas');
|
||||
const {
|
||||
EModelEndpoint,
|
||||
Constants,
|
||||
RetentionMode,
|
||||
openAISettings,
|
||||
} = require('librechat-data-provider');
|
||||
const { bulkIncrementTagCounts, bulkSaveConvos, bulkSaveMessages } = require('~/models');
|
||||
const { FALLBACK_MODEL_BY_ENDPOINT } = require('./defaults');
|
||||
|
||||
/**
|
||||
* Factory function for creating an instance of ImportBatchBuilder.
|
||||
* @param {string} requestUserId - The ID of the user making the request.
|
||||
* @param {object} [interfaceConfig] - Runtime interface config for import retention.
|
||||
* @returns {ImportBatchBuilder} - The newly created ImportBatchBuilder instance.
|
||||
*/
|
||||
function createImportBatchBuilder(requestUserId) {
|
||||
return new ImportBatchBuilder(requestUserId);
|
||||
function createImportBatchBuilder(requestUserId, interfaceConfig) {
|
||||
return new ImportBatchBuilder(requestUserId, interfaceConfig);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -20,13 +26,31 @@ class ImportBatchBuilder {
|
|||
/**
|
||||
* Creates an instance of ImportBatchBuilder.
|
||||
* @param {string} requestUserId - The ID of the user making the import request.
|
||||
* @param {object} [interfaceConfig] - Runtime interface config for import retention.
|
||||
*/
|
||||
constructor(requestUserId) {
|
||||
constructor(requestUserId, interfaceConfig) {
|
||||
this.requestUserId = requestUserId;
|
||||
this.interfaceConfig = interfaceConfig;
|
||||
this.conversations = [];
|
||||
this.messages = [];
|
||||
}
|
||||
|
||||
getRetentionFields() {
|
||||
if (this.interfaceConfig?.retentionMode !== RetentionMode.ALL) {
|
||||
return {};
|
||||
}
|
||||
|
||||
try {
|
||||
return {
|
||||
isTemporary: false,
|
||||
expiredAt: createTempChatExpirationDate(this.interfaceConfig),
|
||||
};
|
||||
} catch (error) {
|
||||
logger.error('[ImportBatchBuilder] Error creating import expiration date:', error);
|
||||
return { isTemporary: false, expiredAt: null };
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts a new conversation in the batch.
|
||||
* @param {string} [endpoint=EModelEndpoint.openAI] - The endpoint for the conversation. Defaults to EModelEndpoint.openAI.
|
||||
|
|
@ -89,6 +113,7 @@ class ImportBatchBuilder {
|
|||
overrideTimestamp: true,
|
||||
endpoint: this.endpoint,
|
||||
model: originalConvo.model ?? fallbackModel,
|
||||
...this.getRetentionFields(),
|
||||
};
|
||||
convo._id && delete convo._id;
|
||||
this.conversations.push(convo);
|
||||
|
|
@ -161,6 +186,7 @@ class ImportBatchBuilder {
|
|||
error: false,
|
||||
sender,
|
||||
text,
|
||||
...this.getRetentionFields(),
|
||||
};
|
||||
message._id && delete message._id;
|
||||
this.lastMessageId = newMessageId;
|
||||
|
|
|
|||
|
|
@ -2,15 +2,16 @@ const fs = require('fs').promises;
|
|||
const { resolveImportMaxFileSize } = require('@librechat/api');
|
||||
const { logger } = require('@librechat/data-schemas');
|
||||
const { getImporter } = require('./importers');
|
||||
const { createImportBatchBuilder } = require('./importBatchBuilder');
|
||||
|
||||
const maxFileSize = resolveImportMaxFileSize();
|
||||
|
||||
/**
|
||||
* Job definition for importing a conversation.
|
||||
* @param {{ filepath: string, requestUserId: string, userRole?: string }} job
|
||||
* @param {{ filepath: string, requestUserId: string, userRole?: string, interfaceConfig?: object }} job
|
||||
*/
|
||||
const importConversations = async (job) => {
|
||||
const { filepath, requestUserId, userRole } = job;
|
||||
const { filepath, requestUserId, userRole, interfaceConfig } = job;
|
||||
try {
|
||||
logger.debug(`user: ${requestUserId} | Importing conversation(s) from file...`);
|
||||
|
||||
|
|
@ -24,7 +25,12 @@ const importConversations = async (job) => {
|
|||
const fileData = await fs.readFile(filepath, 'utf8');
|
||||
const jsonData = JSON.parse(fileData);
|
||||
const importer = getImporter(jsonData);
|
||||
await importer(jsonData, requestUserId, undefined, userRole);
|
||||
await importer(
|
||||
jsonData,
|
||||
requestUserId,
|
||||
(userId) => createImportBatchBuilder(userId, interfaceConfig),
|
||||
userRole,
|
||||
);
|
||||
logger.debug(`user: ${requestUserId} | Finished importing conversations`);
|
||||
} catch (error) {
|
||||
logger.error(`user: ${requestUserId} | Failed to import conversation: `, error);
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ const path = require('path');
|
|||
const {
|
||||
EModelEndpoint,
|
||||
Constants,
|
||||
RetentionMode,
|
||||
openAISettings,
|
||||
anthropicSettings,
|
||||
} = require('librechat-data-provider');
|
||||
|
|
@ -28,6 +29,7 @@ jest.mock('~/server/controllers/ModelController', () => ({
|
|||
jest.mock('~/models', () => ({
|
||||
bulkSaveConvos: jest.fn(),
|
||||
bulkSaveMessages: jest.fn(),
|
||||
bulkIncrementTagCounts: jest.fn(),
|
||||
}));
|
||||
|
||||
afterEach(() => {
|
||||
|
|
@ -1046,6 +1048,22 @@ describe('importLibreChatConvo', () => {
|
|||
expect(result.conversation.endpoint).toBe(EModelEndpoint.openAI);
|
||||
expect(result.conversation.model).toBe(openAISettings.model.default);
|
||||
});
|
||||
|
||||
it('applies all-data retention to imported conversations and messages', () => {
|
||||
const requestUserId = 'user-123';
|
||||
const builder = new ImportBatchBuilder(requestUserId, {
|
||||
retentionMode: RetentionMode.ALL,
|
||||
temporaryChatRetention: 24,
|
||||
});
|
||||
builder.startConversation(EModelEndpoint.openAI);
|
||||
const message = builder.addUserMessage('Retained import');
|
||||
const result = builder.finishConversation('Imported retained chat');
|
||||
|
||||
expect(message.isTemporary).toBe(false);
|
||||
expect(message.expiredAt).toBeInstanceOf(Date);
|
||||
expect(result.conversation.isTemporary).toBe(false);
|
||||
expect(result.conversation.expiredAt).toBeInstanceOf(Date);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -212,6 +212,7 @@ describe('useFileHandling', () => {
|
|||
const formData: FormData = mockMutate.mock.calls[0][0];
|
||||
expect(formData.get('endpoint')).toBe(EModelEndpoint.agents);
|
||||
expect(formData.get('endpointType')).toBe(EModelEndpoint.agents);
|
||||
expect(formData.get('conversationId')).toBe('convo-1');
|
||||
});
|
||||
|
||||
it('does not enter assistants upload path when override is agents', async () => {
|
||||
|
|
@ -287,6 +288,7 @@ describe('useFileHandling', () => {
|
|||
expect(mockMutate).toHaveBeenCalledTimes(1);
|
||||
const formData: FormData = mockMutate.mock.calls[0][0];
|
||||
expect(formData.get('endpoint')).toBe('default');
|
||||
expect(formData.get('conversationId')).toBeNull();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -193,6 +193,9 @@ const useFileHandlingCore = (params: UseFileHandling | undefined, fileState: Fil
|
|||
formData.append('endpointType', endpointType ?? '');
|
||||
formData.append('file', extendedFile.file as File, encodeURIComponent(filename));
|
||||
formData.append('file_id', extendedFile.file_id);
|
||||
if (conversation?.conversationId && conversation.conversationId !== Constants.NEW_CONVO) {
|
||||
formData.append('conversationId', conversation.conversationId);
|
||||
}
|
||||
if (isTemporary) {
|
||||
formData.append('isTemporary', 'true');
|
||||
}
|
||||
|
|
|
|||
|
|
@ -648,6 +648,8 @@ export const tMessageSchema = z.object({
|
|||
/** @deprecated */
|
||||
generation: z.string().nullable().optional(),
|
||||
isCreatedByUser: z.boolean(),
|
||||
isTemporary: z.boolean().optional(),
|
||||
expiredAt: z.string().nullable().optional(),
|
||||
error: z.boolean().optional(),
|
||||
clientTimestamp: z.string().optional(),
|
||||
createdAt: z
|
||||
|
|
|
|||
|
|
@ -263,7 +263,7 @@ describe('Conversation Operations', () => {
|
|||
const result = await saveConvo(mockCtx, mockConversationData);
|
||||
|
||||
expect(result?.conversationId).toBe(mockConversationData.conversationId);
|
||||
expect(result?.expiredAt).toBeNull();
|
||||
expect(result?.expiredAt).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should use custom retention period from config', async () => {
|
||||
|
|
@ -401,6 +401,21 @@ describe('Conversation Operations', () => {
|
|||
);
|
||||
});
|
||||
|
||||
it('should preserve temporary retention when saving without isTemporary', async () => {
|
||||
mockCtx.interfaceConfig = { temporaryChatRetention: 24 };
|
||||
mockCtx.isTemporary = true;
|
||||
const firstSave = await saveConvo(mockCtx, mockConversationData);
|
||||
const originalExpiredAt = firstSave?.expiredAt;
|
||||
|
||||
mockCtx.isTemporary = undefined;
|
||||
const updatedData = { ...mockConversationData, title: 'Updated Title' };
|
||||
const secondSave = await saveConvo(mockCtx, updatedData);
|
||||
|
||||
expect(secondSave?.title).toBe('Updated Title');
|
||||
expect(secondSave?.isTemporary).toBe(true);
|
||||
expect(secondSave?.expiredAt).toEqual(originalExpiredAt);
|
||||
});
|
||||
|
||||
it('should not set expiredAt when updating non-temporary conversation', async () => {
|
||||
// First save a non-temporary conversation
|
||||
mockCtx.isTemporary = false;
|
||||
|
|
|
|||
|
|
@ -186,9 +186,8 @@ export function createConversationMethods(
|
|||
update.conversationId = newConversationId;
|
||||
}
|
||||
|
||||
update.isTemporary = isTemporary === true;
|
||||
|
||||
if (isTemporary || interfaceConfig?.retentionMode === RetentionMode.ALL) {
|
||||
if (interfaceConfig?.retentionMode === RetentionMode.ALL) {
|
||||
update.isTemporary = isTemporary === true;
|
||||
try {
|
||||
update.expiredAt = createTempChatExpirationDate(interfaceConfig);
|
||||
} catch (err) {
|
||||
|
|
@ -196,7 +195,17 @@ export function createConversationMethods(
|
|||
logger.info(`---\`saveConvo\` context: ${metadata?.context}`);
|
||||
update.expiredAt = null;
|
||||
}
|
||||
} else {
|
||||
} else if (isTemporary === true) {
|
||||
update.isTemporary = true;
|
||||
try {
|
||||
update.expiredAt = createTempChatExpirationDate(interfaceConfig);
|
||||
} catch (err) {
|
||||
logger.error('Error creating temporary chat expiration date:', err);
|
||||
logger.info(`---\`saveConvo\` context: ${metadata?.context}`);
|
||||
update.expiredAt = null;
|
||||
}
|
||||
} else if (isTemporary === false) {
|
||||
update.isTemporary = false;
|
||||
update.expiredAt = null;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -233,6 +233,69 @@ describe('File Methods', () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe('getExpiredFiles', () => {
|
||||
it('returns only files whose expiredAt date has passed', async () => {
|
||||
const userId = new mongoose.Types.ObjectId();
|
||||
const now = new Date('2030-01-01T00:00:00.000Z');
|
||||
const expiredFileId = uuidv4();
|
||||
const futureFileId = uuidv4();
|
||||
const permanentFileId = uuidv4();
|
||||
const missingExpiryFileId = uuidv4();
|
||||
|
||||
await fileMethods.createFile(
|
||||
{
|
||||
file_id: expiredFileId,
|
||||
user: userId,
|
||||
filename: 'expired.txt',
|
||||
filepath: '/uploads/expired.txt',
|
||||
type: 'text/plain',
|
||||
bytes: 100,
|
||||
expiredAt: new Date('2029-12-31T23:59:59.000Z'),
|
||||
},
|
||||
true,
|
||||
);
|
||||
await fileMethods.createFile(
|
||||
{
|
||||
file_id: futureFileId,
|
||||
user: userId,
|
||||
filename: 'future.txt',
|
||||
filepath: '/uploads/future.txt',
|
||||
type: 'text/plain',
|
||||
bytes: 100,
|
||||
expiredAt: new Date('2030-01-01T00:00:01.000Z'),
|
||||
},
|
||||
true,
|
||||
);
|
||||
await fileMethods.createFile(
|
||||
{
|
||||
file_id: permanentFileId,
|
||||
user: userId,
|
||||
filename: 'permanent.txt',
|
||||
filepath: '/uploads/permanent.txt',
|
||||
type: 'text/plain',
|
||||
bytes: 100,
|
||||
expiredAt: null,
|
||||
},
|
||||
true,
|
||||
);
|
||||
await fileMethods.createFile(
|
||||
{
|
||||
file_id: missingExpiryFileId,
|
||||
user: userId,
|
||||
filename: 'missing-expiry.txt',
|
||||
filepath: '/uploads/missing-expiry.txt',
|
||||
type: 'text/plain',
|
||||
bytes: 100,
|
||||
},
|
||||
true,
|
||||
);
|
||||
|
||||
const files = await fileMethods.getExpiredFiles(100, now);
|
||||
|
||||
expect(files.map((file) => file.file_id)).toEqual([expiredFileId]);
|
||||
});
|
||||
});
|
||||
|
||||
describe('getToolFilesByIds', () => {
|
||||
it('should retrieve files for file_search tool (embedded files)', async () => {
|
||||
const userId = new mongoose.Types.ObjectId();
|
||||
|
|
|
|||
|
|
@ -47,6 +47,14 @@ export function createFileMethods(mongoose: typeof import('mongoose')) {
|
|||
return await query.sort(sortOptions).lean<IMongoFile[]>();
|
||||
}
|
||||
|
||||
async function getExpiredFiles(limit = 100, now = new Date()): Promise<IMongoFile[]> {
|
||||
const File = mongoose.models.File as Model<IMongoFile>;
|
||||
return await File.find({ expiredAt: { $ne: null, $lte: now } })
|
||||
.sort({ expiredAt: 1 })
|
||||
.limit(limit)
|
||||
.lean<IMongoFile[]>();
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves tool files (files that are embedded or have a fileIdentifier) from an array of file IDs.
|
||||
* Note: execute_code files are handled separately by getCodeGeneratedFiles.
|
||||
|
|
@ -457,6 +465,7 @@ export function createFileMethods(mongoose: typeof import('mongoose')) {
|
|||
return {
|
||||
findFileById,
|
||||
getFiles,
|
||||
getExpiredFiles,
|
||||
getToolFilesByIds,
|
||||
getCodeGeneratedFiles,
|
||||
getUserCodeFiles,
|
||||
|
|
|
|||
|
|
@ -404,7 +404,7 @@ describe('Message Operations', () => {
|
|||
const result = await saveMessage(mockCtx, mockMessageData);
|
||||
|
||||
expect(result?.messageId).toBe('msg123');
|
||||
expect(result?.expiredAt).toBeNull();
|
||||
expect(result?.expiredAt).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should use custom retention period from config', async () => {
|
||||
|
|
@ -593,6 +593,22 @@ describe('Message Operations', () => {
|
|||
);
|
||||
});
|
||||
|
||||
it('should preserve temporary retention when saving without isTemporary', async () => {
|
||||
mockCtx.interfaceConfig = { temporaryChatRetention: 24 };
|
||||
|
||||
mockCtx.isTemporary = true;
|
||||
const firstSave = await saveMessage(mockCtx, mockMessageData);
|
||||
const originalExpiredAt = firstSave?.expiredAt;
|
||||
|
||||
mockCtx.isTemporary = undefined;
|
||||
const updatedData = { ...mockMessageData, text: 'Updated text' };
|
||||
const secondSave = await saveMessage(mockCtx, updatedData);
|
||||
|
||||
expect(secondSave?.text).toBe('Updated text');
|
||||
expect(secondSave?.isTemporary).toBe(true);
|
||||
expect(secondSave?.expiredAt).toEqual(originalExpiredAt);
|
||||
});
|
||||
|
||||
it('should handle bulk operations with temporary messages', async () => {
|
||||
// This test verifies bulkSaveMessages doesn't interfere with expiredAt
|
||||
const messages = [
|
||||
|
|
|
|||
|
|
@ -92,7 +92,8 @@ export function createMessageMethods(mongoose: typeof import('mongoose')): Messa
|
|||
messageId: params.newMessageId || params.messageId,
|
||||
};
|
||||
|
||||
if (isTemporary || interfaceConfig?.retentionMode === RetentionMode.ALL) {
|
||||
if (interfaceConfig?.retentionMode === RetentionMode.ALL) {
|
||||
update.isTemporary = isTemporary === true;
|
||||
try {
|
||||
update.expiredAt = createTempChatExpirationDate(interfaceConfig);
|
||||
} catch (err) {
|
||||
|
|
@ -100,7 +101,17 @@ export function createMessageMethods(mongoose: typeof import('mongoose')): Messa
|
|||
logger.info(`---\`saveMessage\` context: ${metadata?.context}`);
|
||||
update.expiredAt = null;
|
||||
}
|
||||
} else {
|
||||
} else if (isTemporary === true) {
|
||||
update.isTemporary = true;
|
||||
try {
|
||||
update.expiredAt = createTempChatExpirationDate(interfaceConfig);
|
||||
} catch (err) {
|
||||
logger.error('Error creating temporary chat expiration date:', err);
|
||||
logger.info(`---\`saveMessage\` context: ${metadata?.context}`);
|
||||
update.expiredAt = null;
|
||||
}
|
||||
} else if (isTemporary === false) {
|
||||
update.isTemporary = false;
|
||||
update.expiredAt = null;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -90,12 +90,25 @@ describe('Meilisearch Mongoose plugin', () => {
|
|||
expect(mockAddDocuments).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
test('saving TTL conversation does NOT index w/ meilisearch', async () => {
|
||||
test('saving retained non-temporary conversation indexes w/ meilisearch', async () => {
|
||||
await createConversationModel(mongoose).create({
|
||||
conversationId: new mongoose.Types.ObjectId(),
|
||||
user: new mongoose.Types.ObjectId(),
|
||||
title: 'Test Conversation',
|
||||
endpoint: EModelEndpoint.openAI,
|
||||
isTemporary: false,
|
||||
expiredAt: new Date(),
|
||||
});
|
||||
expect(mockAddDocuments).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
test('saving temporary conversation does NOT index w/ meilisearch', async () => {
|
||||
await createConversationModel(mongoose).create({
|
||||
conversationId: new mongoose.Types.ObjectId(),
|
||||
user: new mongoose.Types.ObjectId(),
|
||||
title: 'Test Conversation',
|
||||
endpoint: EModelEndpoint.openAI,
|
||||
isTemporary: true,
|
||||
expiredAt: new Date(),
|
||||
});
|
||||
expect(mockAddDocuments).not.toHaveBeenCalled();
|
||||
|
|
@ -125,12 +138,25 @@ describe('Meilisearch Mongoose plugin', () => {
|
|||
expect(mockAddDocuments).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
test('saving TTL messages does NOT index w/ meilisearch', async () => {
|
||||
test('saving retained non-temporary messages indexes w/ meilisearch', async () => {
|
||||
await createMessageModel(mongoose).create({
|
||||
messageId: new mongoose.Types.ObjectId(),
|
||||
conversationId: new mongoose.Types.ObjectId(),
|
||||
user: new mongoose.Types.ObjectId(),
|
||||
isCreatedByUser: true,
|
||||
isTemporary: false,
|
||||
expiredAt: new Date(),
|
||||
});
|
||||
expect(mockAddDocuments).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
test('saving temporary messages does NOT index w/ meilisearch', async () => {
|
||||
await createMessageModel(mongoose).create({
|
||||
messageId: new mongoose.Types.ObjectId(),
|
||||
conversationId: new mongoose.Types.ObjectId(),
|
||||
user: new mongoose.Types.ObjectId(),
|
||||
isCreatedByUser: true,
|
||||
isTemporary: true,
|
||||
expiredAt: new Date(),
|
||||
});
|
||||
expect(mockAddDocuments).not.toHaveBeenCalled();
|
||||
|
|
@ -224,6 +250,7 @@ describe('Meilisearch Mongoose plugin', () => {
|
|||
user: new mongoose.Types.ObjectId(),
|
||||
title: 'Test Conversation',
|
||||
endpoint: EModelEndpoint.openAI,
|
||||
isTemporary: true,
|
||||
expiredAt: new Date(),
|
||||
});
|
||||
|
||||
|
|
@ -335,6 +362,7 @@ describe('Meilisearch Mongoose plugin', () => {
|
|||
conversationId: new mongoose.Types.ObjectId(),
|
||||
user: new mongoose.Types.ObjectId(),
|
||||
isCreatedByUser: true,
|
||||
isTemporary: true,
|
||||
expiredAt: new Date(),
|
||||
});
|
||||
|
||||
|
|
@ -343,6 +371,7 @@ describe('Meilisearch Mongoose plugin', () => {
|
|||
conversationId: new mongoose.Types.ObjectId(),
|
||||
user: new mongoose.Types.ObjectId(),
|
||||
isCreatedByUser: false,
|
||||
isTemporary: true,
|
||||
expiredAt: new Date(),
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -38,6 +38,8 @@ interface SyncProgress {
|
|||
|
||||
interface _DocumentWithMeiliIndex extends Document {
|
||||
_meiliIndex?: boolean;
|
||||
isTemporary?: boolean;
|
||||
expiredAt?: Date | null;
|
||||
preprocessObjectForIndex?: () => Record<string, unknown>;
|
||||
addObjectToMeili?: (next: CallbackWithoutResultAndOptionalError) => Promise<void>;
|
||||
updateObjectToMeili?: (next: CallbackWithoutResultAndOptionalError) => Promise<void>;
|
||||
|
|
@ -90,6 +92,30 @@ const getSyncConfig = () => ({
|
|||
delayMs: parseInt(process.env.MEILI_SYNC_DELAY_MS || '100', 10),
|
||||
});
|
||||
|
||||
const hasSchemaPath = (schema: Schema, path: string): boolean =>
|
||||
Object.prototype.hasOwnProperty.call(schema.obj, path);
|
||||
|
||||
const buildIndexableQuery = (schema: Schema): FilterQuery<unknown> => {
|
||||
if (!hasSchemaPath(schema, 'isTemporary')) {
|
||||
return hasSchemaPath(schema, 'expiredAt')
|
||||
? { $or: [{ expiredAt: null }, { expiredAt: { $exists: false } }] }
|
||||
: {};
|
||||
}
|
||||
|
||||
return {
|
||||
$or: [
|
||||
{ isTemporary: false },
|
||||
{
|
||||
isTemporary: { $exists: false },
|
||||
$or: [{ expiredAt: null }, { expiredAt: { $exists: false } }],
|
||||
},
|
||||
],
|
||||
};
|
||||
};
|
||||
|
||||
const isIndexableDocument = (doc: DocumentWithMeiliIndex): boolean =>
|
||||
doc.isTemporary === false || (doc.isTemporary == null && _.isNil(doc.expiredAt));
|
||||
|
||||
/**
|
||||
* Validates the required options for configuring the mongoMeili plugin.
|
||||
*/
|
||||
|
|
@ -136,11 +162,13 @@ const processBatch = async <T>(
|
|||
*/
|
||||
const createMeiliMongooseModel = ({
|
||||
index,
|
||||
indexableQuery,
|
||||
attributesToIndex,
|
||||
primaryKey,
|
||||
syncOptions,
|
||||
}: {
|
||||
index: Index<MeiliIndexable>;
|
||||
indexableQuery: FilterQuery<unknown>;
|
||||
attributesToIndex: string[];
|
||||
primaryKey: string;
|
||||
syncOptions: { batchSize: number; delayMs: number };
|
||||
|
|
@ -152,8 +180,11 @@ const createMeiliMongooseModel = ({
|
|||
* Get the current sync progress
|
||||
*/
|
||||
static async getSyncProgress(this: SchemaWithMeiliMethods): Promise<SyncProgress> {
|
||||
const totalDocuments = await this.countDocuments({ expiredAt: null });
|
||||
const indexedDocuments = await this.countDocuments({ expiredAt: null, _meiliIndex: true });
|
||||
const totalDocuments = await this.countDocuments(indexableQuery);
|
||||
const indexedDocuments = await this.countDocuments({
|
||||
...indexableQuery,
|
||||
_meiliIndex: true,
|
||||
});
|
||||
|
||||
return {
|
||||
totalProcessed: indexedDocuments,
|
||||
|
|
@ -164,8 +195,7 @@ const createMeiliMongooseModel = ({
|
|||
|
||||
/**
|
||||
* Synchronizes data between the MongoDB collection and the MeiliSearch index by
|
||||
* incrementally indexing only documents where `expiredAt` is `null` and `_meiliIndex` is not `true`
|
||||
* (i.e., non-expired documents that have not yet been indexed, including those with missing or null `_meiliIndex`).
|
||||
* incrementally indexing only non-temporary documents where `_meiliIndex` is not `true`.
|
||||
* */
|
||||
static async syncWithMeili(this: SchemaWithMeiliMethods): Promise<void> {
|
||||
const startTime = Date.now();
|
||||
|
|
@ -197,7 +227,7 @@ const createMeiliMongooseModel = ({
|
|||
|
||||
while (hasMore) {
|
||||
const query: FilterQuery<unknown> = {
|
||||
expiredAt: null,
|
||||
...indexableQuery,
|
||||
_meiliIndex: { $ne: true },
|
||||
};
|
||||
|
||||
|
|
@ -299,8 +329,9 @@ const createMeiliMongooseModel = ({
|
|||
const query: Record<string, unknown> = {};
|
||||
query[primaryKey] = { $in: meiliIds };
|
||||
|
||||
// Find which documents exist in MongoDB
|
||||
const existingDocs = await this.find(query).select(primaryKey).lean();
|
||||
const existingDocs = await this.find({ ...query, ...indexableQuery })
|
||||
.select(primaryKey)
|
||||
.lean();
|
||||
|
||||
const existingIds = new Set(
|
||||
existingDocs.map((doc: Record<string, unknown>) => doc[primaryKey]),
|
||||
|
|
@ -413,8 +444,7 @@ const createMeiliMongooseModel = ({
|
|||
this: DocumentWithMeiliIndex,
|
||||
next: CallbackWithoutResultAndOptionalError,
|
||||
): Promise<void> {
|
||||
// If this conversation or message has a TTL, don't index it
|
||||
if (!_.isNil(this.expiredAt)) {
|
||||
if (!isIndexableDocument(this)) {
|
||||
return next();
|
||||
}
|
||||
|
||||
|
|
@ -459,6 +489,15 @@ const createMeiliMongooseModel = ({
|
|||
next: CallbackWithoutResultAndOptionalError,
|
||||
): Promise<void> {
|
||||
try {
|
||||
if (!isIndexableDocument(this)) {
|
||||
await index.deleteDocument(String(this[primaryKey as keyof DocumentWithMeiliIndex]));
|
||||
await this.collection.updateOne(
|
||||
{ _id: this._id as Types.ObjectId },
|
||||
{ $set: { _meiliIndex: false } },
|
||||
);
|
||||
return next();
|
||||
}
|
||||
|
||||
const object = this.preprocessObjectForIndex!();
|
||||
await index.updateDocuments([object], { primaryKey });
|
||||
next();
|
||||
|
|
@ -644,7 +683,15 @@ export default function mongoMeili(schema: Schema, options: MongoMeiliOptions):
|
|||
logger.debug(`[mongoMeili] Added 'user' field to ${indexName} index attributes`);
|
||||
}
|
||||
|
||||
schema.loadClass(createMeiliMongooseModel({ index, attributesToIndex, primaryKey, syncOptions }));
|
||||
schema.loadClass(
|
||||
createMeiliMongooseModel({
|
||||
index,
|
||||
indexableQuery: buildIndexableQuery(schema),
|
||||
attributesToIndex,
|
||||
primaryKey,
|
||||
syncOptions,
|
||||
}),
|
||||
);
|
||||
|
||||
// Register Mongoose hooks
|
||||
schema.post('save', function (doc: DocumentWithMeiliIndex, next) {
|
||||
|
|
|
|||
|
|
@ -54,6 +54,6 @@ convoSchema.index({ conversationId: 1, user: 1, tenantId: 1 }, { unique: true })
|
|||
|
||||
convoSchema.index({ user: 1, isTemporary: 1 });
|
||||
// index for MeiliSearch sync operations
|
||||
convoSchema.index({ _meiliIndex: 1, expiredAt: 1 });
|
||||
convoSchema.index({ _meiliIndex: 1, isTemporary: 1, expiredAt: 1 });
|
||||
|
||||
export default convoSchema;
|
||||
|
|
|
|||
|
|
@ -153,7 +153,7 @@ const file: Schema<IMongoFile> = new Schema(
|
|||
},
|
||||
);
|
||||
|
||||
file.index({ expiredAt: 1 }, { expireAfterSeconds: 0 });
|
||||
file.index({ expiredAt: 1 });
|
||||
file.index({ createdAt: 1, updatedAt: 1 });
|
||||
file.index(
|
||||
{ filename: 1, conversationId: 1, context: 1, tenantId: 1 },
|
||||
|
|
|
|||
|
|
@ -63,6 +63,10 @@ const messageSchema: Schema<IMessage> = new Schema(
|
|||
required: true,
|
||||
default: false,
|
||||
},
|
||||
isTemporary: {
|
||||
type: Boolean,
|
||||
default: false,
|
||||
},
|
||||
unfinished: {
|
||||
type: Boolean,
|
||||
default: false,
|
||||
|
|
@ -180,6 +184,6 @@ messageSchema.index({ createdAt: 1 });
|
|||
messageSchema.index({ messageId: 1, user: 1, tenantId: 1 }, { unique: true });
|
||||
|
||||
// index for MeiliSearch sync operations
|
||||
messageSchema.index({ _meiliIndex: 1, expiredAt: 1 });
|
||||
messageSchema.index({ _meiliIndex: 1, isTemporary: 1, expiredAt: 1 });
|
||||
|
||||
export default messageSchema;
|
||||
|
|
|
|||
|
|
@ -72,7 +72,7 @@ export interface IMongoFile extends Omit<Document, 'model'> {
|
|||
codeEnvRef?: CodeEnvRef;
|
||||
};
|
||||
expiresAt?: Date;
|
||||
expiredAt?: Date;
|
||||
expiredAt?: Date | null;
|
||||
createdAt?: Date;
|
||||
updatedAt?: Date;
|
||||
tenantId?: string;
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ export interface IMessage extends Document {
|
|||
text?: string;
|
||||
summary?: string;
|
||||
isCreatedByUser: boolean;
|
||||
isTemporary?: boolean;
|
||||
unfinished?: boolean;
|
||||
error?: boolean;
|
||||
finish_reason?: string;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue