mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-07-02 12:22:22 +00:00
The host advertises serverResources, and the ext-apps bridge treats resources/read, resources/list, and resources/templates/list as one proxied set. Only the first two were wired, so an app that sent resources/templates/list received a method-not-found. Register an onlistresourcetemplates handler backed by a new MCPManager.listResourceTemplates and a /api/mcp/resources/templates/list route, mirroring the existing resources/list path. Tool listing is left out deliberately: the App Bridge has no app-to-host tools/list request, and serverTools covers only tool calls. Make app follow-up requests fail closed when scoped config resolution errors. resolveConfigServers gains an opt-in throwOnError so the app path rejects instead of degrading to an empty set, which previously let a transient failure fall back to the base config for the same server name and proxy the iframe request to the wrong server.
1070 lines
36 KiB
JavaScript
1070 lines
36 KiB
JavaScript
const { tool } = require('@librechat/agents/langchain/tools');
|
|
const { logger, getTenantId } = require('@librechat/data-schemas');
|
|
const { Providers, Constants: AgentConstants } = require('@librechat/agents');
|
|
const {
|
|
sendEvent,
|
|
PENDING_STALE_MS,
|
|
MCPOAuthHandler,
|
|
isMCPDomainAllowed,
|
|
normalizeServerName,
|
|
normalizeJsonSchema,
|
|
GenerationJobManager,
|
|
resolveJsonSchemaRefs,
|
|
sanitizeGeminiSchema,
|
|
buildMCPAuthStepId,
|
|
buildMCPAuthToolCall,
|
|
processMCPEnv,
|
|
buildMCPAuthRunStepEvent,
|
|
buildMCPAuthRunStepDeltaEvent,
|
|
buildMCPAuthRunStepEndDeltaEvent,
|
|
isUserSourced,
|
|
checkAccessWithRequestCache,
|
|
requiresEphemeralUserConnection,
|
|
containsGraphTokenPlaceholder,
|
|
} = require('@librechat/api');
|
|
const {
|
|
Time,
|
|
CacheKeys,
|
|
Constants,
|
|
Permissions,
|
|
PermissionTypes,
|
|
isAssistantsEndpoint,
|
|
} = require('librechat-data-provider');
|
|
const {
|
|
getOAuthReconnectionManager,
|
|
getMCPServersRegistry,
|
|
getFlowStateManager,
|
|
getMCPManager,
|
|
} = require('~/config');
|
|
const db = require('~/models');
|
|
const { findToken, createToken, updateToken, deleteTokens } = db;
|
|
const { getGraphApiToken } = require('./GraphTokenService');
|
|
const { exchangeOboToken } = require('./OboTokenService');
|
|
const { createOboTrustChecker } = require('./OboPolicyService');
|
|
const { reinitMCPServer } = require('./Tools/mcp');
|
|
const { getAppConfig } = require('./Config');
|
|
const { getLogStores } = require('~/cache');
|
|
|
|
const MAX_CACHE_SIZE = 1000;
|
|
const lastReconnectAttempts = new Map();
|
|
const RECONNECT_THROTTLE_MS = 10_000;
|
|
|
|
const missingToolCache = new Map();
|
|
const MISSING_TOOL_TTL_MS = 10_000;
|
|
|
|
async function userCanUseMCPServers(user, req) {
|
|
if (!user?.id || !user?.role) {
|
|
return false;
|
|
}
|
|
|
|
try {
|
|
return await checkAccessWithRequestCache({
|
|
req,
|
|
user,
|
|
permissionType: PermissionTypes.MCP_SERVERS,
|
|
permissions: [Permissions.USE],
|
|
getRoleByName: db.getRoleByName,
|
|
});
|
|
} catch (error) {
|
|
logger.error(`[MCP][User: ${user.id}] Failed MCP permission check`, error);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
function createMCPPermissionContext(req) {
|
|
return {
|
|
canUseServers: (user = req?.user) => userCanUseMCPServers(user, req),
|
|
};
|
|
}
|
|
|
|
function evictStale(map, ttl) {
|
|
if (map.size <= MAX_CACHE_SIZE) {
|
|
return;
|
|
}
|
|
const now = Date.now();
|
|
for (const [key, timestamp] of map) {
|
|
if (now - timestamp >= ttl) {
|
|
map.delete(key);
|
|
}
|
|
if (map.size <= MAX_CACHE_SIZE) {
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
const unavailableMsg =
|
|
"This tool's MCP server is temporarily unavailable. Please try again shortly.";
|
|
|
|
function getOAuthFlowId(userId, serverName, tenantId = getTenantId()) {
|
|
if (!tenantId) {
|
|
return MCPOAuthHandler.generateFlowId(userId, serverName);
|
|
}
|
|
return MCPOAuthHandler.generateFlowId(userId, serverName, tenantId);
|
|
}
|
|
|
|
async function getAppConfigForRequest(req) {
|
|
const user = req?.user;
|
|
return await getAppConfigForUser(user?.id, user);
|
|
}
|
|
|
|
async function getAppConfigForUser(userId, user) {
|
|
return await getAppConfig({ role: user?.role, tenantId: getTenantId(), userId });
|
|
}
|
|
|
|
/**
|
|
* Resolves config-source MCP servers from admin Config overrides for the current
|
|
* request context. Returns the parsed configs keyed by server name.
|
|
* @param {import('express').Request} req - Express request with user context
|
|
* @param {{ throwOnError?: boolean }} [options] - When throwOnError is set, a resolution failure
|
|
* rejects instead of degrading to an empty set. Callers that route a request to a specific
|
|
* server (app follow-up requests) must fail closed so a transient error cannot silently fall
|
|
* back to the base config for the same server name.
|
|
* @returns {Promise<Record<string, import('@librechat/api').ParsedServerConfig>>}
|
|
*/
|
|
async function resolveConfigServers(req, { throwOnError = false } = {}) {
|
|
try {
|
|
const registry = getMCPServersRegistry();
|
|
const appConfig = await getAppConfigForRequest(req);
|
|
return await registry.ensureConfigServers(appConfig?.mcpConfig || {});
|
|
} catch (error) {
|
|
if (throwOnError) {
|
|
throw error;
|
|
}
|
|
logger.warn(
|
|
'[resolveConfigServers] Failed to resolve config servers, degrading to empty:',
|
|
error,
|
|
);
|
|
return {};
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Resolves operator-managed MCP server names from admin Config overrides for the current request.
|
|
* Returns a request-time snapshot for DB server creation, not a cross-process lock.
|
|
* @throws Propagates app config lookup errors to keep DB server creation fail-closed.
|
|
* @param {import('express').Request} req - Express request with user context
|
|
* @returns {Promise<string[]>}
|
|
*/
|
|
async function resolveMcpConfigNames(req) {
|
|
const appConfig = await getAppConfigForRequest(req);
|
|
return Object.keys(appConfig?.mcpConfig || {});
|
|
}
|
|
|
|
/**
|
|
* Resolves config-source servers and merges all server configs (YAML + config + user DB)
|
|
* for the given user context. Shared helper for controllers needing the full merged config.
|
|
* @param {string} userId
|
|
* @param {{ id?: string, role?: string }} [user]
|
|
* @returns {Promise<Record<string, import('@librechat/api').ParsedServerConfig>>}
|
|
*/
|
|
async function resolveAllMcpConfigs(userId, user) {
|
|
const registry = getMCPServersRegistry();
|
|
const appConfig = await getAppConfigForUser(userId, user);
|
|
let configServers = {};
|
|
try {
|
|
configServers = await registry.ensureConfigServers(appConfig?.mcpConfig || {});
|
|
} catch (error) {
|
|
logger.warn(
|
|
'[resolveAllMcpConfigs] Config server resolution failed, continuing without:',
|
|
error,
|
|
);
|
|
}
|
|
if (user?.role) {
|
|
return await registry.getAllServerConfigs(userId, configServers, user.role);
|
|
}
|
|
|
|
return await registry.getAllServerConfigs(userId, configServers);
|
|
}
|
|
|
|
function getServerCustomUserVars(userMCPAuthMap, serverName) {
|
|
return userMCPAuthMap?.[`${Constants.mcp_prefix}${serverName}`];
|
|
}
|
|
|
|
/**
|
|
* Best-effort early gate; the authoritative check is
|
|
* `assertResolvedRuntimeConfigAllowed` in `@librechat/api`, whose resolution
|
|
* this must mirror. Graph placeholders resolve later (async), so a URL still
|
|
* carrying one defers to the authoritative check instead of rejecting here.
|
|
*/
|
|
async function isEarlyDomainAllowed({
|
|
serverConfig,
|
|
user,
|
|
requestBody,
|
|
userMCPAuthMap,
|
|
serverName,
|
|
allowedDomains,
|
|
allowedAddresses,
|
|
}) {
|
|
const validationConfig = processMCPEnv({
|
|
user,
|
|
body: requestBody,
|
|
dbSourced: isUserSourced(serverConfig),
|
|
options: serverConfig,
|
|
customUserVars: getServerCustomUserVars(userMCPAuthMap, serverName),
|
|
});
|
|
if (
|
|
typeof validationConfig?.url === 'string' &&
|
|
containsGraphTokenPlaceholder(validationConfig.url)
|
|
) {
|
|
return true;
|
|
}
|
|
return await isMCPDomainAllowed(validationConfig, allowedDomains, allowedAddresses);
|
|
}
|
|
|
|
/**
|
|
* @param {string} toolName
|
|
* @param {string} serverName
|
|
*/
|
|
function createUnavailableToolStub(toolName, serverName) {
|
|
const normalizedToolKey = `${toolName}${Constants.mcp_delimiter}${normalizeServerName(serverName)}`;
|
|
const _call = async () => [unavailableMsg, null];
|
|
const toolInstance = tool(_call, {
|
|
schema: {
|
|
type: 'object',
|
|
properties: {
|
|
input: { type: 'string', description: 'Input for the tool' },
|
|
},
|
|
required: [],
|
|
},
|
|
name: normalizedToolKey,
|
|
description: unavailableMsg,
|
|
responseFormat: AgentConstants.CONTENT_AND_ARTIFACT,
|
|
});
|
|
toolInstance.mcp = true;
|
|
toolInstance.mcpRawServerName = serverName;
|
|
return toolInstance;
|
|
}
|
|
|
|
function isEmptyObjectSchema(jsonSchema) {
|
|
return (
|
|
jsonSchema != null &&
|
|
typeof jsonSchema === 'object' &&
|
|
jsonSchema.type === 'object' &&
|
|
(jsonSchema.properties == null || Object.keys(jsonSchema.properties).length === 0) &&
|
|
!jsonSchema.additionalProperties
|
|
);
|
|
}
|
|
|
|
/**
|
|
* @param {object} params
|
|
* @param {ServerResponse} params.res - The Express response object for sending events.
|
|
* @param {string} params.stepId - The ID of the step in the flow.
|
|
* @param {ToolCallChunk} params.toolCall - The tool call object containing tool information.
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
|
|
*/
|
|
function createRunStepDeltaEmitter({ res, stepId, toolCall, streamId = null }) {
|
|
/**
|
|
* @param {string} authURL - The URL to redirect the user for OAuth authentication.
|
|
* @param {{ expiresAt?: number }} [options]
|
|
* @returns {Promise<void>}
|
|
*/
|
|
return async function (authURL, options) {
|
|
const eventData = buildMCPAuthRunStepDeltaEvent({ authURL, stepId, toolCall, options });
|
|
if (streamId) {
|
|
await GenerationJobManager.emitChunk(streamId, eventData);
|
|
} else {
|
|
sendEvent(res, eventData);
|
|
}
|
|
};
|
|
}
|
|
|
|
/**
|
|
* @param {object} params
|
|
* @param {ServerResponse} params.res - The Express response object for sending events.
|
|
* @param {string} params.runId - The Run ID, i.e. message ID
|
|
* @param {string} params.stepId - The ID of the step in the flow.
|
|
* @param {ToolCallChunk} params.toolCall - The tool call object containing tool information.
|
|
* @param {number} [params.index]
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
|
|
* @returns {() => Promise<void>}
|
|
*/
|
|
function createRunStepEmitter({ res, runId, stepId, toolCall, index, streamId = null }) {
|
|
return async function () {
|
|
const eventData = buildMCPAuthRunStepEvent({ runId, stepId, toolCall, index });
|
|
if (streamId) {
|
|
await GenerationJobManager.emitChunk(streamId, eventData);
|
|
} else {
|
|
sendEvent(res, eventData);
|
|
}
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Creates a function used to ensure the flow handler is only invoked once
|
|
* @param {object} params
|
|
* @param {string} params.flowId - The ID of the login flow.
|
|
* @param {FlowStateManager<any>} params.flowManager - The flow manager instance.
|
|
* @param {(authURL: string, options?: { expiresAt?: number }) => void | Promise<void>} [params.callback]
|
|
*/
|
|
function createOAuthStart({ flowId, flowManager, callback }) {
|
|
/**
|
|
* Creates a function to handle OAuth login requests.
|
|
* @param {string} authURL - The URL to redirect the user for OAuth authentication.
|
|
* @param {{ expiresAt?: number }} [options]
|
|
* @returns {Promise<boolean>} Returns true to indicate the event was sent successfully.
|
|
*/
|
|
return async function (authURL, options) {
|
|
let emitted = false;
|
|
const emitOAuthStart = async (message) => {
|
|
if (options) {
|
|
await callback?.(authURL, options);
|
|
} else {
|
|
await callback?.(authURL);
|
|
}
|
|
emitted = true;
|
|
logger.debug(message);
|
|
};
|
|
|
|
const existingFlow = await flowManager.getFlowState(flowId, 'oauth_login');
|
|
if (existingFlow) {
|
|
await emitOAuthStart('Re-sent OAuth login request to client');
|
|
return true;
|
|
}
|
|
|
|
await flowManager.createFlowWithHandler(flowId, 'oauth_login', async () => {
|
|
await emitOAuthStart('Sent OAuth login request to client');
|
|
return true;
|
|
});
|
|
|
|
if (!emitted) {
|
|
await emitOAuthStart('Re-sent OAuth login request to client');
|
|
}
|
|
|
|
return true;
|
|
};
|
|
}
|
|
|
|
/**
|
|
* @param {object} params
|
|
* @param {ServerResponse} params.res - The Express response object for sending events.
|
|
* @param {string} params.stepId - The ID of the step in the flow.
|
|
* @param {ToolCallChunk} params.toolCall - The tool call object containing tool information.
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
|
|
*/
|
|
function createOAuthEnd({ res, stepId, toolCall, streamId = null }) {
|
|
return async function () {
|
|
const eventData = buildMCPAuthRunStepEndDeltaEvent({ stepId, toolCall });
|
|
if (streamId) {
|
|
await GenerationJobManager.emitChunk(streamId, eventData);
|
|
} else {
|
|
sendEvent(res, eventData);
|
|
}
|
|
logger.debug('Sent OAuth login success to client');
|
|
};
|
|
}
|
|
|
|
/**
|
|
* @param {object} params
|
|
* @param {string} params.userId - The ID of the user.
|
|
* @param {string} params.serverName - The name of the server.
|
|
* @param {string} params.toolName - The name of the tool.
|
|
* @param {string} [params.tenantId] - The tenant ID for the current request.
|
|
* @param {FlowStateManager<any>} params.flowManager - The flow manager instance.
|
|
*/
|
|
function createAbortHandler({ userId, serverName, toolName, tenantId, flowManager }) {
|
|
return function () {
|
|
logger.info(`[MCP][User: ${userId}][${serverName}][${toolName}] Tool call aborted`);
|
|
const flowId = getOAuthFlowId(userId, serverName, tenantId);
|
|
// Clean up both mcp_oauth and mcp_get_tokens flows
|
|
flowManager.failFlow(flowId, 'mcp_oauth', new Error('Tool call aborted'));
|
|
flowManager.failFlow(flowId, 'mcp_get_tokens', new Error('Tool call aborted'));
|
|
};
|
|
}
|
|
|
|
/**
|
|
* @param {Object} params
|
|
* @param {() => Promise<void>} params.runStepEmitter
|
|
* @param {(authURL: string, options?: { expiresAt?: number }) => Promise<void>} params.runStepDeltaEmitter
|
|
* @returns {(authURL: string, options?: { expiresAt?: number }) => Promise<void>}
|
|
*/
|
|
function createOAuthCallback({ runStepEmitter, runStepDeltaEmitter }) {
|
|
return async function (authURL, options) {
|
|
await runStepEmitter();
|
|
await runStepDeltaEmitter(authURL, options);
|
|
};
|
|
}
|
|
|
|
/**
|
|
* @param {Object} params
|
|
* @param {ServerResponse} params.res - The Express response object for sending events.
|
|
* @param {IUser} params.user - The user from the request object.
|
|
* @param {string} params.serverName
|
|
* @param {AbortSignal} params.signal
|
|
* @param {string} params.model
|
|
* @param {number} [params.index]
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
|
|
* @param {Record<string, Record<string, string>>} [params.userMCPAuthMap]
|
|
* @param {import('@librechat/api').RequestScopedMCPConnectionStore} [params.requestScopedConnections]
|
|
* @param {import('@librechat/api').ParsedServerConfig} [params.serverConfig] - Used to bypass reconnect throttling for request-scoped servers.
|
|
* @returns { Promise<Array<typeof tool | { _call: (toolInput: Object | string) => unknown}>> } An object with `_call` method to execute the tool input.
|
|
*/
|
|
async function reconnectServer({
|
|
res,
|
|
user,
|
|
index,
|
|
signal,
|
|
serverName,
|
|
serverConfig,
|
|
configServers,
|
|
userMCPAuthMap,
|
|
requestBody,
|
|
requestScopedConnections,
|
|
streamId = null,
|
|
}) {
|
|
logger.debug(
|
|
`[MCP][reconnectServer] serverName: ${serverName}, user: ${user?.id}, hasUserMCPAuthMap: ${!!userMCPAuthMap}`,
|
|
);
|
|
|
|
// Request-scoped servers reconnect on every message by design; throttling them
|
|
// would stub out healthy tools for messages sent within the throttle window.
|
|
const requestScoped = serverConfig ? requiresEphemeralUserConnection(serverConfig) : false;
|
|
if (!requestScoped) {
|
|
const throttleKey = `${user.id}:${serverName}`;
|
|
const now = Date.now();
|
|
const lastAttempt = lastReconnectAttempts.get(throttleKey) ?? 0;
|
|
if (now - lastAttempt < RECONNECT_THROTTLE_MS) {
|
|
logger.debug(`[MCP][reconnectServer] Throttled reconnect for ${serverName}`);
|
|
return null;
|
|
}
|
|
lastReconnectAttempts.set(throttleKey, now);
|
|
evictStale(lastReconnectAttempts, RECONNECT_THROTTLE_MS);
|
|
}
|
|
|
|
const runId = Constants.USE_PRELIM_RESPONSE_MESSAGE_ID;
|
|
const flowId = `${user.id}:${serverName}:${Date.now()}`;
|
|
const flowManager = getFlowStateManager(getLogStores(CacheKeys.FLOWS));
|
|
const stepId = buildMCPAuthStepId(serverName);
|
|
const toolCall = buildMCPAuthToolCall({
|
|
id: flowId,
|
|
serverName,
|
|
});
|
|
|
|
// Set up abort handler to clean up OAuth flows if request is aborted
|
|
const tenantId = user?.tenantId ?? getTenantId();
|
|
const oauthFlowId = getOAuthFlowId(user.id, serverName, tenantId);
|
|
const abortHandler = () => {
|
|
logger.info(
|
|
`[MCP][User: ${user.id}][${serverName}] Tool loading aborted, cleaning up OAuth flows`,
|
|
);
|
|
// Clean up both mcp_oauth and mcp_get_tokens flows
|
|
flowManager.failFlow(oauthFlowId, 'mcp_oauth', new Error('Tool loading aborted'));
|
|
flowManager.failFlow(oauthFlowId, 'mcp_get_tokens', new Error('Tool loading aborted'));
|
|
};
|
|
|
|
if (signal) {
|
|
signal.addEventListener('abort', abortHandler, { once: true });
|
|
}
|
|
|
|
try {
|
|
const runStepEmitter = createRunStepEmitter({
|
|
res,
|
|
index,
|
|
runId,
|
|
stepId,
|
|
toolCall,
|
|
streamId,
|
|
});
|
|
const runStepDeltaEmitter = createRunStepDeltaEmitter({
|
|
res,
|
|
stepId,
|
|
toolCall,
|
|
streamId,
|
|
});
|
|
const callback = createOAuthCallback({ runStepEmitter, runStepDeltaEmitter });
|
|
const oauthStart = createOAuthStart({
|
|
res,
|
|
flowId,
|
|
callback,
|
|
flowManager,
|
|
});
|
|
return await reinitMCPServer({
|
|
user,
|
|
signal,
|
|
serverName,
|
|
configServers,
|
|
oauthStart,
|
|
flowManager,
|
|
userMCPAuthMap,
|
|
requestBody,
|
|
requestScopedConnections,
|
|
forceNew: true,
|
|
returnOnOAuth: false,
|
|
connectionTimeout: Time.THIRTY_SECONDS,
|
|
});
|
|
} finally {
|
|
// Clean up abort handler to prevent memory leaks
|
|
if (signal) {
|
|
signal.removeEventListener('abort', abortHandler);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Creates all tools from the specified MCP Server via `toolKey`.
|
|
*
|
|
* This function assumes tools could not be aggregated from the cache of tool definitions,
|
|
* i.e. `availableTools`, and will reinitialize the MCP server to ensure all tools are generated.
|
|
*
|
|
* @param {Object} params
|
|
* @param {ServerResponse} params.res - The Express response object for sending events.
|
|
* @param {{ canUseServers: (user?: IUser) => Promise<boolean> }} [params.mcpPermissionContext] - Request-scoped MCP permission context.
|
|
* @param {IUser} params.user - The user from the request object.
|
|
* @param {string} params.serverName
|
|
* @param {string} params.model
|
|
* @param {Providers | EModelEndpoint} params.provider - The provider for the tool.
|
|
* @param {number} [params.index]
|
|
* @param {AbortSignal} [params.signal]
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
|
|
* @param {import('@librechat/api').ParsedServerConfig} [params.config]
|
|
* @param {import('@librechat/api').RequestBody} [params.requestBody]
|
|
* @param {import('@librechat/api').RequestScopedMCPConnectionStore} [params.requestScopedConnections]
|
|
* @param {Record<string, Record<string, string>>} [params.userMCPAuthMap]
|
|
* @returns { Promise<Array<typeof tool | { _call: (toolInput: Object | string) => unknown}>> } An object with `_call` method to execute the tool input.
|
|
*/
|
|
async function createMCPTools({
|
|
res,
|
|
mcpPermissionContext,
|
|
user,
|
|
index,
|
|
signal,
|
|
config,
|
|
provider,
|
|
serverName,
|
|
configServers,
|
|
userMCPAuthMap,
|
|
requestBody,
|
|
requestScopedConnections,
|
|
streamId = null,
|
|
}) {
|
|
const serverConfig =
|
|
config ?? (await getMCPServersRegistry().getServerConfig(serverName, user?.id, configServers));
|
|
|
|
if (serverConfig?.url) {
|
|
const appConfig = await getAppConfig({
|
|
role: user?.role,
|
|
tenantId: user?.tenantId,
|
|
userId: user?.id,
|
|
});
|
|
const allowedDomains = appConfig?.mcpSettings?.allowedDomains;
|
|
const allowedAddresses = appConfig?.mcpSettings?.allowedAddresses;
|
|
const isDomainAllowed = await isEarlyDomainAllowed({
|
|
serverConfig,
|
|
user,
|
|
requestBody,
|
|
userMCPAuthMap,
|
|
serverName,
|
|
allowedDomains,
|
|
allowedAddresses,
|
|
});
|
|
if (!isDomainAllowed) {
|
|
logger.warn(`[MCP][${serverName}] Domain not allowed, skipping all tools`);
|
|
return [];
|
|
}
|
|
}
|
|
|
|
const result = await reconnectServer({
|
|
res,
|
|
user,
|
|
index,
|
|
signal,
|
|
serverName,
|
|
serverConfig,
|
|
configServers,
|
|
userMCPAuthMap,
|
|
requestBody,
|
|
requestScopedConnections,
|
|
streamId,
|
|
});
|
|
if (result === null) {
|
|
logger.debug(`[MCP][${serverName}] Reconnect throttled, skipping tool creation.`);
|
|
return [];
|
|
}
|
|
if (!result || !result.tools) {
|
|
logger.warn(`[MCP][${serverName}] Failed to reinitialize MCP server.`);
|
|
return [];
|
|
}
|
|
|
|
const serverTools = [];
|
|
for (const tool of result.tools) {
|
|
const toolInstance = await createMCPTool({
|
|
res,
|
|
mcpPermissionContext,
|
|
user,
|
|
provider,
|
|
userMCPAuthMap,
|
|
configServers,
|
|
streamId,
|
|
availableTools: result.availableTools,
|
|
toolKey: `${tool.name}${Constants.mcp_delimiter}${serverName}`,
|
|
requestBody,
|
|
requestScopedConnections,
|
|
config: serverConfig,
|
|
});
|
|
if (toolInstance) {
|
|
serverTools.push(toolInstance);
|
|
}
|
|
}
|
|
|
|
return serverTools;
|
|
}
|
|
|
|
/**
|
|
* Creates a single tool from the specified MCP Server via `toolKey`.
|
|
* @param {Object} params
|
|
* @param {ServerResponse} params.res - The Express response object for sending events.
|
|
* @param {{ canUseServers: (user?: IUser) => Promise<boolean> }} [params.mcpPermissionContext] - Request-scoped MCP permission context.
|
|
* @param {IUser} params.user - The user from the request object.
|
|
* @param {string} params.toolKey - The toolKey for the tool.
|
|
* @param {string} params.model - The model for the tool.
|
|
* @param {number} [params.index]
|
|
* @param {AbortSignal} [params.signal]
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
|
|
* @param {Providers | EModelEndpoint} params.provider - The provider for the tool.
|
|
* @param {LCAvailableTools} [params.availableTools]
|
|
* @param {import('@librechat/api').RequestBody} [params.requestBody]
|
|
* @param {import('@librechat/api').RequestScopedMCPConnectionStore} [params.requestScopedConnections]
|
|
* @param {Record<string, Record<string, string>>} [params.userMCPAuthMap]
|
|
* @param {import('@librechat/api').ParsedServerConfig} [params.config]
|
|
* @param {(availableTools: LCAvailableTools) => void} [params.onAvailableTools]
|
|
* @returns { Promise<typeof tool | { _call: (toolInput: Object | string) => unknown}> } An object with `_call` method to execute the tool input.
|
|
*/
|
|
async function createMCPTool({
|
|
res,
|
|
mcpPermissionContext,
|
|
user,
|
|
index,
|
|
signal,
|
|
toolKey,
|
|
provider,
|
|
userMCPAuthMap,
|
|
availableTools,
|
|
requestBody,
|
|
requestScopedConnections,
|
|
config,
|
|
configServers,
|
|
onAvailableTools,
|
|
streamId = null,
|
|
}) {
|
|
const [toolName, serverName] = toolKey.split(Constants.mcp_delimiter);
|
|
|
|
const serverConfig =
|
|
config ?? (await getMCPServersRegistry().getServerConfig(serverName, user?.id, configServers));
|
|
const requestScopedTools = serverConfig ? requiresEphemeralUserConnection(serverConfig) : false;
|
|
const useMissingToolCache = !requestScopedTools;
|
|
|
|
if (serverConfig?.url) {
|
|
const appConfig = await getAppConfig({
|
|
role: user?.role,
|
|
tenantId: user?.tenantId,
|
|
userId: user?.id,
|
|
});
|
|
const allowedDomains = appConfig?.mcpSettings?.allowedDomains;
|
|
const allowedAddresses = appConfig?.mcpSettings?.allowedAddresses;
|
|
const isDomainAllowed = await isEarlyDomainAllowed({
|
|
serverConfig,
|
|
user,
|
|
requestBody,
|
|
userMCPAuthMap,
|
|
serverName,
|
|
allowedDomains,
|
|
allowedAddresses,
|
|
});
|
|
if (!isDomainAllowed) {
|
|
logger.warn(`[MCP][${serverName}] Domain no longer allowed, skipping tool: ${toolName}`);
|
|
return undefined;
|
|
}
|
|
}
|
|
|
|
/** @type {LCTool | undefined} */
|
|
let toolDefinition = availableTools?.[toolKey]?.function;
|
|
if (!toolDefinition) {
|
|
const cachedAt = useMissingToolCache ? missingToolCache.get(toolKey) : undefined;
|
|
if (cachedAt && Date.now() - cachedAt < MISSING_TOOL_TTL_MS) {
|
|
logger.debug(
|
|
`[MCP][${serverName}][${toolName}] Tool in negative cache, returning unavailable stub.`,
|
|
);
|
|
return createUnavailableToolStub(toolName, serverName);
|
|
}
|
|
|
|
logger.warn(
|
|
`[MCP][${serverName}][${toolName}] Requested tool not found in available tools, re-initializing MCP server.`,
|
|
);
|
|
const result = await reconnectServer({
|
|
res,
|
|
user,
|
|
index,
|
|
signal,
|
|
serverName,
|
|
serverConfig,
|
|
configServers,
|
|
userMCPAuthMap,
|
|
requestBody,
|
|
requestScopedConnections,
|
|
streamId,
|
|
});
|
|
if (result?.availableTools) {
|
|
onAvailableTools?.(result.availableTools);
|
|
}
|
|
toolDefinition = result?.availableTools?.[toolKey]?.function;
|
|
|
|
if (!toolDefinition && useMissingToolCache) {
|
|
missingToolCache.set(toolKey, Date.now());
|
|
evictStale(missingToolCache, MISSING_TOOL_TTL_MS);
|
|
}
|
|
}
|
|
|
|
if (!toolDefinition) {
|
|
logger.warn(
|
|
`[MCP][${serverName}][${toolName}] Tool definition not found, returning unavailable stub.`,
|
|
);
|
|
return createUnavailableToolStub(toolName, serverName);
|
|
}
|
|
|
|
return createToolInstance({
|
|
res,
|
|
mcpPermissionContext,
|
|
user,
|
|
requestBody,
|
|
requestScopedConnections,
|
|
provider,
|
|
toolName,
|
|
serverName,
|
|
serverConfig,
|
|
toolDefinition,
|
|
streamId,
|
|
});
|
|
}
|
|
|
|
function createToolInstance({
|
|
res,
|
|
mcpPermissionContext,
|
|
user: capturedUser = null,
|
|
requestBody: capturedRequestBody,
|
|
requestScopedConnections: capturedRequestScopedConnections,
|
|
toolName,
|
|
serverName,
|
|
serverConfig: capturedServerConfig,
|
|
toolDefinition,
|
|
provider: capturedProvider,
|
|
streamId = null,
|
|
}) {
|
|
/** @type {LCTool} */
|
|
const { description, parameters } = toolDefinition;
|
|
const isGoogle = capturedProvider === Providers.VERTEXAI || capturedProvider === Providers.GOOGLE;
|
|
|
|
let schema = parameters ? normalizeJsonSchema(resolveJsonSchemaRefs(parameters)) : null;
|
|
|
|
if (schema && isGoogle) {
|
|
// Gemini/Vertex AI accept only a subset of JSON Schema; sanitize so MCP tools with
|
|
// unions, non-string enums, etc. don't 400 (they work as-is on OpenAI/Claude).
|
|
schema = sanitizeGeminiSchema(schema);
|
|
}
|
|
|
|
if (!schema || (isGoogle && isEmptyObjectSchema(schema))) {
|
|
schema = {
|
|
type: 'object',
|
|
properties: {
|
|
input: { type: 'string', description: 'Input for the tool' },
|
|
},
|
|
required: [],
|
|
};
|
|
}
|
|
|
|
const normalizedToolKey = `${toolName}${Constants.mcp_delimiter}${normalizeServerName(serverName)}`;
|
|
|
|
/** @type {(toolArguments: Object | string, config?: GraphRunnableConfig) => Promise<unknown>} */
|
|
const _call = async (toolArguments, config) => {
|
|
const effectiveUser = config?.configurable?.user ?? capturedUser;
|
|
const permissionUser = effectiveUser;
|
|
const userId = effectiveUser?.id || config?.configurable?.user_id || capturedUser?.id;
|
|
/** @type {ReturnType<typeof createAbortHandler>} */
|
|
let abortHandler = null;
|
|
/** @type {AbortSignal} */
|
|
let derivedSignal = null;
|
|
|
|
try {
|
|
const provider = (config?.metadata?.provider || capturedProvider)?.toLowerCase();
|
|
const canUseMCP = mcpPermissionContext
|
|
? await mcpPermissionContext.canUseServers(permissionUser)
|
|
: await userCanUseMCPServers(permissionUser);
|
|
if (!canUseMCP) {
|
|
throw new Error('Forbidden: Insufficient MCP server permissions');
|
|
}
|
|
const flowsCache = getLogStores(CacheKeys.FLOWS);
|
|
const flowManager = getFlowStateManager(flowsCache);
|
|
derivedSignal = config?.signal ? AbortSignal.any([config.signal]) : undefined;
|
|
const mcpManager = getMCPManager(userId);
|
|
|
|
const { args: _args, stepId, ...toolCall } = config.toolCall ?? {};
|
|
const flowId = `${serverName}:oauth_login:${config.metadata.thread_id}:${config.metadata.run_id}`;
|
|
const runStepDeltaEmitter = createRunStepDeltaEmitter({
|
|
res,
|
|
stepId,
|
|
toolCall,
|
|
streamId,
|
|
});
|
|
const oauthStart = createOAuthStart({
|
|
flowId,
|
|
flowManager,
|
|
callback: runStepDeltaEmitter,
|
|
});
|
|
const oauthEnd = createOAuthEnd({
|
|
res,
|
|
stepId,
|
|
toolCall,
|
|
streamId,
|
|
});
|
|
|
|
if (derivedSignal) {
|
|
const tenantId = config?.configurable?.user?.tenantId ?? getTenantId();
|
|
abortHandler = createAbortHandler({ userId, serverName, toolName, tenantId, flowManager });
|
|
derivedSignal.addEventListener('abort', abortHandler, { once: true });
|
|
}
|
|
|
|
const customUserVars =
|
|
config?.configurable?.userMCPAuthMap?.[`${Constants.mcp_prefix}${serverName}`];
|
|
|
|
const result = await mcpManager.callTool({
|
|
serverName,
|
|
serverConfig: capturedServerConfig,
|
|
toolName,
|
|
provider,
|
|
toolArguments,
|
|
options: {
|
|
signal: derivedSignal,
|
|
},
|
|
user: effectiveUser,
|
|
requestBody: config?.configurable?.requestBody ?? capturedRequestBody,
|
|
requestScopedConnections:
|
|
config?.configurable?.requestScopedConnections ?? capturedRequestScopedConnections,
|
|
customUserVars,
|
|
flowManager,
|
|
tokenMethods: {
|
|
findToken,
|
|
createToken,
|
|
updateToken,
|
|
deleteTokens,
|
|
},
|
|
oauthStart,
|
|
oauthEnd,
|
|
graphTokenResolver: getGraphApiToken,
|
|
oboTokenResolver: exchangeOboToken,
|
|
oboTrustChecker: createOboTrustChecker(),
|
|
});
|
|
|
|
if (isAssistantsEndpoint(provider) && Array.isArray(result)) {
|
|
return result[0];
|
|
}
|
|
return result;
|
|
} catch (error) {
|
|
logger.error(
|
|
`[MCP][${serverName}][${toolName}][User: ${userId}] Error calling MCP tool:`,
|
|
error,
|
|
);
|
|
|
|
/** OAuth error, provide a helpful message */
|
|
const isOAuthError =
|
|
error.message?.includes('401') ||
|
|
error.message?.includes('OAuth') ||
|
|
error.message?.includes('authentication') ||
|
|
error.message?.includes('Non-200 status code (401)');
|
|
|
|
if (isOAuthError) {
|
|
throw new Error(
|
|
`[MCP][${serverName}][${toolName}] OAuth authentication required. Please check the server logs for the authentication URL.`,
|
|
);
|
|
}
|
|
|
|
throw new Error(
|
|
`[MCP][${serverName}][${toolName}] tool call failed${error?.message ? `: ${error?.message}` : '.'}`,
|
|
);
|
|
} finally {
|
|
// Clean up abort handler to prevent memory leaks
|
|
if (abortHandler && derivedSignal) {
|
|
derivedSignal.removeEventListener('abort', abortHandler);
|
|
}
|
|
}
|
|
};
|
|
|
|
const toolInstance = tool(_call, {
|
|
schema,
|
|
name: normalizedToolKey,
|
|
description: description || '',
|
|
responseFormat: AgentConstants.CONTENT_AND_ARTIFACT,
|
|
});
|
|
toolInstance.mcp = true;
|
|
toolInstance.mcpRawServerName = serverName;
|
|
// On Google/Vertex, propagate the union-flattened schema so definitions extracted
|
|
// from this instance don't reach the Gemini converter with unsupported unions.
|
|
toolInstance.mcpJsonSchema = isGoogle ? schema : parameters;
|
|
return toolInstance;
|
|
}
|
|
|
|
/**
|
|
* Get MCP setup data including config, connections, and OAuth servers.
|
|
* Resolves config-source servers from admin Config overrides when tenant context is available.
|
|
* @param {string} userId - The user ID
|
|
* @param {{ role?: string, tenantId?: string }} [options] - Optional role/tenant context
|
|
* @returns {Object} Object containing mcpConfig, appConnections, userConnections, and oauthServers
|
|
*/
|
|
async function getMCPSetupData(userId, options = {}) {
|
|
const registry = getMCPServersRegistry();
|
|
const { role, tenantId } = options;
|
|
|
|
const appConfig = await getAppConfig({ role, tenantId, userId });
|
|
const configServers = await registry.ensureConfigServers(appConfig?.mcpConfig || {});
|
|
const mcpConfig = role
|
|
? await registry.getAllServerConfigs(userId, configServers, role)
|
|
: await registry.getAllServerConfigs(userId, configServers);
|
|
const mcpManager = getMCPManager(userId);
|
|
/** @type {Map<string, import('@librechat/api').MCPConnection>} */
|
|
let appConnections = new Map();
|
|
try {
|
|
// Use getLoaded() instead of getAll() to avoid forcing connection creation.
|
|
// getAll() creates connections for all servers, which is problematic for servers
|
|
// that require user context (e.g., those with {{LIBRECHAT_USER_ID}} placeholders).
|
|
appConnections = (await mcpManager.appConnections?.getLoaded()) || new Map();
|
|
} catch (error) {
|
|
logger.error(`[MCP][User: ${userId}] Error getting app connections:`, error);
|
|
}
|
|
const userConnections = mcpManager.getUserConnections(userId) || new Map();
|
|
const oauthServers = new Set(
|
|
Object.entries(mcpConfig)
|
|
.filter(([, config]) => config.requiresOAuth)
|
|
.map(([name]) => name),
|
|
);
|
|
|
|
return {
|
|
mcpConfig,
|
|
oauthServers,
|
|
appConnections,
|
|
userConnections,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Check OAuth flow status for a user and server
|
|
* @param {string} userId - The user ID
|
|
* @param {string} serverName - The server name
|
|
* @param {string} [tenantId] - The tenant ID for the current request.
|
|
* @returns {Object} Object containing hasActiveFlow and hasFailedFlow flags
|
|
*/
|
|
async function checkOAuthFlowStatus(userId, serverName, tenantId = getTenantId()) {
|
|
const flowsCache = getLogStores(CacheKeys.FLOWS);
|
|
const flowManager = getFlowStateManager(flowsCache);
|
|
const flowId = getOAuthFlowId(userId, serverName, tenantId);
|
|
|
|
try {
|
|
const flowState = await flowManager.getFlowState(flowId, 'mcp_oauth');
|
|
if (!flowState) {
|
|
return { hasActiveFlow: false, hasFailedFlow: false };
|
|
}
|
|
|
|
const flowAge = Date.now() - flowState.createdAt;
|
|
// Report active only while the flow is still usable (the handling/reuse window),
|
|
// not for the full Keyv retention TTL — otherwise the UI shows "connecting" for a
|
|
// flow the initiate/callback paths already reject, hiding the connect button.
|
|
const flowTTL = flowState.ttl || PENDING_STALE_MS;
|
|
|
|
if (flowState.status === 'FAILED' || flowAge > flowTTL) {
|
|
const wasCancelled = flowState.error && flowState.error.includes('cancelled');
|
|
|
|
if (wasCancelled) {
|
|
logger.debug(`[MCP Connection Status] Found cancelled OAuth flow for ${serverName}`, {
|
|
flowId,
|
|
status: flowState.status,
|
|
error: flowState.error,
|
|
});
|
|
return { hasActiveFlow: false, hasFailedFlow: false };
|
|
} else {
|
|
logger.debug(`[MCP Connection Status] Found failed OAuth flow for ${serverName}`, {
|
|
flowId,
|
|
status: flowState.status,
|
|
flowAge,
|
|
flowTTL,
|
|
timedOut: flowAge > flowTTL,
|
|
error: flowState.error,
|
|
});
|
|
return { hasActiveFlow: false, hasFailedFlow: true };
|
|
}
|
|
}
|
|
|
|
if (flowState.status === 'PENDING') {
|
|
logger.debug(`[MCP Connection Status] Found active OAuth flow for ${serverName}`, {
|
|
flowId,
|
|
flowAge,
|
|
flowTTL,
|
|
});
|
|
return { hasActiveFlow: true, hasFailedFlow: false };
|
|
}
|
|
|
|
return { hasActiveFlow: false, hasFailedFlow: false };
|
|
} catch (error) {
|
|
logger.error(`[MCP Connection Status] Error checking OAuth flows for ${serverName}:`, error);
|
|
return { hasActiveFlow: false, hasFailedFlow: false };
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get connection status for a specific MCP server
|
|
* @param {string} userId - The user ID
|
|
* @param {string} serverName - The server name
|
|
* @param {import('@librechat/api').ParsedServerConfig} config - The server configuration
|
|
* @param {Map<string, import('@librechat/api').MCPConnection>} appConnections - App-level connections
|
|
* @param {Map<string, import('@librechat/api').MCPConnection>} userConnections - User-level connections
|
|
* @param {Set} oauthServers - Set of OAuth servers
|
|
* @returns {Object} Object containing requiresOAuth and connectionState
|
|
*/
|
|
async function getServerConnectionStatus(
|
|
userId,
|
|
serverName,
|
|
config,
|
|
appConnections,
|
|
userConnections,
|
|
oauthServers,
|
|
) {
|
|
const connection = appConnections.get(serverName) || userConnections.get(serverName);
|
|
const isStaleOrDoNotExist = connection ? connection?.isStale(config.updatedAt) : true;
|
|
|
|
const baseConnectionState = isStaleOrDoNotExist
|
|
? 'disconnected'
|
|
: connection?.connectionState || 'disconnected';
|
|
let finalConnectionState = baseConnectionState;
|
|
|
|
// connection state overrides specific to OAuth servers
|
|
if (baseConnectionState === 'disconnected' && oauthServers.has(serverName)) {
|
|
// check if server is actively being reconnected
|
|
const oauthReconnectionManager = getOAuthReconnectionManager();
|
|
if (oauthReconnectionManager.isReconnecting(userId, serverName)) {
|
|
finalConnectionState = 'connecting';
|
|
} else {
|
|
const { hasActiveFlow, hasFailedFlow } = await checkOAuthFlowStatus(userId, serverName);
|
|
|
|
if (hasFailedFlow) {
|
|
finalConnectionState = 'error';
|
|
} else if (hasActiveFlow) {
|
|
finalConnectionState = 'connecting';
|
|
}
|
|
}
|
|
}
|
|
|
|
return {
|
|
requiresOAuth: oauthServers.has(serverName),
|
|
connectionState: finalConnectionState,
|
|
};
|
|
}
|
|
|
|
module.exports = {
|
|
createMCPTool,
|
|
createMCPTools,
|
|
createMCPPermissionContext,
|
|
userCanUseMCPServers,
|
|
getMCPSetupData,
|
|
resolveConfigServers,
|
|
resolveMcpConfigNames,
|
|
resolveAllMcpConfigs,
|
|
createOAuthStart,
|
|
checkOAuthFlowStatus,
|
|
getServerConnectionStatus,
|
|
createUnavailableToolStub,
|
|
};
|