diff --git a/apps/stock/config/config/default.json b/apps/stock/config/config/default.json index 255bbcc..dde869a 100644 --- a/apps/stock/config/config/default.json +++ b/apps/stock/config/config/default.json @@ -77,8 +77,8 @@ "port": 6379, "db": 1 }, - "workers": 10, - "concurrency": 4, + "workers": 1, + "concurrency": 1, "enableScheduledJobs": true, "defaultJobOptions": { "attempts": 3, diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/index.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/index.ts index ebae778..c9eedd1 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/index.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/index.ts @@ -10,4 +10,5 @@ export { schedulePriceUpdates, updatePrices } from './prices.action'; export { checkSessions, createSession } from './session.action'; export { scheduleSymbolInfoUpdates, updateSymbolInfo } from './symbol-info.action'; export { searchSymbols, spiderSymbol } from './symbol.action'; +export { deduplicateSymbols, updateExchangeStats, updateExchangeStatsAndDeduplicate } from './symbol-dedup.action'; diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/symbol-dedup.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/symbol-dedup.action.ts new file mode 100644 index 0000000..b4cc6ac --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/symbol-dedup.action.ts @@ -0,0 +1,269 @@ +/** + * QM Symbol Deduplication and Exchange Statistics + * - Updates exchange statistics (symbol count, market cap) + * - Assigns priority to exchanges + * - Marks active/inactive symbols to eliminate duplicates + */ + +import type { ExecutionContext } from '@stock-bot/handlers'; +import type { QMHandler } from '../qm.handler'; +import type { ExchangeStats } from '../shared/types'; + +/** + * Exchange priority mapping + */ +const EXCHANGE_PRIORITIES: Record = { + // Major US exchanges + 'NYSE': 10, + 'NASDAQ': 10, + 'NYSE American': 9, + + // Other US exchanges + 'EDGX': 5, + 'BZX': 5, + 'ARCA': 5, + + // Canadian main exchanges + 'TSX': 8, + 'TSXV': 8, + 'CSE': 7, + + // Canadian quote exchanges + 'CCQ TSX': 3, + 'CCQ TSXV': 3, + + // Default for others + 'default': 1 +}; + +/** + * Update exchange statistics + */ +export async function updateExchangeStats( + this: QMHandler, + _input: any, + _context: ExecutionContext +): Promise<{ message: string; exchangesUpdated: number }> { + this.logger.info('Updating exchange statistics'); + + try { + // Get all exchanges + const exchanges = await this.mongodb.find('qmExchanges', {}); + + if (!exchanges || exchanges.length === 0) { + return { message: 'No exchanges found', exchangesUpdated: 0 }; + } + + // Calculate stats for each exchange + const bulkOps = []; + + for (const exchange of exchanges) { + // Get symbol statistics for this exchange + const stats = await this.mongodb.aggregate('qmSymbols', [ + { $match: { exchange: exchange.exchange } }, + { + $group: { + _id: null, + symbolCount: { $sum: 1 }, + totalMarketCap: { $sum: { $ifNull: ['$marketCap', 0] } }, + avgMarketCap: { $avg: { $ifNull: ['$marketCap', 0] } } + } + } + ]); + + const exchangeStats: ExchangeStats = stats[0] || { + symbolCount: 0, + totalMarketCap: 0, + avgMarketCap: 0 + }; + + // Get priority for this exchange + const priority = EXCHANGE_PRIORITIES[exchange.exchange] || EXCHANGE_PRIORITIES.default; + + // Prepare update + bulkOps.push({ + updateOne: { + filter: { exchange: exchange.exchange }, + update: { + $set: { + 'stats.symbolCount': exchangeStats.symbolCount, + 'stats.totalMarketCap': exchangeStats.totalMarketCap, + 'stats.avgMarketCap': Math.round(exchangeStats.avgMarketCap), + priority, + updated_at: new Date() + } + } + } + }); + } + + // Execute bulk update + if (bulkOps.length > 0) { + const collection = this.mongodb.collection('qmExchanges'); + await collection.bulkWrite(bulkOps); + } + + + this.logger.info(`Updated statistics for ${exchanges.length} exchanges`); + return { + message: `Updated statistics for ${exchanges.length} exchanges`, + exchangesUpdated: exchanges.length + }; + + } catch (error) { + this.logger.error('Failed to update exchange statistics', { error }); + throw error; + } +} + +/** + * Deduplicate symbols based on country and exchange rules + */ +export async function deduplicateSymbols( + this: QMHandler, + _input: any, + _context: ExecutionContext +): Promise<{ message: string; symbolsProcessed: number; deactivated: number }> { + this.logger.info('Starting symbol deduplication'); + + try { + // Get all unique symbol names with their documents + const symbolGroups = await this.mongodb.aggregate('qmSymbols', [ + { + $group: { + _id: '$symbol', + symbols: { + $push: { + qmSearchCode: '$qmSearchCode', + exchange: '$exchange', + countryCode: '$countryCode', + marketCap: { $ifNull: ['$marketCap', 0] }, + _id: '$_id' + } + } + } + } + ]); + + let totalProcessed = 0; + let totalDeactivated = 0; + const bulkOps = []; + + // Get exchange priorities for sorting + const exchangePriorities = await this.mongodb.find('qmExchanges', {}, { + projection: { exchange: 1, priority: 1 } + }); + const priorityMap = new Map(exchangePriorities.map(e => [e.exchange, e.priority || 1])); + + for (const group of symbolGroups) { + const symbol = group._id; + const documents = group.symbols; + + // Group by country + const byCountry = new Map(); + for (const doc of documents) { + const country = doc.countryCode || 'UNKNOWN'; + if (!byCountry.has(country)) { + byCountry.set(country, []); + } + byCountry.get(country)!.push(doc); + } + + // Process each country separately + for (const [country, countryDocs] of byCountry) { + if (countryDocs.length === 1) { + // Only one symbol in this country, mark as active + bulkOps.push({ + updateOne: { + filter: { _id: countryDocs[0]._id }, + update: { $set: { active: true } } + } + }); + totalProcessed++; + continue; + } + + // Multiple symbols in same country - need to choose one + let activeDoc = null; + + if (country === 'US') { + // For US, prefer where qmSearchCode === symbol + activeDoc = countryDocs.find(doc => doc.qmSearchCode === symbol); + } else { + // For non-US, prefer symbol:COUNTRYCODE format + const expectedFormat = `${symbol}:${country}`; + activeDoc = countryDocs.find(doc => doc.qmSearchCode === expectedFormat); + } + + // If no preferred format found, use highest priority exchange + if (!activeDoc) { + // Sort by exchange priority (desc) and market cap (desc) + countryDocs.sort((a, b) => { + const priorityA = priorityMap.get(a.exchange) || 1; + const priorityB = priorityMap.get(b.exchange) || 1; + if (priorityA !== priorityB) return priorityB - priorityA; + return (b.marketCap || 0) - (a.marketCap || 0); + }); + activeDoc = countryDocs[0]; + } + + // Mark active and inactive documents + for (const doc of countryDocs) { + const isActive = doc._id.equals(activeDoc._id); + bulkOps.push({ + updateOne: { + filter: { _id: doc._id }, + update: { $set: { active: isActive } } + } + }); + totalProcessed++; + if (!isActive) totalDeactivated++; + } + } + } + + // Execute bulk update + if (bulkOps.length > 0) { + const collection = this.mongodb.collection('qmSymbols'); + await collection.bulkWrite(bulkOps, { ordered: false }); + } + + + this.logger.info(`Deduplication complete: processed ${totalProcessed} symbols, deactivated ${totalDeactivated}`); + return { + message: `Processed ${totalProcessed} symbols, deactivated ${totalDeactivated} duplicates`, + symbolsProcessed: totalProcessed, + deactivated: totalDeactivated + }; + + } catch (error) { + this.logger.error('Failed to deduplicate symbols', { error }); + throw error; + } +} + +/** + * Run both exchange stats and symbol deduplication + */ +export async function updateExchangeStatsAndDeduplicate( + this: QMHandler, + input: any, + context: ExecutionContext +): Promise<{ message: string }> { + this.logger.info('Running exchange stats update and symbol deduplication'); + + try { + // First update exchange stats + const statsResult = await updateExchangeStats.call(this, input, context); + + // Then deduplicate symbols + const dedupResult = await deduplicateSymbols.call(this, input, context); + + return { + message: `${statsResult.message}. ${dedupResult.message}` + }; + } catch (error) { + this.logger.error('Failed to update stats and deduplicate', { error }); + throw error; + } +} \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts b/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts index 75133a7..9e910af 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts @@ -9,6 +9,7 @@ import type { DataIngestionServices } from '../../types'; import { checkSessions, createSession, + deduplicateSymbols, scheduleCorporateActionsUpdates, scheduleFilingsUpdates, scheduleFinancialsUpdates, @@ -18,6 +19,8 @@ import { searchSymbols, spiderSymbol, updateCorporateActions, + updateExchangeStats, + updateExchangeStatsAndDeduplicate, updateFilings, updateFinancials, updateIntradayBars, @@ -63,6 +66,25 @@ export class QMHandler extends BaseHandler { }) spiderSymbol = spiderSymbol; + /** + * EXCHANGE STATS & DEDUPLICATION + */ + @Operation('update-exchange-stats') + updateExchangeStats = updateExchangeStats; + + @Operation('deduplicate-symbols') + deduplicateSymbols = deduplicateSymbols; + + @Operation('update-exchange-stats-and-deduplicate') + updateExchangeStatsAndDeduplicate = updateExchangeStatsAndDeduplicate; + + @ScheduledOperation('schedule-exchange-stats-and-dedup', '0 1 * * *', { + priority: 7, + immediately: false, + description: 'Update exchange statistics and deduplicate symbols daily at 1 AM' + }) + scheduleExchangeStatsAndDedup = updateExchangeStatsAndDeduplicate; + /** * SYMBOL INFO */ diff --git a/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts b/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts index 80faa35..00c9864 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts @@ -42,8 +42,8 @@ export const QM_CONFIG = { // Session management settings export const SESSION_CONFIG = { - MIN_SESSIONS: 100, - MAX_SESSIONS: 100, + MIN_SESSIONS: 2, + MAX_SESSIONS: 2, MAX_FAILED_CALLS: 3, SESSION_TIMEOUT: 5000, // 10 seconds API_TIMEOUT: 30000, // 15 seconds diff --git a/apps/stock/data-ingestion/src/handlers/qm/shared/operation-tracker.ts b/apps/stock/data-ingestion/src/handlers/qm/shared/operation-tracker.ts index 1ab6f72..5edb761 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/shared/operation-tracker.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/shared/operation-tracker.ts @@ -217,6 +217,7 @@ export class QMOperationTracker { })(); const filter: any = { + active: { $ne: false }, // Only active symbols (active: true or active doesn't exist) $or: [ { [`operations.${operationName}.lastSuccessAt`]: { $lt: cutoffDate } }, { [`operations.${operationName}.lastSuccessAt`]: { $exists: false } }, @@ -249,7 +250,9 @@ export class QMOperationTracker { ): Promise { const { limit = 100, includeFinished = false } = options; - const filter: any = {}; + const filter: any = { + active: { $ne: false } // Only active symbols + }; if (!includeFinished) { filter[`operations.${operationName}.crawlState.finished`] = { $ne: true }; } @@ -308,7 +311,9 @@ export class QMOperationTracker { } = {} ): Promise> { const { limit = 500 } = options; - const filter: any = {}; + const filter: any = { + active: { $ne: false } // Only active symbols + }; if (options.neverRun) { filter[`operations.${operationName}`] = { $exists: false }; diff --git a/apps/stock/data-ingestion/src/handlers/qm/shared/types.ts b/apps/stock/data-ingestion/src/handlers/qm/shared/types.ts index eba642b..7848f39 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/shared/types.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/shared/types.ts @@ -91,3 +91,9 @@ export interface IntradayCrawlSymbol { lastCrawlDirection?: 'forward' | 'backward'; }; } + +export interface ExchangeStats { + symbolCount: number; + totalMarketCap: number; + avgMarketCap: number; +}