From 3821431737b44e6cc8dbcfdd5e7e2ca41629ef36 Mon Sep 17 00:00:00 2001 From: Boki Date: Sun, 22 Jun 2025 16:10:44 -0400 Subject: [PATCH] finished up initial ceo handler --- .env | 2 +- .../src/handlers/ceo/ceo.handler.ts | 157 ++++++++++++------ 2 files changed, 105 insertions(+), 54 deletions(-) diff --git a/.env b/.env index a029ae7..8923e13 100644 --- a/.env +++ b/.env @@ -5,7 +5,7 @@ # Core Application Settings NODE_ENV=development LOG_LEVEL=trace -LOG_HIDE_OBJECT=true +LOG_HIDE_OBJECT=false # Data Service Configuration DATA_SERVICE_PORT=2001 diff --git a/apps/data-ingestion/src/handlers/ceo/ceo.handler.ts b/apps/data-ingestion/src/handlers/ceo/ceo.handler.ts index d5bf30f..d67f907 100644 --- a/apps/data-ingestion/src/handlers/ceo/ceo.handler.ts +++ b/apps/data-ingestion/src/handlers/ceo/ceo.handler.ts @@ -1,5 +1,6 @@ import { BaseHandler, + Disabled, Handler, Operation, ScheduledOperation, @@ -7,7 +8,10 @@ import { } from '@stock-bot/handlers'; import { getRandomUserAgent } from '@stock-bot/utils'; + + @Handler('ceo') +@Disabled() export class CeoHandler extends BaseHandler { constructor(services: IServiceContainer) { super(services); // Handler name read from @Handler decorator @@ -15,10 +19,10 @@ export class CeoHandler extends BaseHandler { @ScheduledOperation('update-ceo-channels', '0 */15 * * *', { priority: 7, - immediately: true, + immediately: false, description: 'Get all CEO symbols and exchanges' }) - async updateCeoChannels(payload: number | undefined, handler: BaseHandler): Promise { + async updateCeoChannels(payload: number | undefined): Promise { const proxy = this.proxy?.getProxy(); if(!proxy) { this.logger.warn('No proxy available for CEO channels update'); @@ -75,7 +79,7 @@ export class CeoHandler extends BaseHandler { if(page === 1) { for( let i = 2; i <= totalPages; i++) { this.logger.info(`Scheduling page ${i} of ${totalPages} for CEO channels`); - this.scheduleOperation('update-ceo-channels', i) + await this.scheduleOperation('update-ceo-channels', i) } } @@ -83,14 +87,14 @@ export class CeoHandler extends BaseHandler { return { page, totalPages }; } - @Operation('process-unique-symbols') - @ScheduledOperation('process-unique-symbols', '0 */30 * * *', { + @Operation('update-unique-symbols') + @ScheduledOperation('process-unique-symbols', '0 0 1 * *', {//'0 */30 * * *', { priority: 5, immediately: false, description: 'Process unique CEO symbols and schedule individual jobs' }) - async processUniqueSymbols(_payload: unknown, _context: any): Promise { - this.logger.info('Starting process to get unique CEO symbols by ceoId'); + async updateUniqueSymbols(_payload: unknown, _context: any): Promise { + this.logger.info('Starting update to get unique CEO symbols by ceoId'); try { // Get unique ceoId values from the ceoSymbols collection @@ -118,8 +122,6 @@ export class CeoHandler extends BaseHandler { await this.scheduleOperation('process-individual-symbol', { ceoId: symbol.ceoId, symbol: symbol.symbol, - exchange: symbol.exchange, - name: symbol.name }); scheduledJobs++; @@ -129,7 +131,7 @@ export class CeoHandler extends BaseHandler { } } - this.logger.info(`Successfully scheduled ${scheduledJobs} individual symbol processing jobs`); + this.logger.info(`Successfully scheduled ${scheduledJobs} individual symbol update jobs`); // Cache the results for monitoring await this.cacheSet('unique-symbols-last-run', { @@ -148,67 +150,116 @@ export class CeoHandler extends BaseHandler { }; } catch (error) { - this.logger.error('Failed to process unique CEO symbols', { error }); + this.logger.error('Failed to update unique CEO symbols', { error }); throw error; } } @Operation('process-individual-symbol') async processIndividualSymbol(payload: any, _context: any): Promise { - const { ceoId, symbol, exchange, name } = payload; - + const { ceoId, symbol, timestamp } = payload; + const proxy = this.proxy?.getProxy(); + if(!proxy) { + this.logger.warn('No proxy available for processing individual CEO symbol'); + return; + } + this.logger.debug('Processing individual CEO symbol', { ceoId, - symbol, - exchange, - name + timestamp, }); - try { - // Here you can add specific processing logic for each symbol - // For now, just log and potentially fetch additional data - - // Example: Get all historical records for this ceoId - const allRecords = await this.mongodb.collection('ceoSymbols') - .find({ ceoId }) - .sort({ _id: -1 }) - .toArray(); - - this.logger.debug(`Found ${allRecords.length} records for CEO ID ${ceoId}`); - - // Example: Update processing status - await this.mongodb.collection('ceoSymbols').updateMany( - { ceoId }, - { - $set: { - lastProcessed: new Date(), - processedBy: 'individual-symbol-processor' + // Fetch detailed information for the individual symbol + const response = await this.http.get(`https://api.ceo.ca/api/get_spiels?channel=${ceoId}&load_more=top` + + (timestamp ? `&until=${timestamp}` : ''), + { + proxy: proxy, + headers: { + + 'User-Agent': getRandomUserAgent() } - } - ); + }); - // Cache individual symbol data - await this.cacheSet(`symbol-${ceoId}`, { - symbol, - exchange, - name, - recordCount: allRecords.length, - lastProcessed: new Date().toISOString() - }, 3600); // Cache for 1 hour + if (!response.ok) { + throw new Error(`Failed to fetch details for ceoId ${ceoId}: ${response.statusText}`); + } - return { - success: true, + const data = await response.json(); + + const spielCount = data.spiels.length; + if(spielCount === 0) { + this.logger.warn(`No spiels found for ceoId ${ceoId}`); + return null; // No data to process + } + const latestSpielTime = data.spiels[0]?.timestamp; + const posts = data.spiels.map((spiel: any) => ({ ceoId, - symbol, - recordsProcessed: allRecords.length, - timestamp: new Date().toISOString() - }; + spiel: spiel.spiel, + spielReplyToId: spiel.spiel_reply_to_id, + spielReplyTo: spiel.spiel_reply_to, + spielReplyToName: spiel.spiel_reply_to_name, + spielReplyToEdited: spiel.spiel_reply_to_edited, + userId: spiel.user_id, + name: spiel.name, + timestamp: spiel.timestamp, + spielId: spiel.spiel_id, + color: spiel.color, + parentId: spiel.parent_id, + publicId: spiel.public_id, + parentChannel: spiel.parent_channel, + parentTimestamp: spiel.parent_timestamp, + votes: spiel.votes, + editable: spiel.editable, + edited: spiel.edited, + featured: spiel.featured, + verified: spiel.verified, + fake: spiel.fake, + bot: spiel.bot, + voted: spiel.voted, + flagged: spiel.flagged, + ownSpiel: spiel.own_spiel, + score: spiel.score, + savedId: spiel.saved_id, + savedTimestamp: spiel.saved_timestamp, + poll: spiel.poll, + votedInPoll: spiel.voted_in_poll + })); + + await this.mongodb.batchUpsert('ceoPosts', posts, ['spielId']); + this.logger.info(`Fetched ${spielCount} spiels for ceoId ${ceoId}`); + + // Update Shorts + const shortRes = await this.http.get(`https://api.ceo.ca/api/short_positions/one?symbol=${symbol}`, + { + proxy: proxy, + headers: { + 'User-Agent': getRandomUserAgent() + } + }); + + if (shortRes.ok) { + const shortData = await shortRes.json(); + if(shortData && shortData.positions) { + await this.mongodb.batchUpsert('ceoShorts', shortData.positions, ['id']); + } + + await this.scheduleOperation('process-individual-symbol', { + ceoId: ceoId, + timestamp: latestSpielTime + }); + } + + + + this.logger.info(`Successfully processed channel ${ceoId} and added channel ${ceoId} at timestamp ${latestSpielTime}`); + + return { ceoId, spielCount, timestamp }; } catch (error) { this.logger.error('Failed to process individual symbol', { error, - ceoId, - symbol + ceoId, + timestamp }); throw error; }