fixed pipeline handlers
This commit is contained in:
parent
26d9b9ab3f
commit
50e5b5cbed
5 changed files with 206 additions and 152 deletions
Binary file not shown.
|
|
@ -1,74 +1,111 @@
|
||||||
import { getLogger } from '@stock-bot/logger';
|
import {
|
||||||
import { handlerRegistry, createJobHandler, type HandlerConfig, type ScheduledJobConfig } from '@stock-bot/queue';
|
BaseHandler,
|
||||||
import type { IServiceContainer } from '@stock-bot/handlers';
|
Handler,
|
||||||
import { exchangeOperations } from './operations';
|
Operation,
|
||||||
|
ScheduledOperation,
|
||||||
|
type IServiceContainer,
|
||||||
|
} from '@stock-bot/handlers';
|
||||||
|
import { clearPostgreSQLData } from './operations/clear-postgresql-data.operations';
|
||||||
|
import { getSyncStatus } from './operations/enhanced-sync-status.operations';
|
||||||
|
import { getExchangeStats } from './operations/exchange-stats.operations';
|
||||||
|
import { getProviderMappingStats } from './operations/provider-mapping-stats.operations';
|
||||||
|
import { syncQMExchanges } from './operations/qm-exchanges.operations';
|
||||||
|
import { syncAllExchanges } from './operations/sync-all-exchanges.operations';
|
||||||
|
import { syncIBExchanges } from './operations/sync-ib-exchanges.operations';
|
||||||
|
import { syncQMProviderMappings } from './operations/sync-qm-provider-mappings.operations';
|
||||||
|
|
||||||
const logger = getLogger('exchanges-handler');
|
@Handler('exchanges')
|
||||||
|
export class ExchangesHandler extends BaseHandler {
|
||||||
|
constructor(services: IServiceContainer) {
|
||||||
|
super(services);
|
||||||
|
}
|
||||||
|
|
||||||
const HANDLER_NAME = 'exchanges';
|
/**
|
||||||
|
* Sync all exchanges - weekly full sync
|
||||||
const exchangesHandlerConfig: HandlerConfig = {
|
*/
|
||||||
concurrency: 1,
|
@Operation('sync-all-exchanges')
|
||||||
maxAttempts: 3,
|
@ScheduledOperation('sync-all-exchanges', '0 0 * * 0', {
|
||||||
scheduledJobs: [
|
|
||||||
{
|
|
||||||
operation: 'sync-all-exchanges',
|
|
||||||
cronPattern: '0 0 * * 0', // Weekly on Sunday at midnight
|
|
||||||
payload: { clearFirst: true },
|
|
||||||
priority: 10,
|
priority: 10,
|
||||||
immediately: false,
|
description: 'Weekly full exchange sync on Sunday at midnight',
|
||||||
} as ScheduledJobConfig,
|
})
|
||||||
{
|
async syncAllExchanges(payload?: { clearFirst?: boolean }): Promise<unknown> {
|
||||||
operation: 'sync-qm-exchanges',
|
const finalPayload = payload || { clearFirst: true };
|
||||||
cronPattern: '0 1 * * *', // Daily at 1 AM
|
this.log('info', 'Starting sync of all exchanges', finalPayload);
|
||||||
payload: {},
|
return syncAllExchanges(finalPayload, this.services);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sync exchanges from QuestionsAndMethods
|
||||||
|
*/
|
||||||
|
@Operation('sync-qm-exchanges')
|
||||||
|
@ScheduledOperation('sync-qm-exchanges', '0 1 * * *', {
|
||||||
priority: 5,
|
priority: 5,
|
||||||
immediately: false,
|
description: 'Daily sync of QM exchanges at 1 AM',
|
||||||
} as ScheduledJobConfig,
|
})
|
||||||
{
|
async syncQMExchanges(): Promise<unknown> {
|
||||||
operation: 'sync-ib-exchanges',
|
this.log('info', 'Starting QM exchanges sync...');
|
||||||
cronPattern: '0 3 * * *', // Daily at 3 AM
|
return syncQMExchanges({}, this.services);
|
||||||
payload: {},
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sync exchanges from Interactive Brokers
|
||||||
|
*/
|
||||||
|
@Operation('sync-ib-exchanges')
|
||||||
|
@ScheduledOperation('sync-ib-exchanges', '0 3 * * *', {
|
||||||
priority: 3,
|
priority: 3,
|
||||||
immediately: false,
|
description: 'Daily sync of IB exchanges at 3 AM',
|
||||||
} as ScheduledJobConfig,
|
})
|
||||||
{
|
async syncIBExchanges(): Promise<unknown> {
|
||||||
operation: 'sync-qm-provider-mappings',
|
this.log('info', 'Starting IB exchanges sync...');
|
||||||
cronPattern: '0 3 * * *', // Daily at 3 AM
|
return syncIBExchanges({}, this.services);
|
||||||
payload: {},
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sync provider mappings from QuestionsAndMethods
|
||||||
|
*/
|
||||||
|
@Operation('sync-qm-provider-mappings')
|
||||||
|
@ScheduledOperation('sync-qm-provider-mappings', '0 3 * * *', {
|
||||||
priority: 7,
|
priority: 7,
|
||||||
immediately: false,
|
description: 'Daily sync of QM provider mappings at 3 AM',
|
||||||
} as ScheduledJobConfig,
|
})
|
||||||
],
|
async syncQMProviderMappings(): Promise<unknown> {
|
||||||
operations: {
|
this.log('info', 'Starting QM provider mappings sync...');
|
||||||
'sync-all-exchanges': exchangeOperations.syncAllExchanges,
|
return syncQMProviderMappings({}, this.services);
|
||||||
'sync-qm-exchanges': exchangeOperations.syncQMExchanges,
|
}
|
||||||
'sync-ib-exchanges': exchangeOperations.syncIBExchanges,
|
|
||||||
'sync-qm-provider-mappings': exchangeOperations.syncQMProviderMappings,
|
|
||||||
'clear-postgresql-data': exchangeOperations.clearPostgreSQLData,
|
|
||||||
'get-exchange-stats': exchangeOperations.getExchangeStats,
|
|
||||||
'get-provider-mapping-stats': exchangeOperations.getProviderMappingStats,
|
|
||||||
'enhanced-sync-status': exchangeOperations['enhanced-sync-status'],
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
export function initializeExchangesHandler(container: IServiceContainer) {
|
/**
|
||||||
logger.info('Registering exchanges handler...');
|
* Clear PostgreSQL data - maintenance operation
|
||||||
|
*/
|
||||||
|
@Operation('clear-postgresql-data')
|
||||||
|
async clearPostgreSQLData(payload: { type?: 'exchanges' | 'provider_mappings' | 'all' }): Promise<unknown> {
|
||||||
|
this.log('warn', 'Clearing PostgreSQL data', payload);
|
||||||
|
return clearPostgreSQLData(payload, this.services);
|
||||||
|
}
|
||||||
|
|
||||||
// Update operations to use container
|
/**
|
||||||
const containerAwareOperations = Object.entries(exchangeOperations).reduce((acc, [key, operation]) => {
|
* Get exchange statistics
|
||||||
acc[key] = createJobHandler(async (payload: any) => {
|
*/
|
||||||
return operation(payload, container);
|
@Operation('get-exchange-stats')
|
||||||
});
|
async getExchangeStats(): Promise<unknown> {
|
||||||
return acc;
|
this.log('info', 'Getting exchange statistics...');
|
||||||
}, {} as Record<string, any>);
|
return getExchangeStats({}, this.services);
|
||||||
|
}
|
||||||
|
|
||||||
const exchangesHandlerConfigWithContainer: HandlerConfig = {
|
/**
|
||||||
...exchangesHandlerConfig,
|
* Get provider mapping statistics
|
||||||
operations: containerAwareOperations,
|
*/
|
||||||
};
|
@Operation('get-provider-mapping-stats')
|
||||||
|
async getProviderMappingStats(): Promise<unknown> {
|
||||||
|
this.log('info', 'Getting provider mapping statistics...');
|
||||||
|
return getProviderMappingStats({}, this.services);
|
||||||
|
}
|
||||||
|
|
||||||
handlerRegistry.register(HANDLER_NAME, exchangesHandlerConfigWithContainer);
|
/**
|
||||||
logger.info('Exchanges handler registered successfully');
|
* Get enhanced sync status
|
||||||
|
*/
|
||||||
|
@Operation('enhanced-sync-status')
|
||||||
|
async getEnhancedSyncStatus(): Promise<unknown> {
|
||||||
|
this.log('info', 'Getting enhanced sync status...');
|
||||||
|
return getSyncStatus({}, this.services);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,33 +1,42 @@
|
||||||
/**
|
/**
|
||||||
* Handler initialization for data pipeline service
|
* Handler auto-registration for data pipeline service
|
||||||
* Registers all handlers with the service container
|
* Automatically discovers and registers all handlers
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import type { ServiceContainer } from '@stock-bot/di';
|
import type { IServiceContainer } from '@stock-bot/handlers';
|
||||||
|
import { autoRegisterHandlers } from '@stock-bot/handlers';
|
||||||
import { getLogger } from '@stock-bot/logger';
|
import { getLogger } from '@stock-bot/logger';
|
||||||
import { initializeExchangesHandler } from './exchanges/exchanges.handler';
|
|
||||||
import { initializeSymbolsHandler } from './symbols/symbols.handler';
|
// Import handlers for bundling (ensures they're included in the build)
|
||||||
|
import './exchanges/exchanges.handler';
|
||||||
|
import './symbols/symbols.handler';
|
||||||
|
|
||||||
const logger = getLogger('pipeline-handler-init');
|
const logger = getLogger('pipeline-handler-init');
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initialize all handlers with the service container
|
* Initialize and register all handlers automatically
|
||||||
*/
|
*/
|
||||||
export async function initializeAllHandlers(container: ServiceContainer): Promise<void> {
|
export async function initializeAllHandlers(container: IServiceContainer): Promise<void> {
|
||||||
logger.info('Initializing data pipeline handlers...');
|
logger.info('Initializing data pipeline handlers...');
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Initialize exchanges handler with container
|
// Auto-register all handlers in this directory
|
||||||
initializeExchangesHandler(container);
|
const result = await autoRegisterHandlers(__dirname, container, {
|
||||||
logger.debug('Exchanges handler initialized');
|
pattern: '.handler.',
|
||||||
|
exclude: ['test', 'spec', '.old'],
|
||||||
|
dryRun: false,
|
||||||
|
});
|
||||||
|
|
||||||
// Initialize symbols handler with container
|
logger.info('Handler auto-registration complete', {
|
||||||
initializeSymbolsHandler(container);
|
registered: result.registered,
|
||||||
logger.debug('Symbols handler initialized');
|
failed: result.failed,
|
||||||
|
});
|
||||||
|
|
||||||
logger.info('All pipeline handlers initialized successfully');
|
if (result.failed.length > 0) {
|
||||||
|
logger.error('Some handlers failed to register', { failed: result.failed });
|
||||||
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error('Failed to initialize handlers', { error });
|
logger.error('Handler auto-registration failed', { error });
|
||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -1,56 +1,68 @@
|
||||||
import { getLogger } from '@stock-bot/logger';
|
import {
|
||||||
import { handlerRegistry, createJobHandler, type HandlerConfig, type ScheduledJobConfig } from '@stock-bot/queue';
|
BaseHandler,
|
||||||
import type { ServiceContainer } from '@stock-bot/di';
|
Handler,
|
||||||
import { symbolOperations } from './operations';
|
Operation,
|
||||||
|
ScheduledOperation,
|
||||||
|
type IServiceContainer,
|
||||||
|
} from '@stock-bot/handlers';
|
||||||
|
import { syncQMSymbols } from './operations/qm-symbols.operations';
|
||||||
|
import { syncSymbolsFromProvider } from './operations/sync-symbols-from-provider.operations';
|
||||||
|
import { getSyncStatus } from './operations/sync-status.operations';
|
||||||
|
|
||||||
const logger = getLogger('symbols-handler');
|
@Handler('symbols')
|
||||||
|
export class SymbolsHandler extends BaseHandler {
|
||||||
|
constructor(services: IServiceContainer) {
|
||||||
|
super(services);
|
||||||
|
}
|
||||||
|
|
||||||
const HANDLER_NAME = 'symbols';
|
/**
|
||||||
|
* Sync symbols from QuestionsAndMethods API
|
||||||
const symbolsHandlerConfig: HandlerConfig = {
|
*/
|
||||||
concurrency: 1,
|
@ScheduledOperation('sync-qm-symbols', '0 2 * * *', {
|
||||||
maxAttempts: 3,
|
|
||||||
scheduledJobs: [
|
|
||||||
{
|
|
||||||
operation: 'sync-qm-symbols',
|
|
||||||
cronPattern: '0 2 * * *', // Daily at 2 AM
|
|
||||||
payload: {},
|
|
||||||
priority: 5,
|
priority: 5,
|
||||||
immediately: false,
|
description: 'Daily sync of QM symbols at 2 AM',
|
||||||
} as ScheduledJobConfig,
|
})
|
||||||
{
|
async syncQMSymbols(): Promise<{ processed: number; created: number; updated: number }> {
|
||||||
operation: 'sync-symbols-qm',
|
this.log('info', 'Starting QM symbols sync...');
|
||||||
cronPattern: '0 4 * * *', // Daily at 4 AM
|
return syncQMSymbols({}, this.services);
|
||||||
payload: { provider: 'qm', clearFirst: false },
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sync symbols from specific provider
|
||||||
|
*/
|
||||||
|
@Operation('sync-symbols-qm')
|
||||||
|
@ScheduledOperation('sync-symbols-qm', '0 4 * * *', {
|
||||||
priority: 5,
|
priority: 5,
|
||||||
immediately: false,
|
description: 'Daily sync of symbols from QM provider at 4 AM',
|
||||||
} as ScheduledJobConfig,
|
})
|
||||||
],
|
async syncSymbolsQM(): Promise<unknown> {
|
||||||
operations: {
|
return this.syncSymbolsFromProvider({ provider: 'qm', clearFirst: false });
|
||||||
'sync-qm-symbols': symbolOperations.syncQMSymbols,
|
}
|
||||||
'sync-symbols-qm': symbolOperations.syncSymbolsFromProvider,
|
|
||||||
'sync-symbols-eod': symbolOperations.syncSymbolsFromProvider,
|
|
||||||
'sync-symbols-ib': symbolOperations.syncSymbolsFromProvider,
|
|
||||||
'sync-status': symbolOperations.getSyncStatus,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
export function initializeSymbolsHandler(container: ServiceContainer): void {
|
@Operation('sync-symbols-eod')
|
||||||
logger.info('Registering symbols handler...');
|
async syncSymbolsEOD(payload: { provider: string; clearFirst?: boolean }): Promise<unknown> {
|
||||||
|
return this.syncSymbolsFromProvider({ ...payload, provider: 'eod' });
|
||||||
|
}
|
||||||
|
|
||||||
// Update operations to use container
|
@Operation('sync-symbols-ib')
|
||||||
const containerAwareOperations = Object.entries(symbolOperations).reduce((acc, [key, operation]) => {
|
async syncSymbolsIB(payload: { provider: string; clearFirst?: boolean }): Promise<unknown> {
|
||||||
acc[key] = createJobHandler(async (payload: any) => {
|
return this.syncSymbolsFromProvider({ ...payload, provider: 'ib' });
|
||||||
return operation(payload, container);
|
}
|
||||||
});
|
|
||||||
return acc;
|
|
||||||
}, {} as Record<string, any>);
|
|
||||||
|
|
||||||
const symbolsHandlerConfigWithContainer: HandlerConfig = {
|
/**
|
||||||
...symbolsHandlerConfig,
|
* Get sync status for symbols
|
||||||
operations: containerAwareOperations,
|
*/
|
||||||
};
|
@Operation('sync-status')
|
||||||
|
async getSyncStatus(): Promise<unknown> {
|
||||||
|
this.log('info', 'Getting symbol sync status...');
|
||||||
|
return getSyncStatus({}, this.services);
|
||||||
|
}
|
||||||
|
|
||||||
handlerRegistry.register(HANDLER_NAME, symbolsHandlerConfigWithContainer);
|
/**
|
||||||
logger.info('Symbols handler registered successfully');
|
* Internal method to sync symbols from a provider
|
||||||
|
*/
|
||||||
|
private async syncSymbolsFromProvider(payload: { provider: string; clearFirst?: boolean }): Promise<unknown> {
|
||||||
|
this.log('info', 'Syncing symbols from provider', { provider: payload.provider });
|
||||||
|
return syncSymbolsFromProvider(payload, this.services);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -1,10 +1,7 @@
|
||||||
// Re-export handler types from shared types package
|
// Re-export handler types from shared types package
|
||||||
export type {
|
export type {
|
||||||
JobHandler,
|
|
||||||
TypedJobHandler,
|
|
||||||
HandlerConfig,
|
HandlerConfig,
|
||||||
HandlerConfigWithSchedule,
|
HandlerConfigWithSchedule, JobHandler, ScheduledJob, TypedJobHandler
|
||||||
ScheduledJob,
|
|
||||||
} from '@stock-bot/types';
|
} from '@stock-bot/types';
|
||||||
|
|
||||||
// Types for queue operations
|
// Types for queue operations
|
||||||
|
|
@ -110,7 +107,6 @@ export interface QueueConfig extends QueueManagerConfig {
|
||||||
enableMetrics?: boolean;
|
enableMetrics?: boolean;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Re-export createJobHandler from shared types package
|
|
||||||
export { createJobHandler } from '@stock-bot/types';
|
export { createJobHandler } from '@stock-bot/types';
|
||||||
|
|
||||||
export interface BatchJobData {
|
export interface BatchJobData {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue