diff --git a/api/server/services/Files/images/encode.js b/api/server/services/Files/images/encode.js index 2f075229aa..25110566ee 100644 --- a/api/server/services/Files/images/encode.js +++ b/api/server/services/Files/images/encode.js @@ -1,6 +1,6 @@ const axios = require('axios'); const { logger } = require('@librechat/data-schemas'); -const { logAxiosError, validateImage } = require('@librechat/api'); +const { logAxiosError, validateImage, runGuardedEncode } = require('@librechat/api'); const { FileSources, VisionModes, @@ -136,9 +136,12 @@ async function encodeAndFormat(req, files, params, mode) { if (blobStorageSources.has(source)) { try { const downloadStream = encodingMethods[source].getDownloadStream; - let stream = await downloadStream(req, file.filepath); - let base64Data = await streamToBase64(stream); - stream = null; + let base64Data = await runGuardedEncode(file.bytes ?? 0, async () => { + let stream = await downloadStream(req, file.filepath); + const data = await streamToBase64(stream); + stream = null; + return data; + }); promises.push([file, base64Data]); base64Data = null; continue; @@ -146,8 +149,11 @@ async function encodeAndFormat(req, files, params, mode) { logger.error('Error processing image from blob storage:', error); } } else if (source !== FileSources.local && base64Only.has(effectiveEndpoint)) { - const [_file, imageURL] = await preparePayload(req, file); - promises.push([_file, await fetchImageToBase64(imageURL)]); + const entry = await runGuardedEncode(file.bytes ?? 0, async () => { + const [_file, imageURL] = await preparePayload(req, file); + return [_file, await fetchImageToBase64(imageURL)]; + }); + promises.push(entry); continue; } promises.push(preparePayload(req, file)); diff --git a/api/server/services/Files/images/encode.spec.js b/api/server/services/Files/images/encode.spec.js new file mode 100644 index 0000000000..700f930d32 --- /dev/null +++ b/api/server/services/Files/images/encode.spec.js @@ -0,0 +1,111 @@ +const { Readable } = require('stream'); + +const mockRunGuardedEncode = jest.fn((_bytes, task) => task()); + +jest.mock('axios'); +jest.mock('@librechat/api', () => ({ + logAxiosError: jest.fn(({ message }) => message), + validateImage: jest.fn().mockResolvedValue({ isValid: true }), + runGuardedEncode: (...args) => mockRunGuardedEncode(...args), +})); +jest.mock('@librechat/data-schemas', () => ({ + logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn(), debug: jest.fn() }, +})); + +const mockPrepareImagePayload = jest.fn(); +const mockGetDownloadStream = jest.fn(); +jest.mock('~/server/services/Files/strategies', () => ({ + getStrategyFunctions: jest.fn(() => ({ + prepareImagePayload: mockPrepareImagePayload, + getDownloadStream: mockGetDownloadStream, + })), +})); + +const axios = require('axios'); +const { FileSources } = require('librechat-data-provider'); +const { encodeAndFormat } = require('./encode'); + +const makeReq = () => ({ body: {}, config: {} }); + +beforeEach(() => { + jest.clearAllMocks(); + mockRunGuardedEncode.mockImplementation((_bytes, task) => task()); +}); + +describe('encodeAndFormat - request memory guard', () => { + it('gates blob-storage byte pulls and returns [file, base64]', async () => { + mockGetDownloadStream.mockResolvedValue(Readable.from([Buffer.from('blob-image-bytes')])); + const file = { + source: FileSources.s3, + height: 10, + width: 10, + type: 'image/png', + file_id: 'f-blob', + filepath: 'bucket/a.png', + filename: 'a.png', + bytes: 4321, + }; + + const result = await encodeAndFormat(makeReq(), [file], { endpoint: 'openai' }); + + expect(mockRunGuardedEncode).toHaveBeenCalledTimes(1); + expect(mockRunGuardedEncode.mock.calls[0][0]).toBe(4321); + + const expectedBase64 = Buffer.from('blob-image-bytes').toString('base64'); + expect(result.image_urls).toHaveLength(1); + expect(result.image_urls[0].image_url.url).toBe(`data:image/png;base64,${expectedBase64}`); + }); + + it('gates base64Only URL fetches and returns [file, base64]', async () => { + mockPrepareImagePayload.mockResolvedValue([ + { source: FileSources.vectordb, type: 'image/png' }, + 'https://images.example/x.png', + ]); + axios.get.mockResolvedValue({ data: Buffer.from('url-image-bytes') }); + + const file = { + source: FileSources.vectordb, + height: 10, + width: 10, + type: 'image/png', + file_id: 'f-url', + filepath: 'remote/x.png', + filename: 'x.png', + bytes: 9876, + }; + + const result = await encodeAndFormat(makeReq(), [file], { endpoint: 'anthropic' }); + + expect(mockRunGuardedEncode).toHaveBeenCalledTimes(1); + expect(mockRunGuardedEncode.mock.calls[0][0]).toBe(9876); + + const expectedBase64 = Buffer.from('url-image-bytes').toString('base64'); + expect(result.image_urls).toHaveLength(1); + expect(result.image_urls[0].source.data).toBe(expectedBase64); + }); + + it('does not gate the non-buffering local prepare path', async () => { + const localBase64 = Buffer.from('local-image').toString('base64'); + mockPrepareImagePayload.mockResolvedValue([ + { source: FileSources.local, type: 'image/png' }, + localBase64, + ]); + + const file = { + source: FileSources.local, + height: 10, + width: 10, + type: 'image/png', + file_id: 'f-local', + filepath: 'local/p.png', + filename: 'p.png', + bytes: 555, + }; + + const result = await encodeAndFormat(makeReq(), [file], { endpoint: 'openai' }); + + expect(mockRunGuardedEncode).not.toHaveBeenCalled(); + expect(result.image_urls).toHaveLength(1); + expect(result.image_urls[0].image_url.url).toBe(`data:image/png;base64,${localBase64}`); + }); +}); diff --git a/packages/api/src/files/encode/audio.ts b/packages/api/src/files/encode/audio.ts index d29163d868..89f4f31df8 100644 --- a/packages/api/src/files/encode/audio.ts +++ b/packages/api/src/files/encode/audio.ts @@ -4,6 +4,7 @@ import type { IMongoFile } from '@librechat/data-schemas'; import type { ServerRequest, StrategyFunctions, AudioResult } from '~/types'; import { getFileStream, getConfiguredFileSizeLimit } from './utils'; import { validateAudio } from '~/files/validation'; +import { runGuardedEncode } from './memoryGuard'; /** * Encodes and formats audio files for different providers @@ -30,7 +31,11 @@ export async function encodeAndFormatAudios( const result: AudioResult = { audios: [], files: [] }; const results = await Promise.allSettled( - files.map((file) => getFileStream(req, file, encodingMethods, getStrategyFunctions)), + files.map((file) => + runGuardedEncode(file.bytes ?? 0, () => + getFileStream(req, file, encodingMethods, getStrategyFunctions), + ), + ), ); for (const settledResult of results) { diff --git a/packages/api/src/files/encode/document.spec.ts b/packages/api/src/files/encode/document.spec.ts index 846b61eb88..3a955312c2 100644 --- a/packages/api/src/files/encode/document.spec.ts +++ b/packages/api/src/files/encode/document.spec.ts @@ -1013,4 +1013,39 @@ describe('encodeAndFormatDocuments - fileConfig integration', () => { expect(result.files).toHaveLength(0); }); }); + + describe('concurrency guard', () => { + it('bounds parallel getFileStream calls and returns all documents unchanged', async () => { + const req = createMockRequest(50) as ServerRequest; + const files = Array.from({ length: 6 }, (_, i) => + createMockDocFile(1, 'text/plain', `doc-${i}.txt`), + ); + + let active = 0; + let peak = 0; + mockedGetFileStream.mockImplementation(async (_req, file) => { + active++; + peak = Math.max(peak, active); + await new Promise((resolve) => setImmediate(resolve)); + await new Promise((resolve) => setImmediate(resolve)); + active--; + return { + file, + content: Buffer.from(`content-${file.filename}`).toString('base64'), + metadata: file, + }; + }); + + const result = await encodeAndFormatDocuments( + req, + files, + { provider: Providers.OPENAI, useResponsesApi: true }, + mockStrategyFunctions, + ); + + expect(peak).toBe(3); + expect(result.documents).toHaveLength(6); + expect(result.files).toHaveLength(6); + }); + }); }); diff --git a/packages/api/src/files/encode/document.ts b/packages/api/src/files/encode/document.ts index 9f595fee1e..e83abe30db 100644 --- a/packages/api/src/files/encode/document.ts +++ b/packages/api/src/files/encode/document.ts @@ -15,6 +15,7 @@ import type { } from '~/types'; import { validatePdf, validateBedrockDocument } from '~/files/validation'; import { getFileStream, getConfiguredFileSizeLimit } from './utils'; +import { runGuardedEncode } from './memoryGuard'; const ANTHROPIC_CITATION_TYPES = new Set([ 'application/pdf', @@ -140,9 +141,11 @@ export async function encodeAndFormatDocuments( const configuredFileSizeLimit = getConfiguredFileSizeLimit(req, { provider, endpoint }); const results = await Promise.allSettled( - processableFiles.map((file) => { - return getFileStream(req, file, encodingMethods, getStrategyFunctions); - }), + processableFiles.map((file) => + runGuardedEncode(file.bytes ?? 0, () => + getFileStream(req, file, encodingMethods, getStrategyFunctions), + ), + ), ); for (const settledResult of results) { diff --git a/packages/api/src/files/encode/index.ts b/packages/api/src/files/encode/index.ts index a0708596f3..ca4949cbf0 100644 --- a/packages/api/src/files/encode/index.ts +++ b/packages/api/src/files/encode/index.ts @@ -1,3 +1,4 @@ export * from './audio'; export * from './document'; export * from './video'; +export * from './memoryGuard'; diff --git a/packages/api/src/files/encode/memoryGuard.spec.ts b/packages/api/src/files/encode/memoryGuard.spec.ts new file mode 100644 index 0000000000..68b673f25e --- /dev/null +++ b/packages/api/src/files/encode/memoryGuard.spec.ts @@ -0,0 +1,149 @@ +import { InFlightBytesSemaphore, runGuardedEncode } from './memoryGuard'; + +const tick = (): Promise => new Promise((resolve) => setImmediate(resolve)); + +function deferred(): { + promise: Promise; + resolve: (value: T) => void; + reject: (error: unknown) => void; +} { + let resolve!: (value: T) => void; + let reject!: (error: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + +describe('memoryGuard', () => { + describe('runGuardedEncode concurrency cap', () => { + it('never runs more than 3 tasks at once and completes all', async () => { + let active = 0; + let peak = 0; + const gates = [deferred(), deferred(), deferred(), deferred(), deferred()]; + + const calls = gates.map((gate, i) => + runGuardedEncode(1, async () => { + active++; + peak = Math.max(peak, active); + await gate.promise; + active--; + return i; + }), + ); + + await tick(); + expect(active).toBe(3); + + for (const gate of gates) { + gate.resolve(); + await tick(); + } + + await expect(Promise.all(calls)).resolves.toEqual([0, 1, 2, 3, 4]); + expect(peak).toBe(3); + }); + }); + + describe('runGuardedEncode release on throw', () => { + it('frees the slot when a task rejects', async () => { + await expect( + runGuardedEncode(1, async () => { + throw new Error('boom'); + }), + ).rejects.toThrow('boom'); + + await expect(runGuardedEncode(1, async () => 'ok')).resolves.toBe('ok'); + }); + }); + + describe('InFlightBytesSemaphore', () => { + it('does not admit a later reservation ahead of an earlier queued one that still fits', async () => { + const sem = new InFlightBytesSemaphore(100); + const order: string[] = []; + + const releaseActive = await sem.acquire(60); + + void sem.acquire(60).then(() => order.push('first')); + void sem.acquire(10).then(() => order.push('second')); + + await tick(); + expect(order).toEqual([]); + + releaseActive(); + await tick(); + expect(order).toEqual(['first', 'second']); + }); + + it('admits queued waiters in FIFO order as budget frees', async () => { + const sem = new InFlightBytesSemaphore(100); + const order: string[] = []; + const releases: Record void> = {}; + + const releaseA = await sem.acquire(100); + + for (const label of ['B', 'C', 'D']) { + void sem.acquire(60).then((release) => { + order.push(label); + releases[label] = release; + }); + } + + await tick(); + expect(order).toEqual([]); + + releaseA(); + await tick(); + expect(order).toEqual(['B']); + + releases['B'](); + await tick(); + expect(order).toEqual(['B', 'C']); + + releases['C'](); + await tick(); + expect(order).toEqual(['B', 'C', 'D']); + }); + + it('runs an oversize reservation alone without deadlocking', async () => { + const sem = new InFlightBytesSemaphore(100); + + const releaseOversize = await sem.acquire(500); + + let smallAdmitted = false; + void sem.acquire(10).then(() => { + smallAdmitted = true; + }); + + await tick(); + expect(smallAdmitted).toBe(false); + + releaseOversize(); + await tick(); + expect(smallAdmitted).toBe(true); + }); + + it('release is idempotent', async () => { + const sem = new InFlightBytesSemaphore(100); + + const releaseFirst = await sem.acquire(100); + releaseFirst(); + releaseFirst(); + + const releaseSecond = await sem.acquire(100); + + let thirdAdmitted = false; + void sem.acquire(100).then(() => { + thirdAdmitted = true; + }); + + await tick(); + expect(thirdAdmitted).toBe(false); + + releaseSecond(); + await tick(); + expect(thirdAdmitted).toBe(true); + }); + }); +}); diff --git a/packages/api/src/files/encode/memoryGuard.ts b/packages/api/src/files/encode/memoryGuard.ts new file mode 100644 index 0000000000..227a6d4544 --- /dev/null +++ b/packages/api/src/files/encode/memoryGuard.ts @@ -0,0 +1,65 @@ +import { createConcurrencyLimiter } from '~/utils/promise'; + +const ENCODE_CONCURRENCY = 3; +const ENCODE_INFLIGHT_BYTES_MAX = 256 * 1024 * 1024; + +export class InFlightBytesSemaphore { + private inFlight = 0; + private readonly queue: Array<{ bytes: number; resolve: () => void }> = []; + + constructor(private readonly ceiling: number) {} + + acquire(bytes: number): Promise<() => void> { + const reserve = Math.max(0, bytes || 0); + + if (this.queue.length === 0 && this.canAdmit(reserve)) { + return Promise.resolve(this.grant(reserve)); + } + + return new Promise<() => void>((resolve) => { + this.queue.push({ bytes: reserve, resolve: () => resolve(this.grant(reserve)) }); + }); + } + + private canAdmit(reserve: number): boolean { + return this.inFlight === 0 || this.inFlight + reserve <= this.ceiling; + } + + private grant(reserve: number): () => void { + this.inFlight += reserve; + let released = false; + return () => { + if (released) { + return; + } + released = true; + this.inFlight -= reserve; + this.drain(); + }; + } + + private drain(): void { + while (this.queue.length > 0) { + const head = this.queue[0]; + if (!this.canAdmit(head.bytes)) { + break; + } + this.queue.shift(); + head.resolve(); + } + } +} + +const encodeLimit = createConcurrencyLimiter(ENCODE_CONCURRENCY); +const inFlightBytes = new InFlightBytesSemaphore(ENCODE_INFLIGHT_BYTES_MAX); + +export function runGuardedEncode(estimatedBytes: number, task: () => Promise): Promise { + return encodeLimit(async () => { + const release = await inFlightBytes.acquire(estimatedBytes); + try { + return await task(); + } finally { + release(); + } + }); +} diff --git a/packages/api/src/files/encode/utils.ts b/packages/api/src/files/encode/utils.ts index 9afa627cc7..899f3592a2 100644 --- a/packages/api/src/files/encode/utils.ts +++ b/packages/api/src/files/encode/utils.ts @@ -56,11 +56,13 @@ export async function getFileStream( const { getDownloadStream } = encodingMethods[source]; const stream = await getDownloadStream(req, file.filepath); - const buffer = await getStream.buffer(stream); + let buffer: Buffer | null = await getStream.buffer(stream); + const content = buffer.toString('base64'); + buffer = null; return { file, - content: buffer.toString('base64'), + content, metadata: { file_id: file.file_id, temp_file_id: file.temp_file_id, diff --git a/packages/api/src/files/encode/video.ts b/packages/api/src/files/encode/video.ts index b0d9bb8c2d..26c8d007d5 100644 --- a/packages/api/src/files/encode/video.ts +++ b/packages/api/src/files/encode/video.ts @@ -4,6 +4,7 @@ import type { IMongoFile } from '@librechat/data-schemas'; import type { ServerRequest, StrategyFunctions, VideoResult } from '~/types'; import { getFileStream, getConfiguredFileSizeLimit } from './utils'; import { validateVideo } from '~/files/validation'; +import { runGuardedEncode } from './memoryGuard'; /** * Encodes and formats video files for different providers @@ -30,7 +31,11 @@ export async function encodeAndFormatVideos( const result: VideoResult = { videos: [], files: [] }; const results = await Promise.allSettled( - files.map((file) => getFileStream(req, file, encodingMethods, getStrategyFunctions)), + files.map((file) => + runGuardedEncode(file.bytes ?? 0, () => + getFileStream(req, file, encodingMethods, getStrategyFunctions), + ), + ), ); for (const settledResult of results) {