mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-06-20 03:55:44 +00:00
🩹 fix: Clear Stale Redis Job Usage; Live-Tap Legacy Streams; Share Fetched Config
- DEL the Redis job hash before re-creating it so a reused streamId can't
inherit a prior run's contextUsage/tokenUsage and backfill stale usage
- tap the legacy {message,text} stream branch (non-agent OpenAI/Anthropic
streams) into the live estimate, not just the content path
- copy a deduped fetch's token config to every sibling endpoint sharing the
baseURL/key/headers, so /token-config resolves each by its own name
This commit is contained in:
parent
a5f9ae3351
commit
2bfce0c34b
4 changed files with 41 additions and 6 deletions
|
|
@ -813,6 +813,9 @@ export default function useResumableSSE(
|
|||
parentMessageId: data.parentMessageId,
|
||||
messageId: data.messageId,
|
||||
};
|
||||
/** Legacy non-agent streams send cumulative text here — feed the
|
||||
* live estimate like the content path above */
|
||||
tapContent(text, { ...currentSubmission, userMessage });
|
||||
messageHandler(text, { ...currentSubmission, userMessage, initialResponse });
|
||||
}
|
||||
} catch (error) {
|
||||
|
|
|
|||
|
|
@ -161,6 +161,9 @@ export default function useSSE(
|
|||
};
|
||||
|
||||
if (data.message != null) {
|
||||
/** Legacy non-agent streams (handleText) send cumulative text here,
|
||||
* not via the content path — feed it to the live estimate too */
|
||||
tapContent(text, { ...submission, userMessage });
|
||||
messageHandler(text, { ...submission, userMessage, initialResponse });
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ import type { ServerRequest, GetUserKeyValuesFunction, UserKeyValues } from '~/t
|
|||
import type { FetchModelsParams } from '~/endpoints/models';
|
||||
import { fetchModels as defaultFetchModels } from '~/endpoints/models';
|
||||
import { getTokenConfigKey } from '~/endpoints/custom/initialize';
|
||||
import { tokenConfigCache } from '~/cache';
|
||||
import { isUserProvided } from '~/utils';
|
||||
|
||||
/**
|
||||
|
|
@ -97,6 +98,9 @@ export function createLoadConfigModels(deps: LoadConfigModelsDeps) {
|
|||
|
||||
const fetchPromisesMap: Record<string, Promise<string[]>> = {};
|
||||
const uniqueKeyToEndpointsMap: Record<string, string[]> = {};
|
||||
/** tokenKey the deduped fetch cached its token config under, so siblings
|
||||
* sharing the fetch can be backfilled with the same config afterward */
|
||||
const uniqueKeyToTokenKey: Record<string, string> = {};
|
||||
const endpointsMap: Record<string, TEndpoint> = {};
|
||||
|
||||
const resolved: ResolvedEndpoint[] = [];
|
||||
|
|
@ -175,9 +179,12 @@ export function createLoadConfigModels(deps: LoadConfigModelsDeps) {
|
|||
const uniqueKey = `${BASE_URL}__${API_KEY}__${headersFingerprint(endpointHeaders)}`;
|
||||
|
||||
if (models?.fetch && !apiKeyIsUserProvided && !baseURLIsUserProvided) {
|
||||
fetchPromisesMap[uniqueKey] =
|
||||
fetchPromisesMap[uniqueKey] ||
|
||||
fetchModels({
|
||||
if (!fetchPromisesMap[uniqueKey]) {
|
||||
/** User-scoped when configured headers resolve per user — the
|
||||
* derived token config must not be cached under the shared name */
|
||||
const tokenKey = getTokenConfigKey(endpoint, name, req.user?.id ?? '');
|
||||
uniqueKeyToTokenKey[uniqueKey] = tokenKey;
|
||||
fetchPromisesMap[uniqueKey] = fetchModels({
|
||||
name,
|
||||
apiKey: API_KEY,
|
||||
baseURL: BASE_URL,
|
||||
|
|
@ -186,10 +193,9 @@ export function createLoadConfigModels(deps: LoadConfigModelsDeps) {
|
|||
headers: endpointHeaders,
|
||||
direct: endpoint.directEndpoint,
|
||||
userIdQuery: models.userIdQuery,
|
||||
/** User-scoped when configured headers resolve per user — the
|
||||
* derived token config must not be cached under the shared name */
|
||||
tokenKey: getTokenConfigKey(endpoint, name, req.user?.id ?? ''),
|
||||
tokenKey,
|
||||
});
|
||||
}
|
||||
uniqueKeyToEndpointsMap[uniqueKey] = uniqueKeyToEndpointsMap[uniqueKey] || [];
|
||||
uniqueKeyToEndpointsMap[uniqueKey].push(name);
|
||||
continue;
|
||||
|
|
@ -255,6 +261,23 @@ export function createLoadConfigModels(deps: LoadConfigModelsDeps) {
|
|||
);
|
||||
modelsConfig[name] = !modelData?.length ? defaults : modelData;
|
||||
}
|
||||
|
||||
/** A shared fetch caches token config under one endpoint's tokenKey;
|
||||
* copy it to the siblings so /token-config resolves each by its own
|
||||
* name (the query is staleTime:Infinity and won't recover otherwise) */
|
||||
const winnerTokenKey = uniqueKeyToTokenKey[currentKey];
|
||||
if (settled.status === 'fulfilled' && winnerTokenKey != null && associatedNames.length > 1) {
|
||||
const cache = tokenConfigCache();
|
||||
const config = await cache.get(winnerTokenKey);
|
||||
if (config != null) {
|
||||
for (const name of associatedNames) {
|
||||
const siblingKey = getTokenConfigKey(endpointsMap[name], name, req.user?.id ?? '');
|
||||
if (siblingKey !== winnerTokenKey) {
|
||||
await cache.set(siblingKey, config);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return modelsConfig;
|
||||
|
|
|
|||
|
|
@ -164,6 +164,10 @@ export class RedisJobStore implements IJobStore {
|
|||
// For cluster mode, we can't pipeline keys on different slots
|
||||
// The job key uses hash tag {streamId}, runningJobs and userJobs are on different slots
|
||||
if (this.isCluster) {
|
||||
// DEL before HSET: a reused streamId (retry/superseded run) would
|
||||
// otherwise inherit the prior job's contextUsage/tokenUsage fields,
|
||||
// since HSET only overwrites the keys present in the fresh job.
|
||||
await this.redis.del(key);
|
||||
await this.redis.hset(key, this.serializeJob(job));
|
||||
await this.redis.expire(key, this.ttl.running);
|
||||
await this.redis.sadd(KEYS.runningJobs, streamId);
|
||||
|
|
@ -173,6 +177,8 @@ export class RedisJobStore implements IJobStore {
|
|||
}
|
||||
} else {
|
||||
const pipeline = this.redis.pipeline();
|
||||
// See cluster branch: clear stale fields before writing the fresh job.
|
||||
pipeline.del(key);
|
||||
pipeline.hset(key, this.serializeJob(job));
|
||||
pipeline.expire(key, this.ttl.running);
|
||||
pipeline.sadd(KEYS.runningJobs, streamId);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue