diff --git a/apps/data-ingestion/src/handlers/qm/actions/exchanges.action.ts b/apps/data-ingestion/src/handlers/qm/actions/exchanges.action.ts new file mode 100644 index 0000000..101d55d --- /dev/null +++ b/apps/data-ingestion/src/handlers/qm/actions/exchanges.action.ts @@ -0,0 +1,21 @@ +/** + * QM Exchanges Operations - Simple exchange data fetching + */ + +import type { IServiceContainer } from '@stock-bot/handlers'; + +export async function fetchExchanges(services: IServiceContainer): Promise { + // Get exchanges from MongoDB + const exchanges = await services.mongodb.collection('qm_exchanges') + .find({}).toArray(); + + return exchanges; +} + +export async function getExchangeByCode(services: IServiceContainer, code: string): Promise { + // Get specific exchange by code + const exchange = await services.mongodb.collection('qm_exchanges') + .findOne({ code }); + + return exchange; +} \ No newline at end of file diff --git a/apps/data-ingestion/src/handlers/qm/actions/session.action.ts b/apps/data-ingestion/src/handlers/qm/actions/session.action.ts new file mode 100644 index 0000000..c1badd7 --- /dev/null +++ b/apps/data-ingestion/src/handlers/qm/actions/session.action.ts @@ -0,0 +1,81 @@ +/** + * QM Session Actions - Session management and creation + */ + +import type { IServiceContainer } from '@stock-bot/handlers'; +import { QM_SESSION_IDS, SESSION_CONFIG } from '../shared/config'; +import { QMSessionManager } from '../shared/session-manager'; + +/** + * Check existing sessions and queue creation jobs for needed sessions + */ +export async function checkSessions(services: IServiceContainer): Promise<{ + cleaned: number; + queued: number; + message: string; +}> { + const sessionManager = QMSessionManager.getInstance(); + const cleanedCount = sessionManager.cleanupFailedSessions(); + // Check which session IDs need more sessions and queue creation jobs + let queuedCount = 0; + for (const sessionId of Object.values(QM_SESSION_IDS)) { + if (sessionManager.needsMoreSessions(sessionId)) { + const currentCount = sessionManager.getSessions(sessionId).length; + const neededSessions = SESSION_CONFIG.MAX_SESSIONS - currentCount; + for (let i = 0; i < neededSessions; i++) { + await services.queue.getQueue('qm').add('create-session', { sessionId }); + services.logger.log(`Queued job to create session for ${sessionId}`); + queuedCount++; + } + } + } + + return { + cleaned: cleanedCount, + queued: queuedCount, + message: `Session check completed: cleaned ${cleanedCount}, queued ${queuedCount}` + }; +} + +/** + * Create a single session for a specific session ID + */ +export async function createSingleSession( + services: IServiceContainer, + input: any +): Promise<{ sessionId: string; status: string; sessionType: string }> { + + const { sessionId: sessionType = 'default' } = input || {}; + const sessionManager = QMSessionManager.getInstance(); + + // Check if we're at capacity for this session type + if (sessionManager.isAtCapacity(sessionType)) { + return { + sessionId: '', + status: 'skipped', + sessionType, + }; + } + + // TODO: Get actual proxy and headers from proxy service + const session = { + proxy: 'http://proxy:8080', // Placeholder + headers: { + 'User-Agent': 'Mozilla/5.0 (compatible; QMBot/1.0)', + 'Accept': 'application/json' + }, + successfulCalls: 0, + failedCalls: 0, + lastUsed: new Date() + }; + + // Add session to manager + sessionManager.addSession(sessionType, session); + + return { + sessionId: sessionType, + status: 'created', + sessionType + }; +} + diff --git a/apps/data-ingestion/src/handlers/qm/actions/spider.action.ts b/apps/data-ingestion/src/handlers/qm/actions/spider.action.ts new file mode 100644 index 0000000..4dc41ce --- /dev/null +++ b/apps/data-ingestion/src/handlers/qm/actions/spider.action.ts @@ -0,0 +1,34 @@ +/** + * QM Spider Operations - Simple symbol discovery + */ + +import type { IServiceContainer } from '@stock-bot/handlers'; +import type { SymbolSpiderJob } from '../shared/types'; + +export async function spiderSymbolSearch( + services: IServiceContainer, + config: SymbolSpiderJob +): Promise<{ foundSymbols: number; depth: number }> { + + // Simple spider implementation + // TODO: Implement actual API calls to discover symbols + + // For now, just return mock results + const foundSymbols = Math.floor(Math.random() * 10) + 1; + + return { + foundSymbols, + depth: config.depth + }; +} + +export async function queueSymbolDiscovery( + services: IServiceContainer, + searchTerms: string[] +): Promise { + // Queue symbol discovery jobs + for (const term of searchTerms) { + // TODO: Queue actual discovery jobs + await services.cache.set(`discovery:${term}`, { queued: true }, 3600); + } +} \ No newline at end of file diff --git a/apps/data-ingestion/src/handlers/qm/actions/symbols.action.ts b/apps/data-ingestion/src/handlers/qm/actions/symbols.action.ts new file mode 100644 index 0000000..311ec57 --- /dev/null +++ b/apps/data-ingestion/src/handlers/qm/actions/symbols.action.ts @@ -0,0 +1,21 @@ +/** + * QM Symbols Operations - Simple symbol fetching + */ + +import type { IServiceContainer } from '@stock-bot/handlers'; + +export async function searchSymbols(services: IServiceContainer): Promise { + // Get symbols from MongoDB + const symbols = await services.mongodb.collection('qm_symbols') + .find({}).limit(50).toArray(); + + return symbols; +} + +export async function fetchSymbolData(services: IServiceContainer, symbol: string): Promise { + // Fetch data for a specific symbol + const symbolData = await services.mongodb.collection('qm_symbols') + .findOne({ symbol }); + + return symbolData; +} \ No newline at end of file diff --git a/apps/data-ingestion/src/handlers/qm/operations/exchanges.operations.ts b/apps/data-ingestion/src/handlers/qm/operations/exchanges.operations.ts deleted file mode 100644 index be1eeb6..0000000 --- a/apps/data-ingestion/src/handlers/qm/operations/exchanges.operations.ts +++ /dev/null @@ -1,44 +0,0 @@ -/** - * QM Exchanges Operations - Exchange fetching functionality - */ - -import { OperationContext } from '@stock-bot/di'; -import type { ServiceContainer } from '@stock-bot/di'; - -import { initializeQMResources } from './session.operations'; - -export async function fetchExchanges(container: ServiceContainer): Promise { - const ctx = OperationContext.create('qm', 'exchanges', { container }); - - try { - // Ensure resources are initialized - const { QMSessionManager } = await import('../shared/session-manager'); - const sessionManager = QMSessionManager.getInstance(); - - if (!sessionManager.getInitialized()) { - await initializeQMResources(container); - } - - ctx.logger.info('QM exchanges fetch - not implemented yet'); - - // Cache the "not implemented" status - await ctx.cache.set('fetch-status', { - implemented: false, - message: 'QM exchanges fetching not yet implemented', - timestamp: new Date().toISOString() - }, { ttl: 3600 }); - - // TODO: Implement QM exchanges fetching logic - // This could involve: - // 1. Querying existing exchanges from MongoDB - // 2. Making API calls to discover new exchanges - // 3. Processing and storing exchange metadata - - return null; - } catch (error) { - ctx.logger.error('Failed to fetch QM exchanges', { error }); - return null; - } finally { - await ctx.dispose(); - } -} \ No newline at end of file diff --git a/apps/data-ingestion/src/handlers/qm/operations/session.operations.ts b/apps/data-ingestion/src/handlers/qm/operations/session.operations.ts deleted file mode 100644 index eb9a6b7..0000000 --- a/apps/data-ingestion/src/handlers/qm/operations/session.operations.ts +++ /dev/null @@ -1,199 +0,0 @@ -/** - * QM Session Operations - Session creation and management - */ - -import type { ServiceContainer } from '@stock-bot/di'; -import { OperationContext } from '@stock-bot/di'; -import { isShutdownSignalReceived } from '@stock-bot/shutdown'; -import { getRandomProxy } from '@stock-bot/utils'; - -import { QM_CONFIG, QM_SESSION_IDS, SESSION_CONFIG, getQmHeaders } from '../shared/config'; -import { QMSessionManager } from '../shared/session-manager'; -import type { QMSession } from '../shared/types'; - -export async function createSessions(container: ServiceContainer): Promise { - const ctx = OperationContext.create('qm-handler', 'create-sessions', {container}); - - try { - ctx.logger.info('Creating QM sessions...'); - - // Get session manager instance - const sessionManager = QMSessionManager.getInstance(); - - // Check if already initialized - if (!sessionManager.getInitialized()) { - await initializeQMResources(container); - } - - // Clean up failed sessions first - const removedCount = sessionManager.cleanupFailedSessions(); - if (removedCount > 0) { - ctx.logger.info(`Cleaned up ${removedCount} failed sessions`); - } - - // Cache session creation stats - const initialStats = sessionManager.getStats(); - const cache = ctx.resolve('cache'); - await cache.set('pre-creation-stats', initialStats, { ttl: 300 }); - - // Create sessions for each session ID that needs them - for (const [sessionKey, sessionId] of Object.entries(QM_SESSION_IDS)) { - if (sessionManager.isAtCapacity(sessionId)) { - ctx.logger.debug(`Session ID ${sessionKey} is at capacity, skipping`); - continue; - } - - while (sessionManager.needsMoreSessions(sessionId)) { - if (isShutdownSignalReceived()) { - ctx.logger.info('Shutting down, skipping session creation'); - return; - } - - await createSingleSession(sessionId, sessionKey, ctx); - } - } - - // Cache final stats and session count - const finalStats = sessionManager.getStats(); - const totalSessions = sessionManager.getSessionCount(); - - await cache.set('post-creation-stats', finalStats, { ttl: 3600 }); - await cache.set('session-count', totalSessions, { ttl: 900 }); - await cache.set('last-session-creation', new Date().toISOString()); - - ctx.logger.info('QM session creation completed', { - totalSessions, - sessionStats: finalStats - }); - - } catch (error) { - ctx.logger.error('Failed to create QM sessions', { error }); - throw error; - } -} - -async function createSingleSession( - sessionId: string, - sessionKey: string, - ctx: OperationContext -): Promise { - ctx.logger.debug(`Creating new session for ${sessionKey}`, { sessionId }); - - const proxyInfo = await getRandomProxy(); - if (!proxyInfo) { - ctx.logger.error('No proxy available for QM session creation'); - return; - } - - // Convert ProxyInfo to string format - const auth = proxyInfo.username && proxyInfo.password ? - `${proxyInfo.username}:${proxyInfo.password}@` : ''; - const proxy = `${proxyInfo.protocol}://${auth}${proxyInfo.host}:${proxyInfo.port}`; - - const newSession: QMSession = { - proxy: proxy, - headers: getQmHeaders(), - successfulCalls: 0, - failedCalls: 0, - lastUsed: new Date(), - }; - - try { - const sessionResponse = await fetch( - `${QM_CONFIG.BASE_URL}${QM_CONFIG.AUTH_PATH}/${sessionId}`, - { - method: 'GET', - headers: newSession.headers, - signal: AbortSignal.timeout(SESSION_CONFIG.SESSION_TIMEOUT), - } - ); - - ctx.logger.debug('Session response received', { - status: sessionResponse.status, - sessionKey, - }); - - if (!sessionResponse.ok) { - ctx.logger.error('Failed to create QM session', { - sessionKey, - sessionId, - status: sessionResponse.status, - statusText: sessionResponse.statusText, - }); - return; - } - - const sessionData = await sessionResponse.json(); - - // Add token to headers - newSession.headers['Datatool-Token'] = sessionData.token; - - // Add session to manager - const sessionManager = QMSessionManager.getInstance(); - sessionManager.addSession(sessionId, newSession); - - // Cache successful session creation - const cacheService = ctx.resolve('cache'); - await cacheService.set( - `successful-session:${sessionKey}:${Date.now()}`, - { sessionId, proxy, tokenExists: !!sessionData.token }, - { ttl: 300 } - ); - - ctx.logger.info('QM session created successfully', { - sessionKey, - sessionId, - proxy: newSession.proxy, - sessionCount: sessionManager.getSessions(sessionId).length, - hasToken: !!sessionData.token - }); - - } catch (error) { - if (error.name === 'TimeoutError') { - ctx.logger.warn('QM session creation timed out', { sessionKey, sessionId }); - } else { - ctx.logger.error('Error creating QM session', { sessionKey, sessionId, error }); - } - - // Cache failed session attempt for debugging - const cacheService = ctx.resolve('cache'); - await cacheService.set( - `failed-session:${sessionKey}:${Date.now()}`, - { sessionId, proxy, error: error.message }, - { ttl: 300 } - ); - } -} - -export async function initializeQMResources(container?: ServiceContainer): Promise { - if (!container) { - throw new Error('Service container is required for QM resource initialization'); - } - - const ctx = new OperationContext('qm-handler', 'initialize-resources', container); - - try { - const cache = ctx.resolve('cache'); - - // Check if already initialized - const alreadyInitialized = await cache.get('initialized'); - if (alreadyInitialized) { - ctx.logger.debug('QM resources already initialized'); - return; - } - - ctx.logger.debug('Initializing QM resources...'); - - // Mark as initialized in cache and session manager - await cache.set('initialized', true, { ttl: 3600 }); - await cache.set('initialization-time', new Date().toISOString()); - - const sessionManager = QMSessionManager.getInstance(); - sessionManager.setInitialized(true); - - ctx.logger.info('QM resources initialized successfully'); - } catch (error) { - ctx.logger.error('Failed to initialize QM resources', { error }); - throw error; - } -} \ No newline at end of file diff --git a/apps/data-ingestion/src/handlers/qm/operations/spider.operations.ts b/apps/data-ingestion/src/handlers/qm/operations/spider.operations.ts deleted file mode 100644 index b0a42be..0000000 --- a/apps/data-ingestion/src/handlers/qm/operations/spider.operations.ts +++ /dev/null @@ -1,273 +0,0 @@ -/** - * QM Spider Operations - Symbol spider search functionality - */ - -import { OperationContext } from '@stock-bot/di'; -import { QueueManager } from '@stock-bot/queue'; - -import { QMSessionManager } from '../shared/session-manager'; -import { QM_SESSION_IDS } from '../shared/config'; -import type { ServiceContainer } from '@stock-bot/di'; -import type { SymbolSpiderJob, SpiderResult } from '../shared/types'; -import { initializeQMResources } from './session.operations'; -import { searchQMSymbolsAPI } from './symbols.operations'; - -export async function spiderSymbolSearch( - payload: SymbolSpiderJob, - container: ServiceContainer -): Promise { - const ctx = OperationContext.create('qm', 'spider', { container }); - - try { - const { prefix, depth, source = 'qm', maxDepth = 4 } = payload; - - ctx.logger.info('Starting spider search', { - prefix: prefix || 'ROOT', - depth, - source, - maxDepth - }); - - // Check cache for recent results - const cacheKey = `search-result:${prefix || 'ROOT'}:${depth}`; - const cachedResult = await ctx.cache.get(cacheKey); - if (cachedResult) { - ctx.logger.debug('Using cached spider search result', { prefix, depth }); - return cachedResult; - } - - // Ensure resources are initialized - const sessionManager = QMSessionManager.getInstance(); - if (!sessionManager.getInitialized()) { - await initializeQMResources(container); - } - - let result: SpiderResult; - - // Root job: Create A-Z jobs - if (prefix === null || prefix === undefined || prefix === '') { - result = await createAlphabetJobs(source, maxDepth, ctx); - } else { - // Leaf job: Search for symbols with this prefix - result = await searchAndSpawnJobs(prefix, depth, source, maxDepth, ctx, container); - } - - // Cache the result - await ctx.cache.set(cacheKey, result, { ttl: 3600 }); - - // Store spider operation metrics in cache instead of PostgreSQL for now - try { - const statsKey = `spider-stats:${prefix || 'ROOT'}:${depth}:${Date.now()}`; - await ctx.cache.set(statsKey, { - handler: 'qm', - operation: 'spider', - prefix: prefix || 'ROOT', - depth, - symbolsFound: result.symbolsFound, - jobsCreated: result.jobsCreated, - searchTime: new Date().toISOString() - }, { ttl: 86400 }); // Keep for 24 hours - } catch (error) { - ctx.logger.debug('Failed to store spider stats in cache', { error }); - } - - ctx.logger.info('Spider search completed', { - prefix: prefix || 'ROOT', - depth, - success: result.success, - symbolsFound: result.symbolsFound, - jobsCreated: result.jobsCreated - }); - - return result; - - } catch (error) { - ctx.logger.error('Spider symbol search failed', { error, payload }); - const failedResult = { success: false, symbolsFound: 0, jobsCreated: 0 }; - - // Cache failed result for a shorter time - const cacheKey = `search-result:${payload.prefix || 'ROOT'}:${payload.depth}`; - await ctx.cache.set(cacheKey, failedResult, { ttl: 300 }); - - return failedResult; - } finally { - await ctx.dispose(); - } -} - -async function createAlphabetJobs( - source: string, - maxDepth: number, - ctx: OperationContext -): Promise { - try { - const queueManager = QueueManager.getInstance(); - const queue = queueManager.getQueue('qm'); - let jobsCreated = 0; - - ctx.logger.info('Creating alphabet jobs (A-Z)'); - - // Create jobs for A-Z - for (let i = 0; i < 26; i++) { - const letter = String.fromCharCode(65 + i); // A=65, B=66, etc. - - const job: SymbolSpiderJob = { - prefix: letter, - depth: 1, - source, - maxDepth, - }; - - await queue.add( - 'spider-symbol-search', - { - handler: 'qm', - operation: 'spider-symbol-search', - payload: job, - }, - { - priority: 5, - delay: i * 100, // Stagger jobs by 100ms - attempts: 3, - backoff: { type: 'exponential', delay: 2000 }, - } - ); - - jobsCreated++; - } - - // Cache alphabet job creation - await ctx.cache.set('alphabet-jobs-created', { - count: jobsCreated, - timestamp: new Date().toISOString(), - source, - maxDepth - }, { ttl: 3600 }); - - ctx.logger.info(`Created ${jobsCreated} alphabet jobs (A-Z)`); - return { success: true, symbolsFound: 0, jobsCreated }; - - } catch (error) { - ctx.logger.error('Failed to create alphabet jobs', { error }); - return { success: false, symbolsFound: 0, jobsCreated: 0 }; - } -} - -async function searchAndSpawnJobs( - prefix: string, - depth: number, - source: string, - maxDepth: number, - ctx: OperationContext, - container: ServiceContainer -): Promise { - try { - // Ensure sessions exist for symbol search - const sessionManager = QMSessionManager.getInstance(); - const lookupSession = sessionManager.getSession(QM_SESSION_IDS.LOOKUP); - - if (!lookupSession) { - ctx.logger.info('No lookup sessions available, creating sessions first...'); - const { createSessions } = await import('./session.operations'); - await createSessions(container); - - // Wait a bit for session creation - await new Promise(resolve => setTimeout(resolve, 1000)); - } - - // Search for symbols with this prefix - const symbols = await searchQMSymbolsAPI(prefix, container); - const symbolCount = symbols.length; - - ctx.logger.info(`Prefix "${prefix}" returned ${symbolCount} symbols`); - - let jobsCreated = 0; - - // Store symbols in MongoDB - if (ctx.mongodb && symbols.length > 0) { - try { - const updatedSymbols = symbols.map((symbol: Record) => ({ - ...symbol, - qmSearchCode: symbol.symbol, - symbol: (symbol.symbol as string)?.split(':')[0], - searchPrefix: prefix, - searchDepth: depth, - discoveredAt: new Date() - })); - - await ctx.mongodb.batchUpsert('qmSymbols', updatedSymbols, ['qmSearchCode']); - ctx.logger.debug('Stored symbols in MongoDB', { count: symbols.length }); - } catch (error) { - ctx.logger.warn('Failed to store symbols in MongoDB', { error }); - } - } - - // If we have 50+ symbols and haven't reached max depth, spawn sub-jobs - if (symbolCount >= 50 && depth < maxDepth) { - const queueManager = QueueManager.getInstance(); - const queue = queueManager.getQueue('qm'); - - ctx.logger.info(`Spawning sub-jobs for prefix "${prefix}" (${symbolCount} >= 50 symbols)`); - - // Create jobs for prefixA, prefixB, prefixC... prefixZ - for (let i = 0; i < 26; i++) { - const letter = String.fromCharCode(65 + i); - const newPrefix = prefix + letter; - - const job: SymbolSpiderJob = { - prefix: newPrefix, - depth: depth + 1, - source, - maxDepth, - }; - - await queue.add( - 'spider-symbol-search', - { - handler: 'qm', - operation: 'spider-symbol-search', - payload: job, - }, - { - priority: Math.max(1, 6 - depth), // Higher priority for deeper jobs - delay: i * 50, // Stagger sub-jobs by 50ms - attempts: 3, - backoff: { type: 'exponential', delay: 2000 }, - } - ); - - jobsCreated++; - } - - // Cache sub-job creation info - await ctx.cache.set(`sub-jobs:${prefix}`, { - parentPrefix: prefix, - depth, - symbolCount, - jobsCreated, - timestamp: new Date().toISOString() - }, { ttl: 3600 }); - - ctx.logger.info(`Created ${jobsCreated} sub-jobs for prefix "${prefix}"`); - } else { - // Terminal case: save symbols (already done above) - ctx.logger.info(`Terminal case for prefix "${prefix}": ${symbolCount} symbols saved`); - - // Cache terminal case info - await ctx.cache.set(`terminal:${prefix}`, { - prefix, - depth, - symbolCount, - isTerminal: true, - reason: symbolCount < 50 ? 'insufficient_symbols' : 'max_depth_reached', - timestamp: new Date().toISOString() - }, { ttl: 3600 }); - } - - return { success: true, symbolsFound: symbolCount, jobsCreated }; - - } catch (error) { - ctx.logger.error(`Failed to search and spawn jobs for prefix "${prefix}"`, { error, depth }); - return { success: false, symbolsFound: 0, jobsCreated: 0 }; - } -} \ No newline at end of file diff --git a/apps/data-ingestion/src/handlers/qm/operations/symbols.operations.ts b/apps/data-ingestion/src/handlers/qm/operations/symbols.operations.ts deleted file mode 100644 index 995bcbf..0000000 --- a/apps/data-ingestion/src/handlers/qm/operations/symbols.operations.ts +++ /dev/null @@ -1,200 +0,0 @@ -/** - * QM Symbols Operations - Symbol fetching and API interactions - */ - -import { OperationContext } from '@stock-bot/di'; -import { getRandomProxy } from '@stock-bot/utils'; -import type { ServiceContainer } from '@stock-bot/di'; - -import { QMSessionManager } from '../shared/session-manager'; -import { QM_SESSION_IDS, QM_CONFIG, SESSION_CONFIG } from '../shared/config'; -import type { SymbolSpiderJob, Exchange } from '../shared/types'; -import { initializeQMResources } from './session.operations'; -import { spiderSymbolSearch } from './spider.operations'; - -export async function fetchSymbols(container: ServiceContainer): Promise { - const ctx = OperationContext.create('qm', 'symbols', { container }); - - try { - const sessionManager = QMSessionManager.getInstance(); - if (!sessionManager.getInitialized()) { - await initializeQMResources(container); - } - - ctx.logger.info('Starting QM spider-based symbol search...'); - - // Check if we have a recent symbol fetch - const lastFetch = await ctx.cache.get('last-symbol-fetch'); - if (lastFetch) { - ctx.logger.info('Recent symbol fetch found, using spider search'); - } - - // Start the spider process with root job - const rootJob: SymbolSpiderJob = { - prefix: null, // Root job creates A-Z jobs - depth: 0, - source: 'qm', - maxDepth: 4, - }; - - const result = await spiderSymbolSearch(rootJob); - - if (result.success) { - // Cache successful fetch info - await ctx.cache.set('last-symbol-fetch', { - timestamp: new Date().toISOString(), - jobsCreated: result.jobsCreated, - success: true - }, { ttl: 3600 }); - - ctx.logger.info( - `QM spider search initiated successfully. Created ${result.jobsCreated} initial jobs` - ); - return [`Spider search initiated with ${result.jobsCreated} jobs`]; - } else { - ctx.logger.error('Failed to initiate QM spider search'); - return null; - } - } catch (error) { - ctx.logger.error('Failed to start QM spider symbol search', { error }); - return null; - } finally { - await ctx.dispose(); - } -} - -export async function searchQMSymbolsAPI(query: string, container: ServiceContainer): Promise { - const ctx = OperationContext.create('qm', 'api-search', { container }); - - const proxyInfo = await getRandomProxy(); - if (!proxyInfo) { - throw new Error('No proxy available for QM API call'); - } - - const sessionManager = QMSessionManager.getInstance(); - const session = sessionManager.getSession(QM_SESSION_IDS.LOOKUP); - - if (!session) { - throw new Error(`No active session found for QM API with ID: ${QM_SESSION_IDS.LOOKUP}`); - } - - try { - ctx.logger.debug('Searching QM symbols API', { query, proxy: session.proxy }); - - // Check cache for recent API results - const cacheKey = `api-search:${query}`; - const cachedResult = await ctx.cache.get(cacheKey); - if (cachedResult) { - ctx.logger.debug('Using cached API search result', { query }); - return cachedResult; - } - - // QM lookup endpoint for symbol search - const searchParams = new URLSearchParams({ - marketType: 'equity', - pathName: '/demo/portal/company-summary.php', - q: query, - qmodTool: 'SmartSymbolLookup', - searchType: 'symbol', - showFree: 'false', - showHisa: 'false', - webmasterId: '500' - }); - - const apiUrl = `${QM_CONFIG.LOOKUP_URL}?${searchParams.toString()}`; - - const response = await fetch(apiUrl, { - method: 'GET', - headers: session.headers, - signal: AbortSignal.timeout(SESSION_CONFIG.API_TIMEOUT), - }); - - if (!response.ok) { - throw new Error(`QM API request failed: ${response.status} ${response.statusText}`); - } - - const symbols = await response.json(); - - // Update session stats - session.successfulCalls++; - session.lastUsed = new Date(); - - // Process symbols and extract exchanges - if (ctx.mongodb && symbols.length > 0) { - try { - const updatedSymbols = symbols.map((symbol: Record) => ({ - ...symbol, - qmSearchCode: symbol.symbol, - symbol: (symbol.symbol as string)?.split(':')[0], - searchQuery: query, - fetchedAt: new Date() - })); - - await ctx.mongodb.batchUpsert('qmSymbols', updatedSymbols, ['qmSearchCode']); - - // Extract and store unique exchanges - const exchanges: Exchange[] = []; - for (const symbol of symbols) { - if (!exchanges.some(ex => ex.exchange === symbol.exchange)) { - exchanges.push({ - exchange: symbol.exchange, - exchangeCode: symbol.exchangeCode, - exchangeShortName: symbol.exchangeShortName, - countryCode: symbol.countryCode, - source: 'qm', - }); - } - } - - if (exchanges.length > 0) { - await ctx.mongodb.batchUpsert('qmExchanges', exchanges, ['exchange']); - ctx.logger.debug('Stored exchanges in MongoDB', { count: exchanges.length }); - } - - } catch (error) { - ctx.logger.warn('Failed to store symbols/exchanges in MongoDB', { error }); - } - } - - // Cache the result - await ctx.cache.set(cacheKey, symbols, { ttl: 1800 }); // 30 minutes - - // Store API call stats - await ctx.cache.set(`api-stats:${query}:${Date.now()}`, { - query, - symbolCount: symbols.length, - proxy: session.proxy, - success: true, - timestamp: new Date().toISOString() - }, { ttl: 3600 }); - - ctx.logger.info( - `QM API returned ${symbols.length} symbols for query: ${query}`, - { proxy: session.proxy, symbolCount: symbols.length } - ); - - return symbols; - - } catch (error) { - // Update session failure stats - session.failedCalls++; - session.lastUsed = new Date(); - - // Cache failed API call info - await ctx.cache.set(`api-failure:${query}:${Date.now()}`, { - query, - error: error.message, - proxy: session.proxy, - timestamp: new Date().toISOString() - }, { ttl: 600 }); - - ctx.logger.error(`Error searching QM symbols for query "${query}"`, { - error: error.message, - proxy: session.proxy - }); - - throw error; - } finally { - await ctx.dispose(); - } -} \ No newline at end of file diff --git a/apps/data-ingestion/src/handlers/qm/qm.handler.ts b/apps/data-ingestion/src/handlers/qm/qm.handler.ts index e1384a0..07db41d 100644 --- a/apps/data-ingestion/src/handlers/qm/qm.handler.ts +++ b/apps/data-ingestion/src/handlers/qm/qm.handler.ts @@ -14,60 +14,23 @@ export class QMHandler extends BaseHandler { super(services); // Handler name read from @Handler decorator } - @Operation('create-sessions') + @Operation('check-sessions') @QueueSchedule('0 */15 * * *', { priority: 7, immediately: true, - description: 'Create and maintain QM sessions' + description: 'Check and maintain QM sessions' }) - async createSessions(input: unknown, context: ExecutionContext): Promise { - this.logger.info('Creating QM sessions...'); - - try { - // Check existing sessions in cache - const sessionKey = 'qm:sessions:active'; - const existingSessions = await this.cache.get(sessionKey) || []; - - this.logger.info('Current QM sessions', { - existing: existingSessions.length, - action: 'creating_new_sessions' - }); - - // Create new session - const newSession = { - id: `qm-session-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`, - createdAt: new Date().toISOString(), - status: 'active', - provider: 'quotemedia', - // Add other session properties as needed - }; - - // Add to existing sessions - const updatedSessions = [...existingSessions, newSession]; - - // Store sessions in cache with 24 hour TTL (sessions are temporary) - await this.cache.set(sessionKey, updatedSessions, 86400); // 24 hours - - // Store session stats for monitoring - await this.cache.set('qm:sessions:count', updatedSessions.length, 3600); - await this.cache.set('qm:sessions:last-created', new Date().toISOString(), 1800); - - this.logger.info('QM session created', { - sessionId: newSession.id, - totalSessions: updatedSessions.length - }); - - return { - success: true, - sessionId: newSession.id, - totalSessions: updatedSessions.length, - message: 'QM session created successfully' - }; - - } catch (error) { - this.logger.error('Failed to create QM sessions', { error }); - throw error; - } + async checkSessions(input: unknown, context: ExecutionContext): Promise { + // Call the session maintenance action + const { checkSessions } = await import('./actions/session.action'); + return await checkSessions(this.services); + } + + @Operation('create-session') + async createSession(input: unknown, context: ExecutionContext): Promise { + // Call the individual session creation action + const { createSingleSession } = await import('./actions/session.action'); + return await createSingleSession(this.services, input); } @Operation('search-symbols')