work on ceo

This commit is contained in:
Boki 2025-06-24 18:09:32 -04:00
parent c8dcd697c9
commit b25222778e
18 changed files with 391 additions and 110 deletions

View file

@ -0,0 +1,84 @@
import { getRandomUserAgent } from '@stock-bot/utils';
import type { CeoHandler } from '../ceo.handler';
export async function getChannels(
this: CeoHandler,
payload: number | undefined
): Promise<unknown> {
const proxy = this.proxy?.getProxy();
if (!proxy) {
this.logger.warn('No proxy available for CEO channels update');
return;
}
let page;
if (payload === undefined) {
page = 1;
} else {
page = payload;
}
try {
this.logger.info(`Fetching CEO channels for page ${page} with proxy ${proxy}`);
const response = await this.http.get(
'https://api.ceo.ca/api/home?exchange=all&sort_by=symbol&sector=All&tab=companies&page=' + page,
{
proxy: proxy,
headers: {
'User-Agent': getRandomUserAgent(),
},
}
);
if (!response.ok) {
this.logger.debug(`Response status: ${response.status}`);
throw new Error(`Failed to fetch CEO channels: ${response.statusText}`);
}
const results = await response.json();
const channels = results.channel_categories[0].channels;
const totalChannels = results.channel_categories[0].total_channels;
const totalPages = Math.ceil(totalChannels / channels.length);
const exchanges: { exchange: string; countryCode: string }[] = [];
const symbols = channels.map((channel: any) => {
// check if exchange is in the exchanges array object
if (!exchanges.find((e: any) => e.exchange === channel.exchange)) {
exchanges.push({
exchange: channel.exchange,
countryCode: 'CA',
});
}
const details = channel.company_details || {};
return {
symbol: channel.symbol,
exchange: channel.exchange,
name: channel.title,
type: channel.type,
ceoId: channel.channel,
marketCap: details.market_cap,
volumeRatio: details.volume_ratio,
avgVolume: details.avg_volume,
stockType: details.stock_type,
issueType: details.issue_type,
sharesOutstanding: details.shares_outstanding,
float: details.float,
lastSeen: new Date(),
active: true,
};
});
await this.mongodb.batchUpsert('ceoSymbols', symbols, ['symbol', 'exchange']);
await this.mongodb.batchUpsert('ceoExchanges', exchanges, ['exchange']);
if (page === 1) {
for (let i = 2; i <= totalPages; i++) {
this.logger.info(`Scheduling page ${i} of ${totalPages} for CEO channels`);
await this.scheduleOperation('get-channels', i);
}
}
this.logger.info(`Fetched CEO channels for page ${page}/${totalPages}`);
return { page, totalPages };
}catch (error) {
this.logger.error(`Error fetching CEO channels for page ${page} with proxy ${proxy}:`, error);
throw new Error(`Failed to fetch CEO channels: ${error.message}`);
}
}

View file

