finished up initial ceo handler

This commit is contained in:
Boki 2025-06-22 16:10:44 -04:00
parent 5009ccbeda
commit 3821431737
2 changed files with 105 additions and 54 deletions

2
.env
View file

@ -5,7 +5,7 @@
# Core Application Settings
NODE_ENV=development
LOG_LEVEL=trace
LOG_HIDE_OBJECT=true
LOG_HIDE_OBJECT=false
# Data Service Configuration
DATA_SERVICE_PORT=2001

View file

@ -1,5 +1,6 @@
import {
BaseHandler,
Disabled,
Handler,
Operation,
ScheduledOperation,
@ -7,7 +8,10 @@ import {
} from '@stock-bot/handlers';
import { getRandomUserAgent } from '@stock-bot/utils';
@Handler('ceo')
@Disabled()
export class CeoHandler extends BaseHandler {
constructor(services: IServiceContainer) {
super(services); // Handler name read from @Handler decorator
@ -15,10 +19,10 @@ export class CeoHandler extends BaseHandler {
@ScheduledOperation('update-ceo-channels', '0 */15 * * *', {
priority: 7,
immediately: true,
immediately: false,
description: 'Get all CEO symbols and exchanges'
})
async updateCeoChannels(payload: number | undefined, handler: BaseHandler): Promise<unknown> {
async updateCeoChannels(payload: number | undefined): Promise<unknown> {
const proxy = this.proxy?.getProxy();
if(!proxy) {
this.logger.warn('No proxy available for CEO channels update');
@ -75,7 +79,7 @@ export class CeoHandler extends BaseHandler {
if(page === 1) {
for( let i = 2; i <= totalPages; i++) {
this.logger.info(`Scheduling page ${i} of ${totalPages} for CEO channels`);
this.scheduleOperation('update-ceo-channels', i)
await this.scheduleOperation('update-ceo-channels', i)
}
}
@ -83,14 +87,14 @@ export class CeoHandler extends BaseHandler {
return { page, totalPages };
}
@Operation('process-unique-symbols')
@ScheduledOperation('process-unique-symbols', '0 */30 * * *', {
@Operation('update-unique-symbols')
@ScheduledOperation('process-unique-symbols', '0 0 1 * *', {//'0 */30 * * *', {
priority: 5,
immediately: false,
description: 'Process unique CEO symbols and schedule individual jobs'
})
async processUniqueSymbols(_payload: unknown, _context: any): Promise<unknown> {
this.logger.info('Starting process to get unique CEO symbols by ceoId');
async updateUniqueSymbols(_payload: unknown, _context: any): Promise<unknown> {
this.logger.info('Starting update to get unique CEO symbols by ceoId');
try {
// Get unique ceoId values from the ceoSymbols collection
@ -118,8 +122,6 @@ export class CeoHandler extends BaseHandler {
await this.scheduleOperation('process-individual-symbol', {
ceoId: symbol.ceoId,
symbol: symbol.symbol,
exchange: symbol.exchange,
name: symbol.name
});
scheduledJobs++;
@ -129,7 +131,7 @@ export class CeoHandler extends BaseHandler {
}
}
this.logger.info(`Successfully scheduled ${scheduledJobs} individual symbol processing jobs`);
this.logger.info(`Successfully scheduled ${scheduledJobs} individual symbol update jobs`);
// Cache the results for monitoring
await this.cacheSet('unique-symbols-last-run', {
@ -148,67 +150,116 @@ export class CeoHandler extends BaseHandler {
};
} catch (error) {
this.logger.error('Failed to process unique CEO symbols', { error });
this.logger.error('Failed to update unique CEO symbols', { error });
throw error;
}
}
@Operation('process-individual-symbol')
async processIndividualSymbol(payload: any, _context: any): Promise<unknown> {
const { ceoId, symbol, exchange, name } = payload;
const { ceoId, symbol, timestamp } = payload;
const proxy = this.proxy?.getProxy();
if(!proxy) {
this.logger.warn('No proxy available for processing individual CEO symbol');
return;
}
this.logger.debug('Processing individual CEO symbol', {
ceoId,
symbol,
exchange,
name
timestamp,
});
try {
// Here you can add specific processing logic for each symbol
// For now, just log and potentially fetch additional data
// Example: Get all historical records for this ceoId
const allRecords = await this.mongodb.collection('ceoSymbols')
.find({ ceoId })
.sort({ _id: -1 })
.toArray();
this.logger.debug(`Found ${allRecords.length} records for CEO ID ${ceoId}`);
// Example: Update processing status
await this.mongodb.collection('ceoSymbols').updateMany(
{ ceoId },
{
$set: {
lastProcessed: new Date(),
processedBy: 'individual-symbol-processor'
// Fetch detailed information for the individual symbol
const response = await this.http.get(`https://api.ceo.ca/api/get_spiels?channel=${ceoId}&load_more=top`
+ (timestamp ? `&until=${timestamp}` : ''),
{
proxy: proxy,
headers: {
'User-Agent': getRandomUserAgent()
}
}
);
});
// Cache individual symbol data
await this.cacheSet(`symbol-${ceoId}`, {
symbol,
exchange,
name,
recordCount: allRecords.length,
lastProcessed: new Date().toISOString()
}, 3600); // Cache for 1 hour
if (!response.ok) {
throw new Error(`Failed to fetch details for ceoId ${ceoId}: ${response.statusText}`);
}
return {
success: true,
const data = await response.json();
const spielCount = data.spiels.length;
if(spielCount === 0) {
this.logger.warn(`No spiels found for ceoId ${ceoId}`);
return null; // No data to process
}
const latestSpielTime = data.spiels[0]?.timestamp;
const posts = data.spiels.map((spiel: any) => ({
ceoId,
symbol,
recordsProcessed: allRecords.length,
timestamp: new Date().toISOString()
};
spiel: spiel.spiel,
spielReplyToId: spiel.spiel_reply_to_id,
spielReplyTo: spiel.spiel_reply_to,
spielReplyToName: spiel.spiel_reply_to_name,
spielReplyToEdited: spiel.spiel_reply_to_edited,
userId: spiel.user_id,
name: spiel.name,
timestamp: spiel.timestamp,
spielId: spiel.spiel_id,
color: spiel.color,
parentId: spiel.parent_id,
publicId: spiel.public_id,
parentChannel: spiel.parent_channel,
parentTimestamp: spiel.parent_timestamp,
votes: spiel.votes,
editable: spiel.editable,
edited: spiel.edited,
featured: spiel.featured,
verified: spiel.verified,
fake: spiel.fake,
bot: spiel.bot,
voted: spiel.voted,
flagged: spiel.flagged,
ownSpiel: spiel.own_spiel,
score: spiel.score,
savedId: spiel.saved_id,
savedTimestamp: spiel.saved_timestamp,
poll: spiel.poll,
votedInPoll: spiel.voted_in_poll
}));
await this.mongodb.batchUpsert('ceoPosts', posts, ['spielId']);
this.logger.info(`Fetched ${spielCount} spiels for ceoId ${ceoId}`);
// Update Shorts
const shortRes = await this.http.get(`https://api.ceo.ca/api/short_positions/one?symbol=${symbol}`,
{
proxy: proxy,
headers: {
'User-Agent': getRandomUserAgent()
}
});
if (shortRes.ok) {
const shortData = await shortRes.json();
if(shortData && shortData.positions) {
await this.mongodb.batchUpsert('ceoShorts', shortData.positions, ['id']);
}
await this.scheduleOperation('process-individual-symbol', {
ceoId: ceoId,
timestamp: latestSpielTime
});
}
this.logger.info(`Successfully processed channel ${ceoId} and added channel ${ceoId} at timestamp ${latestSpielTime}`);
return { ceoId, spielCount, timestamp };
} catch (error) {
this.logger.error('Failed to process individual symbol', {
error,
ceoId,
symbol
ceoId,
timestamp
});
throw error;
}