started refactor of data-sync-service

This commit is contained in:
Boki 2025-06-21 13:48:22 -04:00
parent 67833a2fd7
commit 3ae9de8376
27 changed files with 1754 additions and 1465 deletions

View file

@ -1,38 +1,35 @@
/**
* Data Sync Service - Sync raw MongoDB data to PostgreSQL master records
*/
// Framework imports
import { initializeServiceConfig } from '@stock-bot/config';
import { Hono } from 'hono';
import { cors } from 'hono/cors';
// Library imports
import { initializeServiceConfig } from '@stock-bot/config';
import { getLogger, setLoggerConfig, shutdownLoggers } from '@stock-bot/logger';
import { createAndConnectMongoDBClient, MongoDBClient } from '@stock-bot/mongodb-client';
import { createAndConnectPostgreSQLClient, PostgreSQLClient } from '@stock-bot/postgres-client';
import { connectMongoDB } from '@stock-bot/mongodb-client';
import { connectPostgreSQL } from '@stock-bot/postgres-client';
import { QueueManager, type QueueManagerConfig } from '@stock-bot/queue';
import { Shutdown } from '@stock-bot/shutdown';
import { enhancedSyncManager } from './services/enhanced-sync-manager';
import { syncManager } from './services/sync-manager';
// Local imports
import { setMongoDBClient, setPostgreSQLClient } from './clients';
import { enhancedSyncRoutes, healthRoutes, statsRoutes, syncRoutes } from './routes';
import { healthRoutes, enhancedSyncRoutes, statsRoutes, syncRoutes } from './routes';
// Initialize configuration with automatic monorepo config inheritance
const config = await initializeServiceConfig();
const config = initializeServiceConfig();
console.log('Data Sync Service Configuration:', JSON.stringify(config, null, 2));
const serviceConfig = config.service;
const databaseConfig = config.database;
const queueConfig = config.queue;
// Initialize logger with config
const loggingConfig = config.logging;
if (loggingConfig) {
if (config.log) {
setLoggerConfig({
logLevel: loggingConfig.level,
logLevel: config.log.level,
logConsole: true,
logFile: false,
environment: config.environment,
hideObject: config.log.hideObject,
});
}
// Create logger AFTER config is set
const logger = getLogger('data-sync-service');
const app = new Hono();
// Add CORS middleware
@ -40,17 +37,15 @@ app.use(
'*',
cors({
origin: '*',
allowMethods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'],
allowMethods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS', 'PATCH'],
allowHeaders: ['Content-Type', 'Authorization'],
credentials: false,
})
);
const logger = getLogger('data-sync-service');
const PORT = serviceConfig.port;
let server: ReturnType<typeof Bun.serve> | null = null;
let postgresClient: PostgreSQLClient | null = null;
let mongoClient: MongoDBClient | null = null;
// Singleton clients are managed in libraries
let queueManager: QueueManager | null = null;
// Initialize shutdown manager
const shutdown = Shutdown.getInstance({ timeout: 15000 });
@ -66,10 +61,10 @@ async function initializeServices() {
logger.info('Initializing data sync service...');
try {
// Initialize MongoDB client
// Initialize MongoDB client singleton
logger.debug('Connecting to MongoDB...');
const mongoConfig = databaseConfig.mongodb;
mongoClient = await createAndConnectMongoDBClient({
await connectMongoDB({
uri: mongoConfig.uri,
database: mongoConfig.database,
host: mongoConfig.host || 'localhost',
@ -80,13 +75,12 @@ async function initializeServices() {
serverSelectionTimeout: 5000,
},
});
setMongoDBClient(mongoClient);
logger.info('MongoDB connected');
// Initialize PostgreSQL client
// Initialize PostgreSQL client singleton
logger.debug('Connecting to PostgreSQL...');
const pgConfig = databaseConfig.postgres;
postgresClient = await createAndConnectPostgreSQLClient({
await connectPostgreSQL({
host: pgConfig.host,
port: pgConfig.port,
database: pgConfig.database,
@ -98,14 +92,98 @@ async function initializeServices() {
idleTimeoutMillis: pgConfig.idleTimeout || 30000,
},
});
setPostgreSQLClient(postgresClient);
logger.info('PostgreSQL connected');
// Initialize sync managers
logger.debug('Initializing sync managers...');
await syncManager.initialize();
await enhancedSyncManager.initialize();
logger.info('Sync managers initialized');
// Initialize queue system (with delayed worker start)
logger.debug('Initializing queue system...');
const queueManagerConfig: QueueManagerConfig = {
redis: queueConfig?.redis || {
host: 'localhost',
port: 6379,
db: 1,
},
defaultQueueOptions: {
defaultJobOptions: queueConfig?.defaultJobOptions || {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
removeOnComplete: 10,
removeOnFail: 5,
},
workers: 2,
concurrency: 1,
enableMetrics: true,
enableDLQ: true,
},
enableScheduledJobs: true,
delayWorkerStart: true, // Prevent workers from starting until all singletons are ready
};
queueManager = QueueManager.getOrInitialize(queueManagerConfig);
logger.info('Queue system initialized');
// Initialize handlers (register handlers and scheduled jobs)
logger.debug('Initializing sync handlers...');
const { initializeExchangesHandler } = await import('./handlers/exchanges/exchanges.handler');
const { initializeSymbolsHandler } = await import('./handlers/symbols/symbols.handler');
initializeExchangesHandler();
initializeSymbolsHandler();
logger.info('Sync handlers initialized');
// Create scheduled jobs from registered handlers
logger.debug('Creating scheduled jobs from registered handlers...');
const { handlerRegistry } = await import('@stock-bot/queue');
const allHandlers = handlerRegistry.getAllHandlers();
let totalScheduledJobs = 0;
for (const [handlerName, config] of allHandlers) {
if (config.scheduledJobs && config.scheduledJobs.length > 0) {
const queue = queueManager.getQueue(handlerName);
for (const scheduledJob of config.scheduledJobs) {
// Include handler and operation info in job data
const jobData = {
handler: handlerName,
operation: scheduledJob.operation,
payload: scheduledJob.payload || {},
};
// Build job options from scheduled job config
const jobOptions = {
priority: scheduledJob.priority,
delay: scheduledJob.delay,
repeat: {
immediately: scheduledJob.immediately,
},
};
await queue.addScheduledJob(
scheduledJob.operation,
jobData,
scheduledJob.cronPattern,
jobOptions
);
totalScheduledJobs++;
logger.debug('Scheduled job created', {
handler: handlerName,
operation: scheduledJob.operation,
cronPattern: scheduledJob.cronPattern,
immediately: scheduledJob.immediately,
priority: scheduledJob.priority,
});
}
}
}
logger.info('Scheduled jobs created', { totalJobs: totalScheduledJobs });
// Now that all singletons are initialized and jobs are scheduled, start the workers
logger.debug('Starting queue workers...');
queueManager.startAllWorkers();
logger.info('Queue workers started');
logger.info('All services initialized successfully');
} catch (error) {
@ -127,8 +205,22 @@ async function startServer() {
logger.info(`Data Sync Service started on port ${PORT}`);
}
// Register shutdown handlers
shutdown.onShutdown(async () => {
// Register shutdown handlers with priorities
// Priority 1: Queue system (highest priority)
shutdown.onShutdownHigh(async () => {
logger.info('Shutting down queue system...');
try {
if (queueManager) {
await queueManager.shutdown();
}
logger.info('Queue system shut down');
} catch (error) {
logger.error('Error shutting down queue system', { error });
}
}, 'Queue System');
// Priority 1: HTTP Server (high priority)
shutdown.onShutdownHigh(async () => {
if (server) {
logger.info('Stopping HTTP server...');
try {
@ -138,42 +230,33 @@ shutdown.onShutdown(async () => {
logger.error('Error stopping HTTP server', { error });
}
}
});
}, 'HTTP Server');
shutdown.onShutdown(async () => {
logger.info('Shutting down sync managers...');
try {
await syncManager.shutdown();
await enhancedSyncManager.shutdown();
logger.info('Sync managers shut down');
} catch (error) {
logger.error('Error shutting down sync managers', { error });
}
});
shutdown.onShutdown(async () => {
// Priority 2: Database connections (medium priority)
shutdown.onShutdownMedium(async () => {
logger.info('Disconnecting from databases...');
try {
if (mongoClient) {
await mongoClient.disconnect();
}
if (postgresClient) {
await postgresClient.disconnect();
}
const { disconnectMongoDB } = await import('@stock-bot/mongodb-client');
const { disconnectPostgreSQL } = await import('@stock-bot/postgres-client');
await disconnectMongoDB();
await disconnectPostgreSQL();
logger.info('Database connections closed');
} catch (error) {
logger.error('Error closing database connections', { error });
}
});
}, 'Databases');
shutdown.onShutdown(async () => {
// Priority 3: Logger shutdown (lowest priority - runs last)
shutdown.onShutdownLow(async () => {
try {
logger.info('Shutting down loggers...');
await shutdownLoggers();
// process.stdout.write('Data sync service loggers shut down\n');
} catch (error) {
process.stderr.write(`Error shutting down loggers: ${error}\n`);
// Don't log after shutdown
} catch {
// Silently ignore logger shutdown errors
}
});
}, 'Loggers');
// Start the service
startServer().catch(error => {
@ -181,4 +264,4 @@ startServer().catch(error => {
process.exit(1);
});
logger.info('Data sync service startup initiated');
logger.info('Data sync service startup initiated');