@ -1,12 +1,12 @@
import { getRandomUserAgent } from '@stock-bot/utils';
import type { CeoHandler } from '../ceo.handler';
export async function processIndividualSymbol(
export async function getPosts(
this: CeoHandler,
payload: any,
_context: any
): Promise<unknown> {
const { ceoId, symbol, timestamp } = payload;
const { ceoId, symbol, timestamp, untilTimestamp } = payload;
const proxy = this.proxy?.getProxy();
if (!proxy) {
this.logger.warn('No proxy available for processing individual CEO symbol');
@ -31,6 +31,7 @@ export async function processIndividualSymbol(
);
if (!response.ok) {
this.logger.debug(`Response status: ${response.status}`);
throw new Error(`Failed to fetch details for ceoId ${ceoId}: ${response.statusText}`);
}
@ -88,14 +89,28 @@ export async function processIndividualSymbol(
);
this.logger.info(`Fetched ${spielCount} spiels for ceoId ${ceoId}`);
await this.scheduleOperation(
'process-individual-symbol',
{
ceoId: ceoId,
timestamp: latestSpielTime,
},
{ priority: 0 }
);
// If untilTimestamp is not provider keep going to the end
// Otherwise keep going until the lastSpiel is before the untilTimestamp
if( untilTimestamp === undefined || (untilTimestamp && latestSpielTime <= untilTimestamp)) {
await this.scheduleOperation(
'get-posts',
{
ceoId: ceoId,
timestamp: latestSpielTime,
},
{ priority: 0 }
);
}
// If timestamp is not provided, run the short positions update
if (!timestamp) {
await this.scheduleOperation(
'get-shorts',
{
symbol: symbol,
},
);
}
this.logger.info(
`Successfully processed channel ${ceoId} and added channel ${ceoId} at timestamp ${latestSpielTime}`
@ -108,6 +123,6 @@ export async function processIndividualSymbol(
ceoId,
timestamp,
});
throw error;
throw new Error(`Failed to process individual symbol ${symbol}`, error.message);
}
}

View file

@ -1,7 +1,7 @@
import { getRandomUserAgent } from '@stock-bot/utils';
import type { CeoHandler } from '../ceo.handler';
export async function processIndividualSymbol(
export async function getShorts(
this: CeoHandler,
payload: any,
_context: any

View file

@ -1,3 +1,5 @@
export { updateCeoChannels } from './update-ceo-channels.action';
export { getChannels } from './get-channels.action';
export { getPosts } from './get-posts.action';
export { getShorts } from './get-shorts.action';
export { updateUniqueSymbols } from './update-unique-symbols.action';
export { processIndividualSymbol } from './process-individual-symbol.action';

View file

@ -1,72 +0,0 @@
import { getRandomUserAgent } from '@stock-bot/utils';
import type { CeoHandler } from '../ceo.handler';
export async function updateCeoChannels(
this: CeoHandler,
payload: number | undefined
): Promise<unknown> {
const proxy = this.proxy?.getProxy();
if (!proxy) {
this.logger.warn('No proxy available for CEO channels update');
return;
}
let page;
if (payload === undefined) {
page = 1;
} else {
page = payload;
}
this.logger.info(`Fetching CEO channels for page ${page} with proxy ${proxy}`);
const response = await this.http.get(
'https://api.ceo.ca/api/home?exchange=all&sort_by=symbol&sector=All&tab=companies&page=' + page,
{
proxy: proxy,
headers: {
'User-Agent': getRandomUserAgent(),
},
}
);
const results = await response.json();
const channels = results.channel_categories[0].channels;
const totalChannels = results.channel_categories[0].total_channels;
const totalPages = Math.ceil(totalChannels / channels.length);
const exchanges: { exchange: string; countryCode: string }[] = [];
const symbols = channels.map((channel: any) => {
// check if exchange is in the exchanges array object
if (!exchanges.find((e: any) => e.exchange === channel.exchange)) {
exchanges.push({
exchange: channel.exchange,
countryCode: 'CA',
});
}
const details = channel.company_details || {};
return {
symbol: channel.symbol,
exchange: channel.exchange,
name: channel.title,
type: channel.type,
ceoId: channel.channel,
marketCap: details.market_cap,
volumeRatio: details.volume_ratio,
avgVolume: details.avg_volume,
stockType: details.stock_type,
issueType: details.issue_type,
sharesOutstanding: details.shares_outstanding,
float: details.float,
};
});
await this.mongodb.batchUpsert('ceoSymbols', symbols, ['symbol', 'exchange']);
await this.mongodb.batchUpsert('ceoExchanges', exchanges, ['exchange']);
if (page === 1) {
for (let i = 2; i <= totalPages; i++) {
this.logger.info(`Scheduling page ${i} of ${totalPages} for CEO channels`);
await this.scheduleOperation('update-ceo-channels', i);
}
}
this.logger.info(`Fetched CEO channels for page ${page}/${totalPages}`);
return { page, totalPages };
}

View file

@ -32,13 +32,14 @@ export async function updateUniqueSymbols(
for (const symbol of uniqueSymbols) {
// Schedule a job to process this individual symbol
await this.scheduleOperation(
'process-individual-symbol',
'get-posts',
{
ceoId: symbol.ceoId,
symbol: symbol.symbol,
},
{ priority: 10 }
);
scheduledJobs++;
// Add small delay to avoid overwhelming the queue

View file

@ -5,7 +5,7 @@ import {
ScheduledOperation,
type IServiceContainer,
} from '@stock-bot/handlers';
import { processIndividualSymbol, updateCeoChannels, updateUniqueSymbols } from './actions';
import { getChannels, getPosts, getShorts, updateUniqueSymbols } from './actions';
@Handler('ceo')
// @Disabled()
@ -14,12 +14,12 @@ export class CeoHandler extends BaseHandler {
super(services); // Handler name read from @Handler decorator
}
@ScheduledOperation('update-ceo-channels', '0 */15 * * *', {
@ScheduledOperation('get-channels', '0 */15 * * *', {
priority: 7,
immediately: false,
description: 'Get all CEO symbols and exchanges',
})
updateCeoChannels = updateCeoChannels;
getChannels = getChannels;
@Operation('update-unique-symbols')
@ScheduledOperation('update-unique-symbols', '0 0 1 * *', {
@ -29,6 +29,9 @@ export class CeoHandler extends BaseHandler {
})
updateUniqueSymbols = updateUniqueSymbols;
@Operation('process-individual-symbol')
processIndividualSymbol = processIndividualSymbol;
@Operation('get-posts')
getPosts = getPosts;
@Operation('get-shorts')
getShorts = getShorts;
}

View file

@ -1,10 +1,10 @@
/**
* Market data routes
*/
import { Hono } from 'hono';
import type { IServiceContainer } from '@stock-bot/handlers';
import { getLogger } from '@stock-bot/logger';
import { processItems } from '@stock-bot/queue';
import { Hono } from 'hono';
const logger = getLogger('market-data-routes');
@ -122,8 +122,8 @@ export function createMarketDataRoutes(container: IServiceContainer) {
batchSize,
priority: 2,
retries: 2,
removeOnComplete: 5,
removeOnFail: 10,
removeOnComplete: 100,
removeOnFail: 100,
},
queueManager
);