stock-bot/apps/data-ingestion/src/handlers/qm/operations/symbols.operations.ts

200 lines
No EOL
6.2 KiB
TypeScript

/**
* QM Symbols Operations - Symbol fetching and API interactions
*/
import { OperationContext } from '@stock-bot/utils';
import { getRandomProxy } from '@stock-bot/utils';
import type { ServiceContainer } from '@stock-bot/connection-factory';
import { QMSessionManager } from '../shared/session-manager';
import { QM_SESSION_IDS, QM_CONFIG, SESSION_CONFIG } from '../shared/config';
import type { SymbolSpiderJob, Exchange } from '../shared/types';
import { initializeQMResources } from './session.operations';
import { spiderSymbolSearch } from './spider.operations';
export async function fetchSymbols(container: ServiceContainer): Promise<unknown[] | null> {
const ctx = OperationContext.create('qm', 'symbols', { container });
try {
const sessionManager = QMSessionManager.getInstance();
if (!sessionManager.getInitialized()) {
await initializeQMResources(container);
}
ctx.logger.info('Starting QM spider-based symbol search...');
// Check if we have a recent symbol fetch
const lastFetch = await ctx.cache.get('last-symbol-fetch');
if (lastFetch) {
ctx.logger.info('Recent symbol fetch found, using spider search');
}
// Start the spider process with root job
const rootJob: SymbolSpiderJob = {
prefix: null, // Root job creates A-Z jobs
depth: 0,
source: 'qm',
maxDepth: 4,
};
const result = await spiderSymbolSearch(rootJob);
if (result.success) {
// Cache successful fetch info
await ctx.cache.set('last-symbol-fetch', {
timestamp: new Date().toISOString(),
jobsCreated: result.jobsCreated,
success: true
}, { ttl: 3600 });
ctx.logger.info(
`QM spider search initiated successfully. Created ${result.jobsCreated} initial jobs`
);
return [`Spider search initiated with ${result.jobsCreated} jobs`];
} else {
ctx.logger.error('Failed to initiate QM spider search');
return null;
}
} catch (error) {
ctx.logger.error('Failed to start QM spider symbol search', { error });
return null;
} finally {
await ctx.dispose();
}
}
export async function searchQMSymbolsAPI(query: string, container: ServiceContainer): Promise<any[]> {
const ctx = OperationContext.create('qm', 'api-search', { container });
const proxyInfo = await getRandomProxy();
if (!proxyInfo) {
throw new Error('No proxy available for QM API call');
}
const sessionManager = QMSessionManager.getInstance();
const session = sessionManager.getSession(QM_SESSION_IDS.LOOKUP);
if (!session) {
throw new Error(`No active session found for QM API with ID: ${QM_SESSION_IDS.LOOKUP}`);
}
try {
ctx.logger.debug('Searching QM symbols API', { query, proxy: session.proxy });
// Check cache for recent API results
const cacheKey = `api-search:${query}`;
const cachedResult = await ctx.cache.get(cacheKey);
if (cachedResult) {
ctx.logger.debug('Using cached API search result', { query });
return cachedResult;
}
// QM lookup endpoint for symbol search
const searchParams = new URLSearchParams({
marketType: 'equity',
pathName: '/demo/portal/company-summary.php',
q: query,
qmodTool: 'SmartSymbolLookup',
searchType: 'symbol',
showFree: 'false',
showHisa: 'false',
webmasterId: '500'
});
const apiUrl = `${QM_CONFIG.LOOKUP_URL}?${searchParams.toString()}`;
const response = await fetch(apiUrl, {
method: 'GET',
headers: session.headers,
signal: AbortSignal.timeout(SESSION_CONFIG.API_TIMEOUT),
});
if (!response.ok) {
throw new Error(`QM API request failed: ${response.status} ${response.statusText}`);
}
const symbols = await response.json();
// Update session stats
session.successfulCalls++;
session.lastUsed = new Date();
// Process symbols and extract exchanges
if (ctx.mongodb && symbols.length > 0) {
try {
const updatedSymbols = symbols.map((symbol: Record<string, unknown>) => ({
...symbol,
qmSearchCode: symbol.symbol,
symbol: (symbol.symbol as string)?.split(':')[0],
searchQuery: query,
fetchedAt: new Date()
}));
await ctx.mongodb.batchUpsert('qmSymbols', updatedSymbols, ['qmSearchCode']);
// Extract and store unique exchanges
const exchanges: Exchange[] = [];
for (const symbol of symbols) {
if (!exchanges.some(ex => ex.exchange === symbol.exchange)) {
exchanges.push({
exchange: symbol.exchange,
exchangeCode: symbol.exchangeCode,
exchangeShortName: symbol.exchangeShortName,
countryCode: symbol.countryCode,
source: 'qm',
});
}
}
if (exchanges.length > 0) {
await ctx.mongodb.batchUpsert('qmExchanges', exchanges, ['exchange']);
ctx.logger.debug('Stored exchanges in MongoDB', { count: exchanges.length });
}
} catch (error) {
ctx.logger.warn('Failed to store symbols/exchanges in MongoDB', { error });
}
}
// Cache the result
await ctx.cache.set(cacheKey, symbols, { ttl: 1800 }); // 30 minutes
// Store API call stats
await ctx.cache.set(`api-stats:${query}:${Date.now()}`, {
query,
symbolCount: symbols.length,
proxy: session.proxy,
success: true,
timestamp: new Date().toISOString()
}, { ttl: 3600 });
ctx.logger.info(
`QM API returned ${symbols.length} symbols for query: ${query}`,
{ proxy: session.proxy, symbolCount: symbols.length }
);
return symbols;
} catch (error) {
// Update session failure stats
session.failedCalls++;
session.lastUsed = new Date();
// Cache failed API call info
await ctx.cache.set(`api-failure:${query}:${Date.now()}`, {
query,
error: error.message,
proxy: session.proxy,
timestamp: new Date().toISOString()
}, { ttl: 600 });
ctx.logger.error(`Error searching QM symbols for query "${query}"`, {
error: error.message,
proxy: session.proxy
});
throw error;
} finally {
await ctx.dispose();
}
}