stock-bot/apps/data-service/src/handlers/qm/operations/spider.operations.ts

268 lines
No EOL
8.1 KiB
TypeScript

/**
* QM Spider Operations - Symbol spider search functionality
*/
import { OperationContext } from '@stock-bot/utils';
import { QueueManager } from '@stock-bot/queue';
import { QMSessionManager } from '../shared/session-manager';
import { QM_SESSION_IDS } from '../shared/config';
import type { SymbolSpiderJob, SpiderResult } from '../shared/types';
import { initializeQMResources } from './session.operations';
import { searchQMSymbolsAPI } from './symbols.operations';
export async function spiderSymbolSearch(
payload: SymbolSpiderJob
): Promise<SpiderResult> {
const ctx = OperationContext.create('qm', 'spider');
try {
const { prefix, depth, source = 'qm', maxDepth = 4 } = payload;
ctx.logger.info('Starting spider search', {
prefix: prefix || 'ROOT',
depth,
source,
maxDepth
});
// Check cache for recent results
const cacheKey = `search-result:${prefix || 'ROOT'}:${depth}`;
const cachedResult = await ctx.cache.get<SpiderResult>(cacheKey);
if (cachedResult) {
ctx.logger.debug('Using cached spider search result', { prefix, depth });
return cachedResult;
}
// Ensure resources are initialized
const sessionManager = QMSessionManager.getInstance();
if (!sessionManager.getInitialized()) {
await initializeQMResources();
}
let result: SpiderResult;
// Root job: Create A-Z jobs
if (prefix === null || prefix === undefined || prefix === '') {
result = await createAlphabetJobs(source, maxDepth, ctx);
} else {
// Leaf job: Search for symbols with this prefix
result = await searchAndSpawnJobs(prefix, depth, source, maxDepth, ctx);
}
// Cache the result
await ctx.cache.set(cacheKey, result, { ttl: 3600 });
// Store spider operation metrics in cache instead of PostgreSQL for now
try {
const statsKey = `spider-stats:${prefix || 'ROOT'}:${depth}:${Date.now()}`;
await ctx.cache.set(statsKey, {
handler: 'qm',
operation: 'spider',
prefix: prefix || 'ROOT',
depth,
symbolsFound: result.symbolsFound,
jobsCreated: result.jobsCreated,
searchTime: new Date().toISOString()
}, { ttl: 86400 }); // Keep for 24 hours
} catch (error) {
ctx.logger.debug('Failed to store spider stats in cache', { error });
}
ctx.logger.info('Spider search completed', {
prefix: prefix || 'ROOT',
depth,
success: result.success,
symbolsFound: result.symbolsFound,
jobsCreated: result.jobsCreated
});
return result;
} catch (error) {
ctx.logger.error('Spider symbol search failed', { error, payload });
const failedResult = { success: false, symbolsFound: 0, jobsCreated: 0 };
// Cache failed result for a shorter time
const cacheKey = `search-result:${payload.prefix || 'ROOT'}:${payload.depth}`;
await ctx.cache.set(cacheKey, failedResult, { ttl: 300 });
return failedResult;
}
}
async function createAlphabetJobs(
source: string,
maxDepth: number,
ctx: OperationContext
): Promise<SpiderResult> {
try {
const queueManager = QueueManager.getInstance();
const queue = queueManager.getQueue('qm');
let jobsCreated = 0;
ctx.logger.info('Creating alphabet jobs (A-Z)');
// Create jobs for A-Z
for (let i = 0; i < 26; i++) {
const letter = String.fromCharCode(65 + i); // A=65, B=66, etc.
const job: SymbolSpiderJob = {
prefix: letter,
depth: 1,
source,
maxDepth,
};
await queue.add(
'spider-symbol-search',
{
handler: 'qm',
operation: 'spider-symbol-search',
payload: job,
},
{
priority: 5,
delay: i * 100, // Stagger jobs by 100ms
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
}
);
jobsCreated++;
}
// Cache alphabet job creation
await ctx.cache.set('alphabet-jobs-created', {
count: jobsCreated,
timestamp: new Date().toISOString(),
source,
maxDepth
}, { ttl: 3600 });
ctx.logger.info(`Created ${jobsCreated} alphabet jobs (A-Z)`);
return { success: true, symbolsFound: 0, jobsCreated };
} catch (error) {
ctx.logger.error('Failed to create alphabet jobs', { error });
return { success: false, symbolsFound: 0, jobsCreated: 0 };
}
}
async function searchAndSpawnJobs(
prefix: string,
depth: number,
source: string,
maxDepth: number,
ctx: OperationContext
): Promise<SpiderResult> {
try {
// Ensure sessions exist for symbol search
const sessionManager = QMSessionManager.getInstance();
const lookupSession = sessionManager.getSession(QM_SESSION_IDS.LOOKUP);
if (!lookupSession) {
ctx.logger.info('No lookup sessions available, creating sessions first...');
const { createSessions } = await import('./session.operations');
await createSessions();
// Wait a bit for session creation
await new Promise(resolve => setTimeout(resolve, 1000));
}
// Search for symbols with this prefix
const symbols = await searchQMSymbolsAPI(prefix);
const symbolCount = symbols.length;
ctx.logger.info(`Prefix "${prefix}" returned ${symbolCount} symbols`);
let jobsCreated = 0;
// Store symbols in MongoDB
if (ctx.mongodb && symbols.length > 0) {
try {
const updatedSymbols = symbols.map((symbol: Record<string, unknown>) => ({
...symbol,
qmSearchCode: symbol.symbol,
symbol: (symbol.symbol as string)?.split(':')[0],
searchPrefix: prefix,
searchDepth: depth,
discoveredAt: new Date()
}));
await ctx.mongodb.batchUpsert('qmSymbols', updatedSymbols, ['qmSearchCode']);
ctx.logger.debug('Stored symbols in MongoDB', { count: symbols.length });
} catch (error) {
ctx.logger.warn('Failed to store symbols in MongoDB', { error });
}
}
// If we have 50+ symbols and haven't reached max depth, spawn sub-jobs
if (symbolCount >= 50 && depth < maxDepth) {
const queueManager = QueueManager.getInstance();
const queue = queueManager.getQueue('qm');
ctx.logger.info(`Spawning sub-jobs for prefix "${prefix}" (${symbolCount} >= 50 symbols)`);
// Create jobs for prefixA, prefixB, prefixC... prefixZ
for (let i = 0; i < 26; i++) {
const letter = String.fromCharCode(65 + i);
const newPrefix = prefix + letter;
const job: SymbolSpiderJob = {
prefix: newPrefix,
depth: depth + 1,
source,
maxDepth,
};
await queue.add(
'spider-symbol-search',
{
handler: 'qm',
operation: 'spider-symbol-search',
payload: job,
},
{
priority: Math.max(1, 6 - depth), // Higher priority for deeper jobs
delay: i * 50, // Stagger sub-jobs by 50ms
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
}
);
jobsCreated++;
}
// Cache sub-job creation info
await ctx.cache.set(`sub-jobs:${prefix}`, {
parentPrefix: prefix,
depth,
symbolCount,
jobsCreated,
timestamp: new Date().toISOString()
}, { ttl: 3600 });
ctx.logger.info(`Created ${jobsCreated} sub-jobs for prefix "${prefix}"`);
} else {
// Terminal case: save symbols (already done above)
ctx.logger.info(`Terminal case for prefix "${prefix}": ${symbolCount} symbols saved`);
// Cache terminal case info
await ctx.cache.set(`terminal:${prefix}`, {
prefix,
depth,
symbolCount,
isTerminal: true,
reason: symbolCount < 50 ? 'insufficient_symbols' : 'max_depth_reached',
timestamp: new Date().toISOString()
}, { ttl: 3600 });
}
return { success: true, symbolsFound: symbolCount, jobsCreated };
} catch (error) {
ctx.logger.error(`Failed to search and spawn jobs for prefix "${prefix}"`, { error, depth });
return { success: false, symbolsFound: 0, jobsCreated: 0 };
}
}