fixing up db's and data-service

This commit is contained in:
Boki 2025-06-19 23:45:38 -04:00
parent aca98fdce4
commit 71f9b0a886
9 changed files with 211 additions and 19 deletions

View file

@ -8,8 +8,8 @@ import { cors } from 'hono/cors';
// Library imports
import { initializeServiceConfig } from '@stock-bot/config';
import { getLogger, setLoggerConfig, shutdownLoggers } from '@stock-bot/logger';
import { createAndConnectMongoDBClient, MongoDBClient } from '@stock-bot/mongodb-client';
import { createAndConnectPostgreSQLClient, PostgreSQLClient } from '@stock-bot/postgres-client';
import { connectMongoDB } from '@stock-bot/mongodb-client';
import { connectPostgreSQL } from '@stock-bot/postgres-client';
import { QueueManager, handlerRegistry, type QueueManagerConfig } from '@stock-bot/queue';
import { Shutdown } from '@stock-bot/shutdown';
// Local imports
@ -48,8 +48,7 @@ app.use(
const logger = getLogger('data-service');
const PORT = serviceConfig.port;
let server: ReturnType<typeof Bun.serve> | null = null;
let postgresClient: PostgreSQLClient | null = null;
let mongoClient: MongoDBClient | null = null;
// Singleton clients are managed in libraries
let queueManager: QueueManager | null = null;
// Initialize shutdown manager
@ -65,10 +64,10 @@ async function initializeServices() {
logger.info('Initializing data service...');
try {
// Initialize MongoDB client
// Initialize MongoDB client singleton
logger.info('Connecting to MongoDB...');
const mongoConfig = databaseConfig.mongodb;
mongoClient = await createAndConnectMongoDBClient({
await connectMongoDB({
uri: mongoConfig.uri,
database: mongoConfig.database,
host: mongoConfig.host || 'localhost',
@ -81,10 +80,10 @@ async function initializeServices() {
});
logger.info('MongoDB connected');
// Initialize PostgreSQL client
// Initialize PostgreSQL client singleton
logger.info('Connecting to PostgreSQL...');
const pgConfig = databaseConfig.postgres;
postgresClient = await createAndConnectPostgreSQLClient({
await connectPostgreSQL({
host: pgConfig.host,
port: pgConfig.port,
database: pgConfig.database,
@ -130,6 +129,40 @@ async function initializeServices() {
initializeWebShareProvider();
logger.info('Data providers initialized');
// Create scheduled jobs from registered handlers
logger.info('Creating scheduled jobs from registered handlers...');
const { handlerRegistry } = await import('@stock-bot/queue');
const allHandlers = handlerRegistry.getAllHandlers();
let totalScheduledJobs = 0;
for (const [handlerName, config] of allHandlers) {
if (config.scheduledJobs && config.scheduledJobs.length > 0) {
const queue = queueManager.getQueue(handlerName);
for (const scheduledJob of config.scheduledJobs) {
// Include handler and operation info in job data
const jobData = {
handler: handlerName,
operation: scheduledJob.operation,
...(scheduledJob.payload || {}),
};
await queue.addScheduledJob(
scheduledJob.operation,
jobData,
scheduledJob.cronPattern
);
totalScheduledJobs++;
logger.info('Scheduled job created', {
handler: handlerName,
operation: scheduledJob.operation,
cronPattern: scheduledJob.cronPattern
});
}
}
}
logger.info('Scheduled jobs created', { totalJobs: totalScheduledJobs });
logger.info('All services initialized successfully');
} catch (error) {
logger.error('Failed to initialize services', { error });
@ -178,12 +211,11 @@ shutdown.onShutdown(async () => {
shutdown.onShutdown(async () => {
logger.info('Disconnecting from databases...');
try {
if (mongoClient) {
await mongoClient.disconnect();
}
if (postgresClient) {
await postgresClient.disconnect();
}
const { disconnectMongoDB } = await import('@stock-bot/mongodb-client');
const { disconnectPostgreSQL } = await import('@stock-bot/postgres-client');
await disconnectMongoDB();
await disconnectPostgreSQL();
logger.info('Database connections closed');
} catch (error) {
logger.error('Error closing database connections', { error });

View file

@ -321,7 +321,7 @@ async function searchQMSymbolsAPI(query: string): Promise<string[]> {
}
const symbols = await response.json();
const client = getMongoDBClient();
const mongoClient = getMongoDBClient();
const updatedSymbols = symbols.map((symbol: any) => {
return {
...symbol,
@ -329,7 +329,7 @@ async function searchQMSymbolsAPI(query: string): Promise<string[]> {
symbol: symbol.symbol.split(':')[0], // Extract symbol from "symbol:exchange"
};
});
await client.batchUpsert('qmSymbols', updatedSymbols, ['qmSearchCode']);
await mongoClient.batchUpsert('qmSymbols', updatedSymbols, ['qmSearchCode']);
const exchanges: Exchange[] = [];
for (const symbol of symbols) {
if (!exchanges.some(ex => ex.exchange === symbol.exchange)) {
@ -342,7 +342,7 @@ async function searchQMSymbolsAPI(query: string): Promise<string[]> {
});
}
}
await client.batchUpsert('qmExchanges', exchanges, ['exchange']);
await mongoClient.batchUpsert('qmExchanges', exchanges, ['exchange']);
session.successfulCalls++;
session.lastUsed = new Date();

View file

@ -67,7 +67,7 @@
"redis": {
"host": "localhost",
"port": 6379,
"db": 1
"db": 0
},
"defaultJobOptions": {
"attempts": 3,

View file

@ -27,3 +27,10 @@ export {
createMongoDBClient,
createAndConnectMongoDBClient,
} from './factory';
// Singleton instance
export {
getMongoDBClient,
connectMongoDB,
getDatabase,
} from './singleton';

View file

@ -0,0 +1,62 @@
import { MongoDBClient } from './client';
import type { MongoDBClientConfig } from './types';
import type { Db } from 'mongodb';
/**
* Singleton MongoDB client instance
* Provides global access to a single MongoDB connection
*/
let instance: MongoDBClient | null = null;
/**
* Initialize the singleton MongoDB client
*/
export async function connectMongoDB(config?: MongoDBClientConfig): Promise<MongoDBClient> {
if (!instance) {
if (!config) {
throw new Error('MongoDB client not initialized. Call connectMongoDB(config) first.');
}
instance = new MongoDBClient(config);
await instance.connect();
}
return instance;
}
/**
* Get the singleton MongoDB client instance
* @throws Error if not initialized
*/
export function getMongoDBClient(): MongoDBClient {
if (!instance) {
throw new Error('MongoDB client not initialized. Call connectMongoDB(config) first.');
}
return instance;
}
/**
* Get the MongoDB database instance
* @throws Error if not initialized
*/
export function getDatabase(): Db {
if (!instance) {
throw new Error('MongoDB client not initialized. Call connectMongoDB(config) first.');
}
return instance.getDatabase();
}
/**
* Check if the MongoDB client is initialized
*/
export function isInitialized(): boolean {
return instance !== null && instance.connected;
}
/**
* Disconnect and reset the singleton instance
*/
export async function disconnectMongoDB(): Promise<void> {
if (instance) {
await instance.disconnect();
instance = null;
}
}

View file

@ -30,8 +30,14 @@ export type {
AuditLog,
} from './types';
// Utils
// Factory functions
export {
createPostgreSQLClient,
createAndConnectPostgreSQLClient,
} from './factory';
// Singleton instance
export {
getPostgreSQLClient,
connectPostgreSQL,
} from './singleton';

View file

@ -0,0 +1,50 @@
import { PostgreSQLClient } from './client';
import type { PostgreSQLClientConfig } from './types';
/**
* Singleton PostgreSQL client instance
* Provides global access to a single PostgreSQL connection pool
*/
let instance: PostgreSQLClient | null = null;
/**
* Initialize the singleton PostgreSQL client
*/
export async function connectPostgreSQL(config?: PostgreSQLClientConfig): Promise<PostgreSQLClient> {
if (!instance) {
if (!config) {
throw new Error('PostgreSQL client not initialized. Call connectPostgreSQL(config) first.');
}
instance = new PostgreSQLClient(config);
await instance.connect();
}
return instance;
}
/**
* Get the singleton PostgreSQL client instance
* @throws Error if not initialized
*/
export function getPostgreSQLClient(): PostgreSQLClient {
if (!instance) {
throw new Error('PostgreSQL client not initialized. Call connectPostgreSQL(config) first.');
}
return instance;
}
/**
* Check if the PostgreSQL client is initialized
*/
export function isInitialized(): boolean {
return instance !== null && instance.connected;
}
/**
* Disconnect and reset the singleton instance
*/
export async function disconnectPostgreSQL(): Promise<void> {
if (instance) {
await instance.disconnect();
instance = null;
}
}

View file

@ -94,6 +94,34 @@ export class Queue {
return await this.bullQueue.addBulk(jobs);
}
/**
* Add a scheduled job with cron-like pattern
*/
async addScheduledJob(
name: string,
data: JobData,
cronPattern: string,
options: JobOptions = {}
): Promise<Job> {
const scheduledOptions: JobOptions = {
...options,
repeat: {
pattern: cronPattern,
// Use job name as repeat key to prevent duplicates
key: `${this.queueName}:${name}`,
},
};
logger.info('Adding scheduled job', {
queueName: this.queueName,
jobName: name,
cronPattern,
repeatKey: scheduledOptions.repeat?.key
});
return await this.bullQueue.add(name, data, scheduledOptions);
}
/**
* Get queue statistics
*/

View file

@ -46,6 +46,13 @@ export interface JobOptions {
type: 'exponential' | 'fixed';
delay: number;
};
repeat?: {
pattern?: string;
key?: string;
limit?: number;
every?: number;
immediately?: boolean;
};
}
export interface QueueOptions {