diff --git a/apps/data-service/config/default.json b/apps/data-service/config/default.json index f86570b..df7cdac 100644 --- a/apps/data-service/config/default.json +++ b/apps/data-service/config/default.json @@ -11,5 +11,21 @@ "origin": "*", "credentials": false } + }, + "queue": { + "redis": { + "host": "localhost", + "port": 6379, + "db": 1 + }, + "defaultJobOptions": { + "attempts": 3, + "backoff": { + "type": "exponential", + "delay": 1000 + }, + "removeOnComplete": true, + "removeOnFail": false + } } } \ No newline at end of file diff --git a/apps/data-service/src/index.ts b/apps/data-service/src/index.ts index d9833b4..1a749d7 100644 --- a/apps/data-service/src/index.ts +++ b/apps/data-service/src/index.ts @@ -10,7 +10,7 @@ import { initializeServiceConfig } from '@stock-bot/config'; import { getLogger, setLoggerConfig, shutdownLoggers } from '@stock-bot/logger'; import { createAndConnectMongoDBClient, MongoDBClient } from '@stock-bot/mongodb-client'; import { createAndConnectPostgreSQLClient, PostgreSQLClient } from '@stock-bot/postgres-client'; -import { getQueue, initializeQueueSystem, shutdownAllQueues } from '@stock-bot/queue'; +import { QueueManager, handlerRegistry, type QueueManagerConfig } from '@stock-bot/queue'; import { Shutdown } from '@stock-bot/shutdown'; // Local imports import { exchangeRoutes, healthRoutes, queueRoutes } from './routes'; @@ -50,7 +50,7 @@ const PORT = serviceConfig.port; let server: ReturnType | null = null; let postgresClient: PostgreSQLClient | null = null; let mongoClient: MongoDBClient | null = null; -// Queue system will be initialized globally +let queueManager: QueueManager | null = null; // Initialize shutdown manager const shutdown = Shutdown.getInstance({ timeout: 15000 }); @@ -100,14 +100,37 @@ async function initializeServices() { // Initialize queue system logger.info('Initializing queue system...'); - await initializeQueueSystem({ + const queueManagerConfig: QueueManagerConfig = { redis: queueConfig.redis, - defaultJobOptions: queueConfig.defaultJobOptions, - workers: 5, - concurrency: 20, - }); + defaultQueueOptions: { + defaultJobOptions: queueConfig.defaultJobOptions, + workers: 5, + concurrency: 20, + enableMetrics: true, + enableDLQ: true, + }, + enableScheduledJobs: true, + }; + + queueManager = QueueManager.getInstance(queueManagerConfig); + await queueManager.initialize(); logger.info('Queue system initialized'); + // Initialize providers (register handlers and scheduled jobs) + logger.info('Initializing data providers...'); + const { initializeExchangeSyncProvider } = await import('./providers/exchange-sync.provider'); + const { initializeIBProvider } = await import('./providers/ib.provider'); + const { initializeProxyProvider } = await import('./providers/proxy.provider'); + const { initializeQMProvider } = await import('./providers/qm.provider'); + const { initializeWebShareProvider } = await import('./providers/webshare.provider'); + + initializeExchangeSyncProvider(); + initializeIBProvider(); + initializeProxyProvider(); + initializeQMProvider(); + initializeWebShareProvider(); + logger.info('Data providers initialized'); + logger.info('All services initialized successfully'); } catch (error) { logger.error('Failed to initialize services', { error }); @@ -144,7 +167,9 @@ shutdown.onShutdown(async () => { shutdown.onShutdown(async () => { logger.info('Shutting down queue system...'); try { - await shutdownAllQueues(); + if (queueManager) { + await queueManager.shutdown(); + } logger.info('Queue system shut down'); } catch (error) { logger.error('Error shutting down queue system', { error }); @@ -183,5 +208,5 @@ startServer().catch(error => { logger.info('Data service startup initiated'); -// Export queue functions for providers -export { getQueue }; +// Export queue manager for providers +export { queueManager }; diff --git a/apps/data-service/src/providers/exchange-sync.provider.ts b/apps/data-service/src/providers/exchange-sync.provider.ts index c522085..40eeebc 100644 --- a/apps/data-service/src/providers/exchange-sync.provider.ts +++ b/apps/data-service/src/providers/exchange-sync.provider.ts @@ -3,15 +3,15 @@ */ import { getLogger } from '@stock-bot/logger'; import type { MasterExchange } from '@stock-bot/mongodb-client'; -import type { ProviderConfigWithSchedule } from '@stock-bot/queue'; -import { providerRegistry } from '@stock-bot/queue'; +import type { HandlerConfigWithSchedule } from '@stock-bot/queue'; +import { handlerRegistry } from '@stock-bot/queue'; const logger = getLogger('exchange-sync'); export function initializeExchangeSyncProvider() { logger.info('Registering exchange sync provider...'); - const exchangeSyncConfig: ProviderConfigWithSchedule = { + const exchangeSyncConfig: HandlerConfigWithSchedule = { name: 'exchange-sync', operations: { @@ -40,7 +40,7 @@ export function initializeExchangeSyncProvider() { ], }; - providerRegistry.registerWithSchedule(exchangeSyncConfig); + handlerRegistry.registerWithSchedule(exchangeSyncConfig); logger.info('Exchange sync provider registered successfully'); } diff --git a/apps/data-service/src/providers/ib.provider.ts b/apps/data-service/src/providers/ib.provider.ts index 3225935..699488d 100644 --- a/apps/data-service/src/providers/ib.provider.ts +++ b/apps/data-service/src/providers/ib.provider.ts @@ -2,8 +2,8 @@ * Interactive Brokers Provider for new queue system */ import { getLogger } from '@stock-bot/logger'; -import type { ProviderConfigWithSchedule } from '@stock-bot/queue'; -import { providerRegistry } from '@stock-bot/queue'; +import type { HandlerConfigWithSchedule } from '@stock-bot/queue'; +import { handlerRegistry } from '@stock-bot/queue'; const logger = getLogger('ib-provider'); @@ -11,7 +11,7 @@ const logger = getLogger('ib-provider'); export function initializeIBProvider() { logger.info('Registering IB provider with scheduled jobs...'); - const ibProviderConfig: ProviderConfigWithSchedule = { + const ibProviderConfig: HandlerConfigWithSchedule = { name: 'ib', operations: { 'fetch-session': async _payload => { @@ -77,6 +77,6 @@ export function initializeIBProvider() { ], }; - providerRegistry.registerWithSchedule(ibProviderConfig); + handlerRegistry.registerWithSchedule(ibProviderConfig); logger.info('IB provider registered successfully with scheduled jobs'); } diff --git a/apps/data-service/src/providers/proxy.provider.ts b/apps/data-service/src/providers/proxy.provider.ts index 40ddf64..1a9c0d3 100644 --- a/apps/data-service/src/providers/proxy.provider.ts +++ b/apps/data-service/src/providers/proxy.provider.ts @@ -3,8 +3,8 @@ */ import { ProxyInfo } from '@stock-bot/http'; import { getLogger } from '@stock-bot/logger'; -import type { ProviderConfigWithSchedule } from '@stock-bot/queue'; -import { providerRegistry } from '@stock-bot/queue'; +import type { HandlerConfigWithSchedule } from '@stock-bot/queue'; +import { handlerRegistry } from '@stock-bot/queue'; const logger = getLogger('proxy-provider'); @@ -12,7 +12,7 @@ const logger = getLogger('proxy-provider'); export function initializeProxyProvider() { logger.info('Registering proxy provider with scheduled jobs...'); - const proxyProviderConfig: ProviderConfigWithSchedule = { + const proxyProviderConfig: HandlerConfigWithSchedule = { name: 'proxy', operations: { @@ -20,7 +20,7 @@ export function initializeProxyProvider() { // Fetch proxies from all configured sources logger.info('Processing fetch proxies from sources request'); const { fetchProxiesFromSources } = await import('./proxy.tasks'); - const { processItems, queueManager } = await import('../index'); + const { processItems } = await import('@stock-bot/queue'); // Fetch all proxies from sources const proxies = await fetchProxiesFromSources(); @@ -32,8 +32,8 @@ export function initializeProxyProvider() { } // Batch process the proxies through check-proxy operation - const batchResult = await processItems(proxies, queueManager, { - provider: 'proxy', + const batchResult = await processItems(proxies, 'proxy', { + handler: 'proxy', operation: 'check-proxy', totalDelayHours: 0.083, // 5 minutes (5/60 hours) batchSize: 50, // Process 50 proxies per batch @@ -83,6 +83,6 @@ export function initializeProxyProvider() { ], }; - providerRegistry.registerWithSchedule(proxyProviderConfig); + handlerRegistry.registerWithSchedule(proxyProviderConfig); logger.info('Proxy provider registered successfully with scheduled jobs'); } diff --git a/apps/data-service/src/providers/proxy.tasks.ts b/apps/data-service/src/providers/proxy.tasks.ts index 7ed6f15..e78063f 100644 --- a/apps/data-service/src/providers/proxy.tasks.ts +++ b/apps/data-service/src/providers/proxy.tasks.ts @@ -310,9 +310,12 @@ async function updateProxyInCache(proxy: ProxyInfo, isWorking: boolean): Promise // Individual task functions export async function queueProxyFetch(): Promise { const { queueManager } = await import('../index'); - const job = await queueManager.add('proxy-fetch', { - type: 'proxy-fetch', - provider: 'proxy-service', + if (!queueManager) { + throw new Error('Queue manager not initialized'); + } + const queue = queueManager.getQueue('proxy'); + const job = await queue.add('proxy-fetch', { + handler: 'proxy', operation: 'fetch-and-check', payload: {}, priority: 5, @@ -325,9 +328,12 @@ export async function queueProxyFetch(): Promise { export async function queueProxyCheck(proxies: ProxyInfo[]): Promise { const { queueManager } = await import('../index'); - const job = await queueManager.add('proxy-check', { - type: 'proxy-check', - provider: 'proxy-service', + if (!queueManager) { + throw new Error('Queue manager not initialized'); + } + const queue = queueManager.getQueue('proxy'); + const job = await queue.add('proxy-check', { + handler: 'proxy', operation: 'check-specific', payload: { proxies }, priority: 3, diff --git a/apps/data-service/src/providers/qm.provider.ts b/apps/data-service/src/providers/qm.provider.ts index 363b170..140921b 100644 --- a/apps/data-service/src/providers/qm.provider.ts +++ b/apps/data-service/src/providers/qm.provider.ts @@ -1,5 +1,5 @@ import { getLogger } from '@stock-bot/logger'; -import { providerRegistry, type ProviderConfigWithSchedule } from '@stock-bot/queue'; +import { handlerRegistry, type HandlerConfigWithSchedule } from '@stock-bot/queue'; import type { SymbolSpiderJob } from './qm.tasks'; const logger = getLogger('qm-provider'); @@ -8,7 +8,7 @@ const logger = getLogger('qm-provider'); export function initializeQMProvider() { logger.info('Registering IB provider with scheduled jobs...'); - const qmProviderConfig: ProviderConfigWithSchedule = { + const qmProviderConfig: HandlerConfigWithSchedule = { name: 'qm', operations: { 'create-sessions': async () => { @@ -77,6 +77,6 @@ export function initializeQMProvider() { ], }; - providerRegistry.registerWithSchedule(qmProviderConfig); + handlerRegistry.registerWithSchedule(qmProviderConfig); logger.info('IB provider registered successfully with scheduled jobs'); } diff --git a/apps/data-service/src/providers/qm.tasks.ts b/apps/data-service/src/providers/qm.tasks.ts index 711fa29..27185ea 100644 --- a/apps/data-service/src/providers/qm.tasks.ts +++ b/apps/data-service/src/providers/qm.tasks.ts @@ -172,6 +172,10 @@ async function createAlphabetJobs( ): Promise<{ success: boolean; symbolsFound: number; jobsCreated: number }> { try { const { queueManager } = await import('../index'); + if (!queueManager) { + throw new Error('Queue manager not initialized'); + } + const queue = queueManager.getQueue('qm'); let jobsCreated = 0; // Create jobs for A-Z @@ -185,10 +189,10 @@ async function createAlphabetJobs( maxDepth, }; - await queueManager.add( - 'qm', + await queue.add( + 'spider-symbol-search', { - provider: 'qm', + handler: 'qm', operation: 'spider-symbol-search', payload: job, }, @@ -239,6 +243,10 @@ async function searchAndSpawnJobs( // If we have 50+ symbols and haven't reached max depth, spawn sub-jobs if (symbolCount >= 50 && depth < maxDepth) { const { queueManager } = await import('../index'); + if (!queueManager) { + throw new Error('Queue manager not initialized'); + } + const queue = queueManager.getQueue('qm'); logger.info(`Spawning sub-jobs for prefix "${prefix}" (${symbolCount} >= 50 symbols)`); @@ -254,10 +262,10 @@ async function searchAndSpawnJobs( maxDepth, }; - await queueManager.add( - 'qm', + await queue.add( + 'spider-symbol-search', { - provider: 'qm', + handler: 'qm', operation: 'spider-symbol-search', payload: job, }, diff --git a/apps/data-service/src/providers/webshare.provider.ts b/apps/data-service/src/providers/webshare.provider.ts index 2b6f6f5..6dcfb8f 100644 --- a/apps/data-service/src/providers/webshare.provider.ts +++ b/apps/data-service/src/providers/webshare.provider.ts @@ -2,8 +2,8 @@ * WebShare Provider for proxy management */ import { getLogger } from '@stock-bot/logger'; -import type { ProviderConfigWithSchedule } from '@stock-bot/queue'; -import { providerRegistry } from '@stock-bot/queue'; +import type { HandlerConfigWithSchedule } from '@stock-bot/queue'; +import { handlerRegistry } from '@stock-bot/queue'; const logger = getLogger('webshare-provider'); @@ -26,7 +26,7 @@ export function getProxy(): string | null { export function initializeWebShareProvider() { logger.info('Registering WebShare provider with scheduled jobs...'); - const webShareProviderConfig: ProviderConfigWithSchedule = { + const webShareProviderConfig: HandlerConfigWithSchedule = { name: 'webshare', operations: { @@ -83,7 +83,7 @@ export function initializeWebShareProvider() { }; // Register the provider - providerRegistry.registerWithSchedule(webShareProviderConfig); + handlerRegistry.registerWithSchedule(webShareProviderConfig); logger.info('WebShare provider registered successfully'); } diff --git a/apps/data-service/src/routes/queue.routes.ts b/apps/data-service/src/routes/queue.routes.ts index cd02cc2..20a8d4d 100644 --- a/apps/data-service/src/routes/queue.routes.ts +++ b/apps/data-service/src/routes/queue.routes.ts @@ -1,5 +1,6 @@ import { Hono } from 'hono'; import { getLogger } from '@stock-bot/logger'; +import { QueueManager } from '@stock-bot/queue'; const logger = getLogger('queue-routes'); const queue = new Hono(); @@ -7,16 +8,13 @@ const queue = new Hono(); // Queue status endpoint queue.get('/status', async c => { try { - // TODO: Implement queue management + const queueManager = QueueManager.getInstance(); + const globalStats = await queueManager.getGlobalStats(); + return c.json({ status: 'success', - data: { - active: 0, - waiting: 0, - completed: 0, - failed: 0 - }, - message: 'Queue management will be implemented' + data: globalStats, + message: 'Queue status retrieved successfully' }); } catch (error) { logger.error('Failed to get queue status', { error }); diff --git a/bun.lock b/bun.lock index 5817b62..4b99ab8 100644 --- a/bun.lock +++ b/bun.lock @@ -309,22 +309,6 @@ "typescript": "^5.0.0", }, }, - "libs/strategy-engine": { - "name": "@stock-bot/strategy-engine", - "version": "1.0.0", - "dependencies": { - "@stock-bot/event-bus": "*", - "@stock-bot/logger": "*", - "@stock-bot/utils": "*", - "commander": "^14.0.0", - "eventemitter3": "^5.0.1", - }, - "devDependencies": { - "@types/node": "^20.11.0", - "bun-types": "^1.2.15", - "typescript": "^5.3.0", - }, - }, "libs/types": { "name": "@stock-bot/types", "version": "1.0.0", @@ -347,19 +331,6 @@ "typescript": "^5.3.0", }, }, - "libs/vector-engine": { - "name": "@stock-bot/vector-engine", - "version": "1.0.0", - "dependencies": { - "@stock-bot/logger": "*", - "@stock-bot/utils": "*", - }, - "devDependencies": { - "@types/node": "^20.11.0", - "bun-types": "^1.2.15", - "typescript": "^5.3.0", - }, - }, }, "trustedDependencies": [ "esbuild", @@ -813,14 +784,10 @@ "@stock-bot/shutdown": ["@stock-bot/shutdown@workspace:libs/shutdown"], - "@stock-bot/strategy-engine": ["@stock-bot/strategy-engine@workspace:libs/strategy-engine"], - "@stock-bot/types": ["@stock-bot/types@workspace:libs/types"], "@stock-bot/utils": ["@stock-bot/utils@workspace:libs/utils"], - "@stock-bot/vector-engine": ["@stock-bot/vector-engine@workspace:libs/vector-engine"], - "@stock-bot/web-api": ["@stock-bot/web-api@workspace:apps/web-api"], "@stock-bot/web-app": ["@stock-bot/web-app@workspace:apps/web-app"], @@ -1109,7 +1076,7 @@ "combined-stream": ["combined-stream@1.0.8", "", { "dependencies": { "delayed-stream": "~1.0.0" } }, "sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg=="], - "commander": ["commander@14.0.0", "", {}, "sha512-2uM9rYjPvyq39NwLRqaiLtWHyDC1FvryJDa2ATTVims5YAS4PupsEQsDvP14FqhFr0P49CYDugi59xaxJlTXRA=="], + "commander": ["commander@4.1.1", "", {}, "sha512-NOKm8xhkzAjzFx8B2v5OAHT+u5pRQc2UCa2Vq9jYL/31o2wi9mxBA7LIFs3sV5VSC49z6pEhfbMULvShKj26WA=="], "commondir": ["commondir@1.0.1", "", {}, "sha512-W9pAhw0ja1Edb5GVdIF1mjZw/ASI0AlShXM83UUGe2DVr5TdAPEA1OA8m/g8zWp9x6On7gqufY+FatDbC3MDQg=="], @@ -2383,14 +2350,10 @@ "@stock-bot/shutdown/@types/node": ["@types/node@20.19.1", "", { "dependencies": { "undici-types": "~6.21.0" } }, "sha512-jJD50LtlD2dodAEO653i3YF04NWak6jN3ky+Ri3Em3mGR39/glWiboM/IePaRbgwSfqM1TpGXfAg8ohn/4dTgA=="], - "@stock-bot/strategy-engine/@types/node": ["@types/node@20.19.1", "", { "dependencies": { "undici-types": "~6.21.0" } }, "sha512-jJD50LtlD2dodAEO653i3YF04NWak6jN3ky+Ri3Em3mGR39/glWiboM/IePaRbgwSfqM1TpGXfAg8ohn/4dTgA=="], - "@stock-bot/types/@types/node": ["@types/node@20.19.1", "", { "dependencies": { "undici-types": "~6.21.0" } }, "sha512-jJD50LtlD2dodAEO653i3YF04NWak6jN3ky+Ri3Em3mGR39/glWiboM/IePaRbgwSfqM1TpGXfAg8ohn/4dTgA=="], "@stock-bot/utils/@types/node": ["@types/node@20.19.1", "", { "dependencies": { "undici-types": "~6.21.0" } }, "sha512-jJD50LtlD2dodAEO653i3YF04NWak6jN3ky+Ri3Em3mGR39/glWiboM/IePaRbgwSfqM1TpGXfAg8ohn/4dTgA=="], - "@stock-bot/vector-engine/@types/node": ["@types/node@20.19.1", "", { "dependencies": { "undici-types": "~6.21.0" } }, "sha512-jJD50LtlD2dodAEO653i3YF04NWak6jN3ky+Ri3Em3mGR39/glWiboM/IePaRbgwSfqM1TpGXfAg8ohn/4dTgA=="], - "@stock-bot/web-app/@typescript-eslint/eslint-plugin": ["@typescript-eslint/eslint-plugin@6.21.0", "", { "dependencies": { "@eslint-community/regexpp": "^4.5.1", "@typescript-eslint/scope-manager": "6.21.0", "@typescript-eslint/type-utils": "6.21.0", "@typescript-eslint/utils": "6.21.0", "@typescript-eslint/visitor-keys": "6.21.0", "debug": "^4.3.4", "graphemer": "^1.4.0", "ignore": "^5.2.4", "natural-compare": "^1.4.0", "semver": "^7.5.4", "ts-api-utils": "^1.0.1" }, "peerDependencies": { "@typescript-eslint/parser": "^6.0.0 || ^6.0.0-alpha", "eslint": "^7.0.0 || ^8.0.0" } }, "sha512-oy9+hTPCUFpngkEZUSzbf9MxI65wbKFoQYsgPdILTfbUldp5ovUuphZVe4i30emU9M/kP+T64Di0mxl7dSw3MA=="], "@stock-bot/web-app/@typescript-eslint/parser": ["@typescript-eslint/parser@6.21.0", "", { "dependencies": { "@typescript-eslint/scope-manager": "6.21.0", "@typescript-eslint/types": "6.21.0", "@typescript-eslint/typescript-estree": "6.21.0", "@typescript-eslint/visitor-keys": "6.21.0", "debug": "^4.3.4" }, "peerDependencies": { "eslint": "^7.0.0 || ^8.0.0" } }, "sha512-tbsV1jPne5CkFQCgPBcDOt30ItF7aJoZL997JSF7MhGQqOeT3svWRYxiqlfA5RUdlHN6Fi+EI9bxqbdyAUZjYQ=="], @@ -2509,8 +2472,6 @@ "ssh-remote-port-forward/@types/ssh2": ["@types/ssh2@0.5.52", "", { "dependencies": { "@types/node": "*", "@types/ssh2-streams": "*" } }, "sha512-lbLLlXxdCZOSJMCInKH2+9V/77ET2J6NPQHpFI0kda61Dd1KglJs+fPQBchizmzYSOJBgdTajhPqBO1xxLywvg=="], - "sucrase/commander": ["commander@4.1.1", "", {}, "sha512-NOKm8xhkzAjzFx8B2v5OAHT+u5pRQc2UCa2Vq9jYL/31o2wi9mxBA7LIFs3sV5VSC49z6pEhfbMULvShKj26WA=="], - "tailwindcss/object-hash": ["object-hash@3.0.0", "", {}, "sha512-RSn9F68PjH9HqtltsSnqYC1XXoWe9Bju5+213R98cNGttag9q9yAOTzdbsqvIa7aNm5WffBZFpWYr2aWrklWAw=="], "type-is/mime-types": ["mime-types@3.0.1", "", { "dependencies": { "mime-db": "^1.54.0" } }, "sha512-xRc4oEhT6eaBpU1XF7AjpOFD+xQmXNB5OVKwp4tqCuBpHLS/ZbBDrc07mYTDqVMg6PfxUjjNp85O6Cd2Z/5HWA=="],