fixed up data-service

This commit is contained in:
Boki 2025-06-18 20:11:05 -04:00
parent 68d977f9e0
commit 6b69bcbcaa
8 changed files with 135 additions and 535 deletions

View file

@ -0,0 +1,15 @@
{
"service": {
"name": "data-service",
"port": 2001,
"host": "0.0.0.0",
"healthCheckPath": "/health",
"metricsPath": "/metrics",
"shutdownTimeout": 30000,
"cors": {
"enabled": true,
"origin": "*",
"credentials": false
}
}
}

View file

@ -12,23 +12,15 @@
"clean": "rm -rf dist"
},
"dependencies": {
"@stock-bot/cache": "*",
"@stock-bot/event-bus": "*",
"@stock-bot/http": "*",
"@stock-bot/config-new": "*",
"@stock-bot/logger": "*",
"@stock-bot/mongodb-client": "*",
"@stock-bot/postgres-client": "*",
"@stock-bot/questdb-client": "*",
"@stock-bot/queue": "*",
"@stock-bot/shutdown": "*",
"@stock-bot/types": "*",
"chromium-bidi": "^5.3.1",
"electron": "^36.4.0",
"hono": "^4.0.0",
"p-limit": "^6.2.0",
"ws": "^8.0.0"
"hono": "^4.0.0"
},
"devDependencies": {
"@types/ws": "^8.0.0",
"typescript": "^5.0.0"
}
}

View file

