small fixes

This commit is contained in:
Boki 2025-06-20 11:30:27 -04:00
parent 87037e013f
commit 98aa414231
6 changed files with 33 additions and 33 deletions

View file

@ -251,5 +251,4 @@ startServer().catch(error => {
logger.info('Data service startup initiated'); logger.info('Data service startup initiated');
// Export queue manager for providers // Queue manager is available via QueueManager.getInstance() singleton pattern
export { queueManager };

View file

@ -108,7 +108,7 @@ export async function fetchSession(): Promise<Record<string, string> | undefined
} }
} }
export async function fetchExchanges(sessionHeaders: Record<string, string>): Promise<unknown> { export async function fetchExchanges(sessionHeaders: Record<string, string>): Promise<unknown[] | null> {
try { try {
logger.info('🔍 Fetching exchanges with session headers...'); logger.info('🔍 Fetching exchanges with session headers...');
@ -170,7 +170,7 @@ export async function fetchExchanges(sessionHeaders: Record<string, string>): Pr
} }
// Fetch symbols from IB using the session headers // Fetch symbols from IB using the session headers
export async function fetchSymbols(sessionHeaders: Record<string, string>): Promise<unknown> { export async function fetchSymbols(sessionHeaders: Record<string, string>): Promise<unknown[] | null> {
try { try {
logger.info('🔍 Fetching symbols with session headers...'); logger.info('🔍 Fetching symbols with session headers...');
// Configure the proxy // Configure the proxy
@ -275,6 +275,8 @@ export async function fetchSymbols(sessionHeaders: Record<string, string>): Prom
logger.info('Saved IB symbols to DB', { logger.info('Saved IB symbols to DB', {
totalSymbols: symbols.length, totalSymbols: symbols.length,
}); });
return symbols;
// logger.info('📤 Making request to exchange API...', { // logger.info('📤 Making request to exchange API...', {
// url: exchangeUrl, // url: exchangeUrl,
// headerCount: Object.keys(requestHeaders).length, // headerCount: Object.keys(requestHeaders).length,

View file

@ -1,6 +1,8 @@
import { createCache, type CacheProvider } from '@stock-bot/cache'; import { createCache, type CacheProvider } from '@stock-bot/cache';
import { getDatabaseConfig } from '@stock-bot/config';
import { HttpClient, ProxyInfo } from '@stock-bot/http'; import { HttpClient, ProxyInfo } from '@stock-bot/http';
import { getLogger } from '@stock-bot/logger'; import { getLogger } from '@stock-bot/logger';
import { QueueManager } from '@stock-bot/queue';
// Type definitions // Type definitions
export interface ProxySource { export interface ProxySource {
@ -177,7 +179,9 @@ export async function initializeProxyResources(waitForCache = false): Promise<vo
} }
logger = getLogger('proxy-tasks'); logger = getLogger('proxy-tasks');
const databaseConfig = getDatabaseConfig();
cache = createCache({ cache = createCache({
redisConfig: databaseConfig.dragonfly,
keyPrefix: 'proxy:', keyPrefix: 'proxy:',
ttl: PROXY_CONFIG.CACHE_TTL, ttl: PROXY_CONFIG.CACHE_TTL,
enableMetrics: true, enableMetrics: true,
@ -309,10 +313,7 @@ async function updateProxyInCache(proxy: ProxyInfo, isWorking: boolean): Promise
// Individual task functions // Individual task functions
export async function queueProxyFetch(): Promise<string> { export async function queueProxyFetch(): Promise<string> {
const { queueManager } = await import('../index'); const queueManager = QueueManager.getInstance();
if (!queueManager) {
throw new Error('Queue manager not initialized');
}
const queue = queueManager.getQueue('proxy'); const queue = queueManager.getQueue('proxy');
const job = await queue.add('proxy-fetch', { const job = await queue.add('proxy-fetch', {
handler: 'proxy', handler: 'proxy',
@ -327,10 +328,7 @@ export async function queueProxyFetch(): Promise<string> {
} }
export async function queueProxyCheck(proxies: ProxyInfo[]): Promise<string> { export async function queueProxyCheck(proxies: ProxyInfo[]): Promise<string> {
const { queueManager } = await import('../index'); const queueManager = QueueManager.getInstance();
if (!queueManager) {
throw new Error('Queue manager not initialized');
}
const queue = queueManager.getQueue('proxy'); const queue = queueManager.getQueue('proxy');
const job = await queue.add('proxy-check', { const job = await queue.add('proxy-check', {
handler: 'proxy', handler: 'proxy',

View file

@ -1,6 +1,7 @@
import { getRandomUserAgent } from '@stock-bot/http'; import { getRandomUserAgent } from '@stock-bot/http';
import { getLogger } from '@stock-bot/logger'; import { getLogger } from '@stock-bot/logger';
import { getMongoDBClient } from '@stock-bot/mongodb-client'; import { getMongoDBClient } from '@stock-bot/mongodb-client';
import { QueueManager } from '@stock-bot/queue';
import { getProxy } from './webshare.provider'; import { getProxy } from './webshare.provider';
// Shared instances (module-scoped, not global) // Shared instances (module-scoped, not global)
@ -171,10 +172,7 @@ async function createAlphabetJobs(
maxDepth: number maxDepth: number
): Promise<{ success: boolean; symbolsFound: number; jobsCreated: number }> { ): Promise<{ success: boolean; symbolsFound: number; jobsCreated: number }> {
try { try {
const { queueManager } = await import('../index'); const queueManager = QueueManager.getInstance();
if (!queueManager) {
throw new Error('Queue manager not initialized');
}
const queue = queueManager.getQueue('qm'); const queue = queueManager.getQueue('qm');
let jobsCreated = 0; let jobsCreated = 0;
@ -242,10 +240,7 @@ async function searchAndSpawnJobs(
// If we have 50+ symbols and haven't reached max depth, spawn sub-jobs // If we have 50+ symbols and haven't reached max depth, spawn sub-jobs
if (symbolCount >= 50 && depth < maxDepth) { if (symbolCount >= 50 && depth < maxDepth) {
const { queueManager } = await import('../index'); const queueManager = QueueManager.getInstance();
if (!queueManager) {
throw new Error('Queue manager not initialized');
}
const queue = queueManager.getQueue('qm'); const queue = queueManager.getQueue('qm');
logger.info(`Spawning sub-jobs for prefix "${prefix}" (${symbolCount} >= 50 symbols)`); logger.info(`Spawning sub-jobs for prefix "${prefix}" (${symbolCount} >= 50 symbols)`);

View file

@ -2,7 +2,11 @@
* WebShare Provider for proxy management * WebShare Provider for proxy management
*/ */
import { getLogger } from '@stock-bot/logger'; import { getLogger } from '@stock-bot/logger';
import { handlerRegistry, createJobHandler, type HandlerConfigWithSchedule } from '@stock-bot/queue'; import {
createJobHandler,
handlerRegistry,
type HandlerConfigWithSchedule,
} from '@stock-bot/queue';
const logger = getLogger('webshare-provider'); const logger = getLogger('webshare-provider');
@ -18,7 +22,7 @@ export function getProxy(): string | null {
const proxy = proxies[currentProxyIndex]; const proxy = proxies[currentProxyIndex];
currentProxyIndex = (currentProxyIndex + 1) % proxies.length; currentProxyIndex = (currentProxyIndex + 1) % proxies.length;
return proxy; return proxy ?? null;
} }
// Initialize and register the WebShare provider // Initialize and register the WebShare provider

View file

@ -3,7 +3,7 @@
*/ */
import { Hono } from 'hono'; import { Hono } from 'hono';
import { getLogger } from '@stock-bot/logger'; import { getLogger } from '@stock-bot/logger';
import { processItems, queueManager } from '../index'; import { processItems, QueueManager } from '@stock-bot/queue';
const logger = getLogger('market-data-routes'); const logger = getLogger('market-data-routes');
@ -16,9 +16,10 @@ marketDataRoutes.get('/api/live/:symbol', async c => {
try { try {
// Queue job for live data using Yahoo provider // Queue job for live data using Yahoo provider
const job = await queueManager.add('market-data-live', { const queueManager = QueueManager.getInstance();
type: 'market-data-live', const queue = queueManager.getQueue('yahoo-finance');
provider: 'yahoo-finance', const job = await queue.add('live-data', {
handler: 'yahoo-finance',
operation: 'live-data', operation: 'live-data',
payload: { symbol }, payload: { symbol },
}); });
@ -46,9 +47,10 @@ marketDataRoutes.get('/api/historical/:symbol', async c => {
const toDate = to ? new Date(to) : new Date(); // Now const toDate = to ? new Date(to) : new Date(); // Now
// Queue job for historical data using Yahoo provider // Queue job for historical data using Yahoo provider
const job = await queueManager.add('market-data-historical', { const queueManager = QueueManager.getInstance();
type: 'market-data-historical', const queue = queueManager.getQueue('yahoo-finance');
provider: 'yahoo-finance', const job = await queue.add('historical-data', {
handler: 'yahoo-finance',
operation: 'historical-data', operation: 'historical-data',
payload: { payload: {
symbol, symbol,
@ -94,13 +96,13 @@ marketDataRoutes.post('/api/process-symbols', async c => {
useBatching, useBatching,
}); });
const result = await processItems(symbols, queueManager, { const result = await processItems(symbols, provider, {
handler: provider,
operation,
totalDelayHours, totalDelayHours,
useBatching, useBatching,
batchSize, batchSize,
priority: 2, priority: 2,
provider,
operation,
retries: 2, retries: 2,
removeOnComplete: 5, removeOnComplete: 5,
removeOnFail: 10, removeOnFail: 10,