mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-06-11 10:37:22 +00:00
Some checks failed
Docker Dev Images Build / build (Dockerfile, librechat-dev, node) (push) Waiting to run
Docker Dev Images Build / build (Dockerfile.multi, librechat-dev-api, api-build) (push) Waiting to run
GitNexus Index / index (push) Waiting to run
GitNexus Index / post-index (push) Blocked by required conditions
Sync Locize Translations & Create Translation PR / Sync Translation Keys with Locize (push) Waiting to run
Sync Locize Translations & Create Translation PR / Create Translation PR on Version Published (push) Blocked by required conditions
Sync Helm Chart Tags / Ignore non-main push (push) Waiting to run
Sync Helm Chart Tags / Sync chart tags (push) Waiting to run
Publish `librechat-data-provider` to NPM / pack (push) Has been cancelled
Publish `@librechat/data-schemas` to NPM / pack (push) Has been cancelled
Publish `librechat-data-provider` to NPM / publish-npm (push) Has been cancelled
Publish `@librechat/data-schemas` to NPM / publish-npm (push) Has been cancelled
* Add OBO (On-Behalf-Of) token exchange support for MCP server connections Enables transparent authentication to Entra ID-backed MCP servers using the logged-in user's federated token via the OAuth 2.0 jwt-bearer grant. Configured via obo.scopes in librechat.yaml server config. - Extract generic OboTokenService from GraphTokenService (jwt-bearer grant + cache) - Refactor GraphTokenService to thin wrapper delegating to OboTokenService - Add obo schema field to BaseOptionsSchema in data-provider - Add resolveOboToken in packages/api/src/mcp/oauth/obo.ts (validates federated token, calls resolver, returns MCPOAuthTokens) - Wire oboTokenResolver through MCPConnectionFactory, MCPManager, UserConnectionManager - OBO tokens injected via request headers (not OAuth transport), refreshed on each tool call - Explicit error on OBO failure (no fallthrough to standard OAuth redirect) - Add unit tests for both resolveOboToken (9 tests) and exchangeOboToken (14 tests) * Add OBO authentication option to MCP server UI configuration Enable users to configure On-Behalf-Of (OBO) token exchange for MCP servers created via the UI (MongoDB-stored), in addition to the existing YAML-based configuration. - Add "On-Behalf-Of (OBO)" radio option to MCP server auth section with scopes input field - Remove obo from omitServerManagedFields so the field passes UI schema validation - Add OBO to AuthTypeEnum, obo_scopes to AuthConfig, and OBO handling in form defaults and submission - Add .min(1) validation on obo.scopes to reject empty strings - Add English localization keys: com_ui_obo, com_ui_obo_scopes, com_ui_obo_scopes_description - Add 5 schema validation tests for OBO field acceptance, transport compatibility, and edge cases * 🧊 fix: Add obo to safe properties in redactServerSecrets. Fixes the OBO configuration not showing up in the MCP UI after app restart * Address linter errors * 🧊 fix: fail closed on OBO refresh errors and retry transient token exchange failures - stop tool calls from falling back to stale Authorization headers when per-call OBO refresh fails - add one-time retry for transient Entra OBO exchange failures (network/429/5xx) - preserve structured OBO failure reasons and retryability in resolveOboToken - improve OBO auth error messaging for connection setup and tool execution - add tests for transient vs permanent OBO failure paths * Addressing linting errors / warnings * 🧊 fix: isolate OBO MCP auth to user-scoped connections - block OBO-enabled servers from app-level shared MCP connections - bypass shared connection lookup for OBO servers in MCPManager.getConnection - add regressions covering OBO connection scoping and preserve non-OBO app connection reuse * 🛠️ refactor: centralize MCP user-scoped connection policy - add shared requiresUserScopedConnection helper for OAuth, OBO, and customUserVars - use the shared predicate in MCPManager and ConnectionsRepository - add utils coverage for user-scoped connection policy * 🧊 fix: restrict MCP OBO config to header-capable transports - Move OBO configuration out of the shared MCP base options schema and allow it only on SSE and streamable-http transports, where request headers are applied. - Explicitly reject OBO on stdio and websocket configs to avoid accepted-but- nonfunctional server definitions. Add schema coverage for admin/config parsing and user-input websocket validation. * 🧊 fix: single-flight concurrent OBO token exchanges Concurrent tool calls that arrive on a cache miss were each issuing their own jwt-bearer request to the IdP. Under that fan-out, Entra intermittently returned errors that the retry classifier saw as non-retryable, surfacing as: "The identity provider rejected the OBO token exchange. Cannot execute tool <name>. Re-authenticate the user or verify the configured OBO scopes and retry." A user retry then hit the populated cache and succeeded, which matches the observed flakiness — the cache was empty at the moment of fan-out but populated by the time the user clicked retry. - Coalesce concurrent exchanges in `OboTokenService.exchangeOboToken` keyed by `${openidId}:${scopes}`. Callers that arrive while an exchange is in flight share the same upstream request and receive the same result. `fromCache=false` continues to force a fresh, independent exchange (and is not joined by `fromCache=true` callers). The IdP call, single-retry path, and cache write are unchanged — they were moved into a `performOboExchange` helper so the coalescing wrapper stays small. - Tests cover: coalescing on the same key, isolation between different keys, cleanup on success, cleanup on failure, and the `fromCache=false` bypass. * 🔒 feat: gate MCP OBO config behind MCP_SERVERS.CONFIGURE_OBO permission OBO silently mints per-user delegated tokens from the caller's federated access token and forwards them to whatever URL the server config points at. Previously, anyone with MCP_SERVERS.CREATE could configure obo.scopes — so if server creation is ever delegated beyond admins, a user could stand up an attacker-controlled server, attach it to a shared agent, and exfiltrate other users' downstream tokens on tool invocation. Add a dedicated MCP_SERVERS.CONFIGURE_OBO permission (ADMIN: true, USER: false by default) and enforce it at three layers so the safety property no longer depends on CREATE staying admin-only: - Create/update: POST/PATCH /api/mcp/servers returns 403 when the body carries `obo` and the caller's role lacks the permission. - Runtime fail-closed: for DB-sourced configs, MCPConnectionFactory and MCPManager.callTool re-check the original author's role before each OBO exchange. If the author has been downgraded, the exchange is skipped (factory) or refused (callTool) — retained configs lose their privileges automatically. - UI: the OBO option is hidden in the MCP server dialog for users without the permission; a CONFIGURE_OBO toggle is exposed in the MCP admin role editor. Existing role docs receive the new sub-key via the permission backfill in updateInterfacePermissions on next startup, preserving any operator-set values. YAML/Config-sourced server configs are unaffected since they're admin-controlled at the deployment level. * 🧊 fix: wire OBO machinery for servers with requiresOAuth: false The discovery and user-connection paths gated OAuth wiring (flow manager, token methods, oboTokenResolver, oboTrustChecker) behind isOAuthServer(), which only considers requiresOAuth/oauth fields. A DB-stored OBO server with requiresOAuth: false therefore landed in the non-OAuth branch, never received an oboTokenResolver, and the factory's usesObo getter evaluated to false — sending a bare request that the upstream rejected with invalid_token. Add requiresOAuthMachinery() (OAuth OR OBO) and use it at those two gates. isOAuthServer remains for the OAuth-handshake-only check (shouldInitiateOAuthBeforeConnect), where OBO must not initiate a handshake. Plumb the OBO resolver/trust-checker through ToolDiscoveryOptions so reinitMCPServer can pass them on the discovery path. * 🧊 fix: lock all OBO-target fields (URL, proxy, headers, auth) without CONFIGURE_OBO The CONFIGURE_OBO permission was meant to gate control of the endpoint that receives OBO-minted per-user delegated tokens and the scopes that are requested. The previous frontend lock + backend gate only covered obo.scopes and the auth section, leaving url/proxy/headers/etc. editable by anyone with UPDATE — meaning a non-permission user could still redirect an existing OBO server's token flow to an attacker endpoint. Switch to an allowlist policy: when editing an OBO server without CONFIGURE_OBO, only title/description/iconPath are mutable. Backend rejects any other field change with 403; frontend disables the non-allowlist sections (URL, transport, auth, trust) via fieldset. The comparison surface (MCP_USER_INPUT_FIELDS) is derived from MCPServerUserInputSchema's union members so it stays in sync with the schema. New schema fields land in the locked set by default — adding to the allowlist is the only way to unlock them, which preserves the security-review boundary. * 🧊 fix: skip unauthenticated MCP inspection for OBO-only servers MCPServerInspector.inspectServer() ran an unauthenticated temp connection unless the config had requiresOAuth or customUserVars set. For OBO-only servers without standard MCP OAuth advertisement, this caused MCPConnectionFactory.create to attempt the connection without a user or oboTokenResolver — failing on servers that reject the MCP initialize handshake without a valid bearer token, which surfaced as MCP_INSPECTION_FAILED on create/update. Add `obo` to the skip list alongside requiresOAuth and customUserVars, matching the existing pattern for user-scoped auth modes. * Addressed linting error: watchedTitle is declared but never referenced (the auto-fill logic at line 156 uses getValues('title') instead). Deleted constant.
955 lines
31 KiB
JavaScript
955 lines
31 KiB
JavaScript
const { tool } = require('@librechat/agents/langchain/tools');
|
|
const { logger, getTenantId } = require('@librechat/data-schemas');
|
|
const {
|
|
Providers,
|
|
StepTypes,
|
|
GraphEvents,
|
|
Constants: AgentConstants,
|
|
} = require('@librechat/agents');
|
|
const {
|
|
sendEvent,
|
|
MCPOAuthHandler,
|
|
isMCPDomainAllowed,
|
|
normalizeServerName,
|
|
normalizeJsonSchema,
|
|
GenerationJobManager,
|
|
resolveJsonSchemaRefs,
|
|
buildOAuthToolCallName,
|
|
checkAccessWithRequestCache,
|
|
} = require('@librechat/api');
|
|
const {
|
|
Time,
|
|
CacheKeys,
|
|
Constants,
|
|
Permissions,
|
|
PermissionTypes,
|
|
isAssistantsEndpoint,
|
|
} = require('librechat-data-provider');
|
|
const {
|
|
getOAuthReconnectionManager,
|
|
getMCPServersRegistry,
|
|
getFlowStateManager,
|
|
getMCPManager,
|
|
} = require('~/config');
|
|
const db = require('~/models');
|
|
const { findToken, createToken, updateToken, deleteTokens } = db;
|
|
const { getGraphApiToken } = require('./GraphTokenService');
|
|
const { exchangeOboToken } = require('./OboTokenService');
|
|
const { createOboTrustChecker } = require('./OboPolicyService');
|
|
const { reinitMCPServer } = require('./Tools/mcp');
|
|
const { getAppConfig } = require('./Config');
|
|
const { getLogStores } = require('~/cache');
|
|
|
|
const MAX_CACHE_SIZE = 1000;
|
|
const lastReconnectAttempts = new Map();
|
|
const RECONNECT_THROTTLE_MS = 10_000;
|
|
|
|
const missingToolCache = new Map();
|
|
const MISSING_TOOL_TTL_MS = 10_000;
|
|
|
|
async function userCanUseMCPServers(user, req) {
|
|
if (!user?.id || !user?.role) {
|
|
return false;
|
|
}
|
|
|
|
try {
|
|
return await checkAccessWithRequestCache({
|
|
req,
|
|
user,
|
|
permissionType: PermissionTypes.MCP_SERVERS,
|
|
permissions: [Permissions.USE],
|
|
getRoleByName: db.getRoleByName,
|
|
});
|
|
} catch (error) {
|
|
logger.error(`[MCP][User: ${user.id}] Failed MCP permission check`, error);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
function createMCPPermissionContext(req) {
|
|
return {
|
|
canUseServers: (user = req?.user) => userCanUseMCPServers(user, req),
|
|
};
|
|
}
|
|
|
|
function evictStale(map, ttl) {
|
|
if (map.size <= MAX_CACHE_SIZE) {
|
|
return;
|
|
}
|
|
const now = Date.now();
|
|
for (const [key, timestamp] of map) {
|
|
if (now - timestamp >= ttl) {
|
|
map.delete(key);
|
|
}
|
|
if (map.size <= MAX_CACHE_SIZE) {
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
const unavailableMsg =
|
|
"This tool's MCP server is temporarily unavailable. Please try again shortly.";
|
|
|
|
async function getAppConfigForRequest(req) {
|
|
const user = req?.user;
|
|
return await getAppConfigForUser(user?.id, user);
|
|
}
|
|
|
|
async function getAppConfigForUser(userId, user) {
|
|
return await getAppConfig({ role: user?.role, tenantId: getTenantId(), userId });
|
|
}
|
|
|
|
/**
|
|
* Resolves config-source MCP servers from admin Config overrides for the current
|
|
* request context. Returns the parsed configs keyed by server name.
|
|
* @param {import('express').Request} req - Express request with user context
|
|
* @returns {Promise<Record<string, import('@librechat/api').ParsedServerConfig>>}
|
|
*/
|
|
async function resolveConfigServers(req) {
|
|
try {
|
|
const registry = getMCPServersRegistry();
|
|
const appConfig = await getAppConfigForRequest(req);
|
|
return await registry.ensureConfigServers(appConfig?.mcpConfig || {});
|
|
} catch (error) {
|
|
logger.warn(
|
|
'[resolveConfigServers] Failed to resolve config servers, degrading to empty:',
|
|
error,
|
|
);
|
|
return {};
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Resolves operator-managed MCP server names from admin Config overrides for the current request.
|
|
* Returns a request-time snapshot for DB server creation, not a cross-process lock.
|
|
* @throws Propagates app config lookup errors to keep DB server creation fail-closed.
|
|
* @param {import('express').Request} req - Express request with user context
|
|
* @returns {Promise<string[]>}
|
|
*/
|
|
async function resolveMcpConfigNames(req) {
|
|
const appConfig = await getAppConfigForRequest(req);
|
|
return Object.keys(appConfig?.mcpConfig || {});
|
|
}
|
|
|
|
/**
|
|
* Resolves config-source servers and merges all server configs (YAML + config + user DB)
|
|
* for the given user context. Shared helper for controllers needing the full merged config.
|
|
* @param {string} userId
|
|
* @param {{ id?: string, role?: string }} [user]
|
|
* @returns {Promise<Record<string, import('@librechat/api').ParsedServerConfig>>}
|
|
*/
|
|
async function resolveAllMcpConfigs(userId, user) {
|
|
const registry = getMCPServersRegistry();
|
|
const appConfig = await getAppConfigForUser(userId, user);
|
|
let configServers = {};
|
|
try {
|
|
configServers = await registry.ensureConfigServers(appConfig?.mcpConfig || {});
|
|
} catch (error) {
|
|
logger.warn(
|
|
'[resolveAllMcpConfigs] Config server resolution failed, continuing without:',
|
|
error,
|
|
);
|
|
}
|
|
if (user?.role) {
|
|
return await registry.getAllServerConfigs(userId, configServers, user.role);
|
|
}
|
|
|
|
return await registry.getAllServerConfigs(userId, configServers);
|
|
}
|
|
|
|
/**
|
|
* @param {string} toolName
|
|
* @param {string} serverName
|
|
*/
|
|
function createUnavailableToolStub(toolName, serverName) {
|
|
const normalizedToolKey = `${toolName}${Constants.mcp_delimiter}${normalizeServerName(serverName)}`;
|
|
const _call = async () => [unavailableMsg, null];
|
|
const toolInstance = tool(_call, {
|
|
schema: {
|
|
type: 'object',
|
|
properties: {
|
|
input: { type: 'string', description: 'Input for the tool' },
|
|
},
|
|
required: [],
|
|
},
|
|
name: normalizedToolKey,
|
|
description: unavailableMsg,
|
|
responseFormat: AgentConstants.CONTENT_AND_ARTIFACT,
|
|
});
|
|
toolInstance.mcp = true;
|
|
toolInstance.mcpRawServerName = serverName;
|
|
return toolInstance;
|
|
}
|
|
|
|
function isEmptyObjectSchema(jsonSchema) {
|
|
return (
|
|
jsonSchema != null &&
|
|
typeof jsonSchema === 'object' &&
|
|
jsonSchema.type === 'object' &&
|
|
(jsonSchema.properties == null || Object.keys(jsonSchema.properties).length === 0) &&
|
|
!jsonSchema.additionalProperties
|
|
);
|
|
}
|
|
|
|
/**
|
|
* @param {object} params
|
|
* @param {ServerResponse} params.res - The Express response object for sending events.
|
|
* @param {string} params.stepId - The ID of the step in the flow.
|
|
* @param {ToolCallChunk} params.toolCall - The tool call object containing tool information.
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
|
|
*/
|
|
function createRunStepDeltaEmitter({ res, stepId, toolCall, streamId = null }) {
|
|
/**
|
|
* @param {string} authURL - The URL to redirect the user for OAuth authentication.
|
|
* @returns {Promise<void>}
|
|
*/
|
|
return async function (authURL) {
|
|
/** @type {{ id: string; delta: AgentToolCallDelta }} */
|
|
const data = {
|
|
id: stepId,
|
|
delta: {
|
|
type: StepTypes.TOOL_CALLS,
|
|
tool_calls: [{ ...toolCall, args: '' }],
|
|
auth: authURL,
|
|
expires_at: Date.now() + Time.TWO_MINUTES,
|
|
},
|
|
};
|
|
const eventData = { event: GraphEvents.ON_RUN_STEP_DELTA, data };
|
|
if (streamId) {
|
|
await GenerationJobManager.emitChunk(streamId, eventData);
|
|
} else {
|
|
sendEvent(res, eventData);
|
|
}
|
|
};
|
|
}
|
|
|
|
/**
|
|
* @param {object} params
|
|
* @param {ServerResponse} params.res - The Express response object for sending events.
|
|
* @param {string} params.runId - The Run ID, i.e. message ID
|
|
* @param {string} params.stepId - The ID of the step in the flow.
|
|
* @param {ToolCallChunk} params.toolCall - The tool call object containing tool information.
|
|
* @param {number} [params.index]
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
|
|
* @returns {() => Promise<void>}
|
|
*/
|
|
function createRunStepEmitter({ res, runId, stepId, toolCall, index, streamId = null }) {
|
|
return async function () {
|
|
/** @type {import('@librechat/agents').RunStep} */
|
|
const data = {
|
|
runId: runId ?? Constants.USE_PRELIM_RESPONSE_MESSAGE_ID,
|
|
id: stepId,
|
|
type: StepTypes.TOOL_CALLS,
|
|
index: index ?? 0,
|
|
stepDetails: {
|
|
type: StepTypes.TOOL_CALLS,
|
|
tool_calls: [toolCall],
|
|
},
|
|
};
|
|
const eventData = { event: GraphEvents.ON_RUN_STEP, data };
|
|
if (streamId) {
|
|
await GenerationJobManager.emitChunk(streamId, eventData);
|
|
} else {
|
|
sendEvent(res, eventData);
|
|
}
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Creates a function used to ensure the flow handler is only invoked once
|
|
* @param {object} params
|
|
* @param {string} params.flowId - The ID of the login flow.
|
|
* @param {FlowStateManager<any>} params.flowManager - The flow manager instance.
|
|
* @param {(authURL: string) => void} [params.callback]
|
|
*/
|
|
function createOAuthStart({ flowId, flowManager, callback }) {
|
|
/**
|
|
* Creates a function to handle OAuth login requests.
|
|
* @param {string} authURL - The URL to redirect the user for OAuth authentication.
|
|
* @returns {Promise<boolean>} Returns true to indicate the event was sent successfully.
|
|
*/
|
|
return async function (authURL) {
|
|
await flowManager.createFlowWithHandler(flowId, 'oauth_login', async () => {
|
|
callback?.(authURL);
|
|
logger.debug('Sent OAuth login request to client');
|
|
return true;
|
|
});
|
|
};
|
|
}
|
|
|
|
/**
|
|
* @param {object} params
|
|
* @param {ServerResponse} params.res - The Express response object for sending events.
|
|
* @param {string} params.stepId - The ID of the step in the flow.
|
|
* @param {ToolCallChunk} params.toolCall - The tool call object containing tool information.
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
|
|
*/
|
|
function createOAuthEnd({ res, stepId, toolCall, streamId = null }) {
|
|
return async function () {
|
|
/** @type {{ id: string; delta: AgentToolCallDelta }} */
|
|
const data = {
|
|
id: stepId,
|
|
delta: {
|
|
type: StepTypes.TOOL_CALLS,
|
|
tool_calls: [{ ...toolCall }],
|
|
},
|
|
};
|
|
const eventData = { event: GraphEvents.ON_RUN_STEP_DELTA, data };
|
|
if (streamId) {
|
|
await GenerationJobManager.emitChunk(streamId, eventData);
|
|
} else {
|
|
sendEvent(res, eventData);
|
|
}
|
|
logger.debug('Sent OAuth login success to client');
|
|
};
|
|
}
|
|
|
|
/**
|
|
* @param {object} params
|
|
* @param {string} params.userId - The ID of the user.
|
|
* @param {string} params.serverName - The name of the server.
|
|
* @param {string} params.toolName - The name of the tool.
|
|
* @param {FlowStateManager<any>} params.flowManager - The flow manager instance.
|
|
*/
|
|
function createAbortHandler({ userId, serverName, toolName, flowManager }) {
|
|
return function () {
|
|
logger.info(`[MCP][User: ${userId}][${serverName}][${toolName}] Tool call aborted`);
|
|
const flowId = MCPOAuthHandler.generateFlowId(userId, serverName);
|
|
// Clean up both mcp_oauth and mcp_get_tokens flows
|
|
flowManager.failFlow(flowId, 'mcp_oauth', new Error('Tool call aborted'));
|
|
flowManager.failFlow(flowId, 'mcp_get_tokens', new Error('Tool call aborted'));
|
|
};
|
|
}
|
|
|
|
/**
|
|
* @param {Object} params
|
|
* @param {() => void} params.runStepEmitter
|
|
* @param {(authURL: string) => void} params.runStepDeltaEmitter
|
|
* @returns {(authURL: string) => void}
|
|
*/
|
|
function createOAuthCallback({ runStepEmitter, runStepDeltaEmitter }) {
|
|
return function (authURL) {
|
|
runStepEmitter();
|
|
runStepDeltaEmitter(authURL);
|
|
};
|
|
}
|
|
|
|
/**
|
|
* @param {Object} params
|
|
* @param {ServerResponse} params.res - The Express response object for sending events.
|
|
* @param {IUser} params.user - The user from the request object.
|
|
* @param {string} params.serverName
|
|
* @param {AbortSignal} params.signal
|
|
* @param {string} params.model
|
|
* @param {number} [params.index]
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
|
|
* @param {Record<string, Record<string, string>>} [params.userMCPAuthMap]
|
|
* @returns { Promise<Array<typeof tool | { _call: (toolInput: Object | string) => unknown}>> } An object with `_call` method to execute the tool input.
|
|
*/
|
|
async function reconnectServer({
|
|
res,
|
|
user,
|
|
index,
|
|
signal,
|
|
serverName,
|
|
configServers,
|
|
userMCPAuthMap,
|
|
streamId = null,
|
|
}) {
|
|
logger.debug(
|
|
`[MCP][reconnectServer] serverName: ${serverName}, user: ${user?.id}, hasUserMCPAuthMap: ${!!userMCPAuthMap}`,
|
|
);
|
|
|
|
const throttleKey = `${user.id}:${serverName}`;
|
|
const now = Date.now();
|
|
const lastAttempt = lastReconnectAttempts.get(throttleKey) ?? 0;
|
|
if (now - lastAttempt < RECONNECT_THROTTLE_MS) {
|
|
logger.debug(`[MCP][reconnectServer] Throttled reconnect for ${serverName}`);
|
|
return null;
|
|
}
|
|
lastReconnectAttempts.set(throttleKey, now);
|
|
evictStale(lastReconnectAttempts, RECONNECT_THROTTLE_MS);
|
|
|
|
const runId = Constants.USE_PRELIM_RESPONSE_MESSAGE_ID;
|
|
const flowId = `${user.id}:${serverName}:${Date.now()}`;
|
|
const flowManager = getFlowStateManager(getLogStores(CacheKeys.FLOWS));
|
|
const stepId = 'step_oauth_login_' + serverName;
|
|
const toolCall = {
|
|
id: flowId,
|
|
name: buildOAuthToolCallName(serverName),
|
|
type: 'tool_call_chunk',
|
|
};
|
|
|
|
// Set up abort handler to clean up OAuth flows if request is aborted
|
|
const oauthFlowId = MCPOAuthHandler.generateFlowId(user.id, serverName);
|
|
const abortHandler = () => {
|
|
logger.info(
|
|
`[MCP][User: ${user.id}][${serverName}] Tool loading aborted, cleaning up OAuth flows`,
|
|
);
|
|
// Clean up both mcp_oauth and mcp_get_tokens flows
|
|
flowManager.failFlow(oauthFlowId, 'mcp_oauth', new Error('Tool loading aborted'));
|
|
flowManager.failFlow(oauthFlowId, 'mcp_get_tokens', new Error('Tool loading aborted'));
|
|
};
|
|
|
|
if (signal) {
|
|
signal.addEventListener('abort', abortHandler, { once: true });
|
|
}
|
|
|
|
try {
|
|
const runStepEmitter = createRunStepEmitter({
|
|
res,
|
|
index,
|
|
runId,
|
|
stepId,
|
|
toolCall,
|
|
streamId,
|
|
});
|
|
const runStepDeltaEmitter = createRunStepDeltaEmitter({
|
|
res,
|
|
stepId,
|
|
toolCall,
|
|
streamId,
|
|
});
|
|
const callback = createOAuthCallback({ runStepEmitter, runStepDeltaEmitter });
|
|
const oauthStart = createOAuthStart({
|
|
res,
|
|
flowId,
|
|
callback,
|
|
flowManager,
|
|
});
|
|
return await reinitMCPServer({
|
|
user,
|
|
signal,
|
|
serverName,
|
|
configServers,
|
|
oauthStart,
|
|
flowManager,
|
|
userMCPAuthMap,
|
|
forceNew: true,
|
|
returnOnOAuth: false,
|
|
connectionTimeout: Time.THIRTY_SECONDS,
|
|
});
|
|
} finally {
|
|
// Clean up abort handler to prevent memory leaks
|
|
if (signal) {
|
|
signal.removeEventListener('abort', abortHandler);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Creates all tools from the specified MCP Server via `toolKey`.
|
|
*
|
|
* This function assumes tools could not be aggregated from the cache of tool definitions,
|
|
* i.e. `availableTools`, and will reinitialize the MCP server to ensure all tools are generated.
|
|
*
|
|
* @param {Object} params
|
|
* @param {ServerResponse} params.res - The Express response object for sending events.
|
|
* @param {{ canUseServers: (user?: IUser) => Promise<boolean> }} [params.mcpPermissionContext] - Request-scoped MCP permission context.
|
|
* @param {IUser} params.user - The user from the request object.
|
|
* @param {string} params.serverName
|
|
* @param {string} params.model
|
|
* @param {Providers | EModelEndpoint} params.provider - The provider for the tool.
|
|
* @param {number} [params.index]
|
|
* @param {AbortSignal} [params.signal]
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
|
|
* @param {import('@librechat/api').ParsedServerConfig} [params.config]
|
|
* @param {Record<string, Record<string, string>>} [params.userMCPAuthMap]
|
|
* @returns { Promise<Array<typeof tool | { _call: (toolInput: Object | string) => unknown}>> } An object with `_call` method to execute the tool input.
|
|
*/
|
|
async function createMCPTools({
|
|
res,
|
|
mcpPermissionContext,
|
|
user,
|
|
index,
|
|
signal,
|
|
config,
|
|
provider,
|
|
serverName,
|
|
configServers,
|
|
userMCPAuthMap,
|
|
streamId = null,
|
|
}) {
|
|
const serverConfig =
|
|
config ?? (await getMCPServersRegistry().getServerConfig(serverName, user?.id, configServers));
|
|
if (serverConfig?.url) {
|
|
const appConfig = await getAppConfig({
|
|
role: user?.role,
|
|
tenantId: user?.tenantId,
|
|
userId: user?.id,
|
|
});
|
|
const allowedDomains = appConfig?.mcpSettings?.allowedDomains;
|
|
const allowedAddresses = appConfig?.mcpSettings?.allowedAddresses;
|
|
const isDomainAllowed = await isMCPDomainAllowed(
|
|
serverConfig,
|
|
allowedDomains,
|
|
allowedAddresses,
|
|
);
|
|
if (!isDomainAllowed) {
|
|
logger.warn(`[MCP][${serverName}] Domain not allowed, skipping all tools`);
|
|
return [];
|
|
}
|
|
}
|
|
|
|
const result = await reconnectServer({
|
|
res,
|
|
user,
|
|
index,
|
|
signal,
|
|
serverName,
|
|
configServers,
|
|
userMCPAuthMap,
|
|
streamId,
|
|
});
|
|
if (result === null) {
|
|
logger.debug(`[MCP][${serverName}] Reconnect throttled, skipping tool creation.`);
|
|
return [];
|
|
}
|
|
if (!result || !result.tools) {
|
|
logger.warn(`[MCP][${serverName}] Failed to reinitialize MCP server.`);
|
|
return [];
|
|
}
|
|
|
|
const serverTools = [];
|
|
for (const tool of result.tools) {
|
|
const toolInstance = await createMCPTool({
|
|
res,
|
|
mcpPermissionContext,
|
|
user,
|
|
provider,
|
|
userMCPAuthMap,
|
|
configServers,
|
|
streamId,
|
|
availableTools: result.availableTools,
|
|
toolKey: `${tool.name}${Constants.mcp_delimiter}${serverName}`,
|
|
config: serverConfig,
|
|
});
|
|
if (toolInstance) {
|
|
serverTools.push(toolInstance);
|
|
}
|
|
}
|
|
|
|
return serverTools;
|
|
}
|
|
|
|
/**
|
|
* Creates a single tool from the specified MCP Server via `toolKey`.
|
|
* @param {Object} params
|
|
* @param {ServerResponse} params.res - The Express response object for sending events.
|
|
* @param {{ canUseServers: (user?: IUser) => Promise<boolean> }} [params.mcpPermissionContext] - Request-scoped MCP permission context.
|
|
* @param {IUser} params.user - The user from the request object.
|
|
* @param {string} params.toolKey - The toolKey for the tool.
|
|
* @param {string} params.model - The model for the tool.
|
|
* @param {number} [params.index]
|
|
* @param {AbortSignal} [params.signal]
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
|
|
* @param {Providers | EModelEndpoint} params.provider - The provider for the tool.
|
|
* @param {LCAvailableTools} [params.availableTools]
|
|
* @param {Record<string, Record<string, string>>} [params.userMCPAuthMap]
|
|
* @param {import('@librechat/api').ParsedServerConfig} [params.config]
|
|
* @returns { Promise<typeof tool | { _call: (toolInput: Object | string) => unknown}> } An object with `_call` method to execute the tool input.
|
|
*/
|
|
async function createMCPTool({
|
|
res,
|
|
mcpPermissionContext,
|
|
user,
|
|
index,
|
|
signal,
|
|
toolKey,
|
|
provider,
|
|
userMCPAuthMap,
|
|
availableTools,
|
|
config,
|
|
configServers,
|
|
streamId = null,
|
|
}) {
|
|
const [toolName, serverName] = toolKey.split(Constants.mcp_delimiter);
|
|
|
|
const serverConfig =
|
|
config ?? (await getMCPServersRegistry().getServerConfig(serverName, user?.id, configServers));
|
|
if (serverConfig?.url) {
|
|
const appConfig = await getAppConfig({
|
|
role: user?.role,
|
|
tenantId: user?.tenantId,
|
|
userId: user?.id,
|
|
});
|
|
const allowedDomains = appConfig?.mcpSettings?.allowedDomains;
|
|
const allowedAddresses = appConfig?.mcpSettings?.allowedAddresses;
|
|
const isDomainAllowed = await isMCPDomainAllowed(
|
|
serverConfig,
|
|
allowedDomains,
|
|
allowedAddresses,
|
|
);
|
|
if (!isDomainAllowed) {
|
|
logger.warn(`[MCP][${serverName}] Domain no longer allowed, skipping tool: ${toolName}`);
|
|
return undefined;
|
|
}
|
|
}
|
|
|
|
/** @type {LCTool | undefined} */
|
|
let toolDefinition = availableTools?.[toolKey]?.function;
|
|
if (!toolDefinition) {
|
|
const cachedAt = missingToolCache.get(toolKey);
|
|
if (cachedAt && Date.now() - cachedAt < MISSING_TOOL_TTL_MS) {
|
|
logger.debug(
|
|
`[MCP][${serverName}][${toolName}] Tool in negative cache, returning unavailable stub.`,
|
|
);
|
|
return createUnavailableToolStub(toolName, serverName);
|
|
}
|
|
|
|
logger.warn(
|
|
`[MCP][${serverName}][${toolName}] Requested tool not found in available tools, re-initializing MCP server.`,
|
|
);
|
|
const result = await reconnectServer({
|
|
res,
|
|
user,
|
|
index,
|
|
signal,
|
|
serverName,
|
|
configServers,
|
|
userMCPAuthMap,
|
|
streamId,
|
|
});
|
|
toolDefinition = result?.availableTools?.[toolKey]?.function;
|
|
|
|
if (!toolDefinition) {
|
|
missingToolCache.set(toolKey, Date.now());
|
|
evictStale(missingToolCache, MISSING_TOOL_TTL_MS);
|
|
}
|
|
}
|
|
|
|
if (!toolDefinition) {
|
|
logger.warn(
|
|
`[MCP][${serverName}][${toolName}] Tool definition not found, returning unavailable stub.`,
|
|
);
|
|
return createUnavailableToolStub(toolName, serverName);
|
|
}
|
|
|
|
return createToolInstance({
|
|
res,
|
|
mcpPermissionContext,
|
|
user,
|
|
provider,
|
|
toolName,
|
|
serverName,
|
|
serverConfig,
|
|
toolDefinition,
|
|
streamId,
|
|
});
|
|
}
|
|
|
|
function createToolInstance({
|
|
res,
|
|
mcpPermissionContext,
|
|
user: capturedUser = null,
|
|
toolName,
|
|
serverName,
|
|
serverConfig: capturedServerConfig,
|
|
toolDefinition,
|
|
provider: capturedProvider,
|
|
streamId = null,
|
|
}) {
|
|
/** @type {LCTool} */
|
|
const { description, parameters } = toolDefinition;
|
|
const isGoogle = capturedProvider === Providers.VERTEXAI || capturedProvider === Providers.GOOGLE;
|
|
|
|
let schema = parameters ? normalizeJsonSchema(resolveJsonSchemaRefs(parameters)) : null;
|
|
|
|
if (!schema || (isGoogle && isEmptyObjectSchema(schema))) {
|
|
schema = {
|
|
type: 'object',
|
|
properties: {
|
|
input: { type: 'string', description: 'Input for the tool' },
|
|
},
|
|
required: [],
|
|
};
|
|
}
|
|
|
|
const normalizedToolKey = `${toolName}${Constants.mcp_delimiter}${normalizeServerName(serverName)}`;
|
|
|
|
/** @type {(toolArguments: Object | string, config?: GraphRunnableConfig) => Promise<unknown>} */
|
|
const _call = async (toolArguments, config) => {
|
|
const permissionUser = config?.configurable?.user ?? capturedUser;
|
|
const userId =
|
|
config?.configurable?.user?.id || config?.configurable?.user_id || capturedUser?.id;
|
|
/** @type {ReturnType<typeof createAbortHandler>} */
|
|
let abortHandler = null;
|
|
/** @type {AbortSignal} */
|
|
let derivedSignal = null;
|
|
|
|
try {
|
|
const provider = (config?.metadata?.provider || capturedProvider)?.toLowerCase();
|
|
const canUseMCP = mcpPermissionContext
|
|
? await mcpPermissionContext.canUseServers(permissionUser)
|
|
: await userCanUseMCPServers(permissionUser);
|
|
if (!canUseMCP) {
|
|
throw new Error('Forbidden: Insufficient MCP server permissions');
|
|
}
|
|
const flowsCache = getLogStores(CacheKeys.FLOWS);
|
|
const flowManager = getFlowStateManager(flowsCache);
|
|
derivedSignal = config?.signal ? AbortSignal.any([config.signal]) : undefined;
|
|
const mcpManager = getMCPManager(userId);
|
|
|
|
const { args: _args, stepId, ...toolCall } = config.toolCall ?? {};
|
|
const flowId = `${serverName}:oauth_login:${config.metadata.thread_id}:${config.metadata.run_id}`;
|
|
const runStepDeltaEmitter = createRunStepDeltaEmitter({
|
|
res,
|
|
stepId,
|
|
toolCall,
|
|
streamId,
|
|
});
|
|
const oauthStart = createOAuthStart({
|
|
flowId,
|
|
flowManager,
|
|
callback: runStepDeltaEmitter,
|
|
});
|
|
const oauthEnd = createOAuthEnd({
|
|
res,
|
|
stepId,
|
|
toolCall,
|
|
streamId,
|
|
});
|
|
|
|
if (derivedSignal) {
|
|
abortHandler = createAbortHandler({ userId, serverName, toolName, flowManager });
|
|
derivedSignal.addEventListener('abort', abortHandler, { once: true });
|
|
}
|
|
|
|
const customUserVars =
|
|
config?.configurable?.userMCPAuthMap?.[`${Constants.mcp_prefix}${serverName}`];
|
|
|
|
const result = await mcpManager.callTool({
|
|
serverName,
|
|
serverConfig: capturedServerConfig,
|
|
toolName,
|
|
provider,
|
|
toolArguments,
|
|
options: {
|
|
signal: derivedSignal,
|
|
},
|
|
user: config?.configurable?.user,
|
|
requestBody: config?.configurable?.requestBody,
|
|
customUserVars,
|
|
flowManager,
|
|
tokenMethods: {
|
|
findToken,
|
|
createToken,
|
|
updateToken,
|
|
deleteTokens,
|
|
},
|
|
oauthStart,
|
|
oauthEnd,
|
|
graphTokenResolver: getGraphApiToken,
|
|
oboTokenResolver: exchangeOboToken,
|
|
oboTrustChecker: createOboTrustChecker(),
|
|
});
|
|
|
|
if (isAssistantsEndpoint(provider) && Array.isArray(result)) {
|
|
return result[0];
|
|
}
|
|
return result;
|
|
} catch (error) {
|
|
logger.error(
|
|
`[MCP][${serverName}][${toolName}][User: ${userId}] Error calling MCP tool:`,
|
|
error,
|
|
);
|
|
|
|
/** OAuth error, provide a helpful message */
|
|
const isOAuthError =
|
|
error.message?.includes('401') ||
|
|
error.message?.includes('OAuth') ||
|
|
error.message?.includes('authentication') ||
|
|
error.message?.includes('Non-200 status code (401)');
|
|
|
|
if (isOAuthError) {
|
|
throw new Error(
|
|
`[MCP][${serverName}][${toolName}] OAuth authentication required. Please check the server logs for the authentication URL.`,
|
|
);
|
|
}
|
|
|
|
throw new Error(
|
|
`[MCP][${serverName}][${toolName}] tool call failed${error?.message ? `: ${error?.message}` : '.'}`,
|
|
);
|
|
} finally {
|
|
// Clean up abort handler to prevent memory leaks
|
|
if (abortHandler && derivedSignal) {
|
|
derivedSignal.removeEventListener('abort', abortHandler);
|
|
}
|
|
}
|
|
};
|
|
|
|
const toolInstance = tool(_call, {
|
|
schema,
|
|
name: normalizedToolKey,
|
|
description: description || '',
|
|
responseFormat: AgentConstants.CONTENT_AND_ARTIFACT,
|
|
});
|
|
toolInstance.mcp = true;
|
|
toolInstance.mcpRawServerName = serverName;
|
|
toolInstance.mcpJsonSchema = parameters;
|
|
return toolInstance;
|
|
}
|
|
|
|
/**
|
|
* Get MCP setup data including config, connections, and OAuth servers.
|
|
* Resolves config-source servers from admin Config overrides when tenant context is available.
|
|
* @param {string} userId - The user ID
|
|
* @param {{ role?: string, tenantId?: string }} [options] - Optional role/tenant context
|
|
* @returns {Object} Object containing mcpConfig, appConnections, userConnections, and oauthServers
|
|
*/
|
|
async function getMCPSetupData(userId, options = {}) {
|
|
const registry = getMCPServersRegistry();
|
|
const { role, tenantId } = options;
|
|
|
|
const appConfig = await getAppConfig({ role, tenantId, userId });
|
|
const configServers = await registry.ensureConfigServers(appConfig?.mcpConfig || {});
|
|
const mcpConfig = role
|
|
? await registry.getAllServerConfigs(userId, configServers, role)
|
|
: await registry.getAllServerConfigs(userId, configServers);
|
|
const mcpManager = getMCPManager(userId);
|
|
/** @type {Map<string, import('@librechat/api').MCPConnection>} */
|
|
let appConnections = new Map();
|
|
try {
|
|
// Use getLoaded() instead of getAll() to avoid forcing connection creation.
|
|
// getAll() creates connections for all servers, which is problematic for servers
|
|
// that require user context (e.g., those with {{LIBRECHAT_USER_ID}} placeholders).
|
|
appConnections = (await mcpManager.appConnections?.getLoaded()) || new Map();
|
|
} catch (error) {
|
|
logger.error(`[MCP][User: ${userId}] Error getting app connections:`, error);
|
|
}
|
|
const userConnections = mcpManager.getUserConnections(userId) || new Map();
|
|
const oauthServers = new Set(
|
|
Object.entries(mcpConfig)
|
|
.filter(([, config]) => config.requiresOAuth)
|
|
.map(([name]) => name),
|
|
);
|
|
|
|
return {
|
|
mcpConfig,
|
|
oauthServers,
|
|
appConnections,
|
|
userConnections,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Check OAuth flow status for a user and server
|
|
* @param {string} userId - The user ID
|
|
* @param {string} serverName - The server name
|
|
* @returns {Object} Object containing hasActiveFlow and hasFailedFlow flags
|
|
*/
|
|
async function checkOAuthFlowStatus(userId, serverName) {
|
|
const flowsCache = getLogStores(CacheKeys.FLOWS);
|
|
const flowManager = getFlowStateManager(flowsCache);
|
|
const flowId = MCPOAuthHandler.generateFlowId(userId, serverName);
|
|
|
|
try {
|
|
const flowState = await flowManager.getFlowState(flowId, 'mcp_oauth');
|
|
if (!flowState) {
|
|
return { hasActiveFlow: false, hasFailedFlow: false };
|
|
}
|
|
|
|
const flowAge = Date.now() - flowState.createdAt;
|
|
const flowTTL = flowState.ttl || 180000; // Default 3 minutes
|
|
|
|
if (flowState.status === 'FAILED' || flowAge > flowTTL) {
|
|
const wasCancelled = flowState.error && flowState.error.includes('cancelled');
|
|
|
|
if (wasCancelled) {
|
|
logger.debug(`[MCP Connection Status] Found cancelled OAuth flow for ${serverName}`, {
|
|
flowId,
|
|
status: flowState.status,
|
|
error: flowState.error,
|
|
});
|
|
return { hasActiveFlow: false, hasFailedFlow: false };
|
|
} else {
|
|
logger.debug(`[MCP Connection Status] Found failed OAuth flow for ${serverName}`, {
|
|
flowId,
|
|
status: flowState.status,
|
|
flowAge,
|
|
flowTTL,
|
|
timedOut: flowAge > flowTTL,
|
|
error: flowState.error,
|
|
});
|
|
return { hasActiveFlow: false, hasFailedFlow: true };
|
|
}
|
|
}
|
|
|
|
if (flowState.status === 'PENDING') {
|
|
logger.debug(`[MCP Connection Status] Found active OAuth flow for ${serverName}`, {
|
|
flowId,
|
|
flowAge,
|
|
flowTTL,
|
|
});
|
|
return { hasActiveFlow: true, hasFailedFlow: false };
|
|
}
|
|
|
|
return { hasActiveFlow: false, hasFailedFlow: false };
|
|
} catch (error) {
|
|
logger.error(`[MCP Connection Status] Error checking OAuth flows for ${serverName}:`, error);
|
|
return { hasActiveFlow: false, hasFailedFlow: false };
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get connection status for a specific MCP server
|
|
* @param {string} userId - The user ID
|
|
* @param {string} serverName - The server name
|
|
* @param {import('@librechat/api').ParsedServerConfig} config - The server configuration
|
|
* @param {Map<string, import('@librechat/api').MCPConnection>} appConnections - App-level connections
|
|
* @param {Map<string, import('@librechat/api').MCPConnection>} userConnections - User-level connections
|
|
* @param {Set} oauthServers - Set of OAuth servers
|
|
* @returns {Object} Object containing requiresOAuth and connectionState
|
|
*/
|
|
async function getServerConnectionStatus(
|
|
userId,
|
|
serverName,
|
|
config,
|
|
appConnections,
|
|
userConnections,
|
|
oauthServers,
|
|
) {
|
|
const connection = appConnections.get(serverName) || userConnections.get(serverName);
|
|
const isStaleOrDoNotExist = connection ? connection?.isStale(config.updatedAt) : true;
|
|
|
|
const baseConnectionState = isStaleOrDoNotExist
|
|
? 'disconnected'
|
|
: connection?.connectionState || 'disconnected';
|
|
let finalConnectionState = baseConnectionState;
|
|
|
|
// connection state overrides specific to OAuth servers
|
|
if (baseConnectionState === 'disconnected' && oauthServers.has(serverName)) {
|
|
// check if server is actively being reconnected
|
|
const oauthReconnectionManager = getOAuthReconnectionManager();
|
|
if (oauthReconnectionManager.isReconnecting(userId, serverName)) {
|
|
finalConnectionState = 'connecting';
|
|
} else {
|
|
const { hasActiveFlow, hasFailedFlow } = await checkOAuthFlowStatus(userId, serverName);
|
|
|
|
if (hasFailedFlow) {
|
|
finalConnectionState = 'error';
|
|
} else if (hasActiveFlow) {
|
|
finalConnectionState = 'connecting';
|
|
}
|
|
}
|
|
}
|
|
|
|
return {
|
|
requiresOAuth: oauthServers.has(serverName),
|
|
connectionState: finalConnectionState,
|
|
};
|
|
}
|
|
|
|
module.exports = {
|
|
createMCPTool,
|
|
createMCPTools,
|
|
createMCPPermissionContext,
|
|
userCanUseMCPServers,
|
|
getMCPSetupData,
|
|
resolveConfigServers,
|
|
resolveMcpConfigNames,
|
|
resolveAllMcpConfigs,
|
|
checkOAuthFlowStatus,
|
|
getServerConnectionStatus,
|
|
createUnavailableToolStub,
|
|
};
|