diff --git a/client/src/hooks/SSE/useResumableSSE.ts b/client/src/hooks/SSE/useResumableSSE.ts index 893017afad..c861b8aab2 100644 --- a/client/src/hooks/SSE/useResumableSSE.ts +++ b/client/src/hooks/SSE/useResumableSSE.ts @@ -813,6 +813,9 @@ export default function useResumableSSE( parentMessageId: data.parentMessageId, messageId: data.messageId, }; + /** Legacy non-agent streams send cumulative text here — feed the + * live estimate like the content path above */ + tapContent(text, { ...currentSubmission, userMessage }); messageHandler(text, { ...currentSubmission, userMessage, initialResponse }); } } catch (error) { diff --git a/client/src/hooks/SSE/useSSE.ts b/client/src/hooks/SSE/useSSE.ts index 2e4e4b60cd..745b268417 100644 --- a/client/src/hooks/SSE/useSSE.ts +++ b/client/src/hooks/SSE/useSSE.ts @@ -161,6 +161,9 @@ export default function useSSE( }; if (data.message != null) { + /** Legacy non-agent streams (handleText) send cumulative text here, + * not via the content path — feed it to the live estimate too */ + tapContent(text, { ...submission, userMessage }); messageHandler(text, { ...submission, userMessage, initialResponse }); } } diff --git a/packages/api/src/endpoints/config/models.ts b/packages/api/src/endpoints/config/models.ts index 28ce2a3d81..30dfa8767d 100644 --- a/packages/api/src/endpoints/config/models.ts +++ b/packages/api/src/endpoints/config/models.ts @@ -12,6 +12,7 @@ import type { ServerRequest, GetUserKeyValuesFunction, UserKeyValues } from '~/t import type { FetchModelsParams } from '~/endpoints/models'; import { fetchModels as defaultFetchModels } from '~/endpoints/models'; import { getTokenConfigKey } from '~/endpoints/custom/initialize'; +import { tokenConfigCache } from '~/cache'; import { isUserProvided } from '~/utils'; /** @@ -97,6 +98,9 @@ export function createLoadConfigModels(deps: LoadConfigModelsDeps) { const fetchPromisesMap: Record> = {}; const uniqueKeyToEndpointsMap: Record = {}; + /** tokenKey the deduped fetch cached its token config under, so siblings + * sharing the fetch can be backfilled with the same config afterward */ + const uniqueKeyToTokenKey: Record = {}; const endpointsMap: Record = {}; const resolved: ResolvedEndpoint[] = []; @@ -175,9 +179,12 @@ export function createLoadConfigModels(deps: LoadConfigModelsDeps) { const uniqueKey = `${BASE_URL}__${API_KEY}__${headersFingerprint(endpointHeaders)}`; if (models?.fetch && !apiKeyIsUserProvided && !baseURLIsUserProvided) { - fetchPromisesMap[uniqueKey] = - fetchPromisesMap[uniqueKey] || - fetchModels({ + if (!fetchPromisesMap[uniqueKey]) { + /** User-scoped when configured headers resolve per user — the + * derived token config must not be cached under the shared name */ + const tokenKey = getTokenConfigKey(endpoint, name, req.user?.id ?? ''); + uniqueKeyToTokenKey[uniqueKey] = tokenKey; + fetchPromisesMap[uniqueKey] = fetchModels({ name, apiKey: API_KEY, baseURL: BASE_URL, @@ -186,10 +193,9 @@ export function createLoadConfigModels(deps: LoadConfigModelsDeps) { headers: endpointHeaders, direct: endpoint.directEndpoint, userIdQuery: models.userIdQuery, - /** User-scoped when configured headers resolve per user — the - * derived token config must not be cached under the shared name */ - tokenKey: getTokenConfigKey(endpoint, name, req.user?.id ?? ''), + tokenKey, }); + } uniqueKeyToEndpointsMap[uniqueKey] = uniqueKeyToEndpointsMap[uniqueKey] || []; uniqueKeyToEndpointsMap[uniqueKey].push(name); continue; @@ -255,6 +261,23 @@ export function createLoadConfigModels(deps: LoadConfigModelsDeps) { ); modelsConfig[name] = !modelData?.length ? defaults : modelData; } + + /** A shared fetch caches token config under one endpoint's tokenKey; + * copy it to the siblings so /token-config resolves each by its own + * name (the query is staleTime:Infinity and won't recover otherwise) */ + const winnerTokenKey = uniqueKeyToTokenKey[currentKey]; + if (settled.status === 'fulfilled' && winnerTokenKey != null && associatedNames.length > 1) { + const cache = tokenConfigCache(); + const config = await cache.get(winnerTokenKey); + if (config != null) { + for (const name of associatedNames) { + const siblingKey = getTokenConfigKey(endpointsMap[name], name, req.user?.id ?? ''); + if (siblingKey !== winnerTokenKey) { + await cache.set(siblingKey, config); + } + } + } + } } return modelsConfig; diff --git a/packages/api/src/stream/implementations/RedisJobStore.ts b/packages/api/src/stream/implementations/RedisJobStore.ts index a00eeba6b4..1cd26b107f 100644 --- a/packages/api/src/stream/implementations/RedisJobStore.ts +++ b/packages/api/src/stream/implementations/RedisJobStore.ts @@ -164,6 +164,10 @@ export class RedisJobStore implements IJobStore { // For cluster mode, we can't pipeline keys on different slots // The job key uses hash tag {streamId}, runningJobs and userJobs are on different slots if (this.isCluster) { + // DEL before HSET: a reused streamId (retry/superseded run) would + // otherwise inherit the prior job's contextUsage/tokenUsage fields, + // since HSET only overwrites the keys present in the fresh job. + await this.redis.del(key); await this.redis.hset(key, this.serializeJob(job)); await this.redis.expire(key, this.ttl.running); await this.redis.sadd(KEYS.runningJobs, streamId); @@ -173,6 +177,8 @@ export class RedisJobStore implements IJobStore { } } else { const pipeline = this.redis.pipeline(); + // See cluster branch: clear stale fields before writing the fresh job. + pipeline.del(key); pipeline.hset(key, this.serializeJob(job)); pipeline.expire(key, this.ttl.running); pipeline.sadd(KEYS.runningJobs, streamId);