From 71f9b0a88624b9443f0e98585801b7fe8b597783 Mon Sep 17 00:00:00 2001 From: Boki Date: Thu, 19 Jun 2025 23:45:38 -0400 Subject: [PATCH] fixing up db's and data-service --- apps/data-service/src/index.ts | 60 +++++++++++++++----- apps/data-service/src/providers/qm.tasks.ts | 6 +- libs/config/config/default.json | 2 +- libs/mongodb-client/src/index.ts | 7 +++ libs/mongodb-client/src/singleton.ts | 62 +++++++++++++++++++++ libs/postgres-client/src/index.ts | 8 ++- libs/postgres-client/src/singleton.ts | 50 +++++++++++++++++ libs/queue/src/queue.ts | 28 ++++++++++ libs/queue/src/types.ts | 7 +++ 9 files changed, 211 insertions(+), 19 deletions(-) create mode 100644 libs/mongodb-client/src/singleton.ts create mode 100644 libs/postgres-client/src/singleton.ts diff --git a/apps/data-service/src/index.ts b/apps/data-service/src/index.ts index 4f993a0..19a89fd 100644 --- a/apps/data-service/src/index.ts +++ b/apps/data-service/src/index.ts @@ -8,8 +8,8 @@ import { cors } from 'hono/cors'; // Library imports import { initializeServiceConfig } from '@stock-bot/config'; import { getLogger, setLoggerConfig, shutdownLoggers } from '@stock-bot/logger'; -import { createAndConnectMongoDBClient, MongoDBClient } from '@stock-bot/mongodb-client'; -import { createAndConnectPostgreSQLClient, PostgreSQLClient } from '@stock-bot/postgres-client'; +import { connectMongoDB } from '@stock-bot/mongodb-client'; +import { connectPostgreSQL } from '@stock-bot/postgres-client'; import { QueueManager, handlerRegistry, type QueueManagerConfig } from '@stock-bot/queue'; import { Shutdown } from '@stock-bot/shutdown'; // Local imports @@ -48,8 +48,7 @@ app.use( const logger = getLogger('data-service'); const PORT = serviceConfig.port; let server: ReturnType | null = null; -let postgresClient: PostgreSQLClient | null = null; -let mongoClient: MongoDBClient | null = null; +// Singleton clients are managed in libraries let queueManager: QueueManager | null = null; // Initialize shutdown manager @@ -65,10 +64,10 @@ async function initializeServices() { logger.info('Initializing data service...'); try { - // Initialize MongoDB client + // Initialize MongoDB client singleton logger.info('Connecting to MongoDB...'); const mongoConfig = databaseConfig.mongodb; - mongoClient = await createAndConnectMongoDBClient({ + await connectMongoDB({ uri: mongoConfig.uri, database: mongoConfig.database, host: mongoConfig.host || 'localhost', @@ -81,10 +80,10 @@ async function initializeServices() { }); logger.info('MongoDB connected'); - // Initialize PostgreSQL client + // Initialize PostgreSQL client singleton logger.info('Connecting to PostgreSQL...'); const pgConfig = databaseConfig.postgres; - postgresClient = await createAndConnectPostgreSQLClient({ + await connectPostgreSQL({ host: pgConfig.host, port: pgConfig.port, database: pgConfig.database, @@ -130,6 +129,40 @@ async function initializeServices() { initializeWebShareProvider(); logger.info('Data providers initialized'); + // Create scheduled jobs from registered handlers + logger.info('Creating scheduled jobs from registered handlers...'); + const { handlerRegistry } = await import('@stock-bot/queue'); + const allHandlers = handlerRegistry.getAllHandlers(); + + let totalScheduledJobs = 0; + for (const [handlerName, config] of allHandlers) { + if (config.scheduledJobs && config.scheduledJobs.length > 0) { + const queue = queueManager.getQueue(handlerName); + + for (const scheduledJob of config.scheduledJobs) { + // Include handler and operation info in job data + const jobData = { + handler: handlerName, + operation: scheduledJob.operation, + ...(scheduledJob.payload || {}), + }; + + await queue.addScheduledJob( + scheduledJob.operation, + jobData, + scheduledJob.cronPattern + ); + totalScheduledJobs++; + logger.info('Scheduled job created', { + handler: handlerName, + operation: scheduledJob.operation, + cronPattern: scheduledJob.cronPattern + }); + } + } + } + logger.info('Scheduled jobs created', { totalJobs: totalScheduledJobs }); + logger.info('All services initialized successfully'); } catch (error) { logger.error('Failed to initialize services', { error }); @@ -178,12 +211,11 @@ shutdown.onShutdown(async () => { shutdown.onShutdown(async () => { logger.info('Disconnecting from databases...'); try { - if (mongoClient) { - await mongoClient.disconnect(); - } - if (postgresClient) { - await postgresClient.disconnect(); - } + const { disconnectMongoDB } = await import('@stock-bot/mongodb-client'); + const { disconnectPostgreSQL } = await import('@stock-bot/postgres-client'); + + await disconnectMongoDB(); + await disconnectPostgreSQL(); logger.info('Database connections closed'); } catch (error) { logger.error('Error closing database connections', { error }); diff --git a/apps/data-service/src/providers/qm.tasks.ts b/apps/data-service/src/providers/qm.tasks.ts index 27185ea..911c096 100644 --- a/apps/data-service/src/providers/qm.tasks.ts +++ b/apps/data-service/src/providers/qm.tasks.ts @@ -321,7 +321,7 @@ async function searchQMSymbolsAPI(query: string): Promise { } const symbols = await response.json(); - const client = getMongoDBClient(); + const mongoClient = getMongoDBClient(); const updatedSymbols = symbols.map((symbol: any) => { return { ...symbol, @@ -329,7 +329,7 @@ async function searchQMSymbolsAPI(query: string): Promise { symbol: symbol.symbol.split(':')[0], // Extract symbol from "symbol:exchange" }; }); - await client.batchUpsert('qmSymbols', updatedSymbols, ['qmSearchCode']); + await mongoClient.batchUpsert('qmSymbols', updatedSymbols, ['qmSearchCode']); const exchanges: Exchange[] = []; for (const symbol of symbols) { if (!exchanges.some(ex => ex.exchange === symbol.exchange)) { @@ -342,7 +342,7 @@ async function searchQMSymbolsAPI(query: string): Promise { }); } } - await client.batchUpsert('qmExchanges', exchanges, ['exchange']); + await mongoClient.batchUpsert('qmExchanges', exchanges, ['exchange']); session.successfulCalls++; session.lastUsed = new Date(); diff --git a/libs/config/config/default.json b/libs/config/config/default.json index d5e1049..3a3c184 100644 --- a/libs/config/config/default.json +++ b/libs/config/config/default.json @@ -67,7 +67,7 @@ "redis": { "host": "localhost", "port": 6379, - "db": 1 + "db": 0 }, "defaultJobOptions": { "attempts": 3, diff --git a/libs/mongodb-client/src/index.ts b/libs/mongodb-client/src/index.ts index 2ba0765..c89c9be 100644 --- a/libs/mongodb-client/src/index.ts +++ b/libs/mongodb-client/src/index.ts @@ -27,3 +27,10 @@ export { createMongoDBClient, createAndConnectMongoDBClient, } from './factory'; + +// Singleton instance +export { + getMongoDBClient, + connectMongoDB, + getDatabase, +} from './singleton'; diff --git a/libs/mongodb-client/src/singleton.ts b/libs/mongodb-client/src/singleton.ts new file mode 100644 index 0000000..5806b80 --- /dev/null +++ b/libs/mongodb-client/src/singleton.ts @@ -0,0 +1,62 @@ +import { MongoDBClient } from './client'; +import type { MongoDBClientConfig } from './types'; +import type { Db } from 'mongodb'; + +/** + * Singleton MongoDB client instance + * Provides global access to a single MongoDB connection + */ +let instance: MongoDBClient | null = null; + +/** + * Initialize the singleton MongoDB client + */ +export async function connectMongoDB(config?: MongoDBClientConfig): Promise { + if (!instance) { + if (!config) { + throw new Error('MongoDB client not initialized. Call connectMongoDB(config) first.'); + } + instance = new MongoDBClient(config); + await instance.connect(); + } + return instance; +} + +/** + * Get the singleton MongoDB client instance + * @throws Error if not initialized + */ +export function getMongoDBClient(): MongoDBClient { + if (!instance) { + throw new Error('MongoDB client not initialized. Call connectMongoDB(config) first.'); + } + return instance; +} + +/** + * Get the MongoDB database instance + * @throws Error if not initialized + */ +export function getDatabase(): Db { + if (!instance) { + throw new Error('MongoDB client not initialized. Call connectMongoDB(config) first.'); + } + return instance.getDatabase(); +} + +/** + * Check if the MongoDB client is initialized + */ +export function isInitialized(): boolean { + return instance !== null && instance.connected; +} + +/** + * Disconnect and reset the singleton instance + */ +export async function disconnectMongoDB(): Promise { + if (instance) { + await instance.disconnect(); + instance = null; + } +} \ No newline at end of file diff --git a/libs/postgres-client/src/index.ts b/libs/postgres-client/src/index.ts index f90091b..26e4955 100644 --- a/libs/postgres-client/src/index.ts +++ b/libs/postgres-client/src/index.ts @@ -30,8 +30,14 @@ export type { AuditLog, } from './types'; -// Utils +// Factory functions export { createPostgreSQLClient, createAndConnectPostgreSQLClient, } from './factory'; + +// Singleton instance +export { + getPostgreSQLClient, + connectPostgreSQL, +} from './singleton'; diff --git a/libs/postgres-client/src/singleton.ts b/libs/postgres-client/src/singleton.ts new file mode 100644 index 0000000..3a1ded8 --- /dev/null +++ b/libs/postgres-client/src/singleton.ts @@ -0,0 +1,50 @@ +import { PostgreSQLClient } from './client'; +import type { PostgreSQLClientConfig } from './types'; + +/** + * Singleton PostgreSQL client instance + * Provides global access to a single PostgreSQL connection pool + */ +let instance: PostgreSQLClient | null = null; + +/** + * Initialize the singleton PostgreSQL client + */ +export async function connectPostgreSQL(config?: PostgreSQLClientConfig): Promise { + if (!instance) { + if (!config) { + throw new Error('PostgreSQL client not initialized. Call connectPostgreSQL(config) first.'); + } + instance = new PostgreSQLClient(config); + await instance.connect(); + } + return instance; +} + +/** + * Get the singleton PostgreSQL client instance + * @throws Error if not initialized + */ +export function getPostgreSQLClient(): PostgreSQLClient { + if (!instance) { + throw new Error('PostgreSQL client not initialized. Call connectPostgreSQL(config) first.'); + } + return instance; +} + +/** + * Check if the PostgreSQL client is initialized + */ +export function isInitialized(): boolean { + return instance !== null && instance.connected; +} + +/** + * Disconnect and reset the singleton instance + */ +export async function disconnectPostgreSQL(): Promise { + if (instance) { + await instance.disconnect(); + instance = null; + } +} \ No newline at end of file diff --git a/libs/queue/src/queue.ts b/libs/queue/src/queue.ts index 0caccba..fa59123 100644 --- a/libs/queue/src/queue.ts +++ b/libs/queue/src/queue.ts @@ -94,6 +94,34 @@ export class Queue { return await this.bullQueue.addBulk(jobs); } + /** + * Add a scheduled job with cron-like pattern + */ + async addScheduledJob( + name: string, + data: JobData, + cronPattern: string, + options: JobOptions = {} + ): Promise { + const scheduledOptions: JobOptions = { + ...options, + repeat: { + pattern: cronPattern, + // Use job name as repeat key to prevent duplicates + key: `${this.queueName}:${name}`, + }, + }; + + logger.info('Adding scheduled job', { + queueName: this.queueName, + jobName: name, + cronPattern, + repeatKey: scheduledOptions.repeat?.key + }); + + return await this.bullQueue.add(name, data, scheduledOptions); + } + /** * Get queue statistics */ diff --git a/libs/queue/src/types.ts b/libs/queue/src/types.ts index f727431..a8ed887 100644 --- a/libs/queue/src/types.ts +++ b/libs/queue/src/types.ts @@ -46,6 +46,13 @@ export interface JobOptions { type: 'exponential' | 'fixed'; delay: number; }; + repeat?: { + pattern?: string; + key?: string; + limit?: number; + every?: number; + immediately?: boolean; + }; } export interface QueueOptions {