/** * QM Session Manager - Redis-based implementation for cluster support * Uses Redis as the single source of truth for session management */ import type { CacheProvider } from '@stock-bot/cache'; import type { Logger } from '@stock-bot/types'; import { QM_SESSION_IDS, SESSION_CONFIG } from './config'; import type { QMSession } from './types'; export class QMSessionManagerRedis { private static instance: QMSessionManagerRedis | null = null; private cacheProvider: CacheProvider | null = null; private logger: Logger | null = null; private readonly SESSION_TTL = 86400; // 24 hours private constructor() {} static getInstance(): QMSessionManagerRedis { if (!QMSessionManagerRedis.instance) { QMSessionManagerRedis.instance = new QMSessionManagerRedis(); } return QMSessionManagerRedis.instance; } /** * Initialize with cache provider and logger */ initialize(cache?: CacheProvider, logger?: Logger): void { if (cache) { this.cacheProvider = cache; } if (logger) { this.logger = logger; this.logger.trace('Redis-based SessionManager initialized'); } } /** * Get a random valid session for the given session ID */ async getSession(sessionId: string): Promise { if (!this.cacheProvider) { this.logger?.error('No cache provider available'); return null; } try { // Get all session UUIDs for this sessionId const sessionUuids = await this.getSessionUuids(sessionId); if (sessionUuids.length === 0) { this.logger?.debug(`No sessions found for ${sessionId}`); return null; } // Shuffle and try to find a valid session const shuffled = [...sessionUuids].sort(() => Math.random() - 0.5); for (const uuid of shuffled) { const session = await this.getSessionByUuid(sessionId, uuid); if (session && (session.failedCalls || 0) <= SESSION_CONFIG.MAX_FAILED_CALLS) { this.logger?.trace(`Selected session`, { uuid: session.uuid, failedCalls: session.failedCalls, successfulCalls: session.successfulCalls, }); return session; } } this.logger?.warn(`No valid sessions found for ${sessionId}`); return null; } catch (error) { this.logger?.error('Failed to get session', { error, sessionId }); return null; } } /** * Get a specific session by UUID */ async getSessionByUuid(sessionId: string, uuid: string): Promise { if (!this.cacheProvider) { return null; } const sessionKey = this.getSessionKey(sessionId, uuid); try { const session = await this.cacheProvider.get(sessionKey); if (session) { // Ensure dates are Date objects session.lastUsed = new Date(session.lastUsed); session.createdAt = new Date(session.createdAt); } return session; } catch (error) { this.logger?.error('Failed to get session by UUID', { error, sessionId, uuid }); return null; } } /** * Add a session */ async addSession(sessionId: string, session: QMSession): Promise { if (!this.cacheProvider) { throw new Error('No cache provider available'); } const sessionKey = this.getSessionKey(sessionId, session.uuid); const listKey = this.getSessionListKey(sessionId); try { // Store the session data await this.cacheProvider.set(sessionKey, session, this.SESSION_TTL); // Add UUID to the session list (using SET to simulate SADD) const uuids = await this.getSessionUuids(sessionId); if (!uuids.includes(session.uuid)) { uuids.push(session.uuid); await this.cacheProvider.set(listKey, uuids, this.SESSION_TTL); } this.logger?.debug(`Added session ${session.uuid} to ${sessionId}`); } catch (error) { this.logger?.error('Failed to add session', { error, sessionId, uuid: session.uuid }); throw error; } } /** * Remove a session */ async removeSession(sessionId: string, uuid: string): Promise { if (!this.cacheProvider) { return; } const sessionKey = this.getSessionKey(sessionId, uuid); const listKey = this.getSessionListKey(sessionId); try { // Remove session data await this.cacheProvider.del(sessionKey); // Remove UUID from list const uuids = await this.getSessionUuids(sessionId); const filtered = uuids.filter(u => u !== uuid); if (filtered.length !== uuids.length) { await this.cacheProvider.set(listKey, filtered, this.SESSION_TTL); } this.logger?.debug(`Removed session ${uuid} from ${sessionId}`); } catch (error) { this.logger?.error('Failed to remove session', { error, sessionId, uuid }); } } /** * Get all sessions for a session ID */ async getSessions(sessionId: string): Promise { if (!this.cacheProvider) { return []; } const uuids = await this.getSessionUuids(sessionId); const sessions: QMSession[] = []; for (const uuid of uuids) { const session = await this.getSessionByUuid(sessionId, uuid); if (session) { sessions.push(session); } } return sessions; } /** * Get session count */ async getSessionCount(sessionId?: string): Promise { if (!this.cacheProvider) { return 0; } if (sessionId) { const uuids = await this.getSessionUuids(sessionId); return uuids.length; } // Count all sessions let total = 0; for (const sid of Object.values(QM_SESSION_IDS)) { const uuids = await this.getSessionUuids(sid); total += uuids.length; } return total; } /** * Clean up failed sessions */ async cleanupFailedSessions(): Promise { if (!this.cacheProvider) { return 0; } let removedCount = 0; for (const sessionId of Object.values(QM_SESSION_IDS)) { const sessions = await this.getSessions(sessionId); for (const session of sessions) { if (session.failedCalls > SESSION_CONFIG.MAX_FAILED_CALLS) { await this.removeSession(sessionId, session.uuid); removedCount++; } } } this.logger?.info(`Cleaned up ${removedCount} failed sessions`); return removedCount; } /** * Check if more sessions are needed */ async needsMoreSessions(sessionId: string): Promise { const sessions = await this.getSessions(sessionId); const validSessions = sessions.filter( s => (s.failedCalls || 0) <= SESSION_CONFIG.MAX_FAILED_CALLS ); return validSessions.length < SESSION_CONFIG.MAX_SESSIONS; } /** * Get statistics */ async getStats(): Promise> { const stats: Record = {}; for (const [sessionType, sessionId] of Object.entries(QM_SESSION_IDS)) { const sessions = await this.getSessions(sessionId); const validSessions = sessions.filter( s => (s.failedCalls || 0) <= SESSION_CONFIG.MAX_FAILED_CALLS ); const failedSessions = sessions.filter( s => s.failedCalls > SESSION_CONFIG.MAX_FAILED_CALLS ); stats[sessionId] = { total: sessions.length, valid: validSessions.length, failed: failedSessions.length, }; } return stats; } /** * Increment failed calls for a session */ async incrementFailedCalls(sessionId: string, sessionUuid: string): Promise { const session = await this.getSessionByUuid(sessionId, sessionUuid); if (session) { session.failedCalls = (session.failedCalls || 0) + 1; session.lastUsed = new Date(); const sessionKey = this.getSessionKey(sessionId, sessionUuid); await this.cacheProvider?.set(sessionKey, session, this.SESSION_TTL); this.logger?.debug(`Incremented failed calls for session`, { sessionUuid, failedCalls: session.failedCalls, sessionId }); } else { this.logger?.warn(`Session not found`, { sessionUuid, sessionId }); } } /** * Increment successful calls for a session */ async incrementSuccessfulCalls(sessionId: string, sessionUuid: string): Promise { const session = await this.getSessionByUuid(sessionId, sessionUuid); if (session) { session.successfulCalls = (session.successfulCalls || 0) + 1; session.lastUsed = new Date(); const sessionKey = this.getSessionKey(sessionId, sessionUuid); await this.cacheProvider?.set(sessionKey, session, this.SESSION_TTL); this.logger?.debug(`Incremented successful calls for session`, { sessionUuid, successfulCalls: session.successfulCalls, sessionId }); } else { this.logger?.warn(`Session not found`, { sessionUuid, sessionId }); } } /** * Helper: Get session UUIDs from cache */ private async getSessionUuids(sessionId: string): Promise { if (!this.cacheProvider) { return []; } const listKey = this.getSessionListKey(sessionId); try { const uuids = await this.cacheProvider.get(listKey); return uuids || []; } catch (error) { this.logger?.error('Failed to get session UUIDs', { error, sessionId }); return []; } } /** * Helper: Generate cache keys */ private getSessionKey(sessionId: string, uuid: string): string { return `qm:sessions:${sessionId}:session:${uuid}`; } private getSessionListKey(sessionId: string): string { return `qm:sessions:${sessionId}:list`; } /** * Migrate from old session manager (for backward compatibility) */ async migrateFromOldFormat(): Promise { if (!this.cacheProvider) { return; } this.logger?.info('Starting migration from old session format'); for (const [sessionType, sessionId] of Object.entries(QM_SESSION_IDS)) { const oldListKey = `qm:sessions:${sessionType.toLowerCase()}:list`; const oldSessionIds = await this.cacheProvider.get(oldListKey); if (oldSessionIds && Array.isArray(oldSessionIds)) { this.logger?.info(`Migrating ${oldSessionIds.length} sessions for ${sessionType}`); for (const id of oldSessionIds) { const oldSessionKey = `qm:sessions:${sessionType.toLowerCase()}:${id}`; const oldSession = await this.cacheProvider.get(oldSessionKey); if (oldSession && oldSession.uuid) { const session: QMSession = { uuid: oldSession.uuid, proxy: oldSession.proxy, headers: oldSession.headers, successfulCalls: oldSession.successfulCalls || 0, failedCalls: oldSession.failedCalls || 0, lastUsed: new Date(oldSession.lastUsed), createdAt: new Date(oldSession.createdAt || new Date()), }; await this.addSession(sessionId, session); await this.cacheProvider.del(oldSessionKey); } } await this.cacheProvider.del(oldListKey); } } this.logger?.info('Migration completed'); } }