integrated data-ingestion

This commit is contained in:
Boki 2025-06-21 19:42:20 -04:00
parent 9673ae70ef
commit 3227388d25
15 changed files with 226 additions and 133 deletions

View file

@ -12,6 +12,57 @@
"credentials": false
}
},
"log": {
"level": "info",
"format": "json",
"hideObject": false,
"loki": {
"enabled": false,
"host": "localhost",
"port": 3100,
"labels": {}
}
},
"database": {
"postgres": {
"host": "localhost",
"port": 5432,
"database": "trading_bot",
"user": "trading_user",
"password": "trading_pass_dev",
"ssl": false,
"poolSize": 20,
"connectionTimeout": 30000,
"idleTimeout": 10000
},
"questdb": {
"host": "localhost",
"ilpPort": 9009,
"httpPort": 9000,
"pgPort": 8812,
"database": "questdb",
"user": "admin",
"password": "quest",
"bufferSize": 65536,
"flushInterval": 1000
},
"mongodb": {
"host": "localhost",
"port": 27017,
"database": "stock",
"user": "trading_admin",
"password": "trading_mongo_dev",
"authSource": "admin",
"poolSize": 20
},
"dragonfly": {
"host": "localhost",
"port": 6379,
"db": 0,
"maxRetries": 3,
"retryDelay": 100
}
},
"queue": {
"redis": {
"host": "localhost",
@ -24,12 +75,23 @@
"type": "exponential",
"delay": 1000
},
"removeOnComplete": true,
"removeOnFail": false
"removeOnComplete": 100,
"removeOnFail": 50
}
},
"webshare": {
"apiKey": "",
"apiUrl": "https://proxy.webshare.io/api/v2/"
},
"http": {
"timeout": 30000,
"retries": 3,
"retryDelay": 1000,
"userAgent": "StockBot/1.0",
"rateLimit": {
"enabled": false,
"requestsPerSecond": 10,
"burstSize": 20
}
}
}

View file

