added deduplication and exchange stats

This commit is contained in:
Boki 2025-06-29 14:48:40 -04:00
parent 133f37e755
commit 100efb575f
7 changed files with 309 additions and 6 deletions

View file

@ -77,8 +77,8 @@
"port": 6379,
"db": 1
},
"workers": 10,
"concurrency": 4,
"workers": 1,
"concurrency": 1,
"enableScheduledJobs": true,
"defaultJobOptions": {
"attempts": 3,

View file

@ -10,4 +10,5 @@ export { schedulePriceUpdates, updatePrices } from './prices.action';
export { checkSessions, createSession } from './session.action';
export { scheduleSymbolInfoUpdates, updateSymbolInfo } from './symbol-info.action';
export { searchSymbols, spiderSymbol } from './symbol.action';
export { deduplicateSymbols, updateExchangeStats, updateExchangeStatsAndDeduplicate } from './symbol-dedup.action';

View file

@ -0,0 +1,269 @@
/**
* QM Symbol Deduplication and Exchange Statistics
* - Updates exchange statistics (symbol count, market cap)
* - Assigns priority to exchanges
* - Marks active/inactive symbols to eliminate duplicates
*/
import type { ExecutionContext } from '@stock-bot/handlers';
import type { QMHandler } from '../qm.handler';
import type { ExchangeStats } from '../shared/types';
/**
* Exchange priority mapping
*/
const EXCHANGE_PRIORITIES: Record<string, number> = {
// Major US exchanges
'NYSE': 10,
'NASDAQ': 10,
'NYSE American': 9,
// Other US exchanges
'EDGX': 5,
'BZX': 5,
'ARCA': 5,
// Canadian main exchanges
'TSX': 8,
'TSXV': 8,
'CSE': 7,
// Canadian quote exchanges
'CCQ TSX': 3,
'CCQ TSXV': 3,
// Default for others
'default': 1
};
/**
* Update exchange statistics
*/
export async function updateExchangeStats(
this: QMHandler,
_input: any,
_context: ExecutionContext
): Promise<{ message: string; exchangesUpdated: number }> {
this.logger.info('Updating exchange statistics');
try {
// Get all exchanges
const exchanges = await this.mongodb.find('qmExchanges', {});
if (!exchanges || exchanges.length === 0) {
return { message: 'No exchanges found', exchangesUpdated: 0 };
}
// Calculate stats for each exchange
const bulkOps = [];
for (const exchange of exchanges) {
// Get symbol statistics for this exchange
const stats = await this.mongodb.aggregate('qmSymbols', [
{ $match: { exchange: exchange.exchange } },
{
$group: {
_id: null,
symbolCount: { $sum: 1 },
totalMarketCap: { $sum: { $ifNull: ['$marketCap', 0] } },
avgMarketCap: { $avg: { $ifNull: ['$marketCap', 0] } }
}
}
]);
const exchangeStats: ExchangeStats = stats[0] || {
symbolCount: 0,
totalMarketCap: 0,
avgMarketCap: 0
};
// Get priority for this exchange
const priority = EXCHANGE_PRIORITIES[exchange.exchange] || EXCHANGE_PRIORITIES.default;
// Prepare update
bulkOps.push({
updateOne: {
filter: { exchange: exchange.exchange },
update: {
$set: {
'stats.symbolCount': exchangeStats.symbolCount,
'stats.totalMarketCap': exchangeStats.totalMarketCap,
'stats.avgMarketCap': Math.round(exchangeStats.avgMarketCap),
priority,
updated_at: new Date()
}
}
}
});
}
// Execute bulk update
if (bulkOps.length > 0) {
const collection = this.mongodb.collection('qmExchanges');
await collection.bulkWrite(bulkOps);
}
this.logger.info(`Updated statistics for ${exchanges.length} exchanges`);
return {
message: `Updated statistics for ${exchanges.length} exchanges`,
exchangesUpdated: exchanges.length
};
} catch (error) {
this.logger.error('Failed to update exchange statistics', { error });
throw error;
}
}
/**
* Deduplicate symbols based on country and exchange rules
*/
export async function deduplicateSymbols(
this: QMHandler,
_input: any,
_context: ExecutionContext
): Promise<{ message: string; symbolsProcessed: number; deactivated: number }> {
this.logger.info('Starting symbol deduplication');
try {
// Get all unique symbol names with their documents
const symbolGroups = await this.mongodb.aggregate('qmSymbols', [
{
$group: {
_id: '$symbol',
symbols: {
$push: {
qmSearchCode: '$qmSearchCode',
exchange: '$exchange',
countryCode: '$countryCode',
marketCap: { $ifNull: ['$marketCap', 0] },
_id: '$_id'
}
}
}
}
]);
let totalProcessed = 0;
let totalDeactivated = 0;
const bulkOps = [];
// Get exchange priorities for sorting
const exchangePriorities = await this.mongodb.find('qmExchanges', {}, {
projection: { exchange: 1, priority: 1 }
});
const priorityMap = new Map(exchangePriorities.map(e => [e.exchange, e.priority || 1]));
for (const group of symbolGroups) {
const symbol = group._id;
const documents = group.symbols;
// Group by country
const byCountry = new Map<string, any[]>();
for (const doc of documents) {
const country = doc.countryCode || 'UNKNOWN';
if (!byCountry.has(country)) {
byCountry.set(country, []);
}
byCountry.get(country)!.push(doc);
}
// Process each country separately
for (const [country, countryDocs] of byCountry) {
if (countryDocs.length === 1) {
// Only one symbol in this country, mark as active
bulkOps.push({
updateOne: {
filter: { _id: countryDocs[0]._id },
update: { $set: { active: true } }
}
});
totalProcessed++;
continue;
}
// Multiple symbols in same country - need to choose one
let activeDoc = null;
if (country === 'US') {
// For US, prefer where qmSearchCode === symbol
activeDoc = countryDocs.find(doc => doc.qmSearchCode === symbol);
} else {
// For non-US, prefer symbol:COUNTRYCODE format
const expectedFormat = `${symbol}:${country}`;
activeDoc = countryDocs.find(doc => doc.qmSearchCode === expectedFormat);
}
// If no preferred format found, use highest priority exchange
if (!activeDoc) {
// Sort by exchange priority (desc) and market cap (desc)
countryDocs.sort((a, b) => {
const priorityA = priorityMap.get(a.exchange) || 1;
const priorityB = priorityMap.get(b.exchange) || 1;
if (priorityA !== priorityB) return priorityB - priorityA;
return (b.marketCap || 0) - (a.marketCap || 0);
});
activeDoc = countryDocs[0];
}
// Mark active and inactive documents
for (const doc of countryDocs) {
const isActive = doc._id.equals(activeDoc._id);
bulkOps.push({
updateOne: {
filter: { _id: doc._id },
update: { $set: { active: isActive } }
}
});
totalProcessed++;
if (!isActive) totalDeactivated++;
}
}
}
// Execute bulk update
if (bulkOps.length > 0) {
const collection = this.mongodb.collection('qmSymbols');
await collection.bulkWrite(bulkOps, { ordered: false });
}
this.logger.info(`Deduplication complete: processed ${totalProcessed} symbols, deactivated ${totalDeactivated}`);
return {
message: `Processed ${totalProcessed} symbols, deactivated ${totalDeactivated} duplicates`,
symbolsProcessed: totalProcessed,
deactivated: totalDeactivated
};
} catch (error) {
this.logger.error('Failed to deduplicate symbols', { error });
throw error;
}
}
/**
* Run both exchange stats and symbol deduplication
*/
export async function updateExchangeStatsAndDeduplicate(
this: QMHandler,
input: any,
context: ExecutionContext
): Promise<{ message: string }> {
this.logger.info('Running exchange stats update and symbol deduplication');
try {
// First update exchange stats
const statsResult = await updateExchangeStats.call(this, input, context);
// Then deduplicate symbols
const dedupResult = await deduplicateSymbols.call(this, input, context);
return {
message: `${statsResult.message}. ${dedupResult.message}`
};
} catch (error) {
this.logger.error('Failed to update stats and deduplicate', { error });
throw error;
}
}

View file

@ -9,6 +9,7 @@ import type { DataIngestionServices } from '../../types';
import {
checkSessions,
createSession,
deduplicateSymbols,
scheduleCorporateActionsUpdates,
scheduleFilingsUpdates,
scheduleFinancialsUpdates,
@ -18,6 +19,8 @@ import {
searchSymbols,
spiderSymbol,
updateCorporateActions,
updateExchangeStats,
updateExchangeStatsAndDeduplicate,
updateFilings,
updateFinancials,
updateIntradayBars,
@ -63,6 +66,25 @@ export class QMHandler extends BaseHandler<DataIngestionServices> {
})
spiderSymbol = spiderSymbol;
/**
* EXCHANGE STATS & DEDUPLICATION
*/
@Operation('update-exchange-stats')
updateExchangeStats = updateExchangeStats;
@Operation('deduplicate-symbols')
deduplicateSymbols = deduplicateSymbols;
@Operation('update-exchange-stats-and-deduplicate')
updateExchangeStatsAndDeduplicate = updateExchangeStatsAndDeduplicate;
@ScheduledOperation('schedule-exchange-stats-and-dedup', '0 1 * * *', {
priority: 7,
immediately: false,
description: 'Update exchange statistics and deduplicate symbols daily at 1 AM'
})
scheduleExchangeStatsAndDedup = updateExchangeStatsAndDeduplicate;
/**
* SYMBOL INFO
*/

View file

@ -42,8 +42,8 @@ export const QM_CONFIG = {
// Session management settings
export const SESSION_CONFIG = {
MIN_SESSIONS: 100,
MAX_SESSIONS: 100,
MIN_SESSIONS: 2,
MAX_SESSIONS: 2,
MAX_FAILED_CALLS: 3,
SESSION_TIMEOUT: 5000, // 10 seconds
API_TIMEOUT: 30000, // 15 seconds

View file

@ -217,6 +217,7 @@ export class QMOperationTracker {
})();
const filter: any = {
active: { $ne: false }, // Only active symbols (active: true or active doesn't exist)
$or: [
{ [`operations.${operationName}.lastSuccessAt`]: { $lt: cutoffDate } },
{ [`operations.${operationName}.lastSuccessAt`]: { $exists: false } },
@ -249,7 +250,9 @@ export class QMOperationTracker {
): Promise<IntradayCrawlSymbol[]> {
const { limit = 100, includeFinished = false } = options;
const filter: any = {};
const filter: any = {
active: { $ne: false } // Only active symbols
};
if (!includeFinished) {
filter[`operations.${operationName}.crawlState.finished`] = { $ne: true };
}
@ -308,7 +311,9 @@ export class QMOperationTracker {
} = {}
): Promise<Array<{ qmSearchCode: string; lastRecordDate?: Date }>> {
const { limit = 500 } = options;
const filter: any = {};
const filter: any = {
active: { $ne: false } // Only active symbols
};
if (options.neverRun) {
filter[`operations.${operationName}`] = { $exists: false };

View file

@ -91,3 +91,9 @@ export interface IntradayCrawlSymbol {
lastCrawlDirection?: 'forward' | 'backward';
};
}
export interface ExchangeStats {
symbolCount: number;
totalMarketCap: number;
avgMarketCap: number;
}