From 5009ccbedaa9213b980ee5b07d38f976ce5460a1 Mon Sep 17 00:00:00 2001 From: Boki Date: Sun, 22 Jun 2025 13:26:29 -0400 Subject: [PATCH] removed old working on ceo handler --- .../src/handlers/ceo/ceo.handler.ts | 132 ++++++++++++ apps/data-ingestion/src/handlers/index.ts | 1 + apps/data-ingestion/test-ceo-operations.ts | 101 +++++++++ libs/core/di/src/index.ts | 1 - libs/core/di/src/operation-context.ts | 5 +- libs/core/di/src/service-container.ts | 201 ------------------ libs/core/handlers/src/base/BaseHandler.ts | 12 +- libs/core/handlers/tsconfig.json | 3 +- 8 files changed, 243 insertions(+), 213 deletions(-) create mode 100755 apps/data-ingestion/test-ceo-operations.ts delete mode 100644 libs/core/di/src/service-container.ts diff --git a/apps/data-ingestion/src/handlers/ceo/ceo.handler.ts b/apps/data-ingestion/src/handlers/ceo/ceo.handler.ts index d6d039c..d5bf30f 100644 --- a/apps/data-ingestion/src/handlers/ceo/ceo.handler.ts +++ b/apps/data-ingestion/src/handlers/ceo/ceo.handler.ts @@ -1,6 +1,7 @@ import { BaseHandler, Handler, + Operation, ScheduledOperation, type IServiceContainer } from '@stock-bot/handlers'; @@ -81,4 +82,135 @@ export class CeoHandler extends BaseHandler { this.logger.info(`Fetched CEO channels for page ${page}/${totalPages}`); return { page, totalPages }; } + + @Operation('process-unique-symbols') + @ScheduledOperation('process-unique-symbols', '0 */30 * * *', { + priority: 5, + immediately: false, + description: 'Process unique CEO symbols and schedule individual jobs' + }) + async processUniqueSymbols(_payload: unknown, _context: any): Promise { + this.logger.info('Starting process to get unique CEO symbols by ceoId'); + + try { + // Get unique ceoId values from the ceoSymbols collection + const uniqueCeoIds = await this.mongodb.collection('ceoSymbols').distinct('ceoId'); + + this.logger.info(`Found ${uniqueCeoIds.length} unique CEO IDs`); + + // Get detailed records for each unique ceoId (latest/first record) + const uniqueSymbols = []; + for (const ceoId of uniqueCeoIds) { + const symbol = await this.mongodb.collection('ceoSymbols') + .findOne({ ceoId }, { sort: { _id: -1 } }); // Get latest record + + if (symbol) { + uniqueSymbols.push(symbol); + } + } + + this.logger.info(`Retrieved ${uniqueSymbols.length} unique symbol records`); + + // Schedule individual jobs for each unique symbol + let scheduledJobs = 0; + for (const symbol of uniqueSymbols) { + // Schedule a job to process this individual symbol + await this.scheduleOperation('process-individual-symbol', { + ceoId: symbol.ceoId, + symbol: symbol.symbol, + exchange: symbol.exchange, + name: symbol.name + }); + scheduledJobs++; + + // Add small delay to avoid overwhelming the queue + if (scheduledJobs % 10 === 0) { + this.logger.debug(`Scheduled ${scheduledJobs} jobs so far`); + } + } + + this.logger.info(`Successfully scheduled ${scheduledJobs} individual symbol processing jobs`); + + // Cache the results for monitoring + await this.cacheSet('unique-symbols-last-run', { + timestamp: new Date().toISOString(), + totalUniqueIds: uniqueCeoIds.length, + totalRecords: uniqueSymbols.length, + scheduledJobs + }, 1800); // Cache for 30 minutes + + return { + success: true, + uniqueCeoIds: uniqueCeoIds.length, + uniqueRecords: uniqueSymbols.length, + scheduledJobs, + timestamp: new Date().toISOString() + }; + + } catch (error) { + this.logger.error('Failed to process unique CEO symbols', { error }); + throw error; + } + } + + @Operation('process-individual-symbol') + async processIndividualSymbol(payload: any, _context: any): Promise { + const { ceoId, symbol, exchange, name } = payload; + + this.logger.debug('Processing individual CEO symbol', { + ceoId, + symbol, + exchange, + name + }); + + try { + // Here you can add specific processing logic for each symbol + // For now, just log and potentially fetch additional data + + // Example: Get all historical records for this ceoId + const allRecords = await this.mongodb.collection('ceoSymbols') + .find({ ceoId }) + .sort({ _id: -1 }) + .toArray(); + + this.logger.debug(`Found ${allRecords.length} records for CEO ID ${ceoId}`); + + // Example: Update processing status + await this.mongodb.collection('ceoSymbols').updateMany( + { ceoId }, + { + $set: { + lastProcessed: new Date(), + processedBy: 'individual-symbol-processor' + } + } + ); + + // Cache individual symbol data + await this.cacheSet(`symbol-${ceoId}`, { + symbol, + exchange, + name, + recordCount: allRecords.length, + lastProcessed: new Date().toISOString() + }, 3600); // Cache for 1 hour + + return { + success: true, + ceoId, + symbol, + recordsProcessed: allRecords.length, + timestamp: new Date().toISOString() + }; + + } catch (error) { + this.logger.error('Failed to process individual symbol', { + error, + ceoId, + symbol + }); + throw error; + } + } } diff --git a/apps/data-ingestion/src/handlers/index.ts b/apps/data-ingestion/src/handlers/index.ts index e248e11..01c94cc 100644 --- a/apps/data-ingestion/src/handlers/index.ts +++ b/apps/data-ingestion/src/handlers/index.ts @@ -10,6 +10,7 @@ import { getLogger } from '@stock-bot/logger'; // Import handlers for bundling (ensures they're included in the build) import './qm/qm.handler'; import './webshare/webshare.handler'; +import './ceo/ceo.handler'; // Add more handler imports as needed const logger = getLogger('handler-init'); diff --git a/apps/data-ingestion/test-ceo-operations.ts b/apps/data-ingestion/test-ceo-operations.ts new file mode 100755 index 0000000..dd63bf7 --- /dev/null +++ b/apps/data-ingestion/test-ceo-operations.ts @@ -0,0 +1,101 @@ +#!/usr/bin/env bun + +/** + * Test script for CEO handler operations + */ + +import { initializeServiceConfig } from '@stock-bot/config'; +import { createServiceContainer, initializeServices } from '@stock-bot/di'; +import { getLogger } from '@stock-bot/logger'; + +const logger = getLogger('test-ceo-operations'); + +async function testCeoOperations() { + logger.info('Testing CEO handler operations...'); + + try { + // Initialize config + const config = initializeServiceConfig(); + + // Create Awilix container + const awilixConfig = { + redis: { + host: config.database.dragonfly.host, + port: config.database.dragonfly.port, + db: config.database.dragonfly.db, + }, + mongodb: { + uri: config.database.mongodb.uri, + database: config.database.mongodb.database, + }, + postgres: { + host: config.database.postgres.host, + port: config.database.postgres.port, + database: config.database.postgres.database, + user: config.database.postgres.user, + password: config.database.postgres.password, + }, + questdb: { + enabled: false, + host: config.database.questdb.host, + httpPort: config.database.questdb.httpPort, + pgPort: config.database.questdb.pgPort, + influxPort: config.database.questdb.ilpPort, + database: config.database.questdb.database, + }, + }; + + const container = createServiceContainer(awilixConfig); + await initializeServices(container); + + const serviceContainer = container.resolve('serviceContainer'); + + // Import and create CEO handler + const { CeoHandler } = await import('./src/handlers/ceo/ceo.handler'); + const ceoHandler = new CeoHandler(serviceContainer); + + // Test 1: Check if there are any CEO symbols in the database + logger.info('Checking for existing CEO symbols...'); + const collection = serviceContainer.mongodb.collection('ceoSymbols'); + const count = await collection.countDocuments(); + logger.info(`Found ${count} CEO symbols in database`); + + if (count > 0) { + // Test 2: Run process-unique-symbols operation + logger.info('Testing process-unique-symbols operation...'); + const result = await ceoHandler.processUniqueSymbols(undefined, {}); + logger.info('Process unique symbols result:', result); + + // Test 3: Test individual symbol processing + logger.info('Testing process-individual-symbol operation...'); + const sampleSymbol = await collection.findOne({}); + if (sampleSymbol) { + const individualResult = await ceoHandler.processIndividualSymbol({ + ceoId: sampleSymbol.ceoId, + symbol: sampleSymbol.symbol, + exchange: sampleSymbol.exchange, + name: sampleSymbol.name, + }, {}); + logger.info('Process individual symbol result:', individualResult); + } + } else { + logger.warn('No CEO symbols found. Run the service to populate data first.'); + } + + // Clean up + await serviceContainer.mongodb.disconnect(); + await serviceContainer.postgres.disconnect(); + if (serviceContainer.cache) { + await serviceContainer.cache.disconnect(); + } + + logger.info('Test completed successfully!'); + process.exit(0); + } catch (error) { + logger.error('Test failed:', error); + process.exit(1); + } +} + +// Run the test +testCeoOperations(); \ No newline at end of file diff --git a/libs/core/di/src/index.ts b/libs/core/di/src/index.ts index 7e3607b..4964ba6 100644 --- a/libs/core/di/src/index.ts +++ b/libs/core/di/src/index.ts @@ -1,5 +1,4 @@ // Export all dependency injection components -export * from './service-container'; export * from './operation-context'; export * from './pool-size-calculator'; export * from './types'; diff --git a/libs/core/di/src/operation-context.ts b/libs/core/di/src/operation-context.ts index 682e47f..796abc7 100644 --- a/libs/core/di/src/operation-context.ts +++ b/libs/core/di/src/operation-context.ts @@ -3,7 +3,10 @@ */ import { getLogger, type Logger } from '@stock-bot/logger'; -import type { ServiceResolver } from './service-container'; +interface ServiceResolver { + resolve(serviceName: string): T; + resolveAsync(serviceName: string): Promise; +} export interface OperationContextOptions { handlerName: string; diff --git a/libs/core/di/src/service-container.ts b/libs/core/di/src/service-container.ts deleted file mode 100644 index 4b8175e..0000000 --- a/libs/core/di/src/service-container.ts +++ /dev/null @@ -1,201 +0,0 @@ -import { getLogger, type Logger } from '@stock-bot/logger'; -import type { ConnectionFactory } from './connection-factory'; - -export interface ServiceRegistration { - name: string; - factory: () => T | Promise; - singleton?: boolean; - dispose?: (instance: T) => Promise; -} - -export interface ServiceResolver { - resolve(name: string, options?: any): T; - resolveAsync(name: string, options?: any): Promise; -} - -export class ServiceContainer implements ServiceResolver { - private readonly logger: Logger; - private readonly registrations = new Map(); - private readonly instances = new Map(); - private readonly scopedInstances = new Map(); - private readonly parent?: ServiceContainer; - - constructor(name: string, parent?: ServiceContainer) { - this.logger = getLogger(`service-container:${name}`); - this.parent = parent; - } - - register(registration: ServiceRegistration): void { - this.registrations.set(registration.name, registration); - this.logger.debug('Service registered', { name: registration.name, singleton: registration.singleton }); - } - - resolve(name: string, _options?: any): T { - // Check scoped instances first - if (this.scopedInstances.has(name)) { - return this.scopedInstances.get(name); - } - - // Check singleton instances - if (this.instances.has(name)) { - return this.instances.get(name); - } - - // Get registration from this container or parent - const registration = this.getRegistration(name); - if (!registration) { - throw new Error(`Service ${name} not registered`); - } - - // Create instance synchronously - const instance = registration.factory(); - - // Check if factory returned a promise - if (instance instanceof Promise) { - throw new Error(`Service ${name} is async. Use resolveAsync() instead.`); - } - - // Store based on singleton flag - if (registration.singleton) { - this.instances.set(name, instance); - } else { - this.scopedInstances.set(name, instance); - } - - return instance as T; - } - - async resolveAsync(name: string, _options?: any): Promise { - // Check scoped instances first - if (this.scopedInstances.has(name)) { - return this.scopedInstances.get(name); - } - - // Check singleton instances - if (this.instances.has(name)) { - return this.instances.get(name); - } - - // Get registration from this container or parent - const registration = this.getRegistration(name); - if (!registration) { - throw new Error(`Service ${name} not registered`); - } - - // Create instance - const instance = await Promise.resolve(registration.factory()); - - // Store based on singleton flag - if (registration.singleton) { - this.instances.set(name, instance); - } else { - this.scopedInstances.set(name, instance); - } - - return instance as T; - } - - createScope(): ServiceContainer { - return new ServiceContainer('scoped', this); - } - - async dispose(): Promise { - // Dispose scoped instances - for (const [name, instance] of this.scopedInstances.entries()) { - const registration = this.getRegistration(name); - if (registration?.dispose) { - await registration.dispose(instance); - } - } - this.scopedInstances.clear(); - - // Only dispose singletons if this is the root container - if (!this.parent) { - for (const [name, instance] of this.instances.entries()) { - const registration = this.registrations.get(name); - if (registration?.dispose) { - await registration.dispose(instance); - } - } - this.instances.clear(); - } - } - - private getRegistration(name: string): ServiceRegistration | undefined { - return this.registrations.get(name) || this.parent?.getRegistration(name); - } -} - -// Enhanced service container factory with infrastructure services -export function createServiceContainer( - serviceName: string, - connectionFactory: ConnectionFactory, - config?: any -): ServiceContainer { - const container = new ServiceContainer(serviceName); - - // Register configuration if provided - if (config) { - container.register({ - name: 'config', - factory: () => config, - singleton: true, - }); - } - - // Register connection factories - container.register({ - name: 'mongodb', - factory: async () => { - const pool = await connectionFactory.createMongoDB({ - name: 'default', - config: {} as any, // Config injected by factory - }); - return pool.client; - }, - singleton: true, - }); - - container.register({ - name: 'postgres', - factory: async () => { - const pool = await connectionFactory.createPostgreSQL({ - name: 'default', - config: {} as any, // Config injected by factory - }); - return pool.client; - }, - singleton: true, - }); - - container.register({ - name: 'cache', - factory: async () => { - const pool = await connectionFactory.createCache({ - name: 'default', - config: {} as any, // Config injected by factory - }); - return pool.client; - }, - singleton: true, - }); - - container.register({ - name: 'queue', - factory: async () => { - const pool = await connectionFactory.createQueue({ - name: 'default', - config: {} as any, // Config injected by factory - }); - return pool.client; - }, - singleton: true, - }); - - // Note: Additional services can be registered by individual applications as needed: - // - ProxyManager: container.register({ name: 'proxyManager', factory: () => ProxyManager.getInstance() }) - // - Browser: container.register({ name: 'browser', factory: () => Browser }) - // - HttpClient: container.register({ name: 'httpClient', factory: () => createHttpClient(...) }) - - return container; -} \ No newline at end of file diff --git a/libs/core/handlers/src/base/BaseHandler.ts b/libs/core/handlers/src/base/BaseHandler.ts index 06bf6c7..3efa521 100644 --- a/libs/core/handlers/src/base/BaseHandler.ts +++ b/libs/core/handlers/src/base/BaseHandler.ts @@ -90,12 +90,6 @@ export abstract class BaseHandler implements IHandler { await queue.add(operation, jobData, { delay }); } - /** - * Helper method to schedule an operation with delay in seconds - */ - async scheduleIn(operation: string, payload: unknown, delaySeconds: number): Promise { - return this.scheduleOperation(operation, payload, delaySeconds * 1000); - } /** * Create execution context for operations @@ -118,11 +112,11 @@ export abstract class BaseHandler implements IHandler { /** * Get a MongoDB collection with type safety */ - protected collection(name: string): Collection { + protected collection(name: string): Collection { if (!this.mongodb) { throw new Error('MongoDB service is not available'); } - return this.mongodb.collection(name); + return this.mongodb.collection(name); } /** @@ -142,7 +136,7 @@ export abstract class BaseHandler implements IHandler { if (!this.cache) { return null; } - return this.cache.get(`${this.handlerName}:${key}`); + return this.cache.get(`${this.handlerName}:${key}`); } /** diff --git a/libs/core/handlers/tsconfig.json b/libs/core/handlers/tsconfig.json index 5dee02f..565be8c 100644 --- a/libs/core/handlers/tsconfig.json +++ b/libs/core/handlers/tsconfig.json @@ -9,6 +9,7 @@ "references": [ { "path": "../config" }, { "path": "../logger" }, - { "path": "../di" } + { "path": "../di" }, + { "path": "../../utils" } ] } \ No newline at end of file