diff --git a/.serena/cache/typescript/document_symbols_cache_v20-05-25.pkl b/.serena/cache/typescript/document_symbols_cache_v20-05-25.pkl index 11457a0..59ee04e 100644 Binary files a/.serena/cache/typescript/document_symbols_cache_v20-05-25.pkl and b/.serena/cache/typescript/document_symbols_cache_v20-05-25.pkl differ diff --git a/apps/stock/data-pipeline/src/handlers/exchanges/exchanges.handler.ts b/apps/stock/data-pipeline/src/handlers/exchanges/exchanges.handler.ts index 55a1c18..c1f7800 100644 --- a/apps/stock/data-pipeline/src/handlers/exchanges/exchanges.handler.ts +++ b/apps/stock/data-pipeline/src/handlers/exchanges/exchanges.handler.ts @@ -1,74 +1,111 @@ -import { getLogger } from '@stock-bot/logger'; -import { handlerRegistry, createJobHandler, type HandlerConfig, type ScheduledJobConfig } from '@stock-bot/queue'; -import type { IServiceContainer } from '@stock-bot/handlers'; -import { exchangeOperations } from './operations'; - -const logger = getLogger('exchanges-handler'); - -const HANDLER_NAME = 'exchanges'; - -const exchangesHandlerConfig: HandlerConfig = { - concurrency: 1, - maxAttempts: 3, - scheduledJobs: [ - { - operation: 'sync-all-exchanges', - cronPattern: '0 0 * * 0', // Weekly on Sunday at midnight - payload: { clearFirst: true }, - priority: 10, - immediately: false, - } as ScheduledJobConfig, - { - operation: 'sync-qm-exchanges', - cronPattern: '0 1 * * *', // Daily at 1 AM - payload: {}, - priority: 5, - immediately: false, - } as ScheduledJobConfig, - { - operation: 'sync-ib-exchanges', - cronPattern: '0 3 * * *', // Daily at 3 AM - payload: {}, - priority: 3, - immediately: false, - } as ScheduledJobConfig, - { - operation: 'sync-qm-provider-mappings', - cronPattern: '0 3 * * *', // Daily at 3 AM - payload: {}, - priority: 7, - immediately: false, - } as ScheduledJobConfig, - ], - operations: { - 'sync-all-exchanges': exchangeOperations.syncAllExchanges, - 'sync-qm-exchanges': exchangeOperations.syncQMExchanges, - 'sync-ib-exchanges': exchangeOperations.syncIBExchanges, - 'sync-qm-provider-mappings': exchangeOperations.syncQMProviderMappings, - 'clear-postgresql-data': exchangeOperations.clearPostgreSQLData, - 'get-exchange-stats': exchangeOperations.getExchangeStats, - 'get-provider-mapping-stats': exchangeOperations.getProviderMappingStats, - 'enhanced-sync-status': exchangeOperations['enhanced-sync-status'], - }, -}; - -export function initializeExchangesHandler(container: IServiceContainer) { - logger.info('Registering exchanges handler...'); - - // Update operations to use container - const containerAwareOperations = Object.entries(exchangeOperations).reduce((acc, [key, operation]) => { - acc[key] = createJobHandler(async (payload: any) => { - return operation(payload, container); - }); - return acc; - }, {} as Record); - - const exchangesHandlerConfigWithContainer: HandlerConfig = { - ...exchangesHandlerConfig, - operations: containerAwareOperations, - }; - - handlerRegistry.register(HANDLER_NAME, exchangesHandlerConfigWithContainer); - logger.info('Exchanges handler registered successfully'); -} - +import { + BaseHandler, + Handler, + Operation, + ScheduledOperation, + type IServiceContainer, +} from '@stock-bot/handlers'; +import { clearPostgreSQLData } from './operations/clear-postgresql-data.operations'; +import { getSyncStatus } from './operations/enhanced-sync-status.operations'; +import { getExchangeStats } from './operations/exchange-stats.operations'; +import { getProviderMappingStats } from './operations/provider-mapping-stats.operations'; +import { syncQMExchanges } from './operations/qm-exchanges.operations'; +import { syncAllExchanges } from './operations/sync-all-exchanges.operations'; +import { syncIBExchanges } from './operations/sync-ib-exchanges.operations'; +import { syncQMProviderMappings } from './operations/sync-qm-provider-mappings.operations'; + +@Handler('exchanges') +export class ExchangesHandler extends BaseHandler { + constructor(services: IServiceContainer) { + super(services); + } + + /** + * Sync all exchanges - weekly full sync + */ + @Operation('sync-all-exchanges') + @ScheduledOperation('sync-all-exchanges', '0 0 * * 0', { + priority: 10, + description: 'Weekly full exchange sync on Sunday at midnight', + }) + async syncAllExchanges(payload?: { clearFirst?: boolean }): Promise { + const finalPayload = payload || { clearFirst: true }; + this.log('info', 'Starting sync of all exchanges', finalPayload); + return syncAllExchanges(finalPayload, this.services); + } + + /** + * Sync exchanges from QuestionsAndMethods + */ + @Operation('sync-qm-exchanges') + @ScheduledOperation('sync-qm-exchanges', '0 1 * * *', { + priority: 5, + description: 'Daily sync of QM exchanges at 1 AM', + }) + async syncQMExchanges(): Promise { + this.log('info', 'Starting QM exchanges sync...'); + return syncQMExchanges({}, this.services); + } + + /** + * Sync exchanges from Interactive Brokers + */ + @Operation('sync-ib-exchanges') + @ScheduledOperation('sync-ib-exchanges', '0 3 * * *', { + priority: 3, + description: 'Daily sync of IB exchanges at 3 AM', + }) + async syncIBExchanges(): Promise { + this.log('info', 'Starting IB exchanges sync...'); + return syncIBExchanges({}, this.services); + } + + /** + * Sync provider mappings from QuestionsAndMethods + */ + @Operation('sync-qm-provider-mappings') + @ScheduledOperation('sync-qm-provider-mappings', '0 3 * * *', { + priority: 7, + description: 'Daily sync of QM provider mappings at 3 AM', + }) + async syncQMProviderMappings(): Promise { + this.log('info', 'Starting QM provider mappings sync...'); + return syncQMProviderMappings({}, this.services); + } + + /** + * Clear PostgreSQL data - maintenance operation + */ + @Operation('clear-postgresql-data') + async clearPostgreSQLData(payload: { type?: 'exchanges' | 'provider_mappings' | 'all' }): Promise { + this.log('warn', 'Clearing PostgreSQL data', payload); + return clearPostgreSQLData(payload, this.services); + } + + /** + * Get exchange statistics + */ + @Operation('get-exchange-stats') + async getExchangeStats(): Promise { + this.log('info', 'Getting exchange statistics...'); + return getExchangeStats({}, this.services); + } + + /** + * Get provider mapping statistics + */ + @Operation('get-provider-mapping-stats') + async getProviderMappingStats(): Promise { + this.log('info', 'Getting provider mapping statistics...'); + return getProviderMappingStats({}, this.services); + } + + /** + * Get enhanced sync status + */ + @Operation('enhanced-sync-status') + async getEnhancedSyncStatus(): Promise { + this.log('info', 'Getting enhanced sync status...'); + return getSyncStatus({}, this.services); + } +} \ No newline at end of file diff --git a/apps/stock/data-pipeline/src/handlers/index.ts b/apps/stock/data-pipeline/src/handlers/index.ts index a596fb4..754e8d2 100644 --- a/apps/stock/data-pipeline/src/handlers/index.ts +++ b/apps/stock/data-pipeline/src/handlers/index.ts @@ -1,33 +1,42 @@ /** - * Handler initialization for data pipeline service - * Registers all handlers with the service container + * Handler auto-registration for data pipeline service + * Automatically discovers and registers all handlers */ -import type { ServiceContainer } from '@stock-bot/di'; +import type { IServiceContainer } from '@stock-bot/handlers'; +import { autoRegisterHandlers } from '@stock-bot/handlers'; import { getLogger } from '@stock-bot/logger'; -import { initializeExchangesHandler } from './exchanges/exchanges.handler'; -import { initializeSymbolsHandler } from './symbols/symbols.handler'; + +// Import handlers for bundling (ensures they're included in the build) +import './exchanges/exchanges.handler'; +import './symbols/symbols.handler'; const logger = getLogger('pipeline-handler-init'); /** - * Initialize all handlers with the service container + * Initialize and register all handlers automatically */ -export async function initializeAllHandlers(container: ServiceContainer): Promise { +export async function initializeAllHandlers(container: IServiceContainer): Promise { logger.info('Initializing data pipeline handlers...'); try { - // Initialize exchanges handler with container - initializeExchangesHandler(container); - logger.debug('Exchanges handler initialized'); - - // Initialize symbols handler with container - initializeSymbolsHandler(container); - logger.debug('Symbols handler initialized'); - - logger.info('All pipeline handlers initialized successfully'); + // Auto-register all handlers in this directory + const result = await autoRegisterHandlers(__dirname, container, { + pattern: '.handler.', + exclude: ['test', 'spec', '.old'], + dryRun: false, + }); + + logger.info('Handler auto-registration complete', { + registered: result.registered, + failed: result.failed, + }); + + if (result.failed.length > 0) { + logger.error('Some handlers failed to register', { failed: result.failed }); + } } catch (error) { - logger.error('Failed to initialize handlers', { error }); + logger.error('Handler auto-registration failed', { error }); throw error; } } \ No newline at end of file diff --git a/apps/stock/data-pipeline/src/handlers/symbols/symbols.handler.ts b/apps/stock/data-pipeline/src/handlers/symbols/symbols.handler.ts index 3cd384b..5ab6d0a 100644 --- a/apps/stock/data-pipeline/src/handlers/symbols/symbols.handler.ts +++ b/apps/stock/data-pipeline/src/handlers/symbols/symbols.handler.ts @@ -1,56 +1,68 @@ -import { getLogger } from '@stock-bot/logger'; -import { handlerRegistry, createJobHandler, type HandlerConfig, type ScheduledJobConfig } from '@stock-bot/queue'; -import type { ServiceContainer } from '@stock-bot/di'; -import { symbolOperations } from './operations'; - -const logger = getLogger('symbols-handler'); - -const HANDLER_NAME = 'symbols'; - -const symbolsHandlerConfig: HandlerConfig = { - concurrency: 1, - maxAttempts: 3, - scheduledJobs: [ - { - operation: 'sync-qm-symbols', - cronPattern: '0 2 * * *', // Daily at 2 AM - payload: {}, - priority: 5, - immediately: false, - } as ScheduledJobConfig, - { - operation: 'sync-symbols-qm', - cronPattern: '0 4 * * *', // Daily at 4 AM - payload: { provider: 'qm', clearFirst: false }, - priority: 5, - immediately: false, - } as ScheduledJobConfig, - ], - operations: { - 'sync-qm-symbols': symbolOperations.syncQMSymbols, - 'sync-symbols-qm': symbolOperations.syncSymbolsFromProvider, - 'sync-symbols-eod': symbolOperations.syncSymbolsFromProvider, - 'sync-symbols-ib': symbolOperations.syncSymbolsFromProvider, - 'sync-status': symbolOperations.getSyncStatus, - }, -}; - -export function initializeSymbolsHandler(container: ServiceContainer): void { - logger.info('Registering symbols handler...'); - - // Update operations to use container - const containerAwareOperations = Object.entries(symbolOperations).reduce((acc, [key, operation]) => { - acc[key] = createJobHandler(async (payload: any) => { - return operation(payload, container); - }); - return acc; - }, {} as Record); - - const symbolsHandlerConfigWithContainer: HandlerConfig = { - ...symbolsHandlerConfig, - operations: containerAwareOperations, - }; - - handlerRegistry.register(HANDLER_NAME, symbolsHandlerConfigWithContainer); - logger.info('Symbols handler registered successfully'); -} +import { + BaseHandler, + Handler, + Operation, + ScheduledOperation, + type IServiceContainer, +} from '@stock-bot/handlers'; +import { syncQMSymbols } from './operations/qm-symbols.operations'; +import { syncSymbolsFromProvider } from './operations/sync-symbols-from-provider.operations'; +import { getSyncStatus } from './operations/sync-status.operations'; + +@Handler('symbols') +export class SymbolsHandler extends BaseHandler { + constructor(services: IServiceContainer) { + super(services); + } + + /** + * Sync symbols from QuestionsAndMethods API + */ + @ScheduledOperation('sync-qm-symbols', '0 2 * * *', { + priority: 5, + description: 'Daily sync of QM symbols at 2 AM', + }) + async syncQMSymbols(): Promise<{ processed: number; created: number; updated: number }> { + this.log('info', 'Starting QM symbols sync...'); + return syncQMSymbols({}, this.services); + } + + /** + * Sync symbols from specific provider + */ + @Operation('sync-symbols-qm') + @ScheduledOperation('sync-symbols-qm', '0 4 * * *', { + priority: 5, + description: 'Daily sync of symbols from QM provider at 4 AM', + }) + async syncSymbolsQM(): Promise { + return this.syncSymbolsFromProvider({ provider: 'qm', clearFirst: false }); + } + + @Operation('sync-symbols-eod') + async syncSymbolsEOD(payload: { provider: string; clearFirst?: boolean }): Promise { + return this.syncSymbolsFromProvider({ ...payload, provider: 'eod' }); + } + + @Operation('sync-symbols-ib') + async syncSymbolsIB(payload: { provider: string; clearFirst?: boolean }): Promise { + return this.syncSymbolsFromProvider({ ...payload, provider: 'ib' }); + } + + /** + * Get sync status for symbols + */ + @Operation('sync-status') + async getSyncStatus(): Promise { + this.log('info', 'Getting symbol sync status...'); + return getSyncStatus({}, this.services); + } + + /** + * Internal method to sync symbols from a provider + */ + private async syncSymbolsFromProvider(payload: { provider: string; clearFirst?: boolean }): Promise { + this.log('info', 'Syncing symbols from provider', { provider: payload.provider }); + return syncSymbolsFromProvider(payload, this.services); + } +} \ No newline at end of file diff --git a/libs/services/queue/src/types.ts b/libs/services/queue/src/types.ts index 0ca8312..9b94bce 100644 --- a/libs/services/queue/src/types.ts +++ b/libs/services/queue/src/types.ts @@ -1,10 +1,7 @@ // Re-export handler types from shared types package export type { - JobHandler, - TypedJobHandler, HandlerConfig, - HandlerConfigWithSchedule, - ScheduledJob, + HandlerConfigWithSchedule, JobHandler, ScheduledJob, TypedJobHandler } from '@stock-bot/types'; // Types for queue operations @@ -110,7 +107,6 @@ export interface QueueConfig extends QueueManagerConfig { enableMetrics?: boolean; } -// Re-export createJobHandler from shared types package export { createJobHandler } from '@stock-bot/types'; export interface BatchJobData {