refactoring handlers
This commit is contained in:
parent
59ab0940ae
commit
ab0b7a5385
12 changed files with 1098 additions and 25 deletions
|
|
@ -0,0 +1,266 @@
|
|||
/**
|
||||
* QM Spider Operations - Symbol spider search functionality
|
||||
*/
|
||||
|
||||
import { OperationContext } from '@stock-bot/utils';
|
||||
import { QueueManager } from '@stock-bot/queue';
|
||||
import type { Logger } from '@stock-bot/logger';
|
||||
|
||||
import { QMSessionManager } from '../shared/session-manager';
|
||||
import { QM_SESSION_IDS } from '../shared/config';
|
||||
import type { SymbolSpiderJob, SpiderResult } from '../shared/types';
|
||||
import { initializeQMResources } from './session.operations';
|
||||
import { searchQMSymbolsAPI } from './symbols.operations';
|
||||
|
||||
export async function spiderSymbolSearch(
|
||||
payload: SymbolSpiderJob,
|
||||
parentLogger?: Logger
|
||||
): Promise<SpiderResult> {
|
||||
const ctx = OperationContext.create('qm', 'spider', parentLogger);
|
||||
|
||||
try {
|
||||
const { prefix, depth, source = 'qm', maxDepth = 4 } = payload;
|
||||
|
||||
ctx.logger.info('Starting spider search', {
|
||||
prefix: prefix || 'ROOT',
|
||||
depth,
|
||||
source,
|
||||
maxDepth
|
||||
});
|
||||
|
||||
// Check cache for recent results
|
||||
const cacheKey = `search-result:${prefix || 'ROOT'}:${depth}`;
|
||||
const cachedResult = await ctx.cache.get<SpiderResult>(cacheKey);
|
||||
if (cachedResult) {
|
||||
ctx.logger.debug('Using cached spider search result', { prefix, depth });
|
||||
return cachedResult;
|
||||
}
|
||||
|
||||
// Ensure resources are initialized
|
||||
const sessionManager = QMSessionManager.getInstance();
|
||||
if (!sessionManager.getInitialized()) {
|
||||
await initializeQMResources(parentLogger);
|
||||
}
|
||||
|
||||
let result: SpiderResult;
|
||||
|
||||
// Root job: Create A-Z jobs
|
||||
if (prefix === null || prefix === undefined || prefix === '') {
|
||||
result = await createAlphabetJobs(source, maxDepth, ctx);
|
||||
} else {
|
||||
// Leaf job: Search for symbols with this prefix
|
||||
result = await searchAndSpawnJobs(prefix, depth, source, maxDepth, ctx);
|
||||
}
|
||||
|
||||
// Cache the result
|
||||
await ctx.cache.set(cacheKey, result, { ttl: 3600 });
|
||||
|
||||
// Store spider operation metrics in PostgreSQL
|
||||
if (ctx.postgres) {
|
||||
try {
|
||||
await ctx.postgres.query(
|
||||
'INSERT INTO spider_stats (handler, operation, prefix, depth, symbols_found, jobs_created, search_time) VALUES ($1, $2, $3, $4, $5, $6, $7)',
|
||||
['qm', 'spider', prefix || 'ROOT', depth, result.symbolsFound, result.jobsCreated, new Date()]
|
||||
);
|
||||
} catch (error) {
|
||||
ctx.logger.warn('Failed to store spider stats in PostgreSQL', { error });
|
||||
}
|
||||
}
|
||||
|
||||
ctx.logger.info('Spider search completed', {
|
||||
prefix: prefix || 'ROOT',
|
||||
depth,
|
||||
success: result.success,
|
||||
symbolsFound: result.symbolsFound,
|
||||
jobsCreated: result.jobsCreated
|
||||
});
|
||||
|
||||
return result;
|
||||
|
||||
} catch (error) {
|
||||
ctx.logger.error('Spider symbol search failed', { error, payload });
|
||||
const failedResult = { success: false, symbolsFound: 0, jobsCreated: 0 };
|
||||
|
||||
// Cache failed result for a shorter time
|
||||
const cacheKey = `search-result:${payload.prefix || 'ROOT'}:${payload.depth}`;
|
||||
await ctx.cache.set(cacheKey, failedResult, { ttl: 300 });
|
||||
|
||||
return failedResult;
|
||||
}
|
||||
}
|
||||
|
||||
async function createAlphabetJobs(
|
||||
source: string,
|
||||
maxDepth: number,
|
||||
ctx: OperationContext
|
||||
): Promise<SpiderResult> {
|
||||
try {
|
||||
const queueManager = QueueManager.getInstance();
|
||||
const queue = queueManager.getQueue('qm');
|
||||
let jobsCreated = 0;
|
||||
|
||||
ctx.logger.info('Creating alphabet jobs (A-Z)');
|
||||
|
||||
// Create jobs for A-Z
|
||||
for (let i = 0; i < 26; i++) {
|
||||
const letter = String.fromCharCode(65 + i); // A=65, B=66, etc.
|
||||
|
||||
const job: SymbolSpiderJob = {
|
||||
prefix: letter,
|
||||
depth: 1,
|
||||
source,
|
||||
maxDepth,
|
||||
};
|
||||
|
||||
await queue.add(
|
||||
'spider-symbol-search',
|
||||
{
|
||||
handler: 'qm',
|
||||
operation: 'spider-symbol-search',
|
||||
payload: job,
|
||||
},
|
||||
{
|
||||
priority: 5,
|
||||
delay: i * 100, // Stagger jobs by 100ms
|
||||
attempts: 3,
|
||||
backoff: { type: 'exponential', delay: 2000 },
|
||||
}
|
||||
);
|
||||
|
||||
jobsCreated++;
|
||||
}
|
||||
|
||||
// Cache alphabet job creation
|
||||
await ctx.cache.set('alphabet-jobs-created', {
|
||||
count: jobsCreated,
|
||||
timestamp: new Date().toISOString(),
|
||||
source,
|
||||
maxDepth
|
||||
}, { ttl: 3600 });
|
||||
|
||||
ctx.logger.info(`Created ${jobsCreated} alphabet jobs (A-Z)`);
|
||||
return { success: true, symbolsFound: 0, jobsCreated };
|
||||
|
||||
} catch (error) {
|
||||
ctx.logger.error('Failed to create alphabet jobs', { error });
|
||||
return { success: false, symbolsFound: 0, jobsCreated: 0 };
|
||||
}
|
||||
}
|
||||
|
||||
async function searchAndSpawnJobs(
|
||||
prefix: string,
|
||||
depth: number,
|
||||
source: string,
|
||||
maxDepth: number,
|
||||
ctx: OperationContext
|
||||
): Promise<SpiderResult> {
|
||||
try {
|
||||
// Ensure sessions exist for symbol search
|
||||
const sessionManager = QMSessionManager.getInstance();
|
||||
const lookupSession = sessionManager.getSession(QM_SESSION_IDS.LOOKUP);
|
||||
|
||||
if (!lookupSession) {
|
||||
ctx.logger.info('No lookup sessions available, creating sessions first...');
|
||||
const { createSessions } = await import('./session.operations');
|
||||
await createSessions(ctx.logger);
|
||||
|
||||
// Wait a bit for session creation
|
||||
await new Promise(resolve => setTimeout(resolve, 1000));
|
||||
}
|
||||
|
||||
// Search for symbols with this prefix
|
||||
const symbols = await searchQMSymbolsAPI(prefix, ctx.logger);
|
||||
const symbolCount = symbols.length;
|
||||
|
||||
ctx.logger.info(`Prefix "${prefix}" returned ${symbolCount} symbols`);
|
||||
|
||||
let jobsCreated = 0;
|
||||
|
||||
// Store symbols in MongoDB
|
||||
if (ctx.mongodb && symbols.length > 0) {
|
||||
try {
|
||||
const updatedSymbols = symbols.map((symbol: Record<string, unknown>) => ({
|
||||
...symbol,
|
||||
qmSearchCode: symbol.symbol,
|
||||
symbol: (symbol.symbol as string)?.split(':')[0],
|
||||
searchPrefix: prefix,
|
||||
searchDepth: depth,
|
||||
discoveredAt: new Date()
|
||||
}));
|
||||
|
||||
await ctx.mongodb.batchUpsert('qmSymbols', updatedSymbols, ['qmSearchCode']);
|
||||
ctx.logger.debug('Stored symbols in MongoDB', { count: symbols.length });
|
||||
} catch (error) {
|
||||
ctx.logger.warn('Failed to store symbols in MongoDB', { error });
|
||||
}
|
||||
}
|
||||
|
||||
// If we have 50+ symbols and haven't reached max depth, spawn sub-jobs
|
||||
if (symbolCount >= 50 && depth < maxDepth) {
|
||||
const queueManager = QueueManager.getInstance();
|
||||
const queue = queueManager.getQueue('qm');
|
||||
|
||||
ctx.logger.info(`Spawning sub-jobs for prefix "${prefix}" (${symbolCount} >= 50 symbols)`);
|
||||
|
||||
// Create jobs for prefixA, prefixB, prefixC... prefixZ
|
||||
for (let i = 0; i < 26; i++) {
|
||||
const letter = String.fromCharCode(65 + i);
|
||||
const newPrefix = prefix + letter;
|
||||
|
||||
const job: SymbolSpiderJob = {
|
||||
prefix: newPrefix,
|
||||
depth: depth + 1,
|
||||
source,
|
||||
maxDepth,
|
||||
};
|
||||
|
||||
await queue.add(
|
||||
'spider-symbol-search',
|
||||
{
|
||||
handler: 'qm',
|
||||
operation: 'spider-symbol-search',
|
||||
payload: job,
|
||||
},
|
||||
{
|
||||
priority: Math.max(1, 6 - depth), // Higher priority for deeper jobs
|
||||
delay: i * 50, // Stagger sub-jobs by 50ms
|
||||
attempts: 3,
|
||||
backoff: { type: 'exponential', delay: 2000 },
|
||||
}
|
||||
);
|
||||
|
||||
jobsCreated++;
|
||||
}
|
||||
|
||||
// Cache sub-job creation info
|
||||
await ctx.cache.set(`sub-jobs:${prefix}`, {
|
||||
parentPrefix: prefix,
|
||||
depth,
|
||||
symbolCount,
|
||||
jobsCreated,
|
||||
timestamp: new Date().toISOString()
|
||||
}, { ttl: 3600 });
|
||||
|
||||
ctx.logger.info(`Created ${jobsCreated} sub-jobs for prefix "${prefix}"`);
|
||||
} else {
|
||||
// Terminal case: save symbols (already done above)
|
||||
ctx.logger.info(`Terminal case for prefix "${prefix}": ${symbolCount} symbols saved`);
|
||||
|
||||
// Cache terminal case info
|
||||
await ctx.cache.set(`terminal:${prefix}`, {
|
||||
prefix,
|
||||
depth,
|
||||
symbolCount,
|
||||
isTerminal: true,
|
||||
reason: symbolCount < 50 ? 'insufficient_symbols' : 'max_depth_reached',
|
||||
timestamp: new Date().toISOString()
|
||||
}, { ttl: 3600 });
|
||||
}
|
||||
|
||||
return { success: true, symbolsFound: symbolCount, jobsCreated };
|
||||
|
||||
} catch (error) {
|
||||
ctx.logger.error(`Failed to search and spawn jobs for prefix "${prefix}"`, { error, depth });
|
||||
return { success: false, symbolsFound: 0, jobsCreated: 0 };
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue