🩹 fix: Clear Stale Redis Job Usage; Live-Tap Legacy Streams; Share Fetched Config

- DEL the Redis job hash before re-creating it so a reused streamId can't
  inherit a prior run's contextUsage/tokenUsage and backfill stale usage
- tap the legacy {message,text} stream branch (non-agent OpenAI/Anthropic
  streams) into the live estimate, not just the content path
- copy a deduped fetch's token config to every sibling endpoint sharing the
  baseURL/key/headers, so /token-config resolves each by its own name
This commit is contained in:
Danny Avila 2026-06-13 14:25:47 -04:00
parent a5f9ae3351
commit 2bfce0c34b
4 changed files with 41 additions and 6 deletions

View file

@ -813,6 +813,9 @@ export default function useResumableSSE(
parentMessageId: data.parentMessageId,
messageId: data.messageId,
};
/** Legacy non-agent streams send cumulative text here feed the
* live estimate like the content path above */
tapContent(text, { ...currentSubmission, userMessage });
messageHandler(text, { ...currentSubmission, userMessage, initialResponse });
}
} catch (error) {

View file

@ -161,6 +161,9 @@ export default function useSSE(
};
if (data.message != null) {
/** Legacy non-agent streams (handleText) send cumulative text here,
* not via the content path feed it to the live estimate too */
tapContent(text, { ...submission, userMessage });
messageHandler(text, { ...submission, userMessage, initialResponse });
}
}

View file

@ -12,6 +12,7 @@ import type { ServerRequest, GetUserKeyValuesFunction, UserKeyValues } from '~/t
import type { FetchModelsParams } from '~/endpoints/models';
import { fetchModels as defaultFetchModels } from '~/endpoints/models';
import { getTokenConfigKey } from '~/endpoints/custom/initialize';
import { tokenConfigCache } from '~/cache';
import { isUserProvided } from '~/utils';
/**
@ -97,6 +98,9 @@ export function createLoadConfigModels(deps: LoadConfigModelsDeps) {
const fetchPromisesMap: Record<string, Promise<string[]>> = {};
const uniqueKeyToEndpointsMap: Record<string, string[]> = {};
/** tokenKey the deduped fetch cached its token config under, so siblings
* sharing the fetch can be backfilled with the same config afterward */
const uniqueKeyToTokenKey: Record<string, string> = {};
const endpointsMap: Record<string, TEndpoint> = {};
const resolved: ResolvedEndpoint[] = [];
@ -175,9 +179,12 @@ export function createLoadConfigModels(deps: LoadConfigModelsDeps) {
const uniqueKey = `${BASE_URL}__${API_KEY}__${headersFingerprint(endpointHeaders)}`;
if (models?.fetch && !apiKeyIsUserProvided && !baseURLIsUserProvided) {
fetchPromisesMap[uniqueKey] =
fetchPromisesMap[uniqueKey] ||
fetchModels({
if (!fetchPromisesMap[uniqueKey]) {
/** User-scoped when configured headers resolve per user the
* derived token config must not be cached under the shared name */
const tokenKey = getTokenConfigKey(endpoint, name, req.user?.id ?? '');
uniqueKeyToTokenKey[uniqueKey] = tokenKey;
fetchPromisesMap[uniqueKey] = fetchModels({
name,
apiKey: API_KEY,
baseURL: BASE_URL,
@ -186,10 +193,9 @@ export function createLoadConfigModels(deps: LoadConfigModelsDeps) {
headers: endpointHeaders,
direct: endpoint.directEndpoint,
userIdQuery: models.userIdQuery,
/** User-scoped when configured headers resolve per user the
* derived token config must not be cached under the shared name */
tokenKey: getTokenConfigKey(endpoint, name, req.user?.id ?? ''),
tokenKey,
});
}
uniqueKeyToEndpointsMap[uniqueKey] = uniqueKeyToEndpointsMap[uniqueKey] || [];
uniqueKeyToEndpointsMap[uniqueKey].push(name);
continue;
@ -255,6 +261,23 @@ export function createLoadConfigModels(deps: LoadConfigModelsDeps) {
);
modelsConfig[name] = !modelData?.length ? defaults : modelData;
}
/** A shared fetch caches token config under one endpoint's tokenKey;
* copy it to the siblings so /token-config resolves each by its own
* name (the query is staleTime:Infinity and won't recover otherwise) */
const winnerTokenKey = uniqueKeyToTokenKey[currentKey];
if (settled.status === 'fulfilled' && winnerTokenKey != null && associatedNames.length > 1) {
const cache = tokenConfigCache();
const config = await cache.get(winnerTokenKey);
if (config != null) {
for (const name of associatedNames) {
const siblingKey = getTokenConfigKey(endpointsMap[name], name, req.user?.id ?? '');
if (siblingKey !== winnerTokenKey) {
await cache.set(siblingKey, config);
}
}
}
}
}
return modelsConfig;

View file

@ -164,6 +164,10 @@ export class RedisJobStore implements IJobStore {
// For cluster mode, we can't pipeline keys on different slots
// The job key uses hash tag {streamId}, runningJobs and userJobs are on different slots
if (this.isCluster) {
// DEL before HSET: a reused streamId (retry/superseded run) would
// otherwise inherit the prior job's contextUsage/tokenUsage fields,
// since HSET only overwrites the keys present in the fresh job.
await this.redis.del(key);
await this.redis.hset(key, this.serializeJob(job));
await this.redis.expire(key, this.ttl.running);
await this.redis.sadd(KEYS.runningJobs, streamId);
@ -173,6 +177,8 @@ export class RedisJobStore implements IJobStore {
}
} else {
const pipeline = this.redis.pipeline();
// See cluster branch: clear stale fields before writing the fresh job.
pipeline.del(key);
pipeline.hset(key, this.serializeJob(job));
pipeline.expire(key, this.ttl.running);
pipeline.sadd(KEYS.runningJobs, streamId);