diff --git a/apps/stock/config/config/default.json b/apps/stock/config/config/default.json index 34374b8..621545c 100644 --- a/apps/stock/config/config/default.json +++ b/apps/stock/config/config/default.json @@ -77,8 +77,8 @@ "port": 6379, "db": 1 }, - "workers": 5, - "concurrency": 2, + "workers": 10, + "concurrency": 5, "enableScheduledJobs": true, "defaultJobOptions": { "attempts": 3, diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/symbol.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/symbol.action.ts index a943a8a..9f091c9 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/symbol.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/symbol.action.ts @@ -23,7 +23,7 @@ export async function spiderSymbol( }> { const { prefix, depth = 0, maxDepth = 4 } = input || {}; - this.logger.info('Spider symbol search', { prefix, depth, maxDepth }); + this.logger.info(`Spider symbol search ${prefix}`, { prefix, depth, maxDepth }); if (!prefix) { // Root job - create A-Z jobs @@ -60,9 +60,9 @@ export async function spiderSymbol( }; } - await this.mongodb.batchUpsert('qm_symbols', symbols, ['qmSearchCode']); + await this.mongodb.batchUpsert('qmSymbols', symbols, ['qmSearchCode']); - this.logger.info('Stored symbols from spider search', { + this.logger.info(`Stored symbols from spider search ${prefix} - ${symbols.length}`, { prefix, count: symbols.length }); @@ -82,7 +82,7 @@ export async function spiderSymbol( } if (exchanges.length > 0) { - await this.mongodb.batchUpsert('qm_exchanges', exchanges, ['exchange']); + await this.mongodb.batchUpsert('qmExchanges', exchanges, ['exchange']); this.logger.debug('Stored exchanges from spider search', { count: exchanges.length }); @@ -122,7 +122,7 @@ export async function spiderSymbol( }; } catch (error) { - this.logger.error('Spider search failed', { prefix, error }); + this.logger.error(`Spider search failed ${prefix}`, { prefix, error }); return { message: `Spider search failed for prefix: ${prefix}`, symbolsFound: 0 @@ -190,7 +190,7 @@ export async function searchSymbols( symbol: (symbol.symbol as string)?.split(':')[0] || '', })) : []; - this.logger.info('QM API returned symbols', { + this.logger.debug('QM API returned symbols ${query} - ${processedSymbols.length}', { query, count: processedSymbols.length }); diff --git a/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts b/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts index 6e9e7ac..7a7c1a7 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts @@ -12,6 +12,9 @@ export class QMHandler extends BaseHandler { super(services); // Handler name read from @Handler decorator } + /** + * SESSIONS + */ @ScheduledOperation('check-sessions', '*/2 * * * *', { priority: 8, immediately: false, @@ -22,7 +25,10 @@ export class QMHandler extends BaseHandler { @Operation('create-session') createSession = createSession; - @ScheduledOperation('spider-symbols', '* * * * *', { + /** + * SYMBOLS + */ + @ScheduledOperation('spider-symbols', '0 0 * * 0', { priority: 9, immediately: false, description: 'Weekly comprehensive symbol search using QM API spider - runs every Saturday at midnight' diff --git a/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts b/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts index 29d8d57..eac56f7 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts @@ -32,8 +32,8 @@ export const QM_CONFIG = { // Session management settings export const SESSION_CONFIG = { - MIN_SESSIONS: 2, - MAX_SESSIONS: 5, + MIN_SESSIONS: 50, + MAX_SESSIONS: 100, MAX_FAILED_CALLS: 3, SESSION_TIMEOUT: 5000, // 10 seconds API_TIMEOUT: 30000, // 15 seconds diff --git a/apps/stock/data-ingestion/src/handlers/qm/shared/session-manager.ts b/apps/stock/data-ingestion/src/handlers/qm/shared/session-manager.ts index 9e11962..e28d4ad 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/shared/session-manager.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/shared/session-manager.ts @@ -69,23 +69,59 @@ export class QMSessionManager { * Get a random session for the given session ID */ async getSession(sessionId: string): Promise { - // Always load fresh data from cache - await this.loadFromCache(); + let retries = 3; + let session: QMSession | null = null; - const sessions = this.sessionCache[sessionId]; - if (!sessions || sessions.length === 0) { - return null; - } + while (retries > 0 && !session) { + // Always load fresh data from cache + await this.loadFromCache(); + + const sessions = this.sessionCache[sessionId]; + + if (!sessions || sessions.length === 0) { + retries--; + if (retries > 0) { + this.logger?.debug(`No sessions found for ${sessionId}, retrying... (${retries} attempts left)`); + await new Promise(resolve => setTimeout(resolve, 500)); + continue; + } + + this.logger?.error(`No sessions found for sessionId: ${sessionId}`, { + availableSessionIds: Object.keys(this.sessionCache), + sessionCounts: Object.entries(this.sessionCache).map(([id, s]) => ({ id, count: s.length })), + }); + return null; + } - // Filter out sessions with excessive failures - const validSessions = sessions.filter( - session => session.failedCalls <= SESSION_CONFIG.MAX_FAILED_CALLS - ); - if (validSessions.length === 0) { - return null; - } + // Filter out sessions with excessive failures + const validSessions = sessions.filter( + session => session.failedCalls <= SESSION_CONFIG.MAX_FAILED_CALLS + ); + + if (validSessions.length === 0) { + retries--; + if (retries > 0) { + this.logger?.debug(`No valid sessions after filtering, retrying... (${retries} attempts left)`); + await new Promise(resolve => setTimeout(resolve, 500)); + continue; + } + + this.logger?.error(`No valid sessions after filtering for sessionId: ${sessionId}`, { + totalSessions: sessions.length, + maxFailedCalls: SESSION_CONFIG.MAX_FAILED_CALLS, + }); + return null; + } - return validSessions[Math.floor(Math.random() * validSessions.length)]; + session = validSessions[Math.floor(Math.random() * validSessions.length)]; + this.logger?.trace(`Selected session`, { + uuid: session.uuid, + failedCalls: session.failedCalls, + successfulCalls: session.successfulCalls, + }); + } + + return session; } /** @@ -200,6 +236,7 @@ export class QMSessionManager { return stats; } + /** * Mark manager as initialized (deprecated - we always load from cache now) */ @@ -231,7 +268,12 @@ export class QMSessionManager { const listKey = `qm:sessions:${sessionType.toLowerCase()}:list`; const sessionIds = await this.cacheProvider.get(listKey); - this.logger?.trace(`Loading ${sessionType} sessions`, { sessionIds }); + this.logger?.trace(`Loading ${sessionType} sessions`, { + sessionType, + sessionId, + listKey, + sessionIds + }); if (sessionIds && Array.isArray(sessionIds)) { const sessions: QMSession[] = []; diff --git a/libs/data/mongodb/src/client.ts b/libs/data/mongodb/src/client.ts index fbe19bc..9086573 100644 --- a/libs/data/mongodb/src/client.ts +++ b/libs/data/mongodb/src/client.ts @@ -1,6 +1,6 @@ +import type { Logger } from '@stock-bot/core/logger'; import type { OptionalUnlessRequiredId } from 'mongodb'; import { Collection, Db, MongoClient } from 'mongodb'; -import type { Logger } from '@stock-bot/core/logger'; import type { ConnectionEvents, DocumentBase, @@ -227,7 +227,7 @@ export class MongoDBClient { let totalUpdated = 0; const errors: unknown[] = []; - this.logger.info( + this.logger.debug( `Starting batch upsert operation [${collectionName}-${documents.length}][${operationId}]`, { database: dbName, @@ -312,7 +312,7 @@ export class MongoDBClient { } } - this.logger.info(`Batch upsert completed [${operationId}]`, { + this.logger.debug(`Batch upsert completed [${collectionName}-${documents.length}][${operationId}]`, { database: dbName, collection: collectionName, totalRecords: documents.length,