From ab0b7a53856dcb424d73a33737dc1bc2a3b9a941 Mon Sep 17 00:00:00 2001 From: Boki Date: Sat, 21 Jun 2025 10:33:03 -0400 Subject: [PATCH] refactoring handlers --- .../src/handlers/proxy/proxy.handler.ts | 16 +- .../qm/operations/exchanges.operations.ts | 42 +++ .../qm/operations/session.operations.ts | 185 ++++++++++++ .../qm/operations/spider.operations.ts | 266 ++++++++++++++++++ .../qm/operations/symbols.operations.ts | 196 +++++++++++++ .../src/handlers/qm/qm.handler.ts | 34 +-- ...{qm.operations.ts => qm.operations.ts.old} | 0 .../src/handlers/qm/shared/config.ts | 43 +++ .../src/handlers/qm/shared/session-manager.ts | 136 +++++++++ .../src/handlers/qm/shared/types.ts | 32 +++ libs/utils/src/index.ts | 1 + libs/utils/src/operation-context.ts | 172 +++++++++++ 12 files changed, 1098 insertions(+), 25 deletions(-) create mode 100644 apps/data-service/src/handlers/qm/operations/exchanges.operations.ts create mode 100644 apps/data-service/src/handlers/qm/operations/session.operations.ts create mode 100644 apps/data-service/src/handlers/qm/operations/spider.operations.ts create mode 100644 apps/data-service/src/handlers/qm/operations/symbols.operations.ts rename apps/data-service/src/handlers/qm/{qm.operations.ts => qm.operations.ts.old} (100%) create mode 100644 apps/data-service/src/handlers/qm/shared/config.ts create mode 100644 apps/data-service/src/handlers/qm/shared/session-manager.ts create mode 100644 apps/data-service/src/handlers/qm/shared/types.ts create mode 100644 libs/utils/src/operation-context.ts diff --git a/apps/data-service/src/handlers/proxy/proxy.handler.ts b/apps/data-service/src/handlers/proxy/proxy.handler.ts index f0a5fc5..eb59569 100644 --- a/apps/data-service/src/handlers/proxy/proxy.handler.ts +++ b/apps/data-service/src/handlers/proxy/proxy.handler.ts @@ -5,11 +5,11 @@ import { ProxyInfo } from '@stock-bot/http'; import { getLogger } from '@stock-bot/logger'; import { handlerRegistry, createJobHandler, type HandlerConfigWithSchedule } from '@stock-bot/queue'; -const logger = getLogger('proxy-provider'); +const handlerLogger = getLogger('proxy-handler'); // Initialize and register the Proxy provider export function initializeProxyProvider() { - logger.debug('Registering proxy provider with scheduled jobs...'); + handlerLogger.debug('Registering proxy provider with scheduled jobs...'); const proxyProviderConfig: HandlerConfigWithSchedule = { name: 'proxy', @@ -17,16 +17,16 @@ export function initializeProxyProvider() { operations: { 'fetch-from-sources': createJobHandler(async () => { // Fetch proxies from all configured sources - logger.info('Processing fetch proxies from sources request'); + handlerLogger.info('Processing fetch proxies from sources request'); const { fetchProxiesFromSources } = await import('./proxy.operations'); const { processItems } = await import('@stock-bot/queue'); // Fetch all proxies from sources const proxies = await fetchProxiesFromSources(); - logger.info('Fetched proxies from sources', { count: proxies.length }); + handlerLogger.info('Fetched proxies from sources', { count: proxies.length }); if (proxies.length === 0) { - logger.warn('No proxies fetched from sources'); + handlerLogger.warn('No proxies fetched from sources'); return { processed: 0, successful: 0 }; } @@ -44,7 +44,7 @@ export function initializeProxyProvider() { removeOnFail: 3, }); - logger.info('Batch proxy validation completed', { + handlerLogger.info('Batch proxy validation completed', { totalProxies: proxies.length, jobsCreated: batchResult.jobsCreated, mode: batchResult.mode, @@ -62,7 +62,7 @@ export function initializeProxyProvider() { 'check-proxy': createJobHandler(async (payload: ProxyInfo) => { // payload is now the raw proxy info object - logger.debug('Processing proxy check request', { + handlerLogger.debug('Processing proxy check request', { proxy: `${payload.host}:${payload.port}`, }); const { checkProxy } = await import('./proxy.operations'); @@ -82,5 +82,5 @@ export function initializeProxyProvider() { }; handlerRegistry.registerWithSchedule(proxyProviderConfig); - logger.debug('Proxy provider registered successfully with scheduled jobs'); + handlerLogger.debug('Proxy provider registered successfully with scheduled jobs'); } diff --git a/apps/data-service/src/handlers/qm/operations/exchanges.operations.ts b/apps/data-service/src/handlers/qm/operations/exchanges.operations.ts new file mode 100644 index 0000000..be25191 --- /dev/null +++ b/apps/data-service/src/handlers/qm/operations/exchanges.operations.ts @@ -0,0 +1,42 @@ +/** + * QM Exchanges Operations - Exchange fetching functionality + */ + +import { OperationContext } from '@stock-bot/utils'; +import type { Logger } from '@stock-bot/logger'; + +import { initializeQMResources } from './session.operations'; + +export async function fetchExchanges(parentLogger?: Logger): Promise { + const ctx = OperationContext.create('qm', 'exchanges', parentLogger); + + try { + // Ensure resources are initialized + const { QMSessionManager } = await import('../shared/session-manager'); + const sessionManager = QMSessionManager.getInstance(); + + if (!sessionManager.getInitialized()) { + await initializeQMResources(parentLogger); + } + + ctx.logger.info('QM exchanges fetch - not implemented yet'); + + // Cache the "not implemented" status + await ctx.cache.set('fetch-status', { + implemented: false, + message: 'QM exchanges fetching not yet implemented', + timestamp: new Date().toISOString() + }, { ttl: 3600 }); + + // TODO: Implement QM exchanges fetching logic + // This could involve: + // 1. Querying existing exchanges from MongoDB + // 2. Making API calls to discover new exchanges + // 3. Processing and storing exchange metadata + + return null; + } catch (error) { + ctx.logger.error('Failed to fetch QM exchanges', { error }); + return null; + } +} \ No newline at end of file diff --git a/apps/data-service/src/handlers/qm/operations/session.operations.ts b/apps/data-service/src/handlers/qm/operations/session.operations.ts new file mode 100644 index 0000000..0a94d40 --- /dev/null +++ b/apps/data-service/src/handlers/qm/operations/session.operations.ts @@ -0,0 +1,185 @@ +/** + * QM Session Operations - Session creation and management + */ + +import { OperationContext } from '@stock-bot/utils'; +import { isShutdownSignalReceived } from '@stock-bot/shutdown'; +import { getRandomProxy } from '@stock-bot/utils'; +import type { Logger } from '@stock-bot/logger'; + +import { QMSessionManager } from '../shared/session-manager'; +import { QM_SESSION_IDS, QM_CONFIG, SESSION_CONFIG, getQmHeaders } from '../shared/config'; +import type { QMSession } from '../shared/types'; + +export async function createSessions(parentLogger?: Logger): Promise { + const ctx = OperationContext.create('qm', 'session', parentLogger); + + try { + ctx.logger.info('Creating QM sessions...'); + + // Get session manager instance + const sessionManager = QMSessionManager.getInstance(); + + // Check if already initialized + if (!sessionManager.getInitialized()) { + await initializeQMResources(parentLogger); + } + + // Clean up failed sessions first + const removedCount = sessionManager.cleanupFailedSessions(); + if (removedCount > 0) { + ctx.logger.info(`Cleaned up ${removedCount} failed sessions`); + } + + // Cache session creation stats + const initialStats = sessionManager.getStats(); + await ctx.cache.set('pre-creation-stats', initialStats, { ttl: 300 }); + + // Create sessions for each session ID that needs them + for (const [sessionKey, sessionId] of Object.entries(QM_SESSION_IDS)) { + if (sessionManager.isAtCapacity(sessionId)) { + ctx.logger.debug(`Session ID ${sessionKey} is at capacity, skipping`); + continue; + } + + while (sessionManager.needsMoreSessions(sessionId)) { + if (isShutdownSignalReceived()) { + ctx.logger.info('Shutting down, skipping session creation'); + return; + } + + await createSingleSession(sessionId, sessionKey, ctx); + } + } + + // Cache final stats and session count + const finalStats = sessionManager.getStats(); + const totalSessions = sessionManager.getSessionCount(); + + await ctx.cache.set('post-creation-stats', finalStats, { ttl: 3600 }); + await ctx.cache.set('session-count', totalSessions, { ttl: 900 }); + await ctx.cache.set('last-session-creation', new Date().toISOString()); + + ctx.logger.info('QM session creation completed', { + totalSessions, + sessionStats: finalStats + }); + + } catch (error) { + ctx.logger.error('Failed to create QM sessions', { error }); + throw error; + } +} + +async function createSingleSession( + sessionId: string, + sessionKey: string, + ctx: OperationContext +): Promise { + ctx.logger.debug(`Creating new session for ${sessionKey}`, { sessionId }); + + const proxyInfo = await getRandomProxy(); + if (!proxyInfo) { + ctx.logger.error('No proxy available for QM session creation'); + return; + } + + // Convert ProxyInfo to string format + const auth = proxyInfo.username && proxyInfo.password ? + `${proxyInfo.username}:${proxyInfo.password}@` : ''; + const proxy = `${proxyInfo.protocol}://${auth}${proxyInfo.host}:${proxyInfo.port}`; + + const newSession: QMSession = { + proxy: proxy, + headers: getQmHeaders(), + successfulCalls: 0, + failedCalls: 0, + lastUsed: new Date(), + }; + + try { + const sessionResponse = await fetch( + `${QM_CONFIG.BASE_URL}${QM_CONFIG.AUTH_PATH}/${sessionId}`, + { + method: 'GET', + headers: newSession.headers, + signal: AbortSignal.timeout(SESSION_CONFIG.SESSION_TIMEOUT), + } + ); + + ctx.logger.debug('Session response received', { + status: sessionResponse.status, + sessionKey, + }); + + if (!sessionResponse.ok) { + ctx.logger.error('Failed to create QM session', { + sessionKey, + sessionId, + status: sessionResponse.status, + statusText: sessionResponse.statusText, + }); + return; + } + + const sessionData = await sessionResponse.json(); + + // Add token to headers + newSession.headers['Datatool-Token'] = sessionData.token; + + // Add session to manager + const sessionManager = QMSessionManager.getInstance(); + sessionManager.addSession(sessionId, newSession); + + // Cache successful session creation + await ctx.cache.set( + `successful-session:${sessionKey}:${Date.now()}`, + { sessionId, proxy, tokenExists: !!sessionData.token }, + { ttl: 300 } + ); + + ctx.logger.info('QM session created successfully', { + sessionKey, + sessionId, + proxy: newSession.proxy, + sessionCount: sessionManager.getSessions(sessionId).length, + hasToken: !!sessionData.token + }); + + } catch (error) { + if (error.name === 'TimeoutError') { + ctx.logger.warn('QM session creation timed out', { sessionKey, sessionId }); + } else { + ctx.logger.error('Error creating QM session', { sessionKey, sessionId, error }); + } + + // Cache failed session attempt for debugging + await ctx.cache.set( + `failed-session:${sessionKey}:${Date.now()}`, + { sessionId, proxy, error: error.message }, + { ttl: 300 } + ); + } +} + +export async function initializeQMResources(parentLogger?: Logger): Promise { + const ctx = OperationContext.create('qm', 'init', parentLogger); + + // Check if already initialized + const alreadyInitialized = await ctx.cache.get('initialized'); + if (alreadyInitialized) { + ctx.logger.debug('QM resources already initialized'); + return; + } + + ctx.logger.debug('Initializing QM resources...'); + + // Mark as initialized in cache and session manager + await ctx.cache.set('initialized', true, { ttl: 3600 }); + await ctx.cache.set('initialization-time', new Date().toISOString()); + + const sessionManager = QMSessionManager.getInstance(); + sessionManager.setInitialized(true); + + ctx.logger.info('QM resources initialized successfully'); +} \ No newline at end of file diff --git a/apps/data-service/src/handlers/qm/operations/spider.operations.ts b/apps/data-service/src/handlers/qm/operations/spider.operations.ts new file mode 100644 index 0000000..6b8b145 --- /dev/null +++ b/apps/data-service/src/handlers/qm/operations/spider.operations.ts @@ -0,0 +1,266 @@ +/** + * QM Spider Operations - Symbol spider search functionality + */ + +import { OperationContext } from '@stock-bot/utils'; +import { QueueManager } from '@stock-bot/queue'; +import type { Logger } from '@stock-bot/logger'; + +import { QMSessionManager } from '../shared/session-manager'; +import { QM_SESSION_IDS } from '../shared/config'; +import type { SymbolSpiderJob, SpiderResult } from '../shared/types'; +import { initializeQMResources } from './session.operations'; +import { searchQMSymbolsAPI } from './symbols.operations'; + +export async function spiderSymbolSearch( + payload: SymbolSpiderJob, + parentLogger?: Logger +): Promise { + const ctx = OperationContext.create('qm', 'spider', parentLogger); + + try { + const { prefix, depth, source = 'qm', maxDepth = 4 } = payload; + + ctx.logger.info('Starting spider search', { + prefix: prefix || 'ROOT', + depth, + source, + maxDepth + }); + + // Check cache for recent results + const cacheKey = `search-result:${prefix || 'ROOT'}:${depth}`; + const cachedResult = await ctx.cache.get(cacheKey); + if (cachedResult) { + ctx.logger.debug('Using cached spider search result', { prefix, depth }); + return cachedResult; + } + + // Ensure resources are initialized + const sessionManager = QMSessionManager.getInstance(); + if (!sessionManager.getInitialized()) { + await initializeQMResources(parentLogger); + } + + let result: SpiderResult; + + // Root job: Create A-Z jobs + if (prefix === null || prefix === undefined || prefix === '') { + result = await createAlphabetJobs(source, maxDepth, ctx); + } else { + // Leaf job: Search for symbols with this prefix + result = await searchAndSpawnJobs(prefix, depth, source, maxDepth, ctx); + } + + // Cache the result + await ctx.cache.set(cacheKey, result, { ttl: 3600 }); + + // Store spider operation metrics in PostgreSQL + if (ctx.postgres) { + try { + await ctx.postgres.query( + 'INSERT INTO spider_stats (handler, operation, prefix, depth, symbols_found, jobs_created, search_time) VALUES ($1, $2, $3, $4, $5, $6, $7)', + ['qm', 'spider', prefix || 'ROOT', depth, result.symbolsFound, result.jobsCreated, new Date()] + ); + } catch (error) { + ctx.logger.warn('Failed to store spider stats in PostgreSQL', { error }); + } + } + + ctx.logger.info('Spider search completed', { + prefix: prefix || 'ROOT', + depth, + success: result.success, + symbolsFound: result.symbolsFound, + jobsCreated: result.jobsCreated + }); + + return result; + + } catch (error) { + ctx.logger.error('Spider symbol search failed', { error, payload }); + const failedResult = { success: false, symbolsFound: 0, jobsCreated: 0 }; + + // Cache failed result for a shorter time + const cacheKey = `search-result:${payload.prefix || 'ROOT'}:${payload.depth}`; + await ctx.cache.set(cacheKey, failedResult, { ttl: 300 }); + + return failedResult; + } +} + +async function createAlphabetJobs( + source: string, + maxDepth: number, + ctx: OperationContext +): Promise { + try { + const queueManager = QueueManager.getInstance(); + const queue = queueManager.getQueue('qm'); + let jobsCreated = 0; + + ctx.logger.info('Creating alphabet jobs (A-Z)'); + + // Create jobs for A-Z + for (let i = 0; i < 26; i++) { + const letter = String.fromCharCode(65 + i); // A=65, B=66, etc. + + const job: SymbolSpiderJob = { + prefix: letter, + depth: 1, + source, + maxDepth, + }; + + await queue.add( + 'spider-symbol-search', + { + handler: 'qm', + operation: 'spider-symbol-search', + payload: job, + }, + { + priority: 5, + delay: i * 100, // Stagger jobs by 100ms + attempts: 3, + backoff: { type: 'exponential', delay: 2000 }, + } + ); + + jobsCreated++; + } + + // Cache alphabet job creation + await ctx.cache.set('alphabet-jobs-created', { + count: jobsCreated, + timestamp: new Date().toISOString(), + source, + maxDepth + }, { ttl: 3600 }); + + ctx.logger.info(`Created ${jobsCreated} alphabet jobs (A-Z)`); + return { success: true, symbolsFound: 0, jobsCreated }; + + } catch (error) { + ctx.logger.error('Failed to create alphabet jobs', { error }); + return { success: false, symbolsFound: 0, jobsCreated: 0 }; + } +} + +async function searchAndSpawnJobs( + prefix: string, + depth: number, + source: string, + maxDepth: number, + ctx: OperationContext +): Promise { + try { + // Ensure sessions exist for symbol search + const sessionManager = QMSessionManager.getInstance(); + const lookupSession = sessionManager.getSession(QM_SESSION_IDS.LOOKUP); + + if (!lookupSession) { + ctx.logger.info('No lookup sessions available, creating sessions first...'); + const { createSessions } = await import('./session.operations'); + await createSessions(ctx.logger); + + // Wait a bit for session creation + await new Promise(resolve => setTimeout(resolve, 1000)); + } + + // Search for symbols with this prefix + const symbols = await searchQMSymbolsAPI(prefix, ctx.logger); + const symbolCount = symbols.length; + + ctx.logger.info(`Prefix "${prefix}" returned ${symbolCount} symbols`); + + let jobsCreated = 0; + + // Store symbols in MongoDB + if (ctx.mongodb && symbols.length > 0) { + try { + const updatedSymbols = symbols.map((symbol: Record) => ({ + ...symbol, + qmSearchCode: symbol.symbol, + symbol: (symbol.symbol as string)?.split(':')[0], + searchPrefix: prefix, + searchDepth: depth, + discoveredAt: new Date() + })); + + await ctx.mongodb.batchUpsert('qmSymbols', updatedSymbols, ['qmSearchCode']); + ctx.logger.debug('Stored symbols in MongoDB', { count: symbols.length }); + } catch (error) { + ctx.logger.warn('Failed to store symbols in MongoDB', { error }); + } + } + + // If we have 50+ symbols and haven't reached max depth, spawn sub-jobs + if (symbolCount >= 50 && depth < maxDepth) { + const queueManager = QueueManager.getInstance(); + const queue = queueManager.getQueue('qm'); + + ctx.logger.info(`Spawning sub-jobs for prefix "${prefix}" (${symbolCount} >= 50 symbols)`); + + // Create jobs for prefixA, prefixB, prefixC... prefixZ + for (let i = 0; i < 26; i++) { + const letter = String.fromCharCode(65 + i); + const newPrefix = prefix + letter; + + const job: SymbolSpiderJob = { + prefix: newPrefix, + depth: depth + 1, + source, + maxDepth, + }; + + await queue.add( + 'spider-symbol-search', + { + handler: 'qm', + operation: 'spider-symbol-search', + payload: job, + }, + { + priority: Math.max(1, 6 - depth), // Higher priority for deeper jobs + delay: i * 50, // Stagger sub-jobs by 50ms + attempts: 3, + backoff: { type: 'exponential', delay: 2000 }, + } + ); + + jobsCreated++; + } + + // Cache sub-job creation info + await ctx.cache.set(`sub-jobs:${prefix}`, { + parentPrefix: prefix, + depth, + symbolCount, + jobsCreated, + timestamp: new Date().toISOString() + }, { ttl: 3600 }); + + ctx.logger.info(`Created ${jobsCreated} sub-jobs for prefix "${prefix}"`); + } else { + // Terminal case: save symbols (already done above) + ctx.logger.info(`Terminal case for prefix "${prefix}": ${symbolCount} symbols saved`); + + // Cache terminal case info + await ctx.cache.set(`terminal:${prefix}`, { + prefix, + depth, + symbolCount, + isTerminal: true, + reason: symbolCount < 50 ? 'insufficient_symbols' : 'max_depth_reached', + timestamp: new Date().toISOString() + }, { ttl: 3600 }); + } + + return { success: true, symbolsFound: symbolCount, jobsCreated }; + + } catch (error) { + ctx.logger.error(`Failed to search and spawn jobs for prefix "${prefix}"`, { error, depth }); + return { success: false, symbolsFound: 0, jobsCreated: 0 }; + } +} \ No newline at end of file diff --git a/apps/data-service/src/handlers/qm/operations/symbols.operations.ts b/apps/data-service/src/handlers/qm/operations/symbols.operations.ts new file mode 100644 index 0000000..26f5c67 --- /dev/null +++ b/apps/data-service/src/handlers/qm/operations/symbols.operations.ts @@ -0,0 +1,196 @@ +/** + * QM Symbols Operations - Symbol fetching and API interactions + */ + +import { OperationContext } from '@stock-bot/utils'; +import { getRandomProxy } from '@stock-bot/utils'; +import type { Logger } from '@stock-bot/logger'; + +import { QMSessionManager } from '../shared/session-manager'; +import { QM_SESSION_IDS, QM_CONFIG, SESSION_CONFIG } from '../shared/config'; +import type { SymbolSpiderJob, Exchange } from '../shared/types'; +import { initializeQMResources } from './session.operations'; +import { spiderSymbolSearch } from './spider.operations'; + +export async function fetchSymbols(parentLogger?: Logger): Promise { + const ctx = OperationContext.create('qm', 'symbols', parentLogger); + + try { + const sessionManager = QMSessionManager.getInstance(); + if (!sessionManager.getInitialized()) { + await initializeQMResources(parentLogger); + } + + ctx.logger.info('Starting QM spider-based symbol search...'); + + // Check if we have a recent symbol fetch + const lastFetch = await ctx.cache.get('last-symbol-fetch'); + if (lastFetch) { + ctx.logger.info('Recent symbol fetch found, using spider search'); + } + + // Start the spider process with root job + const rootJob: SymbolSpiderJob = { + prefix: null, // Root job creates A-Z jobs + depth: 0, + source: 'qm', + maxDepth: 4, + }; + + const result = await spiderSymbolSearch(rootJob, parentLogger); + + if (result.success) { + // Cache successful fetch info + await ctx.cache.set('last-symbol-fetch', { + timestamp: new Date().toISOString(), + jobsCreated: result.jobsCreated, + success: true + }, { ttl: 3600 }); + + ctx.logger.info( + `QM spider search initiated successfully. Created ${result.jobsCreated} initial jobs` + ); + return [`Spider search initiated with ${result.jobsCreated} jobs`]; + } else { + ctx.logger.error('Failed to initiate QM spider search'); + return null; + } + } catch (error) { + ctx.logger.error('Failed to start QM spider symbol search', { error }); + return null; + } +} + +export async function searchQMSymbolsAPI(query: string, parentLogger?: Logger): Promise { + const ctx = OperationContext.create('qm', 'api-search', parentLogger); + + const proxyInfo = await getRandomProxy(); + if (!proxyInfo) { + throw new Error('No proxy available for QM API call'); + } + + const sessionManager = QMSessionManager.getInstance(); + const session = sessionManager.getSession(QM_SESSION_IDS.LOOKUP); + + if (!session) { + throw new Error(`No active session found for QM API with ID: ${QM_SESSION_IDS.LOOKUP}`); + } + + try { + ctx.logger.debug('Searching QM symbols API', { query, proxy: session.proxy }); + + // Check cache for recent API results + const cacheKey = `api-search:${query}`; + const cachedResult = await ctx.cache.get(cacheKey); + if (cachedResult) { + ctx.logger.debug('Using cached API search result', { query }); + return cachedResult; + } + + // QM lookup endpoint for symbol search + const searchParams = new URLSearchParams({ + marketType: 'equity', + pathName: '/demo/portal/company-summary.php', + q: query, + qmodTool: 'SmartSymbolLookup', + searchType: 'symbol', + showFree: 'false', + showHisa: 'false', + webmasterId: '500' + }); + + const apiUrl = `${QM_CONFIG.LOOKUP_URL}?${searchParams.toString()}`; + + const response = await fetch(apiUrl, { + method: 'GET', + headers: session.headers, + signal: AbortSignal.timeout(SESSION_CONFIG.API_TIMEOUT), + }); + + if (!response.ok) { + throw new Error(`QM API request failed: ${response.status} ${response.statusText}`); + } + + const symbols = await response.json(); + + // Update session stats + session.successfulCalls++; + session.lastUsed = new Date(); + + // Process symbols and extract exchanges + if (ctx.mongodb && symbols.length > 0) { + try { + const updatedSymbols = symbols.map((symbol: Record) => ({ + ...symbol, + qmSearchCode: symbol.symbol, + symbol: (symbol.symbol as string)?.split(':')[0], + searchQuery: query, + fetchedAt: new Date() + })); + + await ctx.mongodb.batchUpsert('qmSymbols', updatedSymbols, ['qmSearchCode']); + + // Extract and store unique exchanges + const exchanges: Exchange[] = []; + for (const symbol of symbols) { + if (!exchanges.some(ex => ex.exchange === symbol.exchange)) { + exchanges.push({ + exchange: symbol.exchange, + exchangeCode: symbol.exchangeCode, + exchangeShortName: symbol.exchangeShortName, + countryCode: symbol.countryCode, + source: 'qm', + }); + } + } + + if (exchanges.length > 0) { + await ctx.mongodb.batchUpsert('qmExchanges', exchanges, ['exchange']); + ctx.logger.debug('Stored exchanges in MongoDB', { count: exchanges.length }); + } + + } catch (error) { + ctx.logger.warn('Failed to store symbols/exchanges in MongoDB', { error }); + } + } + + // Cache the result + await ctx.cache.set(cacheKey, symbols, { ttl: 1800 }); // 30 minutes + + // Store API call stats + await ctx.cache.set(`api-stats:${query}:${Date.now()}`, { + query, + symbolCount: symbols.length, + proxy: session.proxy, + success: true, + timestamp: new Date().toISOString() + }, { ttl: 3600 }); + + ctx.logger.info( + `QM API returned ${symbols.length} symbols for query: ${query}`, + { proxy: session.proxy, symbolCount: symbols.length } + ); + + return symbols; + + } catch (error) { + // Update session failure stats + session.failedCalls++; + session.lastUsed = new Date(); + + // Cache failed API call info + await ctx.cache.set(`api-failure:${query}:${Date.now()}`, { + query, + error: error.message, + proxy: session.proxy, + timestamp: new Date().toISOString() + }, { ttl: 600 }); + + ctx.logger.error(`Error searching QM symbols for query "${query}"`, { + error: error.message, + proxy: session.proxy + }); + + throw error; + } +} \ No newline at end of file diff --git a/apps/data-service/src/handlers/qm/qm.handler.ts b/apps/data-service/src/handlers/qm/qm.handler.ts index f71945c..8a95c28 100644 --- a/apps/data-service/src/handlers/qm/qm.handler.ts +++ b/apps/data-service/src/handlers/qm/qm.handler.ts @@ -4,31 +4,31 @@ import { handlerRegistry, type HandlerConfigWithSchedule } from '@stock-bot/queue'; -import type { SymbolSpiderJob } from './qm.operations'; +import type { SymbolSpiderJob } from './shared/types'; -const logger = getLogger('qm-provider'); +const handlerLogger = getLogger('qm-handler'); // Initialize and register the QM provider export function initializeQMProvider() { - logger.debug('Registering QM provider with scheduled jobs...'); + handlerLogger.debug('Registering QM provider with scheduled jobs...'); const qmProviderConfig: HandlerConfigWithSchedule = { name: 'qm', operations: { 'create-sessions': createJobHandler(async () => { - logger.debug('Creating QM sessions...'); - const { createSessions } = await import('./qm.operations'); - await createSessions(); - logger.debug('QM sessions created successfully'); + handlerLogger.debug('Creating QM sessions...'); + const { createSessions } = await import('./operations/session.operations'); + await createSessions(handlerLogger); + handlerLogger.debug('QM sessions created successfully'); return { success: true, message: 'QM sessions created successfully' }; }), 'search-symbols': createJobHandler(async () => { - logger.info('Starting QM symbol search...'); - const { fetchSymbols } = await import('./qm.operations'); - const symbols = await fetchSymbols(); + handlerLogger.info('Starting QM symbol search...'); + const { fetchSymbols } = await import('./operations/symbols.operations'); + const symbols = await fetchSymbols(handlerLogger); if (symbols && symbols.length > 0) { - logger.info('QM symbol search completed successfully', { count: symbols.length }); + handlerLogger.info('QM symbol search completed successfully', { count: symbols.length }); return { success: true, message: 'QM symbol search completed successfully', @@ -36,7 +36,7 @@ export function initializeQMProvider() { symbols: symbols.slice(0, 10), // Return first 10 symbols as sample }; } else { - logger.warn('QM symbol search returned no results'); + handlerLogger.warn('QM symbol search returned no results'); return { success: false, message: 'No symbols found', @@ -45,11 +45,11 @@ export function initializeQMProvider() { } }), 'spider-symbol-search': createJobHandler(async (payload: SymbolSpiderJob) => { - logger.debug('Processing spider symbol search job', { payload }); - const { spiderSymbolSearch } = await import('./qm.operations'); - const result = await spiderSymbolSearch(payload); + handlerLogger.debug('Processing spider symbol search job', { payload }); + const { spiderSymbolSearch } = await import('./operations/spider.operations'); + const result = await spiderSymbolSearch(payload, handlerLogger); - logger.debug('Spider search job completed', { + handlerLogger.debug('Spider search job completed', { success: result.success, symbolsFound: result.symbolsFound, }); @@ -85,5 +85,5 @@ export function initializeQMProvider() { }; handlerRegistry.registerWithSchedule(qmProviderConfig); - logger.debug('IB provider registered successfully with scheduled jobs'); + handlerLogger.debug('QM provider registered successfully with scheduled jobs'); } diff --git a/apps/data-service/src/handlers/qm/qm.operations.ts b/apps/data-service/src/handlers/qm/qm.operations.ts.old similarity index 100% rename from apps/data-service/src/handlers/qm/qm.operations.ts rename to apps/data-service/src/handlers/qm/qm.operations.ts.old diff --git a/apps/data-service/src/handlers/qm/shared/config.ts b/apps/data-service/src/handlers/qm/shared/config.ts new file mode 100644 index 0000000..3710ebf --- /dev/null +++ b/apps/data-service/src/handlers/qm/shared/config.ts @@ -0,0 +1,43 @@ +/** + * Shared configuration for QM operations + */ + +import { getRandomUserAgent } from '@stock-bot/http'; + +// QM Session IDs for different endpoints +export const QM_SESSION_IDS = { + LOOKUP: 'dc8c9930437f65d30f6597768800957017bac203a0a50342932757c8dfa158d6', // lookup endpoint + // Add other session IDs as needed +} as const; + +// QM API Configuration +export const QM_CONFIG = { + BASE_URL: 'https://app.quotemedia.com', + AUTH_PATH: '/auth/g/authenticate/dataTool/v0/500', + LOOKUP_URL: 'https://app.quotemedia.com/datatool/lookup.json', + ORIGIN: 'https://www.quotemedia.com', + REFERER: 'https://www.quotemedia.com/', +} as const; + +// Session management settings +export const SESSION_CONFIG = { + MIN_SESSIONS: 5, + MAX_SESSIONS: 10, + MAX_FAILED_CALLS: 10, + SESSION_TIMEOUT: 10000, // 10 seconds + API_TIMEOUT: 15000, // 15 seconds +} as const; + +/** + * Generate standard QM headers + */ +export function getQmHeaders(): Record { + return { + 'User-Agent': getRandomUserAgent(), + Accept: '*/*', + 'Accept-Language': 'en', + 'Sec-Fetch-Mode': 'cors', + Origin: QM_CONFIG.ORIGIN, + Referer: QM_CONFIG.REFERER, + }; +} \ No newline at end of file diff --git a/apps/data-service/src/handlers/qm/shared/session-manager.ts b/apps/data-service/src/handlers/qm/shared/session-manager.ts new file mode 100644 index 0000000..b274e7c --- /dev/null +++ b/apps/data-service/src/handlers/qm/shared/session-manager.ts @@ -0,0 +1,136 @@ +/** + * QM Session Manager - Centralized session state management + */ + +import type { QMSession } from './types'; +import { QM_SESSION_IDS, SESSION_CONFIG } from './config'; + +export class QMSessionManager { + private static instance: QMSessionManager | null = null; + private sessionCache: Record = {}; + private isInitialized = false; + + private constructor() { + // Initialize session cache with known session IDs + Object.values(QM_SESSION_IDS).forEach(sessionId => { + this.sessionCache[sessionId] = []; + }); + } + + static getInstance(): QMSessionManager { + if (!QMSessionManager.instance) { + QMSessionManager.instance = new QMSessionManager(); + } + return QMSessionManager.instance; + } + + /** + * Get a random session for the given session ID + */ + getSession(sessionId: string): QMSession | null { + const sessions = this.sessionCache[sessionId]; + if (!sessions || sessions.length === 0) { + return null; + } + + // Filter out sessions with excessive failures + const validSessions = sessions.filter(session => session.failedCalls <= SESSION_CONFIG.MAX_FAILED_CALLS); + if (validSessions.length === 0) { + return null; + } + + return validSessions[Math.floor(Math.random() * validSessions.length)]; + } + + /** + * Add a session to the cache + */ + addSession(sessionId: string, session: QMSession): void { + if (!this.sessionCache[sessionId]) { + this.sessionCache[sessionId] = []; + } + this.sessionCache[sessionId].push(session); + } + + /** + * Get all sessions for a session ID + */ + getSessions(sessionId: string): QMSession[] { + return this.sessionCache[sessionId] || []; + } + + /** + * Get session count for all session IDs + */ + getSessionCount(): number { + return Object.values(this.sessionCache).reduce((total, sessions) => total + sessions.length, 0); + } + + /** + * Clean up failed sessions + */ + cleanupFailedSessions(): number { + let removedCount = 0; + + Object.keys(this.sessionCache).forEach(sessionId => { + const initialCount = this.sessionCache[sessionId].length; + this.sessionCache[sessionId] = this.sessionCache[sessionId].filter( + session => session.failedCalls <= SESSION_CONFIG.MAX_FAILED_CALLS + ); + removedCount += initialCount - this.sessionCache[sessionId].length; + }); + + return removedCount; + } + + /** + * Check if more sessions are needed for a session ID + */ + needsMoreSessions(sessionId: string): boolean { + const sessions = this.sessionCache[sessionId] || []; + const validSessions = sessions.filter(session => session.failedCalls <= SESSION_CONFIG.MAX_FAILED_CALLS); + return validSessions.length < SESSION_CONFIG.MIN_SESSIONS; + } + + /** + * Check if session ID is at capacity + */ + isAtCapacity(sessionId: string): boolean { + const sessions = this.sessionCache[sessionId] || []; + return sessions.length >= SESSION_CONFIG.MAX_SESSIONS; + } + + /** + * Get session cache statistics + */ + getStats() { + const stats: Record = {}; + + Object.entries(this.sessionCache).forEach(([sessionId, sessions]) => { + const validSessions = sessions.filter(session => session.failedCalls <= SESSION_CONFIG.MAX_FAILED_CALLS); + const failedSessions = sessions.filter(session => session.failedCalls > SESSION_CONFIG.MAX_FAILED_CALLS); + + stats[sessionId] = { + total: sessions.length, + valid: validSessions.length, + failed: failedSessions.length + }; + }); + + return stats; + } + + /** + * Mark manager as initialized + */ + setInitialized(initialized: boolean = true): void { + this.isInitialized = initialized; + } + + /** + * Check if manager is initialized + */ + getInitialized(): boolean { + return this.isInitialized; + } +} \ No newline at end of file diff --git a/apps/data-service/src/handlers/qm/shared/types.ts b/apps/data-service/src/handlers/qm/shared/types.ts new file mode 100644 index 0000000..9897459 --- /dev/null +++ b/apps/data-service/src/handlers/qm/shared/types.ts @@ -0,0 +1,32 @@ +/** + * Shared types for QM operations + */ + +export interface QMSession { + proxy: string; + headers: Record; + successfulCalls: number; + failedCalls: number; + lastUsed: Date; +} + +export interface SymbolSpiderJob { + prefix: string | null; // null = root job (A-Z) + depth: number; // 1=A, 2=AA, 3=AAA, etc. + source: string; // 'qm' + maxDepth?: number; // optional max depth limit +} + +export interface Exchange { + exchange: string; + exchangeCode: string; + exchangeShortName: string; + countryCode: string; + source: string; +} + +export interface SpiderResult { + success: boolean; + symbolsFound: number; + jobsCreated: number; +} \ No newline at end of file diff --git a/libs/utils/src/index.ts b/libs/utils/src/index.ts index 430bafa..a552313 100644 --- a/libs/utils/src/index.ts +++ b/libs/utils/src/index.ts @@ -2,4 +2,5 @@ export * from './calculations/index'; export * from './common'; export * from './dateUtils'; export * from './generic-functions'; +export * from './operation-context'; export * from './proxy'; diff --git a/libs/utils/src/operation-context.ts b/libs/utils/src/operation-context.ts new file mode 100644 index 0000000..38ae757 --- /dev/null +++ b/libs/utils/src/operation-context.ts @@ -0,0 +1,172 @@ +/** + * OperationContext - Unified context for handler operations + * + * Provides streamlined access to: + * - Child loggers with hierarchical context + * - Database clients (MongoDB, PostgreSQL) + * - Contextual cache with automatic key prefixing + * - Shared resource management + */ + +import { createCache, type CacheProvider } from '@stock-bot/cache'; +import { getLogger, type Logger } from '@stock-bot/logger'; +import { getDatabaseConfig } from '@stock-bot/config'; + +export class OperationContext { + public readonly logger: Logger; + public readonly mongodb: any; // MongoDB client - imported dynamically + public readonly postgres: any; // PostgreSQL client - imported dynamically + + private static sharedCache: CacheProvider | null = null; + private static parentLoggers = new Map(); + private static databaseConfig: any = null; + + constructor( + public readonly handlerName: string, + public readonly operationName: string, + parentLogger?: Logger + ) { + // Create child logger from parent or create handler parent + const parent = parentLogger || this.getOrCreateParentLogger(); + this.logger = parent.child(operationName, { + handler: handlerName, + operation: operationName + }); + + // Set up database access + this.mongodb = this.getDatabaseClient('mongodb'); + this.postgres = this.getDatabaseClient('postgres'); + } + + private getDatabaseClient(type: 'mongodb' | 'postgres'): any { + try { + if (type === 'mongodb') { + // Dynamic import to avoid TypeScript issues during build + const { getMongoDBClient } = require('@stock-bot/mongodb-client'); + return getMongoDBClient(); + } else { + // Dynamic import to avoid TypeScript issues during build + const { getPostgreSQLClient } = require('@stock-bot/postgres-client'); + return getPostgreSQLClient(); + } + } catch (error) { + this.logger.warn(`${type} client not initialized, operations may fail`, { error }); + return null; + } + } + + private getOrCreateParentLogger(): Logger { + const parentKey = `${this.handlerName}-handler`; + + if (!OperationContext.parentLoggers.has(parentKey)) { + const parentLogger = getLogger(parentKey); + OperationContext.parentLoggers.set(parentKey, parentLogger); + } + + return OperationContext.parentLoggers.get(parentKey)!; + } + + /** + * Get contextual cache with automatic key prefixing + * Keys are automatically prefixed as: "operations:handlerName:operationName:key" + */ + get cache(): CacheProvider { + if (!OperationContext.sharedCache) { + // Get Redis configuration from database config + if (!OperationContext.databaseConfig) { + OperationContext.databaseConfig = getDatabaseConfig(); + } + + const redisConfig = OperationContext.databaseConfig.dragonfly || { + host: 'localhost', + port: 6379, + db: 1 + }; + + OperationContext.sharedCache = createCache({ + keyPrefix: 'operations:', + shared: true, // Use singleton Redis connection + enableMetrics: true, + ttl: 3600, // Default 1 hour TTL + redisConfig + }); + } + return this.createContextualCache(); + } + + private createContextualCache(): CacheProvider { + const contextPrefix = `${this.handlerName}:${this.operationName}:`; + + // Return a proxy that automatically prefixes keys with context + return { + async get(key: string): Promise { + return OperationContext.sharedCache!.get(`${contextPrefix}${key}`); + }, + + async set(key: string, value: T, options?: any): Promise { + return OperationContext.sharedCache!.set(`${contextPrefix}${key}`, value, options); + }, + + async del(key: string): Promise { + return OperationContext.sharedCache!.del(`${contextPrefix}${key}`); + }, + + async exists(key: string): Promise { + return OperationContext.sharedCache!.exists(`${contextPrefix}${key}`); + }, + + async clear(): Promise { + // Not implemented for contextual cache - use del() for specific keys + throw new Error('clear() not implemented for contextual cache - use del() for specific keys'); + }, + + async keys(pattern: string): Promise { + const fullPattern = `${contextPrefix}${pattern}`; + return OperationContext.sharedCache!.keys(fullPattern); + }, + + getStats() { + return OperationContext.sharedCache!.getStats(); + }, + + async health(): Promise { + return OperationContext.sharedCache!.health(); + }, + + async waitForReady(timeout?: number): Promise { + return OperationContext.sharedCache!.waitForReady(timeout); + }, + + isReady(): boolean { + return OperationContext.sharedCache!.isReady(); + } + } as CacheProvider; + } + + /** + * Factory method to create OperationContext + */ + static create(handlerName: string, operationName: string, parentLogger?: Logger): OperationContext { + return new OperationContext(handlerName, operationName, parentLogger); + } + + /** + * Get cache key prefix for this operation context + */ + getCacheKeyPrefix(): string { + return `operations:${this.handlerName}:${this.operationName}:`; + } + + /** + * Create a child context for sub-operations + */ + createChild(subOperationName: string): OperationContext { + return new OperationContext( + this.handlerName, + `${this.operationName}:${subOperationName}`, + this.logger + ); + } +} + +export default OperationContext; \ No newline at end of file