LibreChat/packages/api/src/stream/implementations/InMemoryJobStore.ts
Danny Avila 439bc98682
⏸ refactor: Improve UX for Parallel Streams (Multi-Convo) (#11096)
* 🌊 feat: Implement multi-conversation feature with added conversation context and payload adjustments

* refactor: Replace isSubmittingFamily with isSubmitting across message components for consistency

* feat: Add loadAddedAgent and processAddedConvo for multi-conversation agent execution

* refactor: Update ContentRender usage to conditionally render PlaceholderRow based on isLast and isSubmitting

* WIP: first pass, sibling index

* feat: Enhance multi-conversation support with agent tracking and display improvements

* refactor: Introduce isEphemeralAgentId utility and update related logic for agent handling

* refactor: Implement createDualMessageContent utility for sibling message display and enhance useStepHandler for added conversations

* refactor: duplicate tools for added agent if ephemeral and primary agent is also ephemeral

* chore: remove deprecated multimessage rendering

* refactor: enhance dual message content creation and agent handling for parallel rendering

* refactor: streamline message rendering and submission handling by removing unused state and optimizing conditional logic

* refactor: adjust content handling in parallel mode to utilize existing content for improved agent display

* refactor: update @librechat/agents dependency to version 3.0.53

* refactor: update @langchain/core and @librechat/agents dependencies to latest versions

* refactor: remove deprecated @langchain/core dependency from package.json

* chore: remove unused SearchToolConfig and GetSourcesParams types from web.ts

* refactor: remove unused message properties from Message component

* refactor: enhance parallel content handling with groupId support in ContentParts and useStepHandler

* refactor: implement parallel content styling in Message, MessageRender, and ContentRender components. use explicit model name

* refactor: improve agent ID handling in createDualMessageContent for dual message display

* refactor: simplify title generation in AddedConvo by removing unused sender and preset logic

* refactor: replace string interpolation with cn utility for className in HoverButtons component

* refactor: enhance agent ID handling by adding suffix management for parallel agents and updating related components

* refactor: enhance column ordering in ContentParts by sorting agents with suffix management

* refactor: update @librechat/agents dependency to version 3.0.55

* feat: implement parallel content rendering with metadata support

- Added `ParallelContentRenderer` and `ParallelColumns` components for rendering messages in parallel based on groupId and agentId.
- Introduced `contentMetadataMap` to store metadata for each content part, allowing efficient parallel content detection.
- Updated `Message` and `ContentRender` components to utilize the new metadata structure for rendering.
- Modified `useStepHandler` to manage content indices and metadata during message processing.
- Enhanced `IJobStore` interface and its implementations to support storing and retrieving content metadata.
- Updated data schemas to include `contentMetadataMap` for messages, enabling multi-agent and parallel execution scenarios.

* refactor: update @librechat/agents dependency to version 3.0.56

* refactor: remove unused EPHEMERAL_AGENT_ID constant and simplify agent ID check

* refactor: enhance multi-agent message processing and primary agent determination

* refactor: implement branch message functionality for parallel responses

* refactor: integrate added conversation retrieval into message editing and regeneration processes

* refactor: remove unused isCard and isMultiMessage props from MessageRender and ContentRender components

* refactor: update @librechat/agents dependency to version 3.0.60

* refactor: replace usage of EPHEMERAL_AGENT_ID constant with isEphemeralAgentId function for improved clarity and consistency

* refactor: standardize agent ID format in tests for consistency

* chore: move addedConvo property to the correct position in payload construction

* refactor: rename agent_id values in loadAgent tests for clarity

* chore: reorder props in ContentParts component for improved readability

* refactor: rename variable 'content' to 'result' for clarity in RedisJobStore tests

* refactor: streamline useMessageActions by removing duplicate handleFeedback assignment

* chore: revert placeholder rendering logic MessageRender and ContentRender components to original

* refactor: implement useContentMetadata hook for optimized content metadata handling

* refactor: remove contentMetadataMap and related logic from the codebase and revert back to agentId/groupId in content parts

- Eliminated contentMetadataMap from various components and services, simplifying the handling of message content.
- Updated functions to directly access agentId and groupId from content parts instead of relying on a separate metadata map.
- Adjusted related hooks and components to reflect the removal of contentMetadataMap, ensuring consistent handling of message content.
- Updated tests and documentation to align with the new structure of message content handling.

* refactor: remove logging from groupParallelContent function to clean up output

* refactor: remove model parameter from TBranchMessageRequest type for simplification

* refactor: enhance branch message creation by stripping metadata for standalone content

* chore: streamline branch message creation by simplifying content filtering and removing unnecessary metadata checks

* refactor: include attachments in branch message creation for improved content handling

* refactor: streamline agent content processing by consolidating primary agent identification and filtering logic

* refactor: simplify multi-agent message processing by creating a dedicated mapping method and enhancing content filtering

* refactor: remove unused parameter from loadEphemeralAgent function for cleaner code

* refactor: update groupId handling in metadata to only set when provided by the server
2025-12-25 01:43:54 -05:00

303 lines
8.1 KiB
TypeScript

import { logger } from '@librechat/data-schemas';
import type { StandardGraph } from '@librechat/agents';
import type { Agents } from 'librechat-data-provider';
import type { IJobStore, SerializableJobData, JobStatus } from '~/stream/interfaces/IJobStore';
/**
* Content state for a job - volatile, in-memory only.
* Uses WeakRef to allow garbage collection of graph when no longer needed.
*/
interface ContentState {
contentParts: Agents.MessageContentComplex[];
graphRef: WeakRef<StandardGraph> | null;
}
/**
* In-memory implementation of IJobStore.
* Suitable for single-instance deployments.
* For horizontal scaling, use RedisJobStore.
*
* Content state is tied to jobs:
* - Uses WeakRef to graph for live access to contentParts and contentData (run steps)
* - No chunk persistence needed - same instance handles generation and reconnects
*/
export class InMemoryJobStore implements IJobStore {
private jobs = new Map<string, SerializableJobData>();
private contentState = new Map<string, ContentState>();
private cleanupInterval: NodeJS.Timeout | null = null;
/** Maps userId -> Set of streamIds (conversationIds) for active jobs */
private userJobMap = new Map<string, Set<string>>();
/** Time to keep completed jobs before cleanup (0 = immediate) */
private ttlAfterComplete = 0;
/** Maximum number of concurrent jobs */
private maxJobs = 1000;
constructor(options?: { ttlAfterComplete?: number; maxJobs?: number }) {
if (options?.ttlAfterComplete) {
this.ttlAfterComplete = options.ttlAfterComplete;
}
if (options?.maxJobs) {
this.maxJobs = options.maxJobs;
}
}
async initialize(): Promise<void> {
if (this.cleanupInterval) {
return;
}
this.cleanupInterval = setInterval(() => {
this.cleanup();
}, 60000);
if (this.cleanupInterval.unref) {
this.cleanupInterval.unref();
}
logger.debug('[InMemoryJobStore] Initialized with cleanup interval');
}
async createJob(
streamId: string,
userId: string,
conversationId?: string,
): Promise<SerializableJobData> {
if (this.jobs.size >= this.maxJobs) {
await this.evictOldest();
}
const job: SerializableJobData = {
streamId,
userId,
status: 'running',
createdAt: Date.now(),
conversationId,
syncSent: false,
};
this.jobs.set(streamId, job);
// Track job by userId for efficient user-scoped queries
let userJobs = this.userJobMap.get(userId);
if (!userJobs) {
userJobs = new Set();
this.userJobMap.set(userId, userJobs);
}
userJobs.add(streamId);
logger.debug(`[InMemoryJobStore] Created job: ${streamId}`);
return job;
}
async getJob(streamId: string): Promise<SerializableJobData | null> {
return this.jobs.get(streamId) ?? null;
}
async updateJob(streamId: string, updates: Partial<SerializableJobData>): Promise<void> {
const job = this.jobs.get(streamId);
if (!job) {
return;
}
Object.assign(job, updates);
}
async deleteJob(streamId: string): Promise<void> {
this.jobs.delete(streamId);
this.contentState.delete(streamId);
logger.debug(`[InMemoryJobStore] Deleted job: ${streamId}`);
}
async hasJob(streamId: string): Promise<boolean> {
return this.jobs.has(streamId);
}
async getRunningJobs(): Promise<SerializableJobData[]> {
const running: SerializableJobData[] = [];
for (const job of this.jobs.values()) {
if (job.status === 'running') {
running.push(job);
}
}
return running;
}
async cleanup(): Promise<number> {
const now = Date.now();
const toDelete: string[] = [];
for (const [streamId, job] of this.jobs) {
const isFinished = ['complete', 'error', 'aborted'].includes(job.status);
if (isFinished && job.completedAt) {
// TTL of 0 means immediate cleanup, otherwise wait for TTL to expire
if (this.ttlAfterComplete === 0 || now - job.completedAt > this.ttlAfterComplete) {
toDelete.push(streamId);
}
}
}
for (const id of toDelete) {
await this.deleteJob(id);
}
if (toDelete.length > 0) {
logger.debug(`[InMemoryJobStore] Cleaned up ${toDelete.length} expired jobs`);
}
return toDelete.length;
}
private async evictOldest(): Promise<void> {
let oldestId: string | null = null;
let oldestTime = Infinity;
for (const [streamId, job] of this.jobs) {
if (job.createdAt < oldestTime) {
oldestTime = job.createdAt;
oldestId = streamId;
}
}
if (oldestId) {
logger.warn(`[InMemoryJobStore] Evicting oldest job: ${oldestId}`);
await this.deleteJob(oldestId);
}
}
/** Get job count (for monitoring) */
async getJobCount(): Promise<number> {
return this.jobs.size;
}
/** Get job count by status (for monitoring) */
async getJobCountByStatus(status: JobStatus): Promise<number> {
let count = 0;
for (const job of this.jobs.values()) {
if (job.status === status) {
count++;
}
}
return count;
}
async destroy(): Promise<void> {
if (this.cleanupInterval) {
clearInterval(this.cleanupInterval);
this.cleanupInterval = null;
}
this.jobs.clear();
this.contentState.clear();
this.userJobMap.clear();
logger.debug('[InMemoryJobStore] Destroyed');
}
/**
* Get active job IDs for a user.
* Returns conversation IDs of running jobs belonging to the user.
* Also performs self-healing cleanup: removes stale entries for jobs that no longer exist.
*/
async getActiveJobIdsByUser(userId: string): Promise<string[]> {
const trackedIds = this.userJobMap.get(userId);
if (!trackedIds || trackedIds.size === 0) {
return [];
}
const activeIds: string[] = [];
for (const streamId of trackedIds) {
const job = this.jobs.get(streamId);
// Only include if job exists AND is still running
if (job && job.status === 'running') {
activeIds.push(streamId);
} else {
// Self-healing: job completed/deleted but mapping wasn't cleaned - fix it now
trackedIds.delete(streamId);
}
}
// Clean up empty set
if (trackedIds.size === 0) {
this.userJobMap.delete(userId);
}
return activeIds;
}
// ===== Content State Methods =====
/**
* Set the graph reference for a job.
* Uses WeakRef to allow garbage collection when graph is no longer needed.
*/
setGraph(streamId: string, graph: StandardGraph): void {
const existing = this.contentState.get(streamId);
if (existing) {
existing.graphRef = new WeakRef(graph);
} else {
this.contentState.set(streamId, {
contentParts: [],
graphRef: new WeakRef(graph),
});
}
}
/**
* Set content parts reference for a job.
*/
setContentParts(streamId: string, contentParts: Agents.MessageContentComplex[]): void {
const existing = this.contentState.get(streamId);
if (existing) {
existing.contentParts = contentParts;
} else {
this.contentState.set(streamId, { contentParts, graphRef: null });
}
}
/**
* Get content parts for a job.
* Returns live content from stored reference.
*/
async getContentParts(streamId: string): Promise<{
content: Agents.MessageContentComplex[];
} | null> {
const state = this.contentState.get(streamId);
if (!state?.contentParts) {
return null;
}
return {
content: state.contentParts,
};
}
/**
* Get run steps for a job from graph.contentData.
* Uses WeakRef - may return empty if graph has been GC'd.
*/
async getRunSteps(streamId: string): Promise<Agents.RunStep[]> {
const state = this.contentState.get(streamId);
if (!state?.graphRef) {
return [];
}
// Dereference WeakRef - may return undefined if GC'd
const graph = state.graphRef.deref();
return graph?.contentData ?? [];
}
/**
* No-op for in-memory - content available via graph reference.
*/
async appendChunk(): Promise<void> {
// No-op: content available via graph reference
}
/**
* Clear content state for a job.
*/
clearContentState(streamId: string): void {
this.contentState.delete(streamId);
}
}