diff --git a/apps/stock/config/config/default.json b/apps/stock/config/config/default.json index dde869a..6c28cf5 100644 --- a/apps/stock/config/config/default.json +++ b/apps/stock/config/config/default.json @@ -77,8 +77,8 @@ "port": 6379, "db": 1 }, - "workers": 1, - "concurrency": 1, + "workers": 5, + "concurrency": 5, "enableScheduledJobs": true, "defaultJobOptions": { "attempts": 3, diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/corporate-actions.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/corporate-actions.action.ts index e7789fa..d4473cc 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/corporate-actions.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/corporate-actions.action.ts @@ -49,7 +49,7 @@ export async function updateCorporateActions( this.logger.info('Fetching corporate actions', { symbol, symbolId }); const sessionManager = QMSessionManager.getInstance(); - sessionManager.initialize(this.cache, this.logger); + await sessionManager.initialize(this.cache, this.logger); // Get a session - you'll need to add the appropriate session ID const sessionId = QM_SESSION_IDS.LOOKUP; // TODO: Update with correct session ID diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/filings.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/filings.action.ts index 94b2064..ed31a0d 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/filings.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/filings.action.ts @@ -44,7 +44,7 @@ export async function updateFilings( this.logger.info('Fetching filings', { symbol, symbolId }); const sessionManager = QMSessionManager.getInstance(); - sessionManager.initialize(this.cache, this.logger); + await sessionManager.initialize(this.cache, this.logger); // Get a session - you'll need to add the appropriate session ID for filings const sessionId = QM_SESSION_IDS.LOOKUP; // TODO: Update with correct session ID diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/financials.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/financials.action.ts index a1eb31d..d276eda 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/financials.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/financials.action.ts @@ -44,7 +44,7 @@ export async function updateFinancials( this.logger.info('Fetching financials', { symbol, symbolId }); const sessionManager = QMSessionManager.getInstance(); - sessionManager.initialize(this.cache, this.logger); + await sessionManager.initialize(this.cache, this.logger); // Get a session - you'll need to add the appropriate session ID for financials const sessionId = QM_SESSION_IDS.LOOKUP; // TODO: Update with correct session ID diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/intraday.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/intraday.action.ts index 75900d3..6445190 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/intraday.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/intraday.action.ts @@ -46,7 +46,7 @@ export async function updateIntradayBars( this.logger.info('Fetching intraday bars', { symbol, symbolId, crawlDate }); const sessionManager = QMSessionManager.getInstance(); - sessionManager.initialize(this.cache, this.logger); + await sessionManager.initialize(this.cache, this.logger); // Get a session - you'll need to add the appropriate session ID for intraday const sessionId = QM_SESSION_IDS.LOOKUP; // TODO: Update with correct session ID diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/prices.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/prices.action.ts index 8c6dd72..34e421e 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/prices.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/prices.action.ts @@ -45,7 +45,7 @@ export async function updatePrices( this.logger.info(`Fetching daily prices ${qmSearchCode}`, { qmSearchCode }); const sessionManager = QMSessionManager.getInstance(); - sessionManager.initialize(this.cache, this.logger); + await sessionManager.initialize(this.cache, this.logger); // Get a session - you'll need to add the appropriate session ID for prices const sessionId = QM_SESSION_IDS.PRICES; @@ -86,11 +86,17 @@ export async function updatePrices( // Update session success stats await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid); - + // Update symbol to track last price update + const tracker = await getOperationTracker(this); + const priceData = responseData.results?.history[0].eoddata || []; if(!priceData || priceData.length === 0) { this.logger.warn(`No price data found for symbol ${qmSearchCode}`); + await tracker.updateSymbolOperation(qmSearchCode, 'price_update', { + status: 'success', + recordCount: priceData.length + }); return { success: false, qmSearchCode, @@ -121,8 +127,7 @@ export async function updatePrices( new Date(0) ); - // Update symbol to track last price update - const tracker = await getOperationTracker(this); + await tracker.updateSymbolOperation(qmSearchCode, 'price_update', { status: 'success', lastRecordDate: latestDate, @@ -193,7 +198,7 @@ export async function schedulePriceUpdates( symbolsQueued: number; errors: number; }> { - const { limit = 100, forceUpdate = false } = input; + const { limit = 50000, forceUpdate = false } = input; const tracker = await getOperationTracker(this); this.logger.info('Scheduling price updates', { limit, forceUpdate }); @@ -236,7 +241,7 @@ export async function schedulePriceUpdates( exchange: doc.exchange }, { priority: 7, // High priority for price data - delay: queued * 500 // 0.5 seconds between jobs + delay: queued * 100 // 0.1 seconds between jobs }); queued++; diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/session.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/session.action.ts index 842fd79..a67bb9a 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/session.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/session.action.ts @@ -26,10 +26,7 @@ export async function checkSessions( const sessionManager = QMSessionManager.getInstance(); // Initialize with cache provider and logger - sessionManager.initialize(this.cache, this.logger); - - // Always load fresh data from cache (don't rely on initialization flag) - await sessionManager.loadFromCache(); + await sessionManager.initialize(this.cache, this.logger); // Cleanup failed sessions (this now handles its own cache sync) const cleanedCount = await sessionManager.cleanupFailedSessions(); @@ -39,15 +36,15 @@ export async function checkSessions( for (const [sessionType, sessionId] of Object.entries(QM_SESSION_IDS)) { this.logger.debug(`Checking session ID: ${sessionId}`); if (await sessionManager.needsMoreSessions(sessionId)) { - const currentCount = sessionManager.getSessions(sessionId).length; + const currentCount = await sessionManager.getSessionCount(sessionId); const neededSessions = SESSION_CONFIG.MAX_SESSIONS - currentCount; // Queue up to 10 at a time to avoid overwhelming the system - const toQueue = Math.min(neededSessions, 20); + const toQueue = Math.min(neededSessions, 40); for (let i = 0; i < toQueue; i++) { await this.scheduleOperation('create-session', { sessionId, sessionType }, { - delay: i * 2000, // Stagger creation by 2 seconds + // delay: i * 2000, // Stagger creation by 2 seconds }); queuedCount++; } @@ -94,7 +91,7 @@ export async function createSession( } // Initialize with cache provider and logger - sessionManager.initialize(this.cache, this.logger); + await sessionManager.initialize(this.cache, this.logger); try { // Get proxy from proxy service diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/symbol-dedup.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/symbol-dedup.action.ts index b4cc6ac..64d8a42 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/symbol-dedup.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/symbol-dedup.action.ts @@ -201,7 +201,7 @@ export async function deduplicateSymbols( countryDocs.sort((a, b) => { const priorityA = priorityMap.get(a.exchange) || 1; const priorityB = priorityMap.get(b.exchange) || 1; - if (priorityA !== priorityB) return priorityB - priorityA; + if (priorityA !== priorityB) { return priorityB - priorityA } return (b.marketCap || 0) - (a.marketCap || 0); }); activeDoc = countryDocs[0]; @@ -217,7 +217,7 @@ export async function deduplicateSymbols( } }); totalProcessed++; - if (!isActive) totalDeactivated++; + if (!isActive) { totalDeactivated++ } } } } diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/symbol-info.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/symbol-info.action.ts index 20b63f3..e25e61a 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/symbol-info.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/symbol-info.action.ts @@ -43,7 +43,7 @@ export async function updateSymbolInfo( this.logger.info(`Fetching symbol info ${qmSearchCode}`, { qmSearchCode }); const sessionManager = QMSessionManager.getInstance(); - sessionManager.initialize(this.cache, this.logger); + await sessionManager.initialize(this.cache, this.logger); // Get a session const sessionId = QM_SESSION_IDS.SYMBOL; diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/symbol.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/symbol.action.ts index 1a3330d..cab3ef5 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/symbol.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/symbol.action.ts @@ -145,7 +145,7 @@ export async function searchSymbols( this.logger.debug('Searching QM symbols', { query }); const sessionManager = QMSessionManager.getInstance(); - sessionManager.initialize(this.cache, this.logger); + await sessionManager.initialize(this.cache, this.logger); // Get a session const sessionId = QM_SESSION_IDS.LOOKUP; diff --git a/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts b/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts index 9e910af..ef5a123 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts @@ -42,10 +42,10 @@ export class QMHandler extends BaseHandler { /** * SESSIONS */ - @ScheduledOperation('check-sessions', '*/2 * * * *', { + @ScheduledOperation('check-sessions', '*/1 * * * *', { priority: 8, - immediately: false, - description: 'Check and maintain QM sessions every 2 minutes', + immediately: true, + description: 'Check and maintain QM sessions every 1 minutes', }) checkSessions = checkSessions; diff --git a/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts b/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts index 00c9864..36b6151 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts @@ -42,9 +42,9 @@ export const QM_CONFIG = { // Session management settings export const SESSION_CONFIG = { - MIN_SESSIONS: 2, - MAX_SESSIONS: 2, - MAX_FAILED_CALLS: 3, + MIN_SESSIONS: 20, + MAX_SESSIONS: 100, + MAX_FAILED_CALLS: 5, SESSION_TIMEOUT: 5000, // 10 seconds API_TIMEOUT: 30000, // 15 seconds } as const; diff --git a/apps/stock/data-ingestion/src/handlers/qm/shared/index.ts b/apps/stock/data-ingestion/src/handlers/qm/shared/index.ts index b391d81..6e5b075 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/shared/index.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/shared/index.ts @@ -1,4 +1,6 @@ export * from './config'; export * from './session-manager'; +export * from './session-manager-redis'; +export * from './session-manager-wrapper'; export * from './types'; export * from './operation-tracker'; \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/qm/shared/session-manager-redis.ts b/apps/stock/data-ingestion/src/handlers/qm/shared/session-manager-redis.ts new file mode 100644 index 0000000..5ca1f32 --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/qm/shared/session-manager-redis.ts @@ -0,0 +1,382 @@ +/** + * QM Session Manager - Redis-based implementation for cluster support + * Uses Redis as the single source of truth for session management + */ + +import type { CacheProvider } from '@stock-bot/cache'; +import type { Logger } from '@stock-bot/types'; +import { QM_SESSION_IDS, SESSION_CONFIG } from './config'; +import type { QMSession } from './types'; + +export class QMSessionManagerRedis { + private static instance: QMSessionManagerRedis | null = null; + private cacheProvider: CacheProvider | null = null; + private logger: Logger | null = null; + private readonly SESSION_TTL = 86400; // 24 hours + + private constructor() {} + + static getInstance(): QMSessionManagerRedis { + if (!QMSessionManagerRedis.instance) { + QMSessionManagerRedis.instance = new QMSessionManagerRedis(); + } + return QMSessionManagerRedis.instance; + } + + /** + * Initialize with cache provider and logger + */ + initialize(cache?: CacheProvider, logger?: Logger): void { + if (cache) { + this.cacheProvider = cache; + } + if (logger) { + this.logger = logger; + this.logger.trace('Redis-based SessionManager initialized'); + } + } + + /** + * Get a random valid session for the given session ID + */ + async getSession(sessionId: string): Promise { + if (!this.cacheProvider) { + this.logger?.error('No cache provider available'); + return null; + } + + try { + // Get all session UUIDs for this sessionId + const sessionUuids = await this.getSessionUuids(sessionId); + + if (sessionUuids.length === 0) { + this.logger?.debug(`No sessions found for ${sessionId}`); + return null; + } + + // Shuffle and try to find a valid session + const shuffled = [...sessionUuids].sort(() => Math.random() - 0.5); + + for (const uuid of shuffled) { + const session = await this.getSessionByUuid(sessionId, uuid); + if (session && (session.failedCalls || 0) <= SESSION_CONFIG.MAX_FAILED_CALLS) { + this.logger?.trace(`Selected session`, { + uuid: session.uuid, + failedCalls: session.failedCalls, + successfulCalls: session.successfulCalls, + }); + return session; + } + } + + this.logger?.warn(`No valid sessions found for ${sessionId}`); + return null; + } catch (error) { + this.logger?.error('Failed to get session', { error, sessionId }); + return null; + } + } + + /** + * Get a specific session by UUID + */ + async getSessionByUuid(sessionId: string, uuid: string): Promise { + if (!this.cacheProvider) { + return null; + } + + const sessionKey = this.getSessionKey(sessionId, uuid); + try { + const session = await this.cacheProvider.get(sessionKey); + if (session) { + // Ensure dates are Date objects + session.lastUsed = new Date(session.lastUsed); + session.createdAt = new Date(session.createdAt); + } + return session; + } catch (error) { + this.logger?.error('Failed to get session by UUID', { error, sessionId, uuid }); + return null; + } + } + + /** + * Add a session + */ + async addSession(sessionId: string, session: QMSession): Promise { + if (!this.cacheProvider) { + throw new Error('No cache provider available'); + } + + const sessionKey = this.getSessionKey(sessionId, session.uuid); + const listKey = this.getSessionListKey(sessionId); + + try { + // Store the session data + await this.cacheProvider.set(sessionKey, session, this.SESSION_TTL); + + // Add UUID to the session list (using SET to simulate SADD) + const uuids = await this.getSessionUuids(sessionId); + if (!uuids.includes(session.uuid)) { + uuids.push(session.uuid); + await this.cacheProvider.set(listKey, uuids, this.SESSION_TTL); + } + + this.logger?.debug(`Added session ${session.uuid} to ${sessionId}`); + } catch (error) { + this.logger?.error('Failed to add session', { error, sessionId, uuid: session.uuid }); + throw error; + } + } + + /** + * Remove a session + */ + async removeSession(sessionId: string, uuid: string): Promise { + if (!this.cacheProvider) { + return; + } + + const sessionKey = this.getSessionKey(sessionId, uuid); + const listKey = this.getSessionListKey(sessionId); + + try { + // Remove session data + await this.cacheProvider.del(sessionKey); + + // Remove UUID from list + const uuids = await this.getSessionUuids(sessionId); + const filtered = uuids.filter(u => u !== uuid); + if (filtered.length !== uuids.length) { + await this.cacheProvider.set(listKey, filtered, this.SESSION_TTL); + } + + this.logger?.debug(`Removed session ${uuid} from ${sessionId}`); + } catch (error) { + this.logger?.error('Failed to remove session', { error, sessionId, uuid }); + } + } + + /** + * Get all sessions for a session ID + */ + async getSessions(sessionId: string): Promise { + if (!this.cacheProvider) { + return []; + } + + const uuids = await this.getSessionUuids(sessionId); + const sessions: QMSession[] = []; + + for (const uuid of uuids) { + const session = await this.getSessionByUuid(sessionId, uuid); + if (session) { + sessions.push(session); + } + } + + return sessions; + } + + /** + * Get session count + */ + async getSessionCount(sessionId?: string): Promise { + if (!this.cacheProvider) { + return 0; + } + + if (sessionId) { + const uuids = await this.getSessionUuids(sessionId); + return uuids.length; + } + + // Count all sessions + let total = 0; + for (const sid of Object.values(QM_SESSION_IDS)) { + const uuids = await this.getSessionUuids(sid); + total += uuids.length; + } + return total; + } + + /** + * Clean up failed sessions + */ + async cleanupFailedSessions(): Promise { + if (!this.cacheProvider) { + return 0; + } + + let removedCount = 0; + + for (const sessionId of Object.values(QM_SESSION_IDS)) { + const sessions = await this.getSessions(sessionId); + + for (const session of sessions) { + if (session.failedCalls > SESSION_CONFIG.MAX_FAILED_CALLS) { + await this.removeSession(sessionId, session.uuid); + removedCount++; + } + } + } + + this.logger?.info(`Cleaned up ${removedCount} failed sessions`); + return removedCount; + } + + /** + * Check if more sessions are needed + */ + async needsMoreSessions(sessionId: string): Promise { + const sessions = await this.getSessions(sessionId); + const validSessions = sessions.filter( + s => (s.failedCalls || 0) <= SESSION_CONFIG.MAX_FAILED_CALLS + ); + return validSessions.length < SESSION_CONFIG.MAX_SESSIONS; + } + + /** + * Get statistics + */ + async getStats(): Promise> { + const stats: Record = {}; + + for (const [sessionType, sessionId] of Object.entries(QM_SESSION_IDS)) { + const sessions = await this.getSessions(sessionId); + const validSessions = sessions.filter( + s => (s.failedCalls || 0) <= SESSION_CONFIG.MAX_FAILED_CALLS + ); + const failedSessions = sessions.filter( + s => s.failedCalls > SESSION_CONFIG.MAX_FAILED_CALLS + ); + + stats[sessionId] = { + total: sessions.length, + valid: validSessions.length, + failed: failedSessions.length, + }; + } + + return stats; + } + + /** + * Increment failed calls for a session + */ + async incrementFailedCalls(sessionId: string, sessionUuid: string): Promise { + const session = await this.getSessionByUuid(sessionId, sessionUuid); + + if (session) { + session.failedCalls = (session.failedCalls || 0) + 1; + session.lastUsed = new Date(); + + const sessionKey = this.getSessionKey(sessionId, sessionUuid); + await this.cacheProvider?.set(sessionKey, session, this.SESSION_TTL); + + this.logger?.debug(`Incremented failed calls for session`, { + sessionUuid, + failedCalls: session.failedCalls, + sessionId + }); + } else { + this.logger?.warn(`Session not found`, { sessionUuid, sessionId }); + } + } + + /** + * Increment successful calls for a session + */ + async incrementSuccessfulCalls(sessionId: string, sessionUuid: string): Promise { + const session = await this.getSessionByUuid(sessionId, sessionUuid); + + if (session) { + session.successfulCalls = (session.successfulCalls || 0) + 1; + session.lastUsed = new Date(); + + const sessionKey = this.getSessionKey(sessionId, sessionUuid); + await this.cacheProvider?.set(sessionKey, session, this.SESSION_TTL); + + this.logger?.debug(`Incremented successful calls for session`, { + sessionUuid, + successfulCalls: session.successfulCalls, + sessionId + }); + } else { + this.logger?.warn(`Session not found`, { sessionUuid, sessionId }); + } + } + + /** + * Helper: Get session UUIDs from cache + */ + private async getSessionUuids(sessionId: string): Promise { + if (!this.cacheProvider) { + return []; + } + + const listKey = this.getSessionListKey(sessionId); + try { + const uuids = await this.cacheProvider.get(listKey); + return uuids || []; + } catch (error) { + this.logger?.error('Failed to get session UUIDs', { error, sessionId }); + return []; + } + } + + /** + * Helper: Generate cache keys + */ + private getSessionKey(sessionId: string, uuid: string): string { + return `qm:sessions:${sessionId}:session:${uuid}`; + } + + private getSessionListKey(sessionId: string): string { + return `qm:sessions:${sessionId}:list`; + } + + /** + * Migrate from old session manager (for backward compatibility) + */ + async migrateFromOldFormat(): Promise { + if (!this.cacheProvider) { + return; + } + + this.logger?.info('Starting migration from old session format'); + + for (const [sessionType, sessionId] of Object.entries(QM_SESSION_IDS)) { + const oldListKey = `qm:sessions:${sessionType.toLowerCase()}:list`; + const oldSessionIds = await this.cacheProvider.get(oldListKey); + + if (oldSessionIds && Array.isArray(oldSessionIds)) { + this.logger?.info(`Migrating ${oldSessionIds.length} sessions for ${sessionType}`); + + for (const id of oldSessionIds) { + const oldSessionKey = `qm:sessions:${sessionType.toLowerCase()}:${id}`; + const oldSession = await this.cacheProvider.get(oldSessionKey); + + if (oldSession && oldSession.uuid) { + const session: QMSession = { + uuid: oldSession.uuid, + proxy: oldSession.proxy, + headers: oldSession.headers, + successfulCalls: oldSession.successfulCalls || 0, + failedCalls: oldSession.failedCalls || 0, + lastUsed: new Date(oldSession.lastUsed), + createdAt: new Date(oldSession.createdAt || new Date()), + }; + + await this.addSession(sessionId, session); + await this.cacheProvider.del(oldSessionKey); + } + } + + await this.cacheProvider.del(oldListKey); + } + } + + this.logger?.info('Migration completed'); + } +} \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/qm/shared/session-manager-wrapper.ts b/apps/stock/data-ingestion/src/handlers/qm/shared/session-manager-wrapper.ts new file mode 100644 index 0000000..9418af8 --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/qm/shared/session-manager-wrapper.ts @@ -0,0 +1,128 @@ +/** + * Session Manager Wrapper - Provides a unified interface that can switch between implementations + */ + +import type { CacheProvider } from '@stock-bot/cache'; +import type { Logger } from '@stock-bot/types'; +import { QMSessionManager } from './session-manager'; +import { QMSessionManagerRedis } from './session-manager-redis'; +import type { QMSession } from './types'; + +/** + * Wrapper that delegates to either in-memory or Redis-based implementation + * Set REDIS_SESSION_MANAGER=true to use Redis implementation + */ +export class SessionManagerWrapper { + private implementation: QMSessionManager | QMSessionManagerRedis; + private static instance: SessionManagerWrapper | null = null; + + private constructor() { + // Use Redis implementation if env var is set or if in production + const useRedis = process.env.REDIS_SESSION_MANAGER === 'true' || + process.env.NODE_ENV === 'production'; + + if (useRedis) { + this.implementation = QMSessionManagerRedis.getInstance(); + } else { + this.implementation = QMSessionManager.getInstance(); + } + } + + static getInstance(): SessionManagerWrapper { + if (!SessionManagerWrapper.instance) { + SessionManagerWrapper.instance = new SessionManagerWrapper(); + } + return SessionManagerWrapper.instance; + } + + /** + * Reset instance (for testing) + */ + static resetInstance(): void { + SessionManagerWrapper.instance = null; + } + + /** + * Initialize with cache provider and logger + */ + initialize(cache?: CacheProvider, logger?: Logger): void { + this.implementation.initialize(cache, logger); + } + + /** + * Get the implementation type + */ + getImplementationType(): string { + return this.implementation instanceof QMSessionManagerRedis ? 'redis' : 'memory'; + } + + // Delegate all methods to the implementation + async getSession(sessionId: string): Promise { + return this.implementation.getSession(sessionId); + } + + async getSessionByUuid(sessionId: string, uuid: string): Promise { + if (this.implementation instanceof QMSessionManagerRedis) { + return this.implementation.getSessionByUuid(sessionId, uuid); + } else { + return this.implementation.getSessionByUuid(sessionId, uuid); + } + } + + async addSession(sessionId: string, session: QMSession): Promise { + return this.implementation.addSession(sessionId, session); + } + + async getSessions(sessionId: string): Promise { + if (this.implementation instanceof QMSessionManagerRedis) { + return this.implementation.getSessions(sessionId); + } else { + return this.implementation.getSessions(sessionId); + } + } + + async getSessionCount(sessionId?: string): Promise { + if (this.implementation instanceof QMSessionManagerRedis) { + return this.implementation.getSessionCount(sessionId); + } else { + return this.implementation.getSessionCount(); + } + } + + async cleanupFailedSessions(): Promise { + return this.implementation.cleanupFailedSessions(); + } + + async needsMoreSessions(sessionId: string): Promise { + return this.implementation.needsMoreSessions(sessionId); + } + + async getStats(): Promise> { + return this.implementation.getStats(); + } + + async incrementFailedCalls(sessionId: string, sessionUuid: string): Promise { + return this.implementation.incrementFailedCalls(sessionId, sessionUuid); + } + + async incrementSuccessfulCalls(sessionId: string, sessionUuid: string): Promise { + return this.implementation.incrementSuccessfulCalls(sessionId, sessionUuid); + } + + // Additional methods for memory implementation + isAtCapacity(sessionId: string): boolean { + if (this.implementation instanceof QMSessionManager) { + return this.implementation.isAtCapacity(sessionId); + } + // For Redis, we'd need to implement this async + throw new Error('isAtCapacity not available for Redis implementation'); + } + + // Migration helper + async migrateToRedis(): Promise { + if (this.implementation instanceof QMSessionManagerRedis) { + return this.implementation.migrateFromOldFormat(); + } + throw new Error('Migration only available for Redis implementation'); + } +} \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/qm/shared/session-manager.ts b/apps/stock/data-ingestion/src/handlers/qm/shared/session-manager.ts index 8d0d25c..acfca30 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/shared/session-manager.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/shared/session-manager.ts @@ -9,37 +9,29 @@ import type { CachedSession, QMSession } from './types'; export class QMSessionManager { private static instance: QMSessionManager | null = null; - private sessionCache: Record = {}; - private isInitialized = false; - private cacheProvider: CacheProvider | null = null; - private logger: Logger | null = null; + private cacheProvider?: CacheProvider; + private logger?: Logger; - private constructor() { - // Initialize session cache with known session IDs - Object.values(QM_SESSION_IDS).forEach(sessionId => { - this.sessionCache[sessionId] = []; - }); - } + private constructor() {} static getInstance(): QMSessionManager { - if (!QMSessionManager.instance) { - QMSessionManager.instance = new QMSessionManager(); + if (!this.instance) { + this.instance = new QMSessionManager(); } - return QMSessionManager.instance; - } - - /** - * Reset the singleton instance (for testing only) - */ - static resetInstance(): void { - if (QMSessionManager.instance?.logger) { - QMSessionManager.instance.logger.warn('Resetting QMSessionManager instance - this should only be used for testing'); - } - QMSessionManager.instance = null; + return this.instance; } /** - * Set the cache provider for persistence + * Reset the singleton instance (mainly for testing) + */ + static resetInstance(): void { + if (this.instance) { + this.instance = null; + } + } + + /** + * Set the cache provider */ setCacheProvider(cache: CacheProvider): void { this.cacheProvider = cache; @@ -50,13 +42,12 @@ export class QMSessionManager { */ setLogger(logger: Logger): void { this.logger = logger; - this.logger.trace('Logger set for QMSessionManager'); } /** - * Initialize with cache provider and logger + * Initialize the session manager with cache and logger */ - initialize(cache?: CacheProvider, logger?: Logger): void { + async initialize(cache?: CacheProvider, logger?: Logger): Promise { if (cache) { this.setCacheProvider(cache); } @@ -66,164 +57,215 @@ export class QMSessionManager { } /** - * Get a random session for the given session ID + * Get an active session for the given session ID + * Implements round-robin selection with health checks */ async getSession(sessionId: string): Promise { - let retries = 3; - let session: QMSession | null = null; - - while (retries > 0 && !session) { - // Always load fresh data from cache - await this.loadFromCache(); - - const sessions = this.sessionCache[sessionId]; - - if (!sessions || sessions.length === 0) { - retries--; - if (retries > 0) { - this.logger?.debug(`No sessions found for ${sessionId}, retrying... (${retries} attempts left)`); - await new Promise(resolve => setTimeout(resolve, 500)); - continue; - } - - this.logger?.error(`No sessions found for sessionId: ${sessionId}`, { - availableSessionIds: Object.keys(this.sessionCache), - sessionCounts: Object.entries(this.sessionCache).map(([id, s]) => ({ id, count: s.length })), - }); - return null; - } + if (!this.cacheProvider) { + this.logger?.error('No cache provider available'); + return null; + } - // Filter out sessions with excessive failures + try { + // Get all sessions for this session type + const sessions = await this.getSessions(sessionId); + + // Filter out failed sessions const validSessions = sessions.filter( session => session.failedCalls <= SESSION_CONFIG.MAX_FAILED_CALLS ); - + + this.logger?.trace(`Found ${validSessions.length} valid sessions for ${sessionId}`, { + sessionId, + totalSessions: sessions.length, + validSessions: validSessions.length, + }); + if (validSessions.length === 0) { - retries--; - if (retries > 0) { - this.logger?.debug(`No valid sessions after filtering, retrying... (${retries} attempts left)`); - await new Promise(resolve => setTimeout(resolve, 500)); - continue; - } - - this.logger?.error(`No valid sessions after filtering for sessionId: ${sessionId}`, { + this.logger?.warn(`No valid sessions available for ${sessionId}`, { + sessionId, totalSessions: sessions.length, maxFailedCalls: SESSION_CONFIG.MAX_FAILED_CALLS, }); return null; } - session = validSessions[Math.floor(Math.random() * validSessions.length)] || null; - this.logger?.trace(`Selected session`, { - uuid: session?.uuid || 'null', - failedCalls: session?.failedCalls || 'null', - successfulCalls: session?.successfulCalls || 'null', - }); + // Sort by least recently used + validSessions.sort((a, b) => + new Date(a.lastUsed).getTime() - new Date(b.lastUsed).getTime() + ); + + // Return the least recently used session + return validSessions[0] || null; + } catch (error) { + this.logger?.error('Failed to get session', { error, sessionId }); + return null; } - - return session; } /** - * Get a specific session by UUID + * Get a session by its UUID */ - getSessionByUuid(sessionId: string, uuid: string): QMSession | null { - const sessions = this.sessionCache[sessionId] || []; + async getSessionByUuid(sessionId: string, uuid: string): Promise { + const sessions = await this.getSessions(sessionId); return sessions.find(s => s.uuid === uuid) || null; } /** - * Add a session to the cache + * Add a new session */ async addSession(sessionId: string, session: QMSession): Promise { - // Load latest from cache first to avoid overwriting other service's changes - await this.loadFromCache(); - - if (!this.sessionCache[sessionId]) { - this.sessionCache[sessionId] = []; + if (!this.cacheProvider) { + this.logger?.error('No cache provider available'); + return; + } + + try { + // Get session type from session ID + const sessionType = Object.entries(QM_SESSION_IDS).find(([_, id]) => id === sessionId)?.[0] || 'UNKNOWN'; + + // Store the session with a unique key + const sessionKey = `qm:sessions:${sessionType.toLowerCase()}:${session.uuid}`; + const cachedSession: CachedSession = { + ...session, + lastUsed: session.lastUsed instanceof Date ? session.lastUsed.toISOString() : session.lastUsed, + createdAt: session.createdAt instanceof Date ? session.createdAt.toISOString() : session.createdAt, + id: session.uuid, + sessionType, + } as any; + + await this.cacheProvider.set(sessionKey, cachedSession, 86400); // 24 hour TTL + + // Update the session list + const listKey = `qm:sessions:${sessionType.toLowerCase()}:list`; + const sessionIds = await this.cacheProvider.get(listKey) || []; + if (!sessionIds.includes(session.uuid)) { + sessionIds.push(session.uuid); + await this.cacheProvider.set(listKey, sessionIds, 86400); + } + + this.logger?.info(`Added new session`, { sessionId, uuid: session.uuid }); + } catch (error) { + this.logger?.error('Failed to add session', { error, sessionId }); } - this.sessionCache[sessionId].push(session); - - this.logger?.debug(`Added session ${session.uuid} to ${sessionId}`, { - sessionId, - uuid: session.uuid - }); - - // Sync to cache immediately - await this.syncToCache(); } /** * Get all sessions for a session ID */ - getSessions(sessionId: string): QMSession[] { - return this.sessionCache[sessionId] || []; - } + async getSessions(sessionId: string): Promise { + if (!this.cacheProvider) { + return []; + } - /** - * Get session count for all session IDs - */ - getSessionCount(): number { - return Object.values(this.sessionCache).reduce((total, sessions) => total + sessions.length, 0); - } + try { + const sessionType = Object.entries(QM_SESSION_IDS).find(([_, id]) => id === sessionId)?.[0] || 'UNKNOWN'; + const listKey = `qm:sessions:${sessionType.toLowerCase()}:list`; + const sessionIds = await this.cacheProvider.get(listKey) || []; - /** - * Clean up failed sessions - */ - async cleanupFailedSessions(): Promise { - // Always load latest from cache first - await this.loadFromCache(); - - let removedCount = 0; - - Object.keys(this.sessionCache).forEach(sessionId => { - const initialCount = this.sessionCache[sessionId]?.length; - if (!initialCount || this.sessionCache[sessionId] === undefined) { - this.logger?.trace(`No sessions to clean up for ${sessionId}`); - return; + const sessions: QMSession[] = []; + for (const uuid of sessionIds) { + const sessionKey = `qm:sessions:${sessionType.toLowerCase()}:${uuid}`; + const cachedSession = await this.cacheProvider.get(sessionKey); + + if (cachedSession) { + sessions.push({ + uuid: cachedSession.uuid, + proxy: cachedSession.proxy, + headers: cachedSession.headers, + successfulCalls: cachedSession.successfulCalls || 0, + failedCalls: cachedSession.failedCalls || 0, + lastUsed: new Date(cachedSession.lastUsed), + createdAt: cachedSession.createdAt ? new Date(cachedSession.createdAt) : new Date(), + }); + } } - this.sessionCache[sessionId] = this.sessionCache[sessionId]?.filter( - session => session.failedCalls <= SESSION_CONFIG.MAX_FAILED_CALLS - ); - removedCount += initialCount - this.sessionCache[sessionId].length; - }); + return sessions; + } catch (error) { + this.logger?.error('Failed to get sessions', { error, sessionId }); + return []; + } + } + + /** + * Get session count for a session ID + */ + async getSessionCount(sessionId: string): Promise { + const sessions = await this.getSessions(sessionId); + return sessions.filter(s => (s.failedCalls || 0) <= SESSION_CONFIG.MAX_FAILED_CALLS).length; + } + + /** + * Cleanup failed sessions + */ + async cleanupFailedSessions(): Promise { + if (!this.cacheProvider) { + return 0; + } + + let removedCount = 0; + + try { + for (const [sessionType, sessionId] of Object.entries(QM_SESSION_IDS)) { + const sessions = await this.getSessions(sessionId); + const listKey = `qm:sessions:${sessionType.toLowerCase()}:list`; + const updatedList: string[] = []; + + for (const session of sessions) { + if ((session.failedCalls || 0) > SESSION_CONFIG.MAX_FAILED_CALLS) { + // Delete the failed session + const sessionKey = `qm:sessions:${sessionType.toLowerCase()}:${session.uuid}`; + await this.cacheProvider.del(sessionKey); + removedCount++; + this.logger?.info(`Removed failed session`, { + sessionType, + uuid: session.uuid, + failedCalls: session.failedCalls + }); + } else { + updatedList.push(session.uuid); + } + } + + // Update the session list + if (updatedList.length > 0) { + await this.cacheProvider.set(listKey, updatedList, 86400); + } else { + await this.cacheProvider.del(listKey); + } + } + } catch (error) { + this.logger?.error('Failed to cleanup sessions', { error }); + } - // Sync back to cache after cleanup - await this.syncToCache(); - return removedCount; } /** - * Check if more sessions are needed for a session ID + * Check if we need more sessions for a session ID */ async needsMoreSessions(sessionId: string): Promise { - // Always load fresh data from cache - await this.loadFromCache(); - - const sessions = this.sessionCache[sessionId] || []; - const validSessions = sessions.filter( - session => session.failedCalls <= SESSION_CONFIG.MAX_FAILED_CALLS - ); - return validSessions.length < SESSION_CONFIG.MAX_SESSIONS; + const validCount = await this.getSessionCount(sessionId); + return validCount < SESSION_CONFIG.MAX_SESSIONS; } /** - * Check if session ID is at capacity + * Check if we're at capacity for a session ID */ - isAtCapacity(sessionId: string): boolean { - const sessions = this.sessionCache[sessionId] || []; + async isAtCapacity(sessionId: string): Promise { + const sessions = await this.getSessions(sessionId); return sessions.length >= SESSION_CONFIG.MAX_SESSIONS; } /** - * Get session cache statistics + * Get statistics about sessions */ - getStats() { - const stats: Record = {}; - - Object.entries(this.sessionCache).forEach(([sessionId, sessions]) => { + async getStats(): Promise> { + const stats: Record = {}; + + for (const [sessionType, sessionId] of Object.entries(QM_SESSION_IDS)) { + const sessions = await this.getSessions(sessionId); const validSessions = sessions.filter( session => session.failedCalls <= SESSION_CONFIG.MAX_FAILED_CALLS ); @@ -231,185 +273,47 @@ export class QMSessionManager { session => session.failedCalls > SESSION_CONFIG.MAX_FAILED_CALLS ); - stats[sessionId] = { + stats[sessionType] = { total: sessions.length, valid: validSessions.length, failed: failedSessions.length, + successfulCalls: sessions.reduce((sum, s) => sum + s.successfulCalls, 0), + failedCalls: sessions.reduce((sum, s) => sum + s.failedCalls, 0), }; - }); + } return stats; } - - /** - * Mark manager as initialized (deprecated - we always load from cache now) - */ - setInitialized(initialized: boolean = true): void { - this.isInitialized = initialized; - } - - /** - * Check if manager is initialized (deprecated - we always load from cache now) - */ - getInitialized(): boolean { - return this.isInitialized; - } - - /** - * Load sessions from cache - */ - async loadFromCache(): Promise { - if (!this.cacheProvider) { - this.logger?.warn('No cache provider available for loading sessions'); - return; - } - - try { - this.logger?.trace('Loading sessions from cache...'); - - // Load sessions for each session type - for (const [sessionType, sessionId] of Object.entries(QM_SESSION_IDS)) { - const listKey = `qm:sessions:${sessionType.toLowerCase()}:list`; - const sessionIds = await this.cacheProvider.get(listKey); - - this.logger?.trace(`Loading ${sessionType} sessions`, { - sessionType, - sessionId, - listKey, - sessionIds - }); - - if (sessionIds && Array.isArray(sessionIds)) { - const sessions: QMSession[] = []; - - for (const id of sessionIds) { - const sessionKey = `qm:sessions:${sessionType.toLowerCase()}:${id}`; - const cachedSession = await this.cacheProvider.get(sessionKey); - - if (cachedSession) { - const session = { - uuid: cachedSession.uuid, - proxy: cachedSession.proxy, - headers: cachedSession.headers, - successfulCalls: cachedSession.successfulCalls || 0, - failedCalls: cachedSession.failedCalls || 0, - lastUsed: new Date(cachedSession.lastUsed), - createdAt: cachedSession.createdAt ? new Date(cachedSession.createdAt) : new Date(), - }; - - this.logger?.trace(`Loaded session ${id}`, { - uuid: session.uuid, - successfulCalls: session.successfulCalls, - failedCalls: session.failedCalls - }); - - sessions.push(session); - } - } - - this.sessionCache[sessionId] = sessions; - this.logger?.debug(`Loaded ${sessions.length} sessions for ${sessionType}`); - } else { - this.logger?.trace(`No sessions found for ${sessionType}`); - } - } - } catch (error) { - this.logger?.error('Failed to load sessions from cache', { error }); - } - } - - /** - * Sync sessions to cache - */ - async syncToCache(): Promise { - if (!this.cacheProvider) { - return; - } - - try { - for (const [sessionType, sessionId] of Object.entries(QM_SESSION_IDS)) { - const sessions = this.sessionCache[sessionId] || []; - const sessionIds: string[] = []; - - // Clear old sessions first - const oldListKey = `qm:sessions:${sessionType.toLowerCase()}:list`; - const oldSessionIds = await this.cacheProvider.get(oldListKey); - if (oldSessionIds && Array.isArray(oldSessionIds)) { - // Delete old session entries - for (const oldId of oldSessionIds) { - const oldSessionKey = `qm:sessions:${sessionType.toLowerCase()}:${oldId}`; - await this.cacheProvider.del(oldSessionKey); - } - } - - // Store each session with stable IDs - for (let i = 0; i < sessions.length; i++) { - const session = sessions[i]; - const id = `${sessionType.toLowerCase()}_${i}`; - const sessionKey = `qm:sessions:${sessionType.toLowerCase()}:${id}`; - - if(!session){ - this.logger?.warn(`Skipping empty session at index ${i} for ${sessionType}`); - continue; - } - const cachedSession: CachedSession = { - ...session, - lastUsed: session.lastUsed instanceof Date ? session.lastUsed.toISOString() : session.lastUsed, - createdAt: session.createdAt instanceof Date ? session.createdAt.toISOString() : session.createdAt, - id, - sessionType, - } as any; - - this.logger?.trace(`Saving session ${id}`, { - uuid: cachedSession.uuid, - successfulCalls: cachedSession.successfulCalls, - failedCalls: cachedSession.failedCalls - }); - - await this.cacheProvider.set(sessionKey, cachedSession, 86400); // 24 hour TTL - sessionIds.push(id); - } - - // Store the list of session IDs - await this.cacheProvider.set(oldListKey, sessionIds, 86400); - } - - // Store stats - const statsKey = 'qm:sessions:stats'; - await this.cacheProvider.set(statsKey, this.getStats(), 3600); - - this.logger?.trace('Session sync to cache completed'); - } catch (error) { - this.logger?.error('Failed to sync sessions to cache', { error }); - } - } - /** * Increment failed calls for a session */ async incrementFailedCalls(sessionId: string, sessionUuid: string): Promise { - // Load latest from cache first - await this.loadFromCache(); - - // Find session by UUID - const sessions = this.sessionCache[sessionId] || []; - const session = sessions.find(s => s.uuid === sessionUuid); - - if (session) { - session.failedCalls++; - session.lastUsed = new Date(); + if (!this.cacheProvider) { + return; + } + + try { + const sessionType = Object.entries(QM_SESSION_IDS).find(([_, id]) => id === sessionId)?.[0] || 'UNKNOWN'; + const sessionKey = `qm:sessions:${sessionType.toLowerCase()}:${sessionUuid}`; + const cachedSession = await this.cacheProvider.get(sessionKey); - this.logger?.debug(`Incremented failed calls for session`, { - sessionUuid, - failedCalls: session.failedCalls, - sessionId - }); - - // Sync to cache after update - await this.syncToCache(); - } else { - this.logger?.warn(`Session not found`, { sessionUuid, sessionId }); + if (cachedSession) { + cachedSession.failedCalls = (cachedSession.failedCalls || 0) + 1; + cachedSession.lastUsed = new Date().toISOString(); + + await this.cacheProvider.set(sessionKey, cachedSession, 86400); + + this.logger?.debug(`Incremented failed calls for session`, { + sessionUuid, + failedCalls: cachedSession.failedCalls, + sessionId + }); + } else { + this.logger?.warn(`Session not found`, { sessionUuid, sessionId }); + } + } catch (error) { + this.logger?.error('Failed to increment failed calls', { error, sessionUuid, sessionId }); } } @@ -417,27 +321,31 @@ export class QMSessionManager { * Increment successful calls for a session */ async incrementSuccessfulCalls(sessionId: string, sessionUuid: string): Promise { - // Load latest from cache first - await this.loadFromCache(); - - // Find session by UUID - const sessions = this.sessionCache[sessionId] || []; - const session = sessions.find(s => s.uuid === sessionUuid); - - if (session) { - session.successfulCalls++; - session.lastUsed = new Date(); + if (!this.cacheProvider) { + return; + } + + try { + const sessionType = Object.entries(QM_SESSION_IDS).find(([_, id]) => id === sessionId)?.[0] || 'UNKNOWN'; + const sessionKey = `qm:sessions:${sessionType.toLowerCase()}:${sessionUuid}`; + const cachedSession = await this.cacheProvider.get(sessionKey); - this.logger?.debug(`Incremented successful calls for session`, { - sessionUuid, - successfulCalls: session.successfulCalls, - sessionId - }); - - // Sync to cache after update - await this.syncToCache(); - } else { - this.logger?.warn(`Session not found`, { sessionUuid, sessionId }); + if (cachedSession) { + cachedSession.successfulCalls = (cachedSession.successfulCalls || 0) + 1; + cachedSession.lastUsed = new Date().toISOString(); + + await this.cacheProvider.set(sessionKey, cachedSession, 86400); + + this.logger?.debug(`Incremented successful calls for session`, { + sessionUuid, + successfulCalls: cachedSession.successfulCalls, + sessionId + }); + } else { + this.logger?.warn(`Session not found`, { sessionUuid, sessionId }); + } + } catch (error) { + this.logger?.error('Failed to increment successful calls', { error, sessionUuid, sessionId }); } } }