@ -1,5 +1,5 @@
import { OperationContext } from '@stock-bot/utils';
import type { ServiceContainer } from '@stock-bot/connection-factory';
import { OperationContext } from '@stock-bot/di';
import type { ServiceContainer } from '@stock-bot/di';
/**
* Example handler showing how to use the new connection pooling pattern
@ -12,26 +12,32 @@ export class ExampleHandler {
*/
async performOperation(data: any): Promise<void> {
// Create operation context with container
const context = OperationContext.create('example', 'perform-operation', {
container: this.container
});
const context = new OperationContext(
'example-handler',
'perform-operation',
this.container,
{ data }
);
try {
// Log operation start
context.logger.info('Starting operation', { data });
// Use MongoDB through context (no more singleton!)
const result = await context.mongodb.collection('test').insertOne(data);
// Use MongoDB through service resolution
const mongodb = context.resolve<any>('mongodb');
const result = await mongodb.collection('test').insertOne(data);
context.logger.debug('MongoDB insert complete', { insertedId: result.insertedId });
// Use PostgreSQL through context
await context.postgres.query(
// Use PostgreSQL through service resolution
const postgres = context.resolve<any>('postgres');
await postgres.query(
'INSERT INTO operations (id, status) VALUES ($1, $2)',
[result.insertedId, 'completed']
);
// Use cache through context
await context.cache.set(`operation:${result.insertedId}`, {
// Use cache through service resolution
const cache = context.resolve<any>('cache');
await cache.set(`operation:${result.insertedId}`, {
status: 'completed',
timestamp: new Date()
});
@ -40,9 +46,6 @@ export class ExampleHandler {
} catch (error) {
context.logger.error('Operation failed', { error });
throw error;
} finally {
// Clean up resources
await context.dispose();
}
}
@ -53,23 +56,35 @@ export class ExampleHandler {
// Create a scoped container for this batch operation
const scopedContainer = this.container.createScope();
const context = OperationContext.create('example', 'batch-operation', {
container: scopedContainer
});
const context = new OperationContext(
'example-handler',
'batch-operation',
scopedContainer,
{ itemCount: items.length }
);
try {
context.logger.info('Starting batch operation', { itemCount: items.length });
// Process items in parallel with isolated connections
// Get services once for the batch
const mongodb = context.resolve<any>('mongodb');
const cache = context.resolve<any>('cache');
// Process items in parallel
const promises = items.map(async (item, index) => {
// Each sub-operation gets its own context
const subContext = context.createChild(`item-${index}`);
const itemContext = new OperationContext(
'example-handler',
`batch-item-${index}`,
scopedContainer,
{ item }
);
try {
await subContext.mongodb.collection('batch').insertOne(item);
await subContext.cache.set(`batch:${item.id}`, item);
} finally {
await subContext.dispose();
await mongodb.collection('batch').insertOne(item);
await cache.set(`batch:${item.id}`, item);
} catch (error) {
itemContext.logger.error('Batch item failed', { error, itemIndex: index });
throw error;
}
});
@ -78,7 +93,6 @@ export class ExampleHandler {
} finally {
// Clean up scoped resources
await context.dispose();
await scopedContainer.dispose();
}
}

View file

@ -3,9 +3,9 @@
*/
import { HttpClient, ProxyInfo } from '@stock-bot/http';
import { OperationContext } from '@stock-bot/di';
import { getLogger } from '@stock-bot/logger';
import { PROXY_CONFIG } from '../shared/config';
import { ProxyStatsManager } from '../shared/proxy-manager';
// Shared HTTP client
let httpClient: HttpClient;
@ -21,7 +21,12 @@ function getHttpClient(ctx: OperationContext): HttpClient {
* Check if a proxy is working
*/
export async function checkProxy(proxy: ProxyInfo): Promise<ProxyInfo> {
const ctx = OperationContext.create('proxy', 'check');
const ctx = {
logger: getLogger('proxy-check'),
resolve: <T>(_name: string) => {
throw new Error(`Service container not available for proxy operations`);
}
} as any;
let success = false;
ctx.logger.debug(`Checking Proxy:`, {
@ -94,10 +99,12 @@ export async function checkProxy(proxy: ProxyInfo): Promise<ProxyInfo> {
* Update proxy data in cache with working/total stats and average response time
*/
async function updateProxyInCache(proxy: ProxyInfo, isWorking: boolean, ctx: OperationContext): Promise<void> {
const cacheKey = `${PROXY_CONFIG.CACHE_KEY}:${proxy.protocol}://${proxy.host}:${proxy.port}`;
const _cacheKey = `${PROXY_CONFIG.CACHE_KEY}:${proxy.protocol}://${proxy.host}:${proxy.port}`;
try {
const existing: ProxyInfo | null = await ctx.cache.get(cacheKey);
// For now, skip cache operations without service container
// TODO: Pass service container to operations
const existing: ProxyInfo | null = null;
// For failed proxies, only update if they already exist
if (!isWorking && !existing) {
@ -140,8 +147,9 @@ async function updateProxyInCache(proxy: ProxyInfo, isWorking: boolean, ctx: Ope
updated.successRate = updated.total > 0 ? (updated.working / updated.total) * 100 : 0;
// Save to cache: reset TTL for working proxies, keep existing TTL for failed ones
const cacheOptions = isWorking ? { ttl: PROXY_CONFIG.CACHE_TTL } : undefined;
await ctx.cache.set(cacheKey, updated, cacheOptions);
const _cacheOptions = isWorking ? { ttl: PROXY_CONFIG.CACHE_TTL } : undefined;
// Skip cache operations without service container
// TODO: Pass service container to operations
ctx.logger.debug(`Updated ${isWorking ? 'working' : 'failed'} proxy in cache`, {
proxy: `${proxy.host}:${proxy.port}`,
@ -161,15 +169,8 @@ async function updateProxyInCache(proxy: ProxyInfo, isWorking: boolean, ctx: Ope
}
function updateProxyStats(sourceId: string, success: boolean, ctx: OperationContext) {
const statsManager = ProxyStatsManager.getInstance();
const source = statsManager.updateSourceStats(sourceId, success);
// Stats are now handled by the global ProxyManager
ctx.logger.debug('Proxy check result', { sourceId, success });
if (!source) {
ctx.logger.warn(`Unknown proxy source: ${sourceId}`);
return;
}
// Cache the updated stats
ctx.cache.set(`${PROXY_CONFIG.CACHE_STATS_KEY}:${source.id}`, source, { ttl: PROXY_CONFIG.CACHE_TTL })
.catch(error => ctx.logger.debug('Failed to cache proxy stats', { error }));
// TODO: Integrate with global ProxyManager stats if needed
}

View file

@ -3,9 +3,9 @@
*/
import { HttpClient, ProxyInfo } from '@stock-bot/http';
import { OperationContext } from '@stock-bot/di';
import { getLogger } from '@stock-bot/logger';
import { PROXY_CONFIG } from '../shared/config';
import { ProxyStatsManager } from '../shared/proxy-manager';
import type { ProxySource } from '../shared/types';
// Shared HTTP client
@ -19,10 +19,11 @@ function getHttpClient(ctx: OperationContext): HttpClient {
}
export async function fetchProxiesFromSources(): Promise<ProxyInfo[]> {
const ctx = OperationContext.create('proxy', 'fetch-sources');
const ctx = {
logger: getLogger('proxy-fetch')
} as any;
const statsManager = ProxyStatsManager.getInstance();
statsManager.resetStats();
ctx.logger.info('Starting proxy fetch from sources');
const fetchPromises = PROXY_CONFIG.PROXY_SOURCES.map(source => fetchProxiesFromSource(source, ctx));
const results = await Promise.all(fetchPromises);

View file

@ -4,12 +4,12 @@
import { ProxyInfo } from '@stock-bot/http';
import { getLogger } from '@stock-bot/logger';
import { handlerRegistry, createJobHandler, type HandlerConfigWithSchedule } from '@stock-bot/queue';
import type { ServiceContainer } from '@stock-bot/connection-factory';
import type { ServiceContainer } from '@stock-bot/di';
const handlerLogger = getLogger('proxy-handler');
// Initialize and register the Proxy provider
export function initializeProxyProvider(container: ServiceContainer) {
export function initializeProxyProvider(_container: ServiceContainer) {
handlerLogger.debug('Registering proxy provider with scheduled jobs...');
const proxyProviderConfig: HandlerConfigWithSchedule = {

View file

@ -4,15 +4,15 @@
import { OperationContext } from '@stock-bot/di';
import { isShutdownSignalReceived } from '@stock-bot/shutdown';
import { getRandomProxy } from '@stock-bot/di';
import type { ServiceContainer } from '@stock-bot/connection-factory';
import { getRandomProxy } from '@stock-bot/utils';
import type { ServiceContainer } from '@stock-bot/di';
import { QMSessionManager } from '../shared/session-manager';
import { QM_SESSION_IDS, QM_CONFIG, SESSION_CONFIG, getQmHeaders } from '../shared/config';
import type { QMSession } from '../shared/types';
export async function createSessions(container: ServiceContainer): Promise<void> {
const ctx = OperationContext.create('qm', 'session', { container });
const ctx = new OperationContext('qm-handler', 'create-sessions', container);
try {
ctx.logger.info('Creating QM sessions...');
@ -33,7 +33,8 @@ export async function createSessions(container: ServiceContainer): Promise<void>
// Cache session creation stats
const initialStats = sessionManager.getStats();
await ctx.cache.set('pre-creation-stats', initialStats, { ttl: 300 });
const cache = ctx.resolve<any>('cache');
await cache.set('pre-creation-stats', initialStats, { ttl: 300 });
// Create sessions for each session ID that needs them
for (const [sessionKey, sessionId] of Object.entries(QM_SESSION_IDS)) {
@ -56,9 +57,9 @@ export async function createSessions(container: ServiceContainer): Promise<void>
const finalStats = sessionManager.getStats();
const totalSessions = sessionManager.getSessionCount();
await ctx.cache.set('post-creation-stats', finalStats, { ttl: 3600 });
await ctx.cache.set('session-count', totalSessions, { ttl: 900 });
await ctx.cache.set('last-session-creation', new Date().toISOString());
await cache.set('post-creation-stats', finalStats, { ttl: 3600 });
await cache.set('session-count', totalSessions, { ttl: 900 });
await cache.set('last-session-creation', new Date().toISOString());
ctx.logger.info('QM session creation completed', {
totalSessions,
@ -68,8 +69,6 @@ export async function createSessions(container: ServiceContainer): Promise<void>
} catch (error) {
ctx.logger.error('Failed to create QM sessions', { error });
throw error;
} finally {
await ctx.dispose();
}
}
@ -134,7 +133,8 @@ async function createSingleSession(
sessionManager.addSession(sessionId, newSession);
// Cache successful session creation
await ctx.cache.set(
const cacheService = ctx.resolve<any>('cache');
await cacheService.set(
`successful-session:${sessionKey}:${Date.now()}`,
{ sessionId, proxy, tokenExists: !!sessionData.token },
{ ttl: 300 }
@ -156,7 +156,8 @@ async function createSingleSession(
}
// Cache failed session attempt for debugging
await ctx.cache.set(
const cacheService = ctx.resolve<any>('cache');
await cacheService.set(
`failed-session:${sessionKey}:${Date.now()}`,
{ sessionId, proxy, error: error.message },
{ ttl: 300 }
@ -165,25 +166,34 @@ async function createSingleSession(
}
export async function initializeQMResources(container?: ServiceContainer): Promise<void> {
const ctx = OperationContext.create('qm', 'init', container ? { container } : undefined);
// Check if already initialized
const alreadyInitialized = await ctx.cache.get('initialized');
if (alreadyInitialized) {
ctx.logger.debug('QM resources already initialized');
return;
if (!container) {
throw new Error('Service container is required for QM resource initialization');
}
const ctx = new OperationContext('qm-handler', 'initialize-resources', container);
try {
const cache = ctx.resolve<any>('cache');
// Check if already initialized
const alreadyInitialized = await cache.get('initialized');
if (alreadyInitialized) {
ctx.logger.debug('QM resources already initialized');
return;
}
ctx.logger.debug('Initializing QM resources...');
// Mark as initialized in cache and session manager
await ctx.cache.set('initialized', true, { ttl: 3600 });
await ctx.cache.set('initialization-time', new Date().toISOString());
const sessionManager = QMSessionManager.getInstance();
sessionManager.setInitialized(true);
ctx.logger.info('QM resources initialized successfully');
await ctx.dispose();
ctx.logger.debug('Initializing QM resources...');
// Mark as initialized in cache and session manager
await cache.set('initialized', true, { ttl: 3600 });
await cache.set('initialization-time', new Date().toISOString());
const sessionManager = QMSessionManager.getInstance();
sessionManager.setInitialized(true);
ctx.logger.info('QM resources initialized successfully');
} catch (error) {
ctx.logger.error('Failed to initialize QM resources', { error });
throw error;
}
}

View file

@ -1,5 +1,4 @@
import {
BaseHandler,
ScheduledHandler,
Handler,
Operation,

View file

@ -13,7 +13,7 @@ import type { ServiceContainer } from '@stock-bot/di';
const logger = getLogger('webshare-provider');
// Initialize and register the WebShare provider
export function initializeWebShareProvider(container: ServiceContainer) {
export function initializeWebShareProvider(_container: ServiceContainer) {
logger.debug('Registering WebShare provider with scheduled jobs...');
const webShareProviderConfig: HandlerConfigWithSchedule = {

View file

@ -7,7 +7,8 @@ import { getLogger, setLoggerConfig, shutdownLoggers } from '@stock-bot/logger';
import type { QueueManager } from '@stock-bot/queue';
import { Shutdown } from '@stock-bot/shutdown';
import { ProxyManager } from '@stock-bot/utils';
import type { ServiceContainer } from '@stock-bot/di';
import { ServiceContainer, ConnectionFactory } from '@stock-bot/di';
import { handlerRegistry } from '@stock-bot/handlers';
// Local imports
import { setupServiceContainer } from './setup/database-setup';
import { createRoutes } from './routes/create-routes';
@ -15,8 +16,7 @@ import { createRoutes } from './routes/create-routes';
const config = initializeServiceConfig();
console.log('Data Service Configuration:', JSON.stringify(config, null, 2));
const serviceConfig = config.service;
const databaseConfig = config.database;
const queueConfig = config.queue;
// Configuration will be passed to service container setup
if (config.log) {
setLoggerConfig({
@ -34,6 +34,7 @@ const logger = getLogger('data-ingestion');
const PORT = serviceConfig.port;
let server: ReturnType<typeof Bun.serve> | null = null;
let serviceContainer: ServiceContainer | null = null;
let connectionFactory: ConnectionFactory | null = null;
let queueManager: QueueManager | null = null;
let app: Hono | null = null;
@ -47,7 +48,9 @@ async function initializeServices() {
try {
// Initialize service container with connection pools
logger.debug('Setting up service container with connection pools...');
serviceContainer = await setupServiceContainer();
const { container, factory } = await setupServiceContainer();
serviceContainer = container;
connectionFactory = factory;
logger.info('Service container initialized with connection pools');
// Create app with routes that have access to the container
@ -78,7 +81,7 @@ async function initializeServices() {
await ProxyManager.initialize();
logger.info('Proxy manager initialized');
// Initialize handlers (register handlers and scheduled jobs)
// Initialize handlers using the handler registry
logger.debug('Initializing data handlers...');
const { initializeWebShareProvider } = await import('./handlers/webshare/webshare.handler');
const { initializeIBProvider } = await import('./handlers/ib/ib.handler');
@ -92,10 +95,22 @@ async function initializeServices() {
initializeQMProvider(serviceContainer);
logger.info('Data handlers initialized with service container');
// Register handlers with queue system
logger.debug('Registering handlers with queue system...');
try {
await queueManager.registerHandlers(handlerRegistry.getAllHandlers());
logger.info('Handlers registered with queue system');
} catch (error) {
logger.error('Failed to register handlers with queue system', {
error: error instanceof Error ? error.message : String(error),
stack: error instanceof Error ? error.stack : undefined
});
throw error;
}
// Create scheduled jobs from registered handlers
logger.debug('Creating scheduled jobs from registered handlers...');
const { handlerRegistry } = await import('@stock-bot/queue');
const allHandlers = handlerRegistry.getAllHandlers();
let totalScheduledJobs = 0;
@ -146,7 +161,10 @@ async function initializeServices() {
logger.info('All services initialized successfully');
} catch (error) {
logger.error('Failed to initialize services', { error });
logger.error('Failed to initialize services', {
error: error instanceof Error ? error.message : String(error),
stack: error instanceof Error ? error.stack : undefined
});
throw error;
}
}
@ -199,9 +217,13 @@ shutdown.onShutdownHigh(async () => {
shutdown.onShutdownMedium(async () => {
logger.info('Disposing service container and connections...');
try {
if (connectionFactory) {
await connectionFactory.disposeAll();
logger.info('Connection factory disposed, all pools closed');
}
if (serviceContainer) {
await serviceContainer.dispose();
logger.info('Service container disposed, all connections closed');
logger.info('Service container disposed');
}
} catch (error) {
logger.error('Error disposing service container', { error });

View file

@ -1,5 +1,5 @@
import { Hono } from 'hono';
import type { ServiceContainer } from '@stock-bot/connection-factory';
import type { ServiceContainer } from '@stock-bot/di';
import { exchangeRoutes } from './exchange.routes';
import { healthRoutes } from './health.routes';
import { queueRoutes } from './queue.routes';

View file

@ -3,9 +3,10 @@ import { getLogger } from '@stock-bot/logger';
import {
ConnectionFactory,
ServiceContainer,
PoolSizeCalculator
PoolSizeCalculator,
createServiceContainer
} from '@stock-bot/di';
import type { ConnectionFactoryConfig, DynamicPoolConfig } from '@stock-bot/mongodb';
import type { DynamicPoolConfig } from '@stock-bot/mongodb';
const logger = getLogger('database-setup');
@ -40,23 +41,25 @@ export function createConnectionFactory(): ConnectionFactory {
/**
* Sets up the service container with all dependencies
*/
export async function setupServiceContainer(): Promise<ServiceContainer> {
export async function setupServiceContainer(): Promise<{ container: ServiceContainer, factory: ConnectionFactory }> {
logger.info('Setting up service container for data-ingestion');
const connectionFactory = createConnectionFactory();
const dbConfig = getDatabaseConfig();
const queueConfig = getQueueConfig();
// Create base container
const container = new ServiceContainer('data-ingestion');
// Create enhanced service container with connection factory
const container = createServiceContainer('data-ingestion', connectionFactory, {
database: dbConfig
});
// Register MongoDB with dynamic pool sizing
// Override the default database connections with specific configurations
// MongoDB with dynamic pool sizing for batch operations
container.register({
name: 'mongodb',
factory: async () => {
const poolSize = PoolSizeCalculator.calculate('data-ingestion', 'batch-import');
const pool = await connectionFactory.createMongoDB({
name: 'default',
name: 'data-ingestion',
config: {
uri: dbConfig.mongodb.uri,
database: dbConfig.mongodb.database,
@ -77,18 +80,15 @@ export async function setupServiceContainer(): Promise<ServiceContainer> {
return pool.client;
},
singleton: true,
dispose: async (client) => {
await client.disconnect();
}
});
// Register PostgreSQL
// PostgreSQL with optimized settings for data ingestion
container.register({
name: 'postgres',
factory: async () => {
const poolSize = PoolSizeCalculator.calculate('data-ingestion');
const pool = await connectionFactory.createPostgreSQL({
name: 'default',
name: 'data-ingestion',
config: {
host: dbConfig.postgresql.host,
port: dbConfig.postgresql.port,
@ -107,17 +107,14 @@ export async function setupServiceContainer(): Promise<ServiceContainer> {
return pool.client;
},
singleton: true,
dispose: async (client) => {
await client.disconnect();
}
});
// Register Cache
// Cache with data-ingestion specific configuration
container.register({
name: 'cache',
factory: async () => {
const pool = await connectionFactory.createCache({
name: 'default',
name: 'data-ingestion',
config: {
host: dbConfig.dragonfly.host,
port: dbConfig.dragonfly.port,
@ -129,12 +126,12 @@ export async function setupServiceContainer(): Promise<ServiceContainer> {
singleton: true,
});
// Register QueueManager
// Queue with data-ingestion specific configuration
container.register({
name: 'queue',
factory: async () => {
const pool = await connectionFactory.createQueue({
name: 'default',
name: 'data-ingestion',
config: {
host: dbConfig.dragonfly.host,
port: dbConfig.dragonfly.port,
@ -144,19 +141,6 @@ export async function setupServiceContainer(): Promise<ServiceContainer> {
return pool.client;
},
singleton: true,
dispose: async (queueManager) => {
await queueManager.shutdown();
}
});
// Register the connection factory itself for pool management
container.register({
name: 'connectionFactory',
factory: () => connectionFactory,
singleton: true,
dispose: async (factory) => {
await factory.disposeAll();
}
});
logger.info('Service container setup complete');
@ -166,7 +150,7 @@ export async function setupServiceContainer(): Promise<ServiceContainer> {
await enableDynamicPoolSizing(container);
}
return container;
return { container, factory: connectionFactory };
}
/**

View file

@ -85,8 +85,8 @@
"type": "exponential",
"delay": 1000
},
"removeOnComplete": true,
"removeOnFail": false
"removeOnComplete": 100,
"removeOnFail": 50
}
},
"features": {

View file

@ -75,8 +75,8 @@
"type": "exponential",
"delay": 1000
},
"removeOnComplete": true,
"removeOnFail": false
"removeOnComplete": 100,
"removeOnFail": 50
}
},
"http": {

View file

@ -31,8 +31,8 @@
},
"defaultJobOptions": {
"attempts": 1,
"removeOnComplete": false,
"removeOnFail": false
"removeOnComplete": 100,
"removeOnFail": 50
}
},
"http": {

View file

@ -1,5 +1,5 @@
import { Queue, type Job } from 'bullmq';
import { getLogger } from '@stock-bot/logger';
import { Queue, type Job } from 'bullmq';
import type { DLQConfig, RedisConfig } from './types';
import { getRedisConnection } from './utils';
@ -76,8 +76,8 @@ export class DeadLetterQueueHandler {
};
await this.dlq.add('failed-job', dlqData, {
removeOnComplete: false,
removeOnFail: false,
removeOnComplete: 100,
removeOnFail: 50,
});
logger.error('Job moved to DLQ', {