stock-bot/apps/data-ingestion/src/handlers/ceo/ceo.handler.ts

216 lines
6.8 KiB
TypeScript

import {
BaseHandler,
Handler,
Operation,
ScheduledOperation,
type IServiceContainer
} from '@stock-bot/handlers';
import { getRandomUserAgent } from '@stock-bot/utils';
@Handler('ceo')
export class CeoHandler extends BaseHandler {
constructor(services: IServiceContainer) {
super(services); // Handler name read from @Handler decorator
}
@ScheduledOperation('update-ceo-channels', '0 */15 * * *', {
priority: 7,
immediately: true,
description: 'Get all CEO symbols and exchanges'
})
async updateCeoChannels(payload: number | undefined, handler: BaseHandler): Promise<unknown> {
const proxy = this.proxy?.getProxy();
if(!proxy) {
this.logger.warn('No proxy available for CEO channels update');
return;
}
let page;
if(payload === undefined) {
page = 1
}else{
page = payload;
}
this.logger.info(`Fetching CEO channels for page ${page} with proxy ${proxy}`);
const response = await this.http.get('https://api.ceo.ca/api/home?exchange=all&sort_by=symbol&sector=All&tab=companies&page='+page, {
proxy: proxy,
headers: {
'User-Agent': getRandomUserAgent()
}
})
const results = await response.json();
const channels = results.channel_categories[0].channels;
const totalChannels = results.channel_categories[0].total_channels;
const totalPages = Math.ceil(totalChannels / channels.length);
const exchanges: {exchange: string, countryCode: string}[] = []
const symbols = channels.map((channel: any) =>{
// check if exchange is in the exchanges array object
if(!exchanges.find((e: any) => e.exchange === channel.exchange)) {
exchanges.push({
exchange: channel.exchange,
countryCode: 'CA'
});
}
const details = channel.company_details || {};
return {
symbol: channel.symbol,
exchange: channel.exchange,
name: channel.title,
type: channel.type,
ceoId: channel.channel,
marketCap: details.market_cap,
volumeRatio: details.volume_ratio,
avgVolume: details.avg_volume,
stockType: details.stock_type,
issueType: details.issue_type,
sharesOutstanding: details.shares_outstanding,
float: details.float,
}
})
await this.mongodb.batchUpsert('ceoSymbols', symbols, ['symbol', 'exchange']);
await this.mongodb.batchUpsert('ceoExchanges', exchanges, ['exchange']);
if(page === 1) {
for( let i = 2; i <= totalPages; i++) {
this.logger.info(`Scheduling page ${i} of ${totalPages} for CEO channels`);
this.scheduleOperation('update-ceo-channels', i)
}
}
this.logger.info(`Fetched CEO channels for page ${page}/${totalPages}`);
return { page, totalPages };
}
@Operation('process-unique-symbols')
@ScheduledOperation('process-unique-symbols', '0 */30 * * *', {
priority: 5,
immediately: false,
description: 'Process unique CEO symbols and schedule individual jobs'
})
async processUniqueSymbols(_payload: unknown, _context: any): Promise<unknown> {
this.logger.info('Starting process to get unique CEO symbols by ceoId');
try {
// Get unique ceoId values from the ceoSymbols collection
const uniqueCeoIds = await this.mongodb.collection('ceoSymbols').distinct('ceoId');
this.logger.info(`Found ${uniqueCeoIds.length} unique CEO IDs`);
// Get detailed records for each unique ceoId (latest/first record)
const uniqueSymbols = [];
for (const ceoId of uniqueCeoIds) {
const symbol = await this.mongodb.collection('ceoSymbols')
.findOne({ ceoId }, { sort: { _id: -1 } }); // Get latest record
if (symbol) {
uniqueSymbols.push(symbol);
}
}
this.logger.info(`Retrieved ${uniqueSymbols.length} unique symbol records`);
// Schedule individual jobs for each unique symbol
let scheduledJobs = 0;
for (const symbol of uniqueSymbols) {
// Schedule a job to process this individual symbol
await this.scheduleOperation('process-individual-symbol', {
ceoId: symbol.ceoId,
symbol: symbol.symbol,
exchange: symbol.exchange,
name: symbol.name
});
scheduledJobs++;
// Add small delay to avoid overwhelming the queue
if (scheduledJobs % 10 === 0) {
this.logger.debug(`Scheduled ${scheduledJobs} jobs so far`);
}
}
this.logger.info(`Successfully scheduled ${scheduledJobs} individual symbol processing jobs`);
// Cache the results for monitoring
await this.cacheSet('unique-symbols-last-run', {
timestamp: new Date().toISOString(),
totalUniqueIds: uniqueCeoIds.length,
totalRecords: uniqueSymbols.length,
scheduledJobs
}, 1800); // Cache for 30 minutes
return {
success: true,
uniqueCeoIds: uniqueCeoIds.length,
uniqueRecords: uniqueSymbols.length,
scheduledJobs,
timestamp: new Date().toISOString()
};
} catch (error) {
this.logger.error('Failed to process unique CEO symbols', { error });
throw error;
}
}
@Operation('process-individual-symbol')
async processIndividualSymbol(payload: any, _context: any): Promise<unknown> {
const { ceoId, symbol, exchange, name } = payload;
this.logger.debug('Processing individual CEO symbol', {
ceoId,
symbol,
exchange,
name
});
try {
// Here you can add specific processing logic for each symbol
// For now, just log and potentially fetch additional data
// Example: Get all historical records for this ceoId
const allRecords = await this.mongodb.collection('ceoSymbols')
.find({ ceoId })
.sort({ _id: -1 })
.toArray();
this.logger.debug(`Found ${allRecords.length} records for CEO ID ${ceoId}`);
// Example: Update processing status
await this.mongodb.collection('ceoSymbols').updateMany(
{ ceoId },
{
$set: {
lastProcessed: new Date(),
processedBy: 'individual-symbol-processor'
}
}
);
// Cache individual symbol data
await this.cacheSet(`symbol-${ceoId}`, {
symbol,
exchange,
name,
recordCount: allRecords.length,
lastProcessed: new Date().toISOString()
}, 3600); // Cache for 1 hour
return {
success: true,
ceoId,
symbol,
recordsProcessed: allRecords.length,
timestamp: new Date().toISOString()
};
} catch (error) {
this.logger.error('Failed to process individual symbol', {
error,
ceoId,
symbol
});
throw error;
}
}
}