updated ib handler

This commit is contained in:
Boki 2025-06-23 08:05:59 -04:00
parent 9492f1b15e
commit fbff428e90
13 changed files with 94 additions and 69 deletions

View file

@ -77,7 +77,7 @@
"port": 6379,
"db": 1
},
"workers": 5,
"workers": 2,
"concurrency": 2,
"enableScheduledJobs": true,
"delayWorkerStart": false,

View file

@ -107,7 +107,7 @@ export async function processIndividualSymbol(
return { ceoId, spielCount, timestamp };
} catch (error) {
this.logger.error('Failed to process individual symbol', {
this.logger.error(`Failed to process individual symbol ${symbol}`, {
error,
ceoId,
timestamp,

View file

@ -1,26 +1,29 @@
import type { IbHandler } from '../ib.handler';
import type { IServiceContainer } from '@stock-bot/handlers';
import { fetchSession } from './fetch-session.action';
import { fetchExchanges } from './fetch-exchanges.action';
import { fetchSymbols } from './fetch-symbols.action';
export async function fetchExchangesAndSymbols(this: IbHandler): Promise<unknown> {
this.logger.info('Starting IB exchanges and symbols fetch job');
export async function fetchExchangesAndSymbols(services: IServiceContainer): Promise<unknown> {
services.logger.info('Starting IB exchanges and symbols fetch job');
try {
// Fetch session headers first
const sessionHeaders = await this.fetchSession();
const sessionHeaders = await fetchSession(services);
if (!sessionHeaders) {
this.logger.error('Failed to get session headers for IB job');
services.logger.error('Failed to get session headers for IB job');
return { success: false, error: 'No session headers' };
}
this.logger.info('Session headers obtained, fetching exchanges...');
services.logger.info('Session headers obtained, fetching exchanges...');
// Fetch exchanges
const exchanges = await this.fetchExchanges();
this.logger.info('Fetched exchanges from IB', { count: exchanges?.length || 0 });
const exchanges = await fetchExchanges(services);
services.logger.info('Fetched exchanges from IB', { count: exchanges?.length || 0 });
// Fetch symbols
this.logger.info('Fetching symbols...');
const symbols = await this.fetchSymbols();
this.logger.info('Fetched symbols from IB', { count: symbols?.length || 0 });
services.logger.info('Fetching symbols...');
const symbols = await fetchSymbols(services);
services.logger.info('Fetched symbols from IB', { count: symbols?.length || 0 });
return {
success: true,
@ -28,7 +31,7 @@ export async function fetchExchangesAndSymbols(this: IbHandler): Promise<unknown
symbolsCount: symbols?.length || 0,
};
} catch (error) {
this.logger.error('Failed to fetch IB exchanges and symbols', { error });
services.logger.error('Failed to fetch IB exchanges and symbols', { error });
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
@ -36,3 +39,4 @@ export async function fetchExchangesAndSymbols(this: IbHandler): Promise<unknown
}
}

View file

@ -1,15 +1,16 @@
import type { IbHandler } from '../ib.handler';
import type { IServiceContainer } from '@stock-bot/handlers';
import { IB_CONFIG } from '../shared/config';
import { fetchSession } from './fetch-session.action';
export async function fetchExchanges(this: IbHandler): Promise<unknown[] | null> {
export async function fetchExchanges(services: IServiceContainer): Promise<unknown[] | null> {
try {
// First get session headers
const sessionHeaders = await this.fetchSession();
const sessionHeaders = await fetchSession(services);
if (!sessionHeaders) {
throw new Error('Failed to get session headers');
}
this.logger.info('🔍 Fetching exchanges with session headers...');
services.logger.info('🔍 Fetching exchanges with session headers...');
// The URL for the exchange data API
const exchangeUrl = IB_CONFIG.BASE_URL + IB_CONFIG.EXCHANGE_API;
@ -27,7 +28,7 @@ export async function fetchExchanges(this: IbHandler): Promise<unknown[] | null>
'X-Requested-With': 'XMLHttpRequest',
};
this.logger.info('📤 Making request to exchange API...', {
services.logger.info('📤 Making request to exchange API...', {
url: exchangeUrl,
headerCount: Object.keys(requestHeaders).length,
});
@ -40,7 +41,7 @@ export async function fetchExchanges(this: IbHandler): Promise<unknown[] | null>
});
if (!response.ok) {
this.logger.error('❌ Exchange API request failed', {
services.logger.error('❌ Exchange API request failed', {
status: response.status,
statusText: response.statusText,
});
@ -49,18 +50,19 @@ export async function fetchExchanges(this: IbHandler): Promise<unknown[] | null>
const data = await response.json();
const exchanges = data?.exchanges || [];
this.logger.info('✅ Exchange data fetched successfully');
services.logger.info('✅ Exchange data fetched successfully');
this.logger.info('Saving IB exchanges to MongoDB...');
await this.mongodb.batchUpsert('ibExchanges', exchanges, ['id', 'country_code']);
this.logger.info('✅ Exchange IB data saved to MongoDB:', {
services.logger.info('Saving IB exchanges to MongoDB...');
await services.mongodb.batchUpsert('ibExchanges', exchanges, ['id', 'country_code']);
services.logger.info('✅ Exchange IB data saved to MongoDB:', {
count: exchanges.length,
});
return exchanges;
} catch (error) {
this.logger.error('❌ Failed to fetch exchanges', { error });
services.logger.error('❌ Failed to fetch exchanges', { error });
return null;
}
}

View file

@ -1,21 +1,21 @@
import { Browser } from '@stock-bot/browser';
import type { IbHandler } from '../ib.handler';
import type { IServiceContainer } from '@stock-bot/handlers';
import { IB_CONFIG } from '../shared/config';
export async function fetchSession(this: IbHandler): Promise<Record<string, string> | undefined> {
export async function fetchSession(services: IServiceContainer): Promise<Record<string, string> | undefined> {
try {
await Browser.initialize({
headless: true,
timeout: IB_CONFIG.BROWSER_TIMEOUT,
blockResources: false,
});
this.logger.info('✅ Browser initialized');
services.logger.info('✅ Browser initialized');
const { page } = await Browser.createPageWithProxy(
IB_CONFIG.BASE_URL + IB_CONFIG.PRODUCTS_PAGE,
IB_CONFIG.DEFAULT_PROXY
);
this.logger.info('✅ Page created with proxy');
services.logger.info('✅ Page created with proxy');
const headersPromise = new Promise<Record<string, string> | undefined>(resolve => {
let resolved = false;
@ -27,7 +27,7 @@ export async function fetchSession(this: IbHandler): Promise<Record<string, stri
resolve(event.headers);
} catch (e) {
resolve(undefined);
this.logger.debug('Raw Summary Response error', { error: (e as Error).message });
services.logger.debug('Raw Summary Response error', { error: (e as Error).message });
}
}
}
@ -37,47 +37,48 @@ export async function fetchSession(this: IbHandler): Promise<Record<string, stri
setTimeout(() => {
if (!resolved) {
resolved = true;
this.logger.warn('Timeout waiting for headers');
services.logger.warn('Timeout waiting for headers');
resolve(undefined);
}
}, IB_CONFIG.HEADERS_TIMEOUT);
});
this.logger.info('⏳ Waiting for page load...');
services.logger.info('⏳ Waiting for page load...');
await page.waitForLoadState('domcontentloaded', { timeout: IB_CONFIG.PAGE_LOAD_TIMEOUT });
this.logger.info('✅ Page loaded');
services.logger.info('✅ Page loaded');
//Products tabs
this.logger.info('🔍 Looking for Products tab...');
services.logger.info('🔍 Looking for Products tab...');
const productsTab = page.locator('#productSearchTab[role="tab"][href="#products"]');
await productsTab.waitFor({ timeout: IB_CONFIG.ELEMENT_TIMEOUT });
this.logger.info('✅ Found Products tab');
this.logger.info('🖱️ Clicking Products tab...');
services.logger.info('✅ Found Products tab');
services.logger.info('🖱️ Clicking Products tab...');
await productsTab.click();
this.logger.info('✅ Products tab clicked');
services.logger.info('✅ Products tab clicked');
// New Products Checkbox
this.logger.info('🔍 Looking for "New Products Only" radio button...');
services.logger.info('🔍 Looking for "New Products Only" radio button...');
const radioButton = page.locator('span.checkbox-text:has-text("New Products Only")');
await radioButton.waitFor({ timeout: IB_CONFIG.ELEMENT_TIMEOUT });
this.logger.info(`🎯 Found "New Products Only" radio button`);
services.logger.info(`🎯 Found "New Products Only" radio button`);
await radioButton.first().click();
this.logger.info('✅ "New Products Only" radio button clicked');
services.logger.info('✅ "New Products Only" radio button clicked');
// Wait for and return headers immediately when captured
this.logger.info('⏳ Waiting for headers to be captured...');
services.logger.info('⏳ Waiting for headers to be captured...');
const headers = await headersPromise;
page.close();
if (headers) {
this.logger.info('✅ Headers captured successfully');
services.logger.info('✅ Headers captured successfully');
} else {
this.logger.warn('⚠️ No headers were captured');
services.logger.warn('⚠️ No headers were captured');
}
return headers;
} catch (error) {
this.logger.error('Failed to fetch IB symbol summary', { error });
services.logger.error('Failed to fetch IB symbol summary', { error });
return;
}
}

View file

@ -1,15 +1,16 @@
import type { IbHandler } from '../ib.handler';
import type { IServiceContainer } from '@stock-bot/handlers';
import { IB_CONFIG } from '../shared/config';
import { fetchSession } from './fetch-session.action';
export async function fetchSymbols(this: IbHandler): Promise<unknown[] | null> {
export async function fetchSymbols(services: IServiceContainer): Promise<unknown[] | null> {
try {
// First get session headers
const sessionHeaders = await this.fetchSession();
const sessionHeaders = await fetchSession(services);
if (!sessionHeaders) {
throw new Error('Failed to get session headers');
}
this.logger.info('🔍 Fetching symbols with session headers...');
services.logger.info('🔍 Fetching symbols with session headers...');
// Prepare headers - include all session headers plus any additional ones
const requestHeaders = {
@ -45,7 +46,7 @@ export async function fetchSymbols(this: IbHandler): Promise<unknown[] | null> {
});
if (!summaryResponse.ok) {
this.logger.error('❌ Summary API request failed', {
services.logger.error('❌ Summary API request failed', {
status: summaryResponse.status,
statusText: summaryResponse.statusText,
});
@ -53,14 +54,14 @@ export async function fetchSymbols(this: IbHandler): Promise<unknown[] | null> {
}
const summaryData = await summaryResponse.json();
this.logger.info('✅ IB Summary data fetched successfully', {
services.logger.info('✅ IB Summary data fetched successfully', {
totalCount: summaryData[0].totalCount,
});
const symbols = [];
requestBody.pageSize = IB_CONFIG.PAGE_SIZE;
const pageCount = Math.ceil(summaryData[0].totalCount / IB_CONFIG.PAGE_SIZE) || 0;
this.logger.info('Fetching Symbols for IB', { pageCount });
services.logger.info('Fetching Symbols for IB', { pageCount });
const symbolPromises = [];
for (let page = 1; page <= pageCount; page++) {
@ -79,7 +80,7 @@ export async function fetchSymbols(this: IbHandler): Promise<unknown[] | null> {
const responses = await Promise.all(symbolPromises);
for (const response of responses) {
if (!response.ok) {
this.logger.error('❌ Symbols API request failed', {
services.logger.error('❌ Symbols API request failed', {
status: response.status,
statusText: response.statusText,
});
@ -90,28 +91,29 @@ export async function fetchSymbols(this: IbHandler): Promise<unknown[] | null> {
if (symJson && symJson.length > 0) {
symbols.push(...symJson);
} else {
this.logger.warn('⚠️ No symbols found in response');
services.logger.warn('⚠️ No symbols found in response');
continue;
}
}
if (symbols.length === 0) {
this.logger.warn('⚠️ No symbols fetched from IB');
services.logger.warn('⚠️ No symbols fetched from IB');
return null;
}
this.logger.info('✅ IB symbols fetched successfully, saving to DB...', {
services.logger.info('✅ IB symbols fetched successfully, saving to DB...', {
totalSymbols: symbols.length,
});
await this.mongodb.batchUpsert('ib_symbols', symbols, ['symbol', 'exchangeId']);
this.logger.info('Saved IB symbols to DB', {
await services.mongodb.batchUpsert('ib_symbols', symbols, ['symbol', 'exchangeId']);
services.logger.info('Saved IB symbols to DB', {
totalSymbols: symbols.length,
});
return symbols;
} catch (error) {
this.logger.error('❌ Failed to fetch symbols', { error });
services.logger.error('❌ Failed to fetch symbols', { error });
return null;
}
}

View file

@ -14,13 +14,19 @@ export class IbHandler extends BaseHandler {
}
@Operation('fetch-session')
fetchSession = fetchSession;
async fetchSession(): Promise<Record<string, string> | undefined> {
return fetchSession(this);
}
@Operation('fetch-exchanges')
fetchExchanges = fetchExchanges;
async fetchExchanges(): Promise<unknown[] | null> {
return fetchExchanges(this);
}
@Operation('fetch-symbols')
fetchSymbols = fetchSymbols;
async fetchSymbols(): Promise<unknown[] | null> {
return fetchSymbols(this);
}
@Operation('ib-exchanges-and-symbols')
@ScheduledOperation('ib-exchanges-and-symbols', '0 0 * * 0', {
@ -28,6 +34,9 @@ export class IbHandler extends BaseHandler {
description: 'Fetch and update IB exchanges and symbols data',
immediately: false,
})
fetchExchangesAndSymbols = fetchExchangesAndSymbols;
async fetchExchangesAndSymbols(): Promise<unknown> {
return fetchExchangesAndSymbols(this);
}
}

View file

@ -4,7 +4,7 @@ import {
Operation,
QueueSchedule,
type ExecutionContext,
type IServiceContainer,
type IServiceContainer
} from '@stock-bot/handlers';
@Handler('webshare')
@ -14,7 +14,7 @@ export class WebShareHandler extends BaseHandler {
}
@Operation('fetch-proxies')
@QueueSchedule('0 */6 * * *', {
@QueueSchedule('0 */6 * * *', { // once a month
priority: 3,
immediately: true,
description: 'Fetch fresh proxies from WebShare API',