From b767689470f08330a7efc8de82fa13989760cce9 Mon Sep 17 00:00:00 2001 From: Boki Date: Thu, 26 Jun 2025 22:38:53 -0400 Subject: [PATCH] created qm session check --- .../src/handlers/qm/actions/session.action.ts | 164 +++++--------- .../src/handlers/qm/shared/config.ts | 17 +- .../src/handlers/qm/shared/session-manager.ts | 205 ++++++++++++++---- .../src/handlers/qm/shared/types.ts | 4 +- 4 files changed, 235 insertions(+), 155 deletions(-) diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/session.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/session.action.ts index b195a8f..f24cd65 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/session.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/session.action.ts @@ -3,9 +3,10 @@ */ import type { BaseHandler, ExecutionContext } from '@stock-bot/handlers'; -import { BunRequestInit, getRandomUserAgent } from '@stock-bot/utils'; -import { QM_CONFIG, QM_SESSION_IDS, SESSION_CONFIG } from '../shared/config'; +import { BunRequestInit } from '@stock-bot/utils'; +import { getQmHeaders, QM_CONFIG, QM_SESSION_IDS, SESSION_CONFIG } from '../shared/config'; import { QMSessionManager } from '../shared/session-manager'; +import { QMSession } from '../shared/types'; /** * Check existing sessions and queue creation jobs for needed sessions @@ -24,29 +25,22 @@ export async function checkSessions( const sessionManager = QMSessionManager.getInstance(); - // Set cache provider if not already set - if (this.cache) { - sessionManager.setCacheProvider(this.cache); - } + // Initialize with cache provider and logger + sessionManager.initialize(this.cache, this.logger); - // Load sessions from cache if not initialized - if (!sessionManager.getInitialized()) { - await sessionManager.loadFromCache(); - sessionManager.setInitialized(true); - } + // Always load fresh data from cache (don't rely on initialization flag) + await sessionManager.loadFromCache(); - const cleanedCount = sessionManager.cleanupFailedSessions(); - - // Sync after cleanup - await sessionManager.syncToCache(); + // Cleanup failed sessions (this now handles its own cache sync) + const cleanedCount = await sessionManager.cleanupFailedSessions(); // Check which session IDs need more sessions and queue creation jobs let queuedCount = 0; for (const [sessionType, sessionId] of Object.entries(QM_SESSION_IDS)) { this.logger.debug(`Checking session ID: ${sessionId}`); - if (sessionManager.needsMoreSessions(sessionId)) { + if (await sessionManager.needsMoreSessions(sessionId)) { const currentCount = sessionManager.getSessions(sessionId).length; - const neededSessions = SESSION_CONFIG.MIN_SESSIONS - currentCount; + const neededSessions = SESSION_CONFIG.MAX_SESSIONS - currentCount; // Queue up to 10 at a time to avoid overwhelming the system const toQueue = Math.min(neededSessions, 10); @@ -60,7 +54,7 @@ export async function checkSessions( this.logger.info(`Queued ${toQueue} jobs to create sessions for ${sessionType}`, { currentCount, - targetCount: SESSION_CONFIG.MIN_SESSIONS, + targetCount: SESSION_CONFIG.MAX_SESSIONS, }); } } @@ -88,7 +82,7 @@ interface CreateSessionInput { export async function createSession( this: BaseHandler, input: CreateSessionInput -): Promise<{ sessionId: string; status: string; sessionType: string }> { +): Promise<{ sessionId: string; status: string; sessionType: string, session?: QMSession }> { const { sessionId, sessionType = 'LOOKUP' } = input || {}; const sessionManager = QMSessionManager.getInstance(); @@ -99,10 +93,8 @@ export async function createSession( throw new Error(`Invalid session type: ${sessionType}`); } - // Set cache provider if not already set - if (this.cache) { - sessionManager.setCacheProvider(this.cache); - } + // Initialize with cache provider and logger + sessionManager.initialize(this.cache, this.logger); try { // Get proxy from proxy service @@ -112,95 +104,59 @@ export async function createSession( throw new Error(`No proxy available for session type ${sessionType}`); } - const userAgent = getRandomUserAgent(); - this.logger.debug(`Using User-Agent: ${userAgent}, proxy: ${proxyUrl || 'none'}`); - // Authenticate with QM API inline - const authUrl = `${QM_CONFIG.BASE_URL}${QM_CONFIG.SESSION_PATH}`; + const sessionUrl = `${QM_CONFIG.BASE_URL}${QM_CONFIG.SESSION_PATH}/${sessionId}`; // Build request options - const requestOptions: BunRequestInit = { - method: 'GET', + const sessionRequest: BunRequestInit = { proxy: proxyUrl || undefined, - headers: { - 'User-Agent': userAgent, - Accept: '*/*', - 'Accept-Language': 'en', - 'Sec-Fetch-Mode': 'cors', - Origin: 'https://www.quotemedia.com', - Referer: 'https://www.quotemedia.com/', - }, - redirect: 'manual', // Don't follow redirects automatically + headers: getQmHeaders(), }; - this.logger.debug('Authenticating with QM API', { authUrl }); + this.logger.debug('Authenticating with QM API', { sessionUrl, sessionRequest }); - const response = await fetch(authUrl, requestOptions); + const sessionResponse = await fetch(sessionUrl, sessionRequest); - // Extract cookies from response headers - // const cookies: string[] = []; - // const setCookieHeaders = response.headers.getSetCookie(); + // Check if authentication was successful + if (sessionResponse.status === 200 || sessionResponse.status === 302) { + this.logger.info('QM authentication successful', { + status: sessionResponse.status, + }); + }else{ + this.logger.warn('QM authentication failed', { + status: sessionResponse.status, + statusText: sessionResponse.statusText, + }); + throw new Error(`QM authentication failed with status ${sessionResponse.status}`); + } + + + const sessionData = await sessionResponse.json(); - // if (setCookieHeaders && setCookieHeaders.length > 0) { - // cookies.push(...setCookieHeaders); - // } + // Add token to headers + sessionRequest.headers['Datatool-Token'] = sessionData.token; - // // Check if authentication was successful - // if (response.status === 200 || response.status === 302) { - // this.logger.info('QM authentication successful', { - // status: response.status, - // cookieCount: cookies.length, - // }); + // Create session object with unique ID + const session: QMSession = { + uuid: `${sessionType.toLowerCase()}_${Date.now()}_${Math.random().toString(36).substring(2, 9)}`, + proxy: proxyUrl, + headers: sessionRequest.headers, + successfulCalls: 0, + failedCalls: 0, + lastUsed: new Date(), + createdAt: new Date(), + }; - // // Build headers with cookies - // const headers = sessionManager.getQmHeaders(); - // if (cookies.length > 0) { - // headers['Cookie'] = buildCookieString(cookies); - // } + // Add session to manager (this now handles cache sync) + await sessionManager.addSession(actualSessionId, session); - // // Create session object - // const session: QMSession = { - // proxy: proxyUrl || '', - // headers, - // successfulCalls: 0, - // failedCalls: 0, - // lastUsed: new Date(), - // }; - - // // Add session to manager - // sessionManager.addSession(actualSessionId, session); - - // // Sync to cache - // await sessionManager.syncToCache(); - - // this.logger.info(`Successfully created session for ${sessionType}`, { - // sessionId: actualSessionId, - // hasProxy: !!proxyUrl, - // hasCookies: cookies.length > 0, - // }); - - // return { - // sessionId: actualSessionId, - // status: 'created', - // sessionType, - // }; - // } else { - // this.logger.warn('QM authentication failed', { - // status: response.status, - // statusText: response.statusText, - // }); - - // return { - // sessionId: actualSessionId, - // status: 'failed', - // sessionType, - // }; - // } + this.logger.info(`Successfully created session for ${sessionType}`, { session }); return { - sessionId: 'test',//actualSessionId, + sessionId: actualSessionId, status: 'created', sessionType, + session, }; } catch (error) { this.logger.error(`Failed to create session for ${sessionType}`, { error }); @@ -210,18 +166,4 @@ export async function createSession( sessionType, }; } -} - -/** - * Build cookie string from array of set-cookie headers - */ -function buildCookieString(cookies: string[]): string { - return cookies - .map(cookie => { - // Extract just the name=value part, ignore attributes - const match = cookie.match(/^([^=]+=[^;]+)/); - return match ? match[1] : ''; - }) - .filter(Boolean) - .join('; '); -} +} \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts b/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts index c15e6a6..29d8d57 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts @@ -2,6 +2,8 @@ * Shared configuration for QM operations */ +import { getRandomUserAgent } from "@stock-bot/utils"; + // QM Session IDs for different endpoints export const QM_SESSION_IDS = { LOOKUP: 'dc8c9930437f65d30f6597768800957017bac203a0a50342932757c8dfa158d6', // lookup endpoint @@ -30,9 +32,20 @@ export const QM_CONFIG = { // Session management settings export const SESSION_CONFIG = { - MIN_SESSIONS: 15, - MAX_SESSIONS: 50, + MIN_SESSIONS: 2, + MAX_SESSIONS: 5, MAX_FAILED_CALLS: 3, SESSION_TIMEOUT: 5000, // 10 seconds API_TIMEOUT: 30000, // 15 seconds } as const; + +export function getQmHeaders(): Record { + return { + 'User-Agent': getRandomUserAgent(), + Accept: '*/*', + 'Accept-Language': 'en', + 'Sec-Fetch-Mode': 'cors', + Origin: 'https://www.quotemedia.com', + Referer: 'https://www.quotemedia.com/', + }; +} diff --git a/apps/stock/data-ingestion/src/handlers/qm/shared/session-manager.ts b/apps/stock/data-ingestion/src/handlers/qm/shared/session-manager.ts index 520f435..9e11962 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/shared/session-manager.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/shared/session-manager.ts @@ -3,7 +3,7 @@ */ import type { CacheProvider } from '@stock-bot/cache'; -import { getRandomUserAgent } from '@stock-bot/utils'; +import type { Logger } from '@stock-bot/types'; import { QM_SESSION_IDS, SESSION_CONFIG } from './config'; import type { CachedSession, QMSession } from './types'; @@ -12,6 +12,7 @@ export class QMSessionManager { private sessionCache: Record = {}; private isInitialized = false; private cacheProvider: CacheProvider | null = null; + private logger: Logger | null = null; private constructor() { // Initialize session cache with known session IDs @@ -26,6 +27,16 @@ export class QMSessionManager { } return QMSessionManager.instance; } + + /** + * Reset the singleton instance (for testing only) + */ + static resetInstance(): void { + if (QMSessionManager.instance?.logger) { + QMSessionManager.instance.logger.warn('Resetting QMSessionManager instance - this should only be used for testing'); + } + QMSessionManager.instance = null; + } /** * Set the cache provider for persistence @@ -34,10 +45,33 @@ export class QMSessionManager { this.cacheProvider = cache; } + /** + * Set the logger + */ + setLogger(logger: Logger): void { + this.logger = logger; + this.logger.trace('Logger set for QMSessionManager'); + } + + /** + * Initialize with cache provider and logger + */ + initialize(cache?: CacheProvider, logger?: Logger): void { + if (cache) { + this.setCacheProvider(cache); + } + if (logger) { + this.setLogger(logger); + } + } + /** * Get a random session for the given session ID */ - getSession(sessionId: string): QMSession | null { + async getSession(sessionId: string): Promise { + // Always load fresh data from cache + await this.loadFromCache(); + const sessions = this.sessionCache[sessionId]; if (!sessions || sessions.length === 0) { return null; @@ -54,14 +88,33 @@ export class QMSessionManager { return validSessions[Math.floor(Math.random() * validSessions.length)]; } + /** + * Get a specific session by UUID + */ + getSessionByUuid(sessionId: string, uuid: string): QMSession | null { + const sessions = this.sessionCache[sessionId] || []; + return sessions.find(s => s.uuid === uuid) || null; + } + /** * Add a session to the cache */ - addSession(sessionId: string, session: QMSession): void { + async addSession(sessionId: string, session: QMSession): Promise { + // Load latest from cache first to avoid overwriting other service's changes + await this.loadFromCache(); + if (!this.sessionCache[sessionId]) { this.sessionCache[sessionId] = []; } this.sessionCache[sessionId].push(session); + + this.logger?.debug(`Added session ${session.uuid} to ${sessionId}`, { + sessionId, + uuid: session.uuid + }); + + // Sync to cache immediately + await this.syncToCache(); } /** @@ -81,7 +134,10 @@ export class QMSessionManager { /** * Clean up failed sessions */ - cleanupFailedSessions(): number { + async cleanupFailedSessions(): Promise { + // Always load latest from cache first + await this.loadFromCache(); + let removedCount = 0; Object.keys(this.sessionCache).forEach(sessionId => { @@ -92,29 +148,24 @@ export class QMSessionManager { removedCount += initialCount - this.sessionCache[sessionId].length; }); + // Sync back to cache after cleanup + await this.syncToCache(); + return removedCount; } - getQmHeaders(): Record { - return { - 'User-Agent': getRandomUserAgent(), - Accept: '*/*', - 'Accept-Language': 'en', - 'Sec-Fetch-Mode': 'cors', - Origin: 'https://www.quotemedia.com', - Referer: 'https://www.quotemedia.com/', - }; - } - /** * Check if more sessions are needed for a session ID */ - needsMoreSessions(sessionId: string): boolean { + async needsMoreSessions(sessionId: string): Promise { + // Always load fresh data from cache + await this.loadFromCache(); + const sessions = this.sessionCache[sessionId] || []; const validSessions = sessions.filter( session => session.failedCalls <= SESSION_CONFIG.MAX_FAILED_CALLS ); - return validSessions.length < SESSION_CONFIG.MIN_SESSIONS; + return validSessions.length < SESSION_CONFIG.MAX_SESSIONS; } /** @@ -150,14 +201,14 @@ export class QMSessionManager { } /** - * Mark manager as initialized + * Mark manager as initialized (deprecated - we always load from cache now) */ setInitialized(initialized: boolean = true): void { this.isInitialized = initialized; } /** - * Check if manager is initialized + * Check if manager is initialized (deprecated - we always load from cache now) */ getInitialized(): boolean { return this.isInitialized; @@ -168,15 +219,20 @@ export class QMSessionManager { */ async loadFromCache(): Promise { if (!this.cacheProvider) { + this.logger?.warn('No cache provider available for loading sessions'); return; } try { + this.logger?.trace('Loading sessions from cache...'); + // Load sessions for each session type for (const [sessionType, sessionId] of Object.entries(QM_SESSION_IDS)) { const listKey = `qm:sessions:${sessionType.toLowerCase()}:list`; const sessionIds = await this.cacheProvider.get(listKey); + this.logger?.trace(`Loading ${sessionType} sessions`, { sessionIds }); + if (sessionIds && Array.isArray(sessionIds)) { const sessions: QMSession[] = []; @@ -185,21 +241,34 @@ export class QMSessionManager { const cachedSession = await this.cacheProvider.get(sessionKey); if (cachedSession) { - sessions.push({ + const session = { + uuid: cachedSession.uuid, proxy: cachedSession.proxy, headers: cachedSession.headers, - successfulCalls: cachedSession.successfulCalls, - failedCalls: cachedSession.failedCalls, + successfulCalls: cachedSession.successfulCalls || 0, + failedCalls: cachedSession.failedCalls || 0, lastUsed: new Date(cachedSession.lastUsed), + createdAt: cachedSession.createdAt ? new Date(cachedSession.createdAt) : new Date(), + }; + + this.logger?.trace(`Loaded session ${id}`, { + uuid: session.uuid, + successfulCalls: session.successfulCalls, + failedCalls: session.failedCalls }); + + sessions.push(session); } } this.sessionCache[sessionId] = sessions; + this.logger?.debug(`Loaded ${sessions.length} sessions for ${sessionType}`); + } else { + this.logger?.trace(`No sessions found for ${sessionType}`); } } } catch (error) { - console.error('Failed to load sessions from cache:', error); + this.logger?.error('Failed to load sessions from cache', { error }); } } @@ -216,54 +285,108 @@ export class QMSessionManager { const sessions = this.sessionCache[sessionId] || []; const sessionIds: string[] = []; - // Store each session + // Clear old sessions first + const oldListKey = `qm:sessions:${sessionType.toLowerCase()}:list`; + const oldSessionIds = await this.cacheProvider.get(oldListKey); + if (oldSessionIds && Array.isArray(oldSessionIds)) { + // Delete old session entries + for (const oldId of oldSessionIds) { + const oldSessionKey = `qm:sessions:${sessionType.toLowerCase()}:${oldId}`; + await this.cacheProvider.del(oldSessionKey); + } + } + + // Store each session with stable IDs for (let i = 0; i < sessions.length; i++) { const session = sessions[i]; - const id = `${sessionType.toLowerCase()}_${i}_${Date.now()}`; + const id = `${sessionType.toLowerCase()}_${i}`; const sessionKey = `qm:sessions:${sessionType.toLowerCase()}:${id}`; const cachedSession: CachedSession = { ...session, + lastUsed: session.lastUsed instanceof Date ? session.lastUsed.toISOString() : session.lastUsed, + createdAt: session.createdAt instanceof Date ? session.createdAt.toISOString() : session.createdAt, id, sessionType, - }; + } as any; + + this.logger?.trace(`Saving session ${id}`, { + uuid: cachedSession.uuid, + successfulCalls: cachedSession.successfulCalls, + failedCalls: cachedSession.failedCalls + }); await this.cacheProvider.set(sessionKey, cachedSession, 86400); // 24 hour TTL sessionIds.push(id); } // Store the list of session IDs - const listKey = `qm:sessions:${sessionType.toLowerCase()}:list`; - await this.cacheProvider.set(listKey, sessionIds, 86400); + await this.cacheProvider.set(oldListKey, sessionIds, 86400); } // Store stats const statsKey = 'qm:sessions:stats'; await this.cacheProvider.set(statsKey, this.getStats(), 3600); + + this.logger?.trace('Session sync to cache completed'); } catch (error) { - console.error('Failed to sync sessions to cache:', error); + this.logger?.error('Failed to sync sessions to cache', { error }); } } /** * Increment failed calls for a session */ - async incrementFailedCalls(sessionId: string, session: QMSession): Promise { - session.failedCalls++; - session.lastUsed = new Date(); - - // Sync to cache after update - await this.syncToCache(); + async incrementFailedCalls(sessionId: string, sessionUuid: string): Promise { + // Load latest from cache first + await this.loadFromCache(); + + // Find session by UUID + const sessions = this.sessionCache[sessionId] || []; + const session = sessions.find(s => s.uuid === sessionUuid); + + if (session) { + session.failedCalls++; + session.lastUsed = new Date(); + + this.logger?.debug(`Incremented failed calls for session`, { + sessionUuid, + failedCalls: session.failedCalls, + sessionId + }); + + // Sync to cache after update + await this.syncToCache(); + } else { + this.logger?.warn(`Session not found`, { sessionUuid, sessionId }); + } } /** * Increment successful calls for a session */ - async incrementSuccessfulCalls(sessionId: string, session: QMSession): Promise { - session.successfulCalls++; - session.lastUsed = new Date(); - - // Sync to cache after update - await this.syncToCache(); + async incrementSuccessfulCalls(sessionId: string, sessionUuid: string): Promise { + // Load latest from cache first + await this.loadFromCache(); + + // Find session by UUID + const sessions = this.sessionCache[sessionId] || []; + const session = sessions.find(s => s.uuid === sessionUuid); + + if (session) { + session.successfulCalls++; + session.lastUsed = new Date(); + + this.logger?.debug(`Incremented successful calls for session`, { + sessionUuid, + successfulCalls: session.successfulCalls, + sessionId + }); + + // Sync to cache after update + await this.syncToCache(); + } else { + this.logger?.warn(`Session not found`, { sessionUuid, sessionId }); + } } } diff --git a/apps/stock/data-ingestion/src/handlers/qm/shared/types.ts b/apps/stock/data-ingestion/src/handlers/qm/shared/types.ts index b0b2514..fcbe3d1 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/shared/types.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/shared/types.ts @@ -3,11 +3,13 @@ */ export interface QMSession { + uuid?: string; // Unique identifier for the session proxy: string; - headers: Record; + headers: HeadersInit; successfulCalls: number; failedCalls: number; lastUsed: Date; + createdAt: Date; } export interface SymbolSpiderJob {