335 lines
9.9 KiB
TypeScript
335 lines
9.9 KiB
TypeScript
/**
|
|
* Pipeline Service
|
|
* Manages data pipeline operations by queuing jobs for the data-pipeline service
|
|
*/
|
|
|
|
import type { IServiceContainer } from '@stock-bot/handlers';
|
|
import { getLogger } from '@stock-bot/logger';
|
|
|
|
const logger = getLogger('pipeline-service');
|
|
|
|
export interface PipelineJobResult {
|
|
success: boolean;
|
|
jobId?: string;
|
|
message?: string;
|
|
error?: string;
|
|
data?: any;
|
|
}
|
|
|
|
export interface PipelineStatsResult {
|
|
success: boolean;
|
|
data?: any;
|
|
error?: string;
|
|
}
|
|
|
|
export class PipelineService {
|
|
constructor(private container: IServiceContainer) {}
|
|
|
|
/**
|
|
* Queue a job to sync symbols from QuestionsAndMethods
|
|
*/
|
|
async syncQMSymbols(): Promise<PipelineJobResult> {
|
|
try {
|
|
const queueManager = this.container.queue;
|
|
if (!queueManager) {
|
|
return { success: false, error: 'Queue manager not available' };
|
|
}
|
|
|
|
const symbolsQueue = queueManager.getQueue('symbols');
|
|
const job = await symbolsQueue.addJob('sync-qm-symbols', {
|
|
handler: 'symbols',
|
|
operation: 'sync-qm-symbols',
|
|
payload: {},
|
|
});
|
|
|
|
logger.info('QM symbols sync job queued', { jobId: job.id });
|
|
return { success: true, jobId: job.id, message: 'QM symbols sync job queued' };
|
|
} catch (error) {
|
|
logger.error('Failed to queue QM symbols sync job', { error });
|
|
return {
|
|
success: false,
|
|
error: error instanceof Error ? error.message : 'Failed to queue sync job',
|
|
};
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Queue a job to sync exchanges from QuestionsAndMethods
|
|
*/
|
|
async syncQMExchanges(): Promise<PipelineJobResult> {
|
|
try {
|
|
const queueManager = this.container.queue;
|
|
if (!queueManager) {
|
|
return { success: false, error: 'Queue manager not available' };
|
|
}
|
|
|
|
const exchangesQueue = queueManager.getQueue('exchanges');
|
|
const job = await exchangesQueue.addJob('sync-qm-exchanges', {
|
|
handler: 'exchanges',
|
|
operation: 'sync-qm-exchanges',
|
|
payload: {},
|
|
});
|
|
|
|
logger.info('QM exchanges sync job queued', { jobId: job.id });
|
|
return { success: true, jobId: job.id, message: 'QM exchanges sync job queued' };
|
|
} catch (error) {
|
|
logger.error('Failed to queue QM exchanges sync job', { error });
|
|
return {
|
|
success: false,
|
|
error: error instanceof Error ? error.message : 'Failed to queue sync job',
|
|
};
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Queue a job to sync symbols from a specific provider
|
|
*/
|
|
async syncProviderSymbols(provider: string): Promise<PipelineJobResult> {
|
|
try {
|
|
const queueManager = this.container.queue;
|
|
if (!queueManager) {
|
|
return { success: false, error: 'Queue manager not available' };
|
|
}
|
|
|
|
const symbolsQueue = queueManager.getQueue('symbols');
|
|
const job = await symbolsQueue.addJob('sync-symbols-from-provider', {
|
|
handler: 'symbols',
|
|
operation: 'sync-symbols-from-provider',
|
|
payload: { provider },
|
|
});
|
|
|
|
logger.info(`${provider} symbols sync job queued`, { jobId: job.id, provider });
|
|
return {
|
|
success: true,
|
|
jobId: job.id,
|
|
message: `${provider} symbols sync job queued`,
|
|
};
|
|
} catch (error) {
|
|
logger.error('Failed to queue provider symbols sync job', { error, provider });
|
|
return {
|
|
success: false,
|
|
error: error instanceof Error ? error.message : 'Failed to queue sync job',
|
|
};
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Queue a job to sync all exchanges
|
|
*/
|
|
async syncAllExchanges(clearFirst: boolean = false): Promise<PipelineJobResult> {
|
|
try {
|
|
const queueManager = this.container.queue;
|
|
if (!queueManager) {
|
|
return { success: false, error: 'Queue manager not available' };
|
|
}
|
|
|
|
const exchangesQueue = queueManager.getQueue('exchanges');
|
|
const job = await exchangesQueue.addJob('sync-all-exchanges', {
|
|
handler: 'exchanges',
|
|
operation: 'sync-all-exchanges',
|
|
payload: { clearFirst },
|
|
});
|
|
|
|
logger.info('Enhanced exchanges sync job queued', { jobId: job.id, clearFirst });
|
|
return {
|
|
success: true,
|
|
jobId: job.id,
|
|
message: 'Enhanced exchanges sync job queued',
|
|
};
|
|
} catch (error) {
|
|
logger.error('Failed to queue enhanced exchanges sync job', { error });
|
|
return {
|
|
success: false,
|
|
error: error instanceof Error ? error.message : 'Failed to queue sync job',
|
|
};
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Queue a job to sync QM provider mappings
|
|
*/
|
|
async syncQMProviderMappings(): Promise<PipelineJobResult> {
|
|
try {
|
|
const queueManager = this.container.queue;
|
|
if (!queueManager) {
|
|
return { success: false, error: 'Queue manager not available' };
|
|
}
|
|
|
|
const exchangesQueue = queueManager.getQueue('exchanges');
|
|
const job = await exchangesQueue.addJob('sync-qm-provider-mappings', {
|
|
handler: 'exchanges',
|
|
operation: 'sync-qm-provider-mappings',
|
|
payload: {},
|
|
});
|
|
|
|
logger.info('QM provider mappings sync job queued', { jobId: job.id });
|
|
return {
|
|
success: true,
|
|
jobId: job.id,
|
|
message: 'QM provider mappings sync job queued',
|
|
};
|
|
} catch (error) {
|
|
logger.error('Failed to queue QM provider mappings sync job', { error });
|
|
return {
|
|
success: false,
|
|
error: error instanceof Error ? error.message : 'Failed to queue sync job',
|
|
};
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Queue a job to sync IB exchanges
|
|
*/
|
|
async syncIBExchanges(): Promise<PipelineJobResult> {
|
|
try {
|
|
const queueManager = this.container.queue;
|
|
if (!queueManager) {
|
|
return { success: false, error: 'Queue manager not available' };
|
|
}
|
|
|
|
const exchangesQueue = queueManager.getQueue('exchanges');
|
|
const job = await exchangesQueue.addJob('sync-ib-exchanges', {
|
|
handler: 'exchanges',
|
|
operation: 'sync-ib-exchanges',
|
|
payload: {},
|
|
});
|
|
|
|
logger.info('IB exchanges sync job queued', { jobId: job.id });
|
|
return {
|
|
success: true,
|
|
jobId: job.id,
|
|
message: 'IB exchanges sync job queued',
|
|
};
|
|
} catch (error) {
|
|
logger.error('Failed to queue IB exchanges sync job', { error });
|
|
return {
|
|
success: false,
|
|
error: error instanceof Error ? error.message : 'Failed to queue sync job',
|
|
};
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get sync status
|
|
*/
|
|
async getSyncStatus(): Promise<PipelineJobResult> {
|
|
try {
|
|
const queueManager = this.container.queue;
|
|
if (!queueManager) {
|
|
return { success: false, error: 'Queue manager not available' };
|
|
}
|
|
|
|
const symbolsQueue = queueManager.getQueue('symbols');
|
|
const job = await symbolsQueue.addJob('sync-status', {
|
|
handler: 'symbols',
|
|
operation: 'sync-status',
|
|
payload: {},
|
|
});
|
|
|
|
logger.info('Sync status job queued', { jobId: job.id });
|
|
return {
|
|
success: true,
|
|
jobId: job.id,
|
|
message: 'Sync status job queued',
|
|
};
|
|
} catch (error) {
|
|
logger.error('Failed to queue sync status job', { error });
|
|
return {
|
|
success: false,
|
|
error: error instanceof Error ? error.message : 'Failed to queue status job',
|
|
};
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Clear PostgreSQL data
|
|
*/
|
|
async clearPostgreSQLData(
|
|
dataType: 'exchanges' | 'provider_mappings' | 'all' = 'all'
|
|
): Promise<PipelineJobResult> {
|
|
try {
|
|
const queueManager = this.container.queue;
|
|
if (!queueManager) {
|
|
return { success: false, error: 'Queue manager not available' };
|
|
}
|
|
|
|
const exchangesQueue = queueManager.getQueue('exchanges');
|
|
const job = await exchangesQueue.addJob('clear-postgresql-data', {
|
|
handler: 'exchanges',
|
|
operation: 'clear-postgresql-data',
|
|
payload: { dataType },
|
|
});
|
|
|
|
logger.info('PostgreSQL data clear job queued', { jobId: job.id, dataType });
|
|
return {
|
|
success: true,
|
|
jobId: job.id,
|
|
message: 'PostgreSQL data clear job queued',
|
|
};
|
|
} catch (error) {
|
|
logger.error('Failed to queue PostgreSQL clear job', { error });
|
|
return {
|
|
success: false,
|
|
error: error instanceof Error ? error.message : 'Failed to queue clear job',
|
|
};
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get exchange statistics (waits for result)
|
|
*/
|
|
async getExchangeStats(): Promise<PipelineStatsResult> {
|
|
try {
|
|
const queueManager = this.container.queue;
|
|
if (!queueManager) {
|
|
return { success: false, error: 'Queue manager not available' };
|
|
}
|
|
|
|
const exchangesQueue = queueManager.getQueue('exchanges');
|
|
const job = await exchangesQueue.addJob('get-exchange-stats', {
|
|
handler: 'exchanges',
|
|
operation: 'get-exchange-stats',
|
|
payload: {},
|
|
});
|
|
|
|
// Wait for job to complete and return result
|
|
const result = await job.waitUntilFinished();
|
|
return { success: true, data: result };
|
|
} catch (error) {
|
|
logger.error('Failed to get exchange stats', { error });
|
|
return {
|
|
success: false,
|
|
error: error instanceof Error ? error.message : 'Failed to get stats',
|
|
};
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get provider mapping statistics (waits for result)
|
|
*/
|
|
async getProviderMappingStats(): Promise<PipelineStatsResult> {
|
|
try {
|
|
const queueManager = this.container.queue;
|
|
if (!queueManager) {
|
|
return { success: false, error: 'Queue manager not available' };
|
|
}
|
|
|
|
const exchangesQueue = queueManager.getQueue('exchanges');
|
|
const job = await exchangesQueue.addJob('get-provider-mapping-stats', {
|
|
handler: 'exchanges',
|
|
operation: 'get-provider-mapping-stats',
|
|
payload: {},
|
|
});
|
|
|
|
// Wait for job to complete and return result
|
|
const result = await job.waitUntilFinished();
|
|
return { success: true, data: result };
|
|
} catch (error) {
|
|
logger.error('Failed to get provider mapping stats', { error });
|
|
return {
|
|
success: false,
|
|
error: error instanceof Error ? error.message : 'Failed to get stats',
|
|
};
|
|
}
|
|
}
|
|
}
|