fix: bound peak memory of concurrent base64 attachment encoding (#14023)

* fix: bound peak memory of concurrent base64 attachment encoding

* chore: sort encode imports

---------

Co-authored-by: Danny Avila <danny@librechat.ai>
This commit is contained in:
matt burnett 2026-07-01 05:22:16 -07:00 committed by GitHub
parent 38ab4add3d
commit b20abb2593
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 395 additions and 13 deletions

View file

@ -1,6 +1,6 @@
const axios = require('axios');
const { logger } = require('@librechat/data-schemas');
const { logAxiosError, validateImage } = require('@librechat/api');
const { logAxiosError, validateImage, runGuardedEncode } = require('@librechat/api');
const {
FileSources,
VisionModes,
@ -136,9 +136,12 @@ async function encodeAndFormat(req, files, params, mode) {
if (blobStorageSources.has(source)) {
try {
const downloadStream = encodingMethods[source].getDownloadStream;
let stream = await downloadStream(req, file.filepath);
let base64Data = await streamToBase64(stream);
stream = null;
let base64Data = await runGuardedEncode(file.bytes ?? 0, async () => {
let stream = await downloadStream(req, file.filepath);
const data = await streamToBase64(stream);
stream = null;
return data;
});
promises.push([file, base64Data]);
base64Data = null;
continue;
@ -146,8 +149,11 @@ async function encodeAndFormat(req, files, params, mode) {
logger.error('Error processing image from blob storage:', error);
}
} else if (source !== FileSources.local && base64Only.has(effectiveEndpoint)) {
const [_file, imageURL] = await preparePayload(req, file);
promises.push([_file, await fetchImageToBase64(imageURL)]);
const entry = await runGuardedEncode(file.bytes ?? 0, async () => {
const [_file, imageURL] = await preparePayload(req, file);
return [_file, await fetchImageToBase64(imageURL)];
});
promises.push(entry);
continue;
}
promises.push(preparePayload(req, file));

View file

@ -0,0 +1,111 @@
const { Readable } = require('stream');
const mockRunGuardedEncode = jest.fn((_bytes, task) => task());
jest.mock('axios');
jest.mock('@librechat/api', () => ({
logAxiosError: jest.fn(({ message }) => message),
validateImage: jest.fn().mockResolvedValue({ isValid: true }),
runGuardedEncode: (...args) => mockRunGuardedEncode(...args),
}));
jest.mock('@librechat/data-schemas', () => ({
logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn(), debug: jest.fn() },
}));
const mockPrepareImagePayload = jest.fn();
const mockGetDownloadStream = jest.fn();
jest.mock('~/server/services/Files/strategies', () => ({
getStrategyFunctions: jest.fn(() => ({
prepareImagePayload: mockPrepareImagePayload,
getDownloadStream: mockGetDownloadStream,
})),
}));
const axios = require('axios');
const { FileSources } = require('librechat-data-provider');
const { encodeAndFormat } = require('./encode');
const makeReq = () => ({ body: {}, config: {} });
beforeEach(() => {
jest.clearAllMocks();
mockRunGuardedEncode.mockImplementation((_bytes, task) => task());
});
describe('encodeAndFormat - request memory guard', () => {
it('gates blob-storage byte pulls and returns [file, base64]', async () => {
mockGetDownloadStream.mockResolvedValue(Readable.from([Buffer.from('blob-image-bytes')]));
const file = {
source: FileSources.s3,
height: 10,
width: 10,
type: 'image/png',
file_id: 'f-blob',
filepath: 'bucket/a.png',
filename: 'a.png',
bytes: 4321,
};
const result = await encodeAndFormat(makeReq(), [file], { endpoint: 'openai' });
expect(mockRunGuardedEncode).toHaveBeenCalledTimes(1);
expect(mockRunGuardedEncode.mock.calls[0][0]).toBe(4321);
const expectedBase64 = Buffer.from('blob-image-bytes').toString('base64');
expect(result.image_urls).toHaveLength(1);
expect(result.image_urls[0].image_url.url).toBe(`data:image/png;base64,${expectedBase64}`);
});
it('gates base64Only URL fetches and returns [file, base64]', async () => {
mockPrepareImagePayload.mockResolvedValue([
{ source: FileSources.vectordb, type: 'image/png' },
'https://images.example/x.png',
]);
axios.get.mockResolvedValue({ data: Buffer.from('url-image-bytes') });
const file = {
source: FileSources.vectordb,
height: 10,
width: 10,
type: 'image/png',
file_id: 'f-url',
filepath: 'remote/x.png',
filename: 'x.png',
bytes: 9876,
};
const result = await encodeAndFormat(makeReq(), [file], { endpoint: 'anthropic' });
expect(mockRunGuardedEncode).toHaveBeenCalledTimes(1);
expect(mockRunGuardedEncode.mock.calls[0][0]).toBe(9876);
const expectedBase64 = Buffer.from('url-image-bytes').toString('base64');
expect(result.image_urls).toHaveLength(1);
expect(result.image_urls[0].source.data).toBe(expectedBase64);
});
it('does not gate the non-buffering local prepare path', async () => {
const localBase64 = Buffer.from('local-image').toString('base64');
mockPrepareImagePayload.mockResolvedValue([
{ source: FileSources.local, type: 'image/png' },
localBase64,
]);
const file = {
source: FileSources.local,
height: 10,
width: 10,
type: 'image/png',
file_id: 'f-local',
filepath: 'local/p.png',
filename: 'p.png',
bytes: 555,
};
const result = await encodeAndFormat(makeReq(), [file], { endpoint: 'openai' });
expect(mockRunGuardedEncode).not.toHaveBeenCalled();
expect(result.image_urls).toHaveLength(1);
expect(result.image_urls[0].image_url.url).toBe(`data:image/png;base64,${localBase64}`);
});
});

View file

@ -4,6 +4,7 @@ import type { IMongoFile } from '@librechat/data-schemas';
import type { ServerRequest, StrategyFunctions, AudioResult } from '~/types';
import { getFileStream, getConfiguredFileSizeLimit } from './utils';
import { validateAudio } from '~/files/validation';
import { runGuardedEncode } from './memoryGuard';
/**
* Encodes and formats audio files for different providers
@ -30,7 +31,11 @@ export async function encodeAndFormatAudios(
const result: AudioResult = { audios: [], files: [] };
const results = await Promise.allSettled(
files.map((file) => getFileStream(req, file, encodingMethods, getStrategyFunctions)),
files.map((file) =>
runGuardedEncode(file.bytes ?? 0, () =>
getFileStream(req, file, encodingMethods, getStrategyFunctions),
),
),
);
for (const settledResult of results) {

View file

@ -1013,4 +1013,39 @@ describe('encodeAndFormatDocuments - fileConfig integration', () => {
expect(result.files).toHaveLength(0);
});
});
describe('concurrency guard', () => {
it('bounds parallel getFileStream calls and returns all documents unchanged', async () => {
const req = createMockRequest(50) as ServerRequest;
const files = Array.from({ length: 6 }, (_, i) =>
createMockDocFile(1, 'text/plain', `doc-${i}.txt`),
);
let active = 0;
let peak = 0;
mockedGetFileStream.mockImplementation(async (_req, file) => {
active++;
peak = Math.max(peak, active);
await new Promise((resolve) => setImmediate(resolve));
await new Promise((resolve) => setImmediate(resolve));
active--;
return {
file,
content: Buffer.from(`content-${file.filename}`).toString('base64'),
metadata: file,
};
});
const result = await encodeAndFormatDocuments(
req,
files,
{ provider: Providers.OPENAI, useResponsesApi: true },
mockStrategyFunctions,
);
expect(peak).toBe(3);
expect(result.documents).toHaveLength(6);
expect(result.files).toHaveLength(6);
});
});
});

View file

@ -15,6 +15,7 @@ import type {
} from '~/types';
import { validatePdf, validateBedrockDocument } from '~/files/validation';
import { getFileStream, getConfiguredFileSizeLimit } from './utils';
import { runGuardedEncode } from './memoryGuard';
const ANTHROPIC_CITATION_TYPES = new Set([
'application/pdf',
@ -140,9 +141,11 @@ export async function encodeAndFormatDocuments(
const configuredFileSizeLimit = getConfiguredFileSizeLimit(req, { provider, endpoint });
const results = await Promise.allSettled(
processableFiles.map((file) => {
return getFileStream(req, file, encodingMethods, getStrategyFunctions);
}),
processableFiles.map((file) =>
runGuardedEncode(file.bytes ?? 0, () =>
getFileStream(req, file, encodingMethods, getStrategyFunctions),
),
),
);
for (const settledResult of results) {

View file

@ -1,3 +1,4 @@
export * from './audio';
export * from './document';
export * from './video';
export * from './memoryGuard';

View file

@ -0,0 +1,149 @@
import { InFlightBytesSemaphore, runGuardedEncode } from './memoryGuard';
const tick = (): Promise<void> => new Promise((resolve) => setImmediate(resolve));
function deferred<T = void>(): {
promise: Promise<T>;
resolve: (value: T) => void;
reject: (error: unknown) => void;
} {
let resolve!: (value: T) => void;
let reject!: (error: unknown) => void;
const promise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
return { promise, resolve, reject };
}
describe('memoryGuard', () => {
describe('runGuardedEncode concurrency cap', () => {
it('never runs more than 3 tasks at once and completes all', async () => {
let active = 0;
let peak = 0;
const gates = [deferred(), deferred(), deferred(), deferred(), deferred()];
const calls = gates.map((gate, i) =>
runGuardedEncode(1, async () => {
active++;
peak = Math.max(peak, active);
await gate.promise;
active--;
return i;
}),
);
await tick();
expect(active).toBe(3);
for (const gate of gates) {
gate.resolve();
await tick();
}
await expect(Promise.all(calls)).resolves.toEqual([0, 1, 2, 3, 4]);
expect(peak).toBe(3);
});
});
describe('runGuardedEncode release on throw', () => {
it('frees the slot when a task rejects', async () => {
await expect(
runGuardedEncode(1, async () => {
throw new Error('boom');
}),
).rejects.toThrow('boom');
await expect(runGuardedEncode(1, async () => 'ok')).resolves.toBe('ok');
});
});
describe('InFlightBytesSemaphore', () => {
it('does not admit a later reservation ahead of an earlier queued one that still fits', async () => {
const sem = new InFlightBytesSemaphore(100);
const order: string[] = [];
const releaseActive = await sem.acquire(60);
void sem.acquire(60).then(() => order.push('first'));
void sem.acquire(10).then(() => order.push('second'));
await tick();
expect(order).toEqual([]);
releaseActive();
await tick();
expect(order).toEqual(['first', 'second']);
});
it('admits queued waiters in FIFO order as budget frees', async () => {
const sem = new InFlightBytesSemaphore(100);
const order: string[] = [];
const releases: Record<string, () => void> = {};
const releaseA = await sem.acquire(100);
for (const label of ['B', 'C', 'D']) {
void sem.acquire(60).then((release) => {
order.push(label);
releases[label] = release;
});
}
await tick();
expect(order).toEqual([]);
releaseA();
await tick();
expect(order).toEqual(['B']);
releases['B']();
await tick();
expect(order).toEqual(['B', 'C']);
releases['C']();
await tick();
expect(order).toEqual(['B', 'C', 'D']);
});
it('runs an oversize reservation alone without deadlocking', async () => {
const sem = new InFlightBytesSemaphore(100);
const releaseOversize = await sem.acquire(500);
let smallAdmitted = false;
void sem.acquire(10).then(() => {
smallAdmitted = true;
});
await tick();
expect(smallAdmitted).toBe(false);
releaseOversize();
await tick();
expect(smallAdmitted).toBe(true);
});
it('release is idempotent', async () => {
const sem = new InFlightBytesSemaphore(100);
const releaseFirst = await sem.acquire(100);
releaseFirst();
releaseFirst();
const releaseSecond = await sem.acquire(100);
let thirdAdmitted = false;
void sem.acquire(100).then(() => {
thirdAdmitted = true;
});
await tick();
expect(thirdAdmitted).toBe(false);
releaseSecond();
await tick();
expect(thirdAdmitted).toBe(true);
});
});
});

View file

@ -0,0 +1,65 @@
import { createConcurrencyLimiter } from '~/utils/promise';
const ENCODE_CONCURRENCY = 3;
const ENCODE_INFLIGHT_BYTES_MAX = 256 * 1024 * 1024;
export class InFlightBytesSemaphore {
private inFlight = 0;
private readonly queue: Array<{ bytes: number; resolve: () => void }> = [];
constructor(private readonly ceiling: number) {}
acquire(bytes: number): Promise<() => void> {
const reserve = Math.max(0, bytes || 0);
if (this.queue.length === 0 && this.canAdmit(reserve)) {
return Promise.resolve(this.grant(reserve));
}
return new Promise<() => void>((resolve) => {
this.queue.push({ bytes: reserve, resolve: () => resolve(this.grant(reserve)) });
});
}
private canAdmit(reserve: number): boolean {
return this.inFlight === 0 || this.inFlight + reserve <= this.ceiling;
}
private grant(reserve: number): () => void {
this.inFlight += reserve;
let released = false;
return () => {
if (released) {
return;
}
released = true;
this.inFlight -= reserve;
this.drain();
};
}
private drain(): void {
while (this.queue.length > 0) {
const head = this.queue[0];
if (!this.canAdmit(head.bytes)) {
break;
}
this.queue.shift();
head.resolve();
}
}
}
const encodeLimit = createConcurrencyLimiter(ENCODE_CONCURRENCY);
const inFlightBytes = new InFlightBytesSemaphore(ENCODE_INFLIGHT_BYTES_MAX);
export function runGuardedEncode<T>(estimatedBytes: number, task: () => Promise<T>): Promise<T> {
return encodeLimit(async () => {
const release = await inFlightBytes.acquire(estimatedBytes);
try {
return await task();
} finally {
release();
}
});
}

View file

@ -56,11 +56,13 @@ export async function getFileStream(
const { getDownloadStream } = encodingMethods[source];
const stream = await getDownloadStream(req, file.filepath);
const buffer = await getStream.buffer(stream);
let buffer: Buffer | null = await getStream.buffer(stream);
const content = buffer.toString('base64');
buffer = null;
return {
file,
content: buffer.toString('base64'),
content,
metadata: {
file_id: file.file_id,
temp_file_id: file.temp_file_id,

View file

@ -4,6 +4,7 @@ import type { IMongoFile } from '@librechat/data-schemas';
import type { ServerRequest, StrategyFunctions, VideoResult } from '~/types';
import { getFileStream, getConfiguredFileSizeLimit } from './utils';
import { validateVideo } from '~/files/validation';
import { runGuardedEncode } from './memoryGuard';
/**
* Encodes and formats video files for different providers
@ -30,7 +31,11 @@ export async function encodeAndFormatVideos(
const result: VideoResult = { videos: [], files: [] };
const results = await Promise.allSettled(
files.map((file) => getFileStream(req, file, encodingMethods, getStrategyFunctions)),
files.map((file) =>
runGuardedEncode(file.bytes ?? 0, () =>
getFileStream(req, file, encodingMethods, getStrategyFunctions),
),
),
);
for (const settledResult of results) {