@ -1,17 +1,36 @@
/**
* Data Service - Combined live and historical data ingestion with queue-based architecture
* Data Service - Market data ingestion service
*/
// Framework imports
import { Hono } from 'hono';
import { cors } from 'hono/cors';
import { Browser } from '@stock-bot/browser';
import { getLogger, shutdownLoggers } from '@stock-bot/logger';
import { connectMongoDB, disconnectMongoDB } from '@stock-bot/mongodb-client';
import { processItems, QueueManager, type QueueConfig } from '@stock-bot/queue';
// Library imports
import { initializeServiceConfig } from '@stock-bot/config-new';
import { getLogger, setLoggerConfig, shutdownLoggers } from '@stock-bot/logger';
import { createAndConnectMongoDBClient, MongoDBClient } from '@stock-bot/mongodb-client';
import { createAndConnectPostgreSQLClient, PostgreSQLClient } from '@stock-bot/postgres-client';
import { Shutdown } from '@stock-bot/shutdown';
import { initializeIBResources } from './providers/ib.tasks';
import { initializeProxyResources } from './providers/proxy.tasks';
// Local imports
import { exchangeRoutes, healthRoutes, queueRoutes } from './routes';
// Initialize configuration with automatic monorepo config inheritance
const config = await initializeServiceConfig();
const serviceConfig = config.service;
const databaseConfig = config.database;
// Initialize logger with config
const loggingConfig = config.logging;
if (loggingConfig) {
setLoggerConfig({
logLevel: loggingConfig.level,
logConsole: true,
logFile: false,
environment: config.environment,
});
}
const app = new Hono();
@ -27,95 +46,56 @@ app.use(
);
const logger = getLogger('data-service');
const PORT = parseInt(process.env.DATA_SERVICE_PORT || '3002');
const PORT = serviceConfig.port;
let server: ReturnType<typeof Bun.serve> | null = null;
let postgresClient: PostgreSQLClient | null = null;
let mongoClient: MongoDBClient | null = null;
// Initialize shutdown manager with 15 second timeout
// Initialize shutdown manager
const shutdown = Shutdown.getInstance({ timeout: 15000 });
/**
* Create and configure the enhanced queue manager for data service
*/
function createDataServiceQueueManager(): QueueManager {
const config: QueueConfig = {
queueName: 'data-service-queue',
workers: parseInt(process.env.WORKER_COUNT || '5'),
concurrency: parseInt(process.env.WORKER_CONCURRENCY || '20'),
redis: {
host: process.env.DRAGONFLY_HOST || 'localhost',
port: parseInt(process.env.DRAGONFLY_PORT || '6379'),
},
providers: [
// Import and initialize providers lazily
async () => {
const { initializeWebShareProvider } = await import('./providers/webshare.provider');
return initializeWebShareProvider();
},
async () => {
const { initializeIBProvider } = await import('./providers/ib.provider');
return initializeIBProvider();
},
async () => {
const { initializeProxyProvider } = await import('./providers/proxy.provider');
return initializeProxyProvider();
},
async () => {
const { initializeQMProvider } = await import('./providers/qm.provider');
return initializeQMProvider();
},
async () => {
const { initializeExchangeSyncProvider } = await import(
'./providers/exchange-sync.provider'
);
return initializeExchangeSyncProvider();
},
],
enableScheduledJobs: true,
};
return new QueueManager(config);
}
// Create singleton instance
export const queueManager = createDataServiceQueueManager();
// Export processItems for direct use
export { processItems };
// Register all routes
app.route('', exchangeRoutes);
app.route('', healthRoutes);
app.route('', queueRoutes);
// Mount routes
app.route('/health', healthRoutes);
app.route('/api/exchanges', exchangeRoutes);
app.route('/api/queue', queueRoutes);
// Initialize services
async function initializeServices() {
logger.info('Initializing data service...');
try {
// Initialize MongoDB client first
logger.info('Starting MongoDB client initialization...');
await connectMongoDB();
logger.info('MongoDB client initialized');
// Initialize MongoDB client
logger.info('Connecting to MongoDB...');
const mongoConfig = databaseConfig.mongodb;
mongoClient = await createAndConnectMongoDBClient({
uri: mongoConfig.uri,
database: mongoConfig.database,
host: mongoConfig.host || 'localhost',
port: mongoConfig.port || 27017,
timeouts: {
connectTimeout: 30000,
socketTimeout: 30000,
serverSelectionTimeout: 5000,
},
});
logger.info('MongoDB connected');
// Initialize browser resources
logger.info('Starting browser resources initialization...');
await Browser.initialize();
logger.info('Browser resources initialized');
// Initialize proxy resources
logger.info('Starting proxy resources initialization...');
await initializeProxyResources();
logger.info('Proxy resources initialized');
// Initialize IB resources
logger.info('Starting IB resources initialization...');
await initializeIBResources();
logger.info('IB resources initialized');
// Initialize queue manager (includes batch cache initialization)
logger.info('Starting queue manager initialization...');
await queueManager.initialize();
logger.info('Queue manager initialized');
// Initialize PostgreSQL client
logger.info('Connecting to PostgreSQL...');
const pgConfig = databaseConfig.postgres;
postgresClient = await createAndConnectPostgreSQLClient({
host: pgConfig.host,
port: pgConfig.port,
database: pgConfig.database,
username: pgConfig.user,
password: pgConfig.password,
poolSettings: {
min: 2,
max: pgConfig.poolSize || 10,
idleTimeoutMillis: pgConfig.idleTimeout || 30000,
},
});
logger.info('PostgreSQL connected');
logger.info('All services initialized successfully');
} catch (error) {
@ -127,12 +107,13 @@ async function initializeServices() {
// Start server
async function startServer() {
await initializeServices();
// Start the HTTP server using Bun's native serve
server = Bun.serve({
port: PORT,
fetch: app.fetch,
development: process.env.NODE_ENV === 'development',
development: config.environment === 'development',
});
logger.info(`Data Service started on port ${PORT}`);
}
@ -142,7 +123,7 @@ shutdown.onShutdown(async () => {
logger.info('Stopping HTTP server...');
try {
server.stop();
logger.info('HTTP server stopped successfully');
logger.info('HTTP server stopped');
} catch (error) {
logger.error('Error stopping HTTP server', { error });
}
@ -150,63 +131,33 @@ shutdown.onShutdown(async () => {
});
shutdown.onShutdown(async () => {
logger.info('Shutting down queue manager...');
logger.info('Disconnecting from databases...');
try {
await queueManager.shutdown();
logger.info('Queue manager shut down successfully');
if (mongoClient) {
await mongoClient.disconnect();
}
if (postgresClient) {
await postgresClient.disconnect();
}
logger.info('Database connections closed');
} catch (error) {
logger.error('Error shutting down queue manager', { error });
// Don't re-throw to allow other shutdown handlers to complete
// The shutdown library tracks failures internally
logger.error('Error closing database connections', { error });
}
});
// Add Browser shutdown handler
shutdown.onShutdown(async () => {
logger.info('Shutting down browser resources...');
try {
await Browser.close();
logger.info('Browser resources shut down successfully');
} catch (error) {
// Browser might already be closed by running tasks, this is expected
const errorMessage = error instanceof Error ? error.message : String(error);
if (errorMessage.includes('Target page, context or browser has been closed')) {
logger.info('Browser was already closed by running tasks');
} else {
logger.error('Error shutting down browser resources', { error });
}
// Don't throw here as browser shutdown shouldn't block app shutdown
}
});
// Add MongoDB shutdown handler
shutdown.onShutdown(async () => {
logger.info('Shutting down MongoDB client...');
try {
await disconnectMongoDB();
logger.info('MongoDB client shut down successfully');
} catch (error) {
logger.error('Error shutting down MongoDB client', { error });
// Don't throw here to allow other shutdown handlers to complete
}
});
// Add logger shutdown handler (should be last)
shutdown.onShutdown(async () => {
try {
await shutdownLoggers();
// Use process.stdout since loggers are being shut down
process.stdout.write('All loggers flushed and shut down successfully\n');
process.stdout.write('Data service loggers shut down\n');
} catch (error) {
process.stderr.write(`Error shutting down loggers: ${error}\n`);
// Don't throw here as this is the final cleanup
}
});
// Start the application
// Start the service
startServer().catch(error => {
logger.error('Failed to start server', { error });
logger.error('Failed to start data service', { error });
process.exit(1);
});
logger.info('Data service startup initiated with graceful shutdown handlers');
logger.info('Data service startup initiated');

View file

@ -1,322 +1,22 @@
/**
* Exchange Routes - Simple API endpoints for exchange management
*/
import { Hono } from 'hono';
import { getLogger } from '@stock-bot/logger';
import type { MasterExchange } from '@stock-bot/mongodb-client';
import { connectMongoDB, getDatabase } from '@stock-bot/mongodb-client';
import { queueManager } from '../index';
const logger = getLogger('exchange-routes');
const exchange = new Hono();
export const exchangeRoutes = new Hono();
// Get all master exchanges
exchangeRoutes.get('/api/exchanges', async c => {
// Get all exchanges
exchange.get('/', async c => {
try {
await connectMongoDB();
const db = getDatabase();
const collection = db.collection<MasterExchange>('masterExchanges');
const exchanges = await collection.find({}).toArray();
// TODO: Implement exchange listing from database
return c.json({
data: exchanges,
count: exchanges.length,
status: 'success',
data: [],
message: 'Exchange endpoints will be implemented with database integration'
});
} catch (error) {
logger.error('Error getting all exchanges', { error });
return c.json({ error: 'Internal server error' }, 500);
logger.error('Failed to get exchanges', { error });
return c.json({ status: 'error', message: 'Failed to get exchanges' }, 500);
}
});
// Get master exchange details
exchangeRoutes.get('/api/exchanges/:masterExchangeId', async c => {
try {
const masterExchangeId = c.req.param('masterExchangeId');
await connectMongoDB();
const db = getDatabase();
const collection = db.collection<MasterExchange>('masterExchanges');
const exchange = await collection.findOne({ masterExchangeId });
if (!exchange) {
return c.json({ error: 'Exchange not found' }, 404);
}
return c.json({
status: 'success',
data: exchange,
});
} catch (error) {
logger.error('Error getting exchange details', { error });
return c.json({ error: 'Internal server error' }, 500);
}
});
// Get source mapping
exchangeRoutes.get('/api/exchanges/mapping/:sourceName/:sourceId', async c => {
try {
const sourceName = c.req.param('sourceName');
const sourceId = c.req.param('sourceId');
await connectMongoDB();
const db = getDatabase();
const collection = db.collection<MasterExchange>('masterExchanges');
const result = await collection.findOne(
{ [`sourceMappings.${sourceName}.id`]: sourceId },
{ projection: { masterExchangeId: 1 } }
);
if (!result) {
return c.json({ error: 'Mapping not found' }, 404);
}
return c.json({
status: 'success',
data: { masterExchangeId: result.masterExchangeId, sourceName, sourceId },
});
} catch (error) {
logger.error('Error getting exchange mapping', { error });
return c.json({ error: 'Internal server error' }, 500);
}
});
// Get all exchanges from a specific source
exchangeRoutes.get('/api/exchanges/source/:sourceName', async c => {
try {
const sourceName = c.req.param('sourceName');
await connectMongoDB();
const db = getDatabase();
const collection = db.collection<MasterExchange>('masterExchanges');
const results = await collection
.find(
{ [`sourceMappings.${sourceName}`]: { $exists: true } },
{
projection: {
masterExchangeId: 1,
[`sourceMappings.${sourceName}`]: 1,
},
}
)
.toArray();
const exchanges = results.map(result => ({
masterExchangeId: result.masterExchangeId,
sourceMapping: result.sourceMappings[sourceName],
}));
return c.json({
data: exchanges,
count: exchanges.length,
status: 'success',
sourceName,
});
} catch (error) {
logger.error('Error getting source exchanges', { error });
return c.json({ error: 'Internal server error' }, 500);
}
});
// Add source mapping to an exchange
exchangeRoutes.post('/api/exchanges/:masterExchangeId/mappings', async c => {
try {
const masterExchangeId = c.req.param('masterExchangeId');
const { sourceName, sourceData } = await c.req.json();
if (!sourceName || !sourceData || !sourceData.id) {
return c.json({ error: 'sourceName and sourceData with id are required' }, 400);
}
await connectMongoDB();
const db = getDatabase();
const collection = db.collection<MasterExchange>('masterExchanges');
const result = await collection.updateOne(
{ masterExchangeId },
{
$set: {
[`sourceMappings.${sourceName}`]: sourceData,
updatedAt: new Date(),
},
}
);
if (result.matchedCount === 0) {
return c.json({ error: 'Exchange not found' }, 404);
}
return c.json({
status: 'success',
message: 'Source mapping added successfully',
data: { masterExchangeId, sourceName, sourceData },
});
} catch (error) {
logger.error('Error adding source mapping', { error });
return c.json({ error: 'Internal server error' }, 500);
}
});
// Trigger exchange sync
exchangeRoutes.post('/api/exchanges/sync', async c => {
try {
const job = await queueManager.add('exchange-sync', {
type: 'exchange-sync',
provider: 'exchange-sync',
operation: 'sync-ib-exchanges',
payload: {},
priority: 2,
});
return c.json({
status: 'success',
message: 'IB exchange sync job queued',
jobId: job.id,
operation: 'sync-ib-exchanges',
});
} catch (error) {
logger.error('Error triggering exchange sync', { error });
return c.json({ error: 'Internal server error' }, 500);
}
});
// Update exchange (shortName and active status)
exchangeRoutes.patch('/api/exchanges/:id', async c => {
try {
const id = c.req.param('id');
const updates = await c.req.json();
// Validate the updates - only allow shortName and active
const sanitizedUpdates: Record<string, unknown> = {};
if ('shortName' in updates && typeof updates.shortName === 'string') {
sanitizedUpdates.shortName = updates.shortName;
}
if ('active' in updates && typeof updates.active === 'boolean') {
sanitizedUpdates.active = updates.active;
}
if (Object.keys(sanitizedUpdates).length === 0) {
return c.json({ error: 'No valid fields to update' }, 400);
}
await connectMongoDB();
const db = getDatabase();
const collection = db.collection<MasterExchange>('masterExchanges');
// Update using MongoDB _id (ObjectId)
const result = await collection.updateOne(
{ _id: new (await import('mongodb')).ObjectId(id) },
{
$set: {
...sanitizedUpdates,
updated_at: new Date(),
},
}
);
if (result.matchedCount === 0) {
return c.json({ error: 'Exchange not found' }, 404);
}
return c.json({
status: 'success',
message: 'Exchange updated successfully',
data: { id, updates: sanitizedUpdates },
});
} catch (error) {
logger.error('Error updating exchange', { error });
return c.json({ error: 'Internal server error' }, 500);
}
});
// Add source to exchange
exchangeRoutes.post('/api/exchanges/:id/sources', async c => {
try {
const id = c.req.param('id');
const { source, source_code, mapping } = await c.req.json();
if (!source || !source_code || !mapping || !mapping.id) {
return c.json({ error: 'source, source_code, and mapping with id are required' }, 400);
}
// Create the storage key using source and source_code
const storageKey = `${source}_${source_code}`;
// Add lastUpdated to the mapping
const sourceData = {
...mapping,
lastUpdated: new Date(),
};
await connectMongoDB();
const db = getDatabase();
const collection = db.collection<MasterExchange>('masterExchanges');
// Update using MongoDB _id (ObjectId)
const result = await collection.updateOne(
{ _id: new (await import('mongodb')).ObjectId(id) },
{
$set: {
[`sourceMappings.${storageKey}`]: sourceData,
updated_at: new Date(),
},
}
);
if (result.matchedCount === 0) {
return c.json({ error: 'Exchange not found' }, 404);
}
return c.json({
status: 'success',
message: 'Source mapping added successfully',
data: { id, storageKey, mapping: sourceData },
});
} catch (error) {
logger.error('Error adding source mapping', { error });
return c.json({ error: 'Internal server error' }, 500);
}
});
// Remove source from exchange
exchangeRoutes.delete('/api/exchanges/:id/sources/:sourceName', async c => {
try {
const id = c.req.param('id');
const sourceName = c.req.param('sourceName');
await connectMongoDB();
const db = getDatabase();
const collection = db.collection<MasterExchange>('masterExchanges');
// Remove the source mapping using MongoDB _id (ObjectId)
const result = await collection.updateOne(
{ _id: new (await import('mongodb')).ObjectId(id) },
{
$unset: {
[`sourceMappings.${sourceName}`]: '',
},
$set: {
updated_at: new Date(),
},
}
);
if (result.matchedCount === 0) {
return c.json({ error: 'Exchange not found' }, 404);
}
return c.json({
status: 'success',
message: 'Source mapping removed successfully',
data: { id, sourceName },
});
} catch (error) {
logger.error('Error removing source mapping', { error });
return c.json({ error: 'Internal server error' }, 500);
}
});
export { exchange as exchangeRoutes };

View file

@ -1,20 +1,14 @@
/**
* Health check routes
*/
import { Hono } from 'hono';
import { queueManager } from '../index';
export const healthRoutes = new Hono();
const health = new Hono();
// Health check endpoint
healthRoutes.get('/health', c => {
health.get('/', c => {
return c.json({
service: 'data-service',
status: 'healthy',
service: 'data-service',
timestamp: new Date().toISOString(),
queue: {
status: 'running',
name: queueManager.getQueueName(),
},
});
});
export { health as healthRoutes };

View file

@ -1,72 +1,27 @@
/**
* Queue management routes
*/
import { Hono } from 'hono';
import { getLogger } from '@stock-bot/logger';
import { queueManager } from '../index';
const logger = getLogger('queue-routes');
const queue = new Hono();
export const queueRoutes = new Hono();
// Queue management endpoints
queueRoutes.get('/api/queue/status', async c => {
// Queue status endpoint
queue.get('/status', async c => {
try {
const stats = await queueManager.getStats();
return c.json({ status: 'success', data: stats });
// TODO: Implement queue management
return c.json({
status: 'success',
data: {
active: 0,
waiting: 0,
completed: 0,
failed: 0
},
message: 'Queue management will be implemented'
});
} catch (error) {
logger.error('Failed to get queue status', { error });
return c.json({ status: 'error', message: 'Failed to get queue status' }, 500);
}
});
queueRoutes.post('/api/queue/job', async c => {
try {
const { name, data, options } = await c.req.json();
const job = await queueManager.add(name, data, options);
return c.json({ status: 'success', jobId: job.id });
} catch (error) {
logger.error('Failed to add job', { error });
return c.json({ status: 'error', message: 'Failed to add job' }, 500);
}
});
// Provider registry endpoints
queueRoutes.get('/api/providers', async c => {
try {
const { providerRegistry } = await import('@stock-bot/queue');
const configs = providerRegistry.getProviderConfigs();
return c.json({ status: 'success', providers: configs });
} catch (error) {
logger.error('Failed to get providers', { error });
return c.json({ status: 'error', message: 'Failed to get providers' }, 500);
}
});
// Add new endpoint to see scheduled jobs
queueRoutes.get('/api/scheduled-jobs', async c => {
try {
const { providerRegistry } = await import('@stock-bot/queue');
const jobs = providerRegistry.getAllScheduledJobs();
return c.json({
status: 'success',
count: jobs.length,
jobs,
});
} catch (error) {
logger.error('Failed to get scheduled jobs info', { error });
return c.json({ status: 'error', message: 'Failed to get scheduled jobs' }, 500);
}
});
queueRoutes.post('/api/queue/clean', async c => {
try {
const { grace = 60000 } = await c.req.json(); // Default 1 minute
await queueManager.clean(grace);
const stats = await queueManager.getStats();
return c.json({ status: 'success', message: 'Queue cleaned', queueStats: stats });
} catch (error) {
logger.error('Failed to clean queue', { error });
return c.json({ status: 'error', message: 'Failed to clean queue' }, 500);
}
});
export { queue as queueRoutes };

View file

@ -16,16 +16,11 @@
],
"references": [
{ "path": "../../libs/types" },
{ "path": "../../libs/config" },
{ "path": "../../libs/config-new" },
{ "path": "../../libs/logger" },
{ "path": "../../libs/http" },
{ "path": "../../libs/cache" },
{ "path": "../../libs/questdb-client" },
{ "path": "../../libs/mongodb-client" },
{ "path": "../../libs/event-bus" },
{ "path": "../../libs/shutdown" },
{ "path": "../../libs/utils" },
{ "path": "../../libs/browser" },
{ "path": "../../libs/queue" }
{ "path": "../../libs/postgres-client" },
{ "path": "../../libs/questdb-client" },
{ "path": "../../libs/shutdown" }
]
}

View file

@ -3,14 +3,12 @@
"tasks": {
"build": {
"dependsOn": [
"@stock-bot/cache#build",
"@stock-bot/event-bus#build",
"@stock-bot/http#build",
"@stock-bot/config-new#build",
"@stock-bot/logger#build",
"@stock-bot/mongodb-client#build",
"@stock-bot/postgres-client#build",
"@stock-bot/questdb-client#build",
"@stock-bot/shutdown#build",
"@stock-bot/browser#build"
"@stock-bot/shutdown#build"
],
"outputs": ["dist/**"],
"inputs": [