refactor of data-service

This commit is contained in:
Boki 2025-06-12 08:03:09 -04:00
parent 6fb98c69f2
commit 09c97df1a8
49 changed files with 2394 additions and 112 deletions

View file

@ -2,10 +2,12 @@
* Data Service - Combined live and historical data ingestion with queue-based architecture
*/
import { Hono } from 'hono';
import { Browser } from '@stock-bot/browser';
import { loadEnvVariables } from '@stock-bot/config';
import { getLogger } from '@stock-bot/logger';
import { Shutdown } from '@stock-bot/shutdown';
import { initializeProxyCache } from './providers/proxy.tasks';
import { initializeIBResources } from './providers/ib.tasks';
import { initializeProxyResources } from './providers/proxy.tasks';
import { queueManager } from './services/queue.service';
import { initializeBatchCache } from './utils/batch-helpers';
import { healthRoutes, marketDataRoutes, proxyRoutes, queueRoutes, testRoutes } from './routes';
@ -33,6 +35,11 @@ async function initializeServices() {
logger.info('Initializing data service...');
try {
// Initialize browser resources
logger.info('Starting browser resources initialization...');
await Browser.initialize();
logger.info('Browser resources initialized');
// Initialize batch cache FIRST - before queue service
logger.info('Starting batch cache initialization...');
await initializeBatchCache();
@ -40,7 +47,12 @@ async function initializeServices() {
// Initialize proxy cache - before queue service
logger.info('Starting proxy cache initialization...');
await initializeProxyCache();
await initializeProxyResources(true); // Wait for cache during startup
logger.info('Proxy cache initialized');
// Initialize proxy cache - before queue service
logger.info('Starting proxy cache initialization...');
await initializeIBResources(true); // Wait for cache during startup
logger.info('Proxy cache initialized');
// Initialize queue service (Redis connections should be ready now)

View file

@ -0,0 +1,32 @@
import { getLogger } from '@stock-bot/logger';
import { ProviderConfig } from '../services/provider-registry.service';
const logger = getLogger('ib-provider');
export const ibProvider: ProviderConfig = {
name: 'ib',
operations: {
'ib-symbol-summary': async () => {
const { ibTasks } = await import('./ib.tasks');
logger.info('Fetching symbol summary from IB');
const total = await ibTasks.fetchSymbolSummary();
logger.info('Fetched symbol summary from IB', {
count: total,
});
return total;
},
},
scheduledJobs: [
{
type: 'ib-symbol-summary',
operation: 'ib-symbol-summary',
payload: {},
// should remove and just run at the same time so app restarts dont keeping adding same jobs
cronPattern: '*/2 * * * *',
priority: 5,
immediately: true, // Don't run immediately during startup to avoid conflicts
description: 'Fetch and validate proxy list from sources',
},
],
};

View file

@ -0,0 +1,152 @@
import { Browser } from '@stock-bot/browser';
import { getLogger } from '@stock-bot/logger';
// Shared instances (module-scoped, not global)
let isInitialized = false; // Track if resources are initialized
let logger: ReturnType<typeof getLogger>;
// let cache: CacheProvider;
export async function initializeIBResources(waitForCache = false): Promise<void> {
// Skip if already initialized
if (isInitialized) {
return;
}
logger = getLogger('proxy-tasks');
// cache = createCache({
// keyPrefix: 'proxy:',
// ttl: PROXY_CONFIG.CACHE_TTL,
// enableMetrics: true,
// });
// httpClient = new HttpClient({ timeout: 15000 }, logger);
// if (waitForCache) {
// // logger.info('Initializing proxy cache...');
// // await cache.waitForReady(10000);
// // logger.info('Proxy cache initialized successfully');
// logger.info('Proxy tasks initialized');
// } else {
// logger.info('Proxy tasks initialized (fallback mode)');
// }
isInitialized = true;
}
export async function fetchSymbolSummary(): Promise<number> {
try {
await Browser.initialize({ headless: true, timeout: 10000, blockResources: false });
logger.info('✅ Browser initialized');
const { page, contextId } = await Browser.createPageWithProxy(
'https://www.interactivebrokers.com/en/trading/products-exchanges.php#/',
'http://doimvbnb-US-rotate:w5fpiwrb9895@p.webshare.io:80'
);
logger.info('✅ Page created with proxy');
let summaryData: any = null; // Initialize summaryData to store API response
let eventCount = 0;
page.onNetworkEvent(event => {
if (event.url.includes('/webrest/search/product-types/summary')) {
console.log(`🎯 Found summary API call: ${event.type} ${event.url}`);
if (event.type === 'response' && event.responseData) {
console.log(`📊 Summary API Response Data: ${event.responseData}`);
try {
summaryData = JSON.parse(event.responseData) as any;
const totalCount = summaryData[0].totalCount;
console.log('📊 Summary API Response:', JSON.stringify(summaryData, null, 2));
console.log(`🔢 Total symbols found: ${totalCount || 'Unknown'}`);
} catch (e) {
console.log('📊 Raw Summary Response:', event.responseData);
}
}
}
eventCount++;
logger.info(`📡 Event ${eventCount}: ${event.type} ${event.url}`);
});
logger.info('⏳ Waiting for page load...');
await page.waitForLoadState('domcontentloaded', { timeout: 20000 });
logger.info('✅ Page loaded');
// RIGHT HERE - Interact with the page to find Stocks checkbox and Apply button
logger.info('🔍 Looking for Products tab...');
// Wait for the page to fully load
await page.waitForTimeout(20000);
// First, click on the Products tab
const productsTab = page.locator('#productSearchTab[role="tab"][href="#products"]');
await productsTab.waitFor({ timeout: 20000 });
logger.info('✅ Found Products tab');
logger.info('🖱️ Clicking Products tab...');
await productsTab.click();
logger.info('✅ Products tab clicked');
// Wait for the tab content to load
await page.waitForTimeout(5000);
// Click on the Asset Classes accordion to expand it
logger.info('🔍 Looking for Asset Classes accordion...');
const assetClassesAccordion = page.locator(
'#products .accordion-item #acc-products .accordion_btn:has-text("Asset Classes")'
);
await assetClassesAccordion.waitFor({ timeout: 10000 });
logger.info('✅ Found Asset Classes accordion');
logger.info('🖱️ Clicking Asset Classes accordion...');
await assetClassesAccordion.click();
logger.info('✅ Asset Classes accordion clicked');
// Wait for the accordion content to expand
await page.waitForTimeout(2000);
logger.info('🔍 Looking for Stocks checkbox...');
// Find the span with class "fs-7 checkbox-text" and inner text containing "Stocks"
const stocksSpan = page.locator('span.fs-7.checkbox-text:has-text("Stocks")');
await stocksSpan.waitFor({ timeout: 10000 });
logger.info('✅ Found Stocks span');
// Find the checkbox by looking in the same parent container
const parentContainer = stocksSpan.locator('..');
const checkbox = parentContainer.locator('input[type="checkbox"]');
if ((await checkbox.count()) > 0) {
logger.info('📋 Clicking Stocks checkbox...');
await checkbox.first().check();
logger.info('✅ Stocks checkbox checked');
} else {
logger.info('⚠️ Could not find checkbox near Stocks text');
}
// Wait a moment for any UI updates
await page.waitForTimeout(1000);
// Find and click the nearest Apply button
logger.info('🔍 Looking for Apply button...');
const applyButton = page.locator(
'button:has-text("Apply"), input[type="submit"][value*="Apply"], input[type="button"][value*="Apply"]'
);
if ((await applyButton.count()) > 0) {
logger.info('🎯 Clicking Apply button...');
await applyButton.first().click();
logger.info('✅ Apply button clicked');
// Wait for any network requests triggered by the Apply button
await page.waitForTimeout(2000);
} else {
logger.info('⚠️ Could not find Apply button');
}
return 0;
} catch (error) {
logger.error('Failed to fetch IB symbol summary', { error });
return 0;
}
}
// Optional: Export a convenience object that groups related tasks
export const ibTasks = {
fetchSymbolSummary,
};

View file

@ -1,4 +1,3 @@
import pLimit from 'p-limit';
import { createCache, type CacheProvider } from '@stock-bot/cache';
import { HttpClient, ProxyInfo } from '@stock-bot/http';
import { getLogger } from '@stock-bot/logger';
@ -22,7 +21,6 @@ const PROXY_CONFIG = {
CHECK_TIMEOUT: 7000,
CHECK_IP: '99.246.102.205',
CHECK_URL: 'https://proxy-detection.stare.gg/?api_key=bd406bf53ddc6abe1d9de5907830a955',
CONCURRENCY_LIMIT: 100,
PROXY_SOURCES: [
{
id: 'prxchk',
@ -154,10 +152,10 @@ const PROXY_CONFIG = {
};
// Shared instances (module-scoped, not global)
let isInitialized = false; // Track if resources are initialized
let logger: ReturnType<typeof getLogger>;
let cache: CacheProvider;
let httpClient: HttpClient;
let concurrencyLimit: ReturnType<typeof pLimit>;
let proxyStats: ProxySource[] = PROXY_CONFIG.PROXY_SOURCES.map(source => ({
id: source.id,
total: 0,
@ -167,6 +165,37 @@ let proxyStats: ProxySource[] = PROXY_CONFIG.PROXY_SOURCES.map(source => ({
url: source.url,
}));
/**
* Initialize proxy resources (cache and shared dependencies)
* This should be called before any proxy operations
* @param waitForCache - Whether to wait for cache readiness (default: false for fallback mode)
*/
export async function initializeProxyResources(waitForCache = false): Promise<void> {
// Skip if already initialized
if (isInitialized) {
return;
}
logger = getLogger('proxy-tasks');
cache = createCache({
keyPrefix: 'proxy:',
ttl: PROXY_CONFIG.CACHE_TTL,
enableMetrics: true,
});
httpClient = new HttpClient({ timeout: 10000 }, logger);
if (waitForCache) {
logger.info('Initializing proxy cache...');
await cache.waitForReady(10000);
logger.info('Proxy cache initialized successfully');
logger.info('Proxy tasks initialized');
} else {
logger.info('Proxy tasks initialized (fallback mode)');
}
isInitialized = true;
}
// make a function that takes in source id and a boolean success and updates the proxyStats array
async function updateProxyStats(sourceId: string, success: boolean) {
const source = proxyStats.find(s => s.id === sourceId);
@ -278,50 +307,8 @@ async function updateProxyInCache(proxy: ProxyInfo, isWorking: boolean): Promise
}
}
/**
* Initialize proxy cache for use during application startup
* This should be called before any proxy operations
*/
export async function initializeProxyCache(): Promise<void> {
logger = getLogger('proxy-tasks');
cache = createCache({
keyPrefix: 'proxy:',
ttl: PROXY_CONFIG.CACHE_TTL,
enableMetrics: true,
});
logger.info('Initializing proxy cache...');
await cache.waitForReady(10000);
logger.info('Proxy cache initialized successfully');
// Initialize other shared resources that don't require cache
httpClient = new HttpClient({ timeout: 10000 }, logger);
concurrencyLimit = pLimit(PROXY_CONFIG.CONCURRENCY_LIMIT);
logger.info('Proxy tasks initialized');
}
async function initializeSharedResources() {
if (!logger) {
// If not initialized at startup, initialize with fallback mode
logger = getLogger('proxy-tasks');
cache = createCache({
keyPrefix: 'proxy:',
ttl: PROXY_CONFIG.CACHE_TTL,
enableMetrics: true,
});
httpClient = new HttpClient({ timeout: 10000 }, logger);
concurrencyLimit = pLimit(PROXY_CONFIG.CONCURRENCY_LIMIT);
logger.info('Proxy tasks initialized (fallback mode)');
}
}
// Individual task functions
export async function queueProxyFetch(): Promise<string> {
await initializeSharedResources();
const { queueManager } = await import('../services/queue.service');
const job = await queueManager.addJob({
type: 'proxy-fetch',
@ -337,8 +324,6 @@ export async function queueProxyFetch(): Promise<string> {
}
export async function queueProxyCheck(proxies: ProxyInfo[]): Promise<string> {
await initializeSharedResources();
const { queueManager } = await import('../services/queue.service');
const job = await queueManager.addJob({
type: 'proxy-check',
@ -354,35 +339,15 @@ export async function queueProxyCheck(proxies: ProxyInfo[]): Promise<string> {
}
export async function fetchProxiesFromSources(): Promise<ProxyInfo[]> {
await initializeSharedResources();
await resetProxyStats();
// Ensure concurrencyLimit is available before using it
if (!concurrencyLimit) {
logger.error('concurrencyLimit not initialized, using sequential processing');
const result = [];
for (const source of PROXY_CONFIG.PROXY_SOURCES) {
const proxies = await fetchProxiesFromSource(source);
result.push(...proxies);
}
let allProxies: ProxyInfo[] = result;
allProxies = removeDuplicateProxies(allProxies);
return allProxies;
}
const sources = PROXY_CONFIG.PROXY_SOURCES.map(source =>
concurrencyLimit(() => fetchProxiesFromSource(source))
);
const result = await Promise.all(sources);
let allProxies: ProxyInfo[] = result.flat();
const fetchPromises = PROXY_CONFIG.PROXY_SOURCES.map(source => fetchProxiesFromSource(source));
const results = await Promise.all(fetchPromises);
let allProxies: ProxyInfo[] = results.flat();
allProxies = removeDuplicateProxies(allProxies);
// await checkProxies(allProxies);
return allProxies;
}
export async function fetchProxiesFromSource(source: ProxySource): Promise<ProxyInfo[]> {
await initializeSharedResources();
const allProxies: ProxyInfo[] = [];
try {
@ -436,8 +401,6 @@ export async function fetchProxiesFromSource(source: ProxySource): Promise<Proxy
* Check if a proxy is working
*/
export async function checkProxy(proxy: ProxyInfo): Promise<ProxyInfo> {
await initializeSharedResources();
let success = false;
logger.debug(`Checking Proxy:`, {
protocol: proxy.protocol,
@ -504,6 +467,76 @@ export async function checkProxy(proxy: ProxyInfo): Promise<ProxyInfo> {
}
}
/**
* Get a random active proxy from the cache
* @param protocol - Optional protocol filter ('http' | 'https' | 'socks4' | 'socks5')
* @param minSuccessRate - Minimum success rate percentage (default: 50)
* @returns A random working proxy or null if none found
*/
export async function getRandomActiveProxy(
protocol?: 'http' | 'https' | 'socks4' | 'socks5',
minSuccessRate: number = 50
): Promise<ProxyInfo | null> {
try {
// Get all active proxy keys from cache
const pattern = protocol
? `${PROXY_CONFIG.CACHE_KEY}:${protocol}://*`
: `${PROXY_CONFIG.CACHE_KEY}:*`;
const keys = await cache.keys(pattern);
if (keys.length === 0) {
logger.debug('No active proxies found in cache', { pattern });
return null;
}
// Shuffle the keys for randomness
const shuffledKeys = keys.sort(() => Math.random() - 0.5);
// Find a working proxy that meets the criteria
for (const key of shuffledKeys) {
try {
const proxyData: ProxyInfo | null = await cache.get(key);
if (
proxyData &&
proxyData.isWorking &&
(!proxyData.successRate || proxyData.successRate >= minSuccessRate)
) {
logger.debug('Random active proxy selected', {
proxy: `${proxyData.host}:${proxyData.port}`,
protocol: proxyData.protocol,
successRate: proxyData.successRate?.toFixed(1) + '%',
avgResponseTime: proxyData.averageResponseTime
? `${proxyData.averageResponseTime.toFixed(0)}ms`
: 'N/A',
});
return proxyData;
}
} catch (error) {
logger.debug('Error reading proxy from cache', { key, error: (error as Error).message });
continue;
}
}
logger.debug('No working proxies found meeting criteria', {
protocol,
minSuccessRate,
keysChecked: shuffledKeys.length,
});
return null;
} catch (error) {
logger.error('Error getting random active proxy', {
error: error instanceof Error ? error.message : String(error),
protocol,
minSuccessRate,
});
return null;
}
}
// Utility functions
function cleanProxyUrl(url: string): string {
return url

View file

@ -1,15 +1,15 @@
import { getLogger } from '@stock-bot/logger';
import { ProviderConfig } from '../services/provider-registry.service';
const logger = getLogger('quotemedia-provider');
const logger = getLogger('qm-provider');
export const quotemediaProvider: ProviderConfig = {
name: 'quotemedia',
export const qmProvider: ProviderConfig = {
name: 'qm',
operations: {
'live-data': async (payload: { symbol: string; fields?: string[] }) => {
logger.info('Fetching live data from QuoteMedia', { symbol: payload.symbol });
logger.info('Fetching live data from qm', { symbol: payload.symbol });
// Simulate QuoteMedia API call
// Simulate qm API call
const mockData = {
symbol: payload.symbol,
price: Math.random() * 1000 + 100,
@ -17,7 +17,7 @@ export const quotemediaProvider: ProviderConfig = {
change: (Math.random() - 0.5) * 20,
changePercent: (Math.random() - 0.5) * 5,
timestamp: new Date().toISOString(),
source: 'quotemedia',
source: 'qm',
fields: payload.fields || ['price', 'volume', 'change'],
};
@ -34,7 +34,7 @@ export const quotemediaProvider: ProviderConfig = {
interval?: string;
fields?: string[];
}) => {
logger.info('Fetching historical data from QuoteMedia', {
logger.info('Fetching historical data from qm', {
symbol: payload.symbol,
from: payload.from,
to: payload.to,
@ -56,7 +56,7 @@ export const quotemediaProvider: ProviderConfig = {
low: Math.random() * 1000 + 100,
close: Math.random() * 1000 + 100,
volume: Math.floor(Math.random() * 1000000),
source: 'quotemedia',
source: 'qm',
});
}
@ -67,12 +67,12 @@ export const quotemediaProvider: ProviderConfig = {
symbol: payload.symbol,
interval: payload.interval || '1d',
data,
source: 'quotemedia',
source: 'qm',
totalRecords: data.length,
};
},
'batch-quotes': async (payload: { symbols: string[]; fields?: string[] }) => {
logger.info('Fetching batch quotes from QuoteMedia', {
logger.info('Fetching batch quotes from qm', {
symbols: payload.symbols,
count: payload.symbols.length,
});
@ -83,7 +83,7 @@ export const quotemediaProvider: ProviderConfig = {
volume: Math.floor(Math.random() * 1000000),
change: (Math.random() - 0.5) * 20,
timestamp: new Date().toISOString(),
source: 'quotemedia',
source: 'qm',
}));
// Simulate network delay
@ -91,13 +91,13 @@ export const quotemediaProvider: ProviderConfig = {
return {
quotes,
source: 'quotemedia',
source: 'qm',
timestamp: new Date().toISOString(),
totalSymbols: payload.symbols.length,
};
},
'company-profile': async (payload: { symbol: string }) => {
logger.info('Fetching company profile from QuoteMedia', { symbol: payload.symbol });
logger.info('Fetching company profile from qm', { symbol: payload.symbol });
// Simulate company profile data
const profile = {
@ -109,7 +109,7 @@ export const quotemediaProvider: ProviderConfig = {
marketCap: Math.floor(Math.random() * 1000000000000),
employees: Math.floor(Math.random() * 100000),
website: `https://www.${payload.symbol.toLowerCase()}.com`,
source: 'quotemedia',
source: 'qm',
};
await new Promise(resolve => setTimeout(resolve, 150 + Math.random() * 100));
@ -117,7 +117,7 @@ export const quotemediaProvider: ProviderConfig = {
return profile;
},
'options-chain': async (payload: { symbol: string; expiration?: string }) => {
logger.info('Fetching options chain from QuoteMedia', {
logger.info('Fetching options chain from qm', {
symbol: payload.symbol,
expiration: payload.expiration,
});
@ -148,14 +148,14 @@ export const quotemediaProvider: ProviderConfig = {
new Date(Date.now() + 30 * 24 * 60 * 60 * 1000).toISOString().split('T')[0],
calls,
puts,
source: 'quotemedia',
source: 'qm',
};
},
},
scheduledJobs: [
// {
// type: 'quotemedia-premium-refresh',
// type: 'qm-premium-refresh',
// operation: 'batch-quotes',
// payload: { symbols: ['AAPL', 'GOOGL', 'MSFT'] },
// cronPattern: '*/2 * * * *', // Every 2 minutes
@ -163,7 +163,7 @@ export const quotemediaProvider: ProviderConfig = {
// description: 'Refresh premium quotes with detailed market data'
// },
// {
// type: 'quotemedia-options-update',
// type: 'qm-options-update',
// operation: 'options-chain',
// payload: { symbol: 'SPY' },
// cronPattern: '*/10 * * * *', // Every 10 minutes
@ -171,7 +171,7 @@ export const quotemediaProvider: ProviderConfig = {
// description: 'Update options chain data for SPY ETF'
// },
// {
// type: 'quotemedia-profiles',
// type: 'qm-profiles',
// operation: 'company-profile',
// payload: { symbol: 'AAPL' },
// cronPattern: '0 9 * * 1-5', // Weekdays at 9 AM

View file

@ -154,8 +154,8 @@ export class QueueService {
// Define providers to register
const providers = [
{ module: '../providers/proxy.provider', export: 'proxyProvider' },
{ module: '../providers/quotemedia.provider', export: 'quotemediaProvider' },
{ module: '../providers/yahoo.provider', export: 'yahooProvider' },
{ module: '../providers/ib.provider', export: 'ibProvider' },
// { module: '../providers/yahoo.provider', export: 'yahooProvider' },
];
// Import and register all providers