This commit is contained in:
Boki 2025-06-21 23:13:07 -04:00
parent 0c77449584
commit ca1f658be6
9 changed files with 170 additions and 766 deletions

View file

@ -0,0 +1,21 @@
/**
* QM Exchanges Operations - Simple exchange data fetching
*/
import type { IServiceContainer } from '@stock-bot/handlers';
export async function fetchExchanges(services: IServiceContainer): Promise<any[]> {
// Get exchanges from MongoDB
const exchanges = await services.mongodb.collection('qm_exchanges')
.find({}).toArray();
return exchanges;
}
export async function getExchangeByCode(services: IServiceContainer, code: string): Promise<any> {
// Get specific exchange by code
const exchange = await services.mongodb.collection('qm_exchanges')
.findOne({ code });
return exchange;
}

View file

@ -0,0 +1,81 @@
/**
* QM Session Actions - Session management and creation
*/
import type { IServiceContainer } from '@stock-bot/handlers';
import { QM_SESSION_IDS, SESSION_CONFIG } from '../shared/config';
import { QMSessionManager } from '../shared/session-manager';
/**
* Check existing sessions and queue creation jobs for needed sessions
*/
export async function checkSessions(services: IServiceContainer): Promise<{
cleaned: number;
queued: number;
message: string;
}> {
const sessionManager = QMSessionManager.getInstance();
const cleanedCount = sessionManager.cleanupFailedSessions();
// Check which session IDs need more sessions and queue creation jobs
let queuedCount = 0;
for (const sessionId of Object.values(QM_SESSION_IDS)) {
if (sessionManager.needsMoreSessions(sessionId)) {
const currentCount = sessionManager.getSessions(sessionId).length;
const neededSessions = SESSION_CONFIG.MAX_SESSIONS - currentCount;
for (let i = 0; i < neededSessions; i++) {
await services.queue.getQueue('qm').add('create-session', { sessionId });
services.logger.log(`Queued job to create session for ${sessionId}`);
queuedCount++;
}
}
}
return {
cleaned: cleanedCount,
queued: queuedCount,
message: `Session check completed: cleaned ${cleanedCount}, queued ${queuedCount}`
};
}
/**
* Create a single session for a specific session ID
*/
export async function createSingleSession(
services: IServiceContainer,
input: any
): Promise<{ sessionId: string; status: string; sessionType: string }> {
const { sessionId: sessionType = 'default' } = input || {};
const sessionManager = QMSessionManager.getInstance();
// Check if we're at capacity for this session type
if (sessionManager.isAtCapacity(sessionType)) {
return {
sessionId: '',
status: 'skipped',
sessionType,
};
}
// TODO: Get actual proxy and headers from proxy service
const session = {
proxy: 'http://proxy:8080', // Placeholder
headers: {
'User-Agent': 'Mozilla/5.0 (compatible; QMBot/1.0)',
'Accept': 'application/json'
},
successfulCalls: 0,
failedCalls: 0,
lastUsed: new Date()
};
// Add session to manager
sessionManager.addSession(sessionType, session);
return {
sessionId: sessionType,
status: 'created',
sessionType
};
}

View file

@ -0,0 +1,34 @@
/**
* QM Spider Operations - Simple symbol discovery
*/
import type { IServiceContainer } from '@stock-bot/handlers';
import type { SymbolSpiderJob } from '../shared/types';
export async function spiderSymbolSearch(
services: IServiceContainer,
config: SymbolSpiderJob
): Promise<{ foundSymbols: number; depth: number }> {
// Simple spider implementation
// TODO: Implement actual API calls to discover symbols
// For now, just return mock results
const foundSymbols = Math.floor(Math.random() * 10) + 1;
return {
foundSymbols,
depth: config.depth
};
}
export async function queueSymbolDiscovery(
services: IServiceContainer,
searchTerms: string[]
): Promise<void> {
// Queue symbol discovery jobs
for (const term of searchTerms) {
// TODO: Queue actual discovery jobs
await services.cache.set(`discovery:${term}`, { queued: true }, 3600);
}
}

View file

@ -0,0 +1,21 @@
/**
* QM Symbols Operations - Simple symbol fetching
*/
import type { IServiceContainer } from '@stock-bot/handlers';
export async function searchSymbols(services: IServiceContainer): Promise<any[]> {
// Get symbols from MongoDB
const symbols = await services.mongodb.collection('qm_symbols')
.find({}).limit(50).toArray();
return symbols;
}
export async function fetchSymbolData(services: IServiceContainer, symbol: string): Promise<any> {
// Fetch data for a specific symbol
const symbolData = await services.mongodb.collection('qm_symbols')
.findOne({ symbol });
return symbolData;
}

View file

@ -1,44 +0,0 @@
/**
* QM Exchanges Operations - Exchange fetching functionality
*/
import { OperationContext } from '@stock-bot/di';
import type { ServiceContainer } from '@stock-bot/di';
import { initializeQMResources } from './session.operations';
export async function fetchExchanges(container: ServiceContainer): Promise<unknown[] | null> {
const ctx = OperationContext.create('qm', 'exchanges', { container });
try {
// Ensure resources are initialized
const { QMSessionManager } = await import('../shared/session-manager');
const sessionManager = QMSessionManager.getInstance();
if (!sessionManager.getInitialized()) {
await initializeQMResources(container);
}
ctx.logger.info('QM exchanges fetch - not implemented yet');
// Cache the "not implemented" status
await ctx.cache.set('fetch-status', {
implemented: false,
message: 'QM exchanges fetching not yet implemented',
timestamp: new Date().toISOString()
}, { ttl: 3600 });
// TODO: Implement QM exchanges fetching logic
// This could involve:
// 1. Querying existing exchanges from MongoDB
// 2. Making API calls to discover new exchanges
// 3. Processing and storing exchange metadata
return null;
} catch (error) {
ctx.logger.error('Failed to fetch QM exchanges', { error });
return null;
} finally {
await ctx.dispose();
}
}

View file

@ -1,199 +0,0 @@
/**
* QM Session Operations - Session creation and management
*/
import type { ServiceContainer } from '@stock-bot/di';
import { OperationContext } from '@stock-bot/di';
import { isShutdownSignalReceived } from '@stock-bot/shutdown';
import { getRandomProxy } from '@stock-bot/utils';
import { QM_CONFIG, QM_SESSION_IDS, SESSION_CONFIG, getQmHeaders } from '../shared/config';
import { QMSessionManager } from '../shared/session-manager';
import type { QMSession } from '../shared/types';
export async function createSessions(container: ServiceContainer): Promise<void> {
const ctx = OperationContext.create('qm-handler', 'create-sessions', {container});
try {
ctx.logger.info('Creating QM sessions...');
// Get session manager instance
const sessionManager = QMSessionManager.getInstance();
// Check if already initialized
if (!sessionManager.getInitialized()) {
await initializeQMResources(container);
}
// Clean up failed sessions first
const removedCount = sessionManager.cleanupFailedSessions();
if (removedCount > 0) {
ctx.logger.info(`Cleaned up ${removedCount} failed sessions`);
}
// Cache session creation stats
const initialStats = sessionManager.getStats();
const cache = ctx.resolve<any>('cache');
await cache.set('pre-creation-stats', initialStats, { ttl: 300 });
// Create sessions for each session ID that needs them
for (const [sessionKey, sessionId] of Object.entries(QM_SESSION_IDS)) {
if (sessionManager.isAtCapacity(sessionId)) {
ctx.logger.debug(`Session ID ${sessionKey} is at capacity, skipping`);
continue;
}
while (sessionManager.needsMoreSessions(sessionId)) {
if (isShutdownSignalReceived()) {
ctx.logger.info('Shutting down, skipping session creation');
return;
}
await createSingleSession(sessionId, sessionKey, ctx);
}
}
// Cache final stats and session count
const finalStats = sessionManager.getStats();
const totalSessions = sessionManager.getSessionCount();
await cache.set('post-creation-stats', finalStats, { ttl: 3600 });
await cache.set('session-count', totalSessions, { ttl: 900 });
await cache.set('last-session-creation', new Date().toISOString());
ctx.logger.info('QM session creation completed', {
totalSessions,
sessionStats: finalStats
});
} catch (error) {
ctx.logger.error('Failed to create QM sessions', { error });
throw error;
}
}
async function createSingleSession(
sessionId: string,
sessionKey: string,
ctx: OperationContext
): Promise<void> {
ctx.logger.debug(`Creating new session for ${sessionKey}`, { sessionId });
const proxyInfo = await getRandomProxy();
if (!proxyInfo) {
ctx.logger.error('No proxy available for QM session creation');
return;
}
// Convert ProxyInfo to string format
const auth = proxyInfo.username && proxyInfo.password ?
`${proxyInfo.username}:${proxyInfo.password}@` : '';
const proxy = `${proxyInfo.protocol}://${auth}${proxyInfo.host}:${proxyInfo.port}`;
const newSession: QMSession = {
proxy: proxy,
headers: getQmHeaders(),
successfulCalls: 0,
failedCalls: 0,
lastUsed: new Date(),
};
try {
const sessionResponse = await fetch(
`${QM_CONFIG.BASE_URL}${QM_CONFIG.AUTH_PATH}/${sessionId}`,
{
method: 'GET',
headers: newSession.headers,
signal: AbortSignal.timeout(SESSION_CONFIG.SESSION_TIMEOUT),
}
);
ctx.logger.debug('Session response received', {
status: sessionResponse.status,
sessionKey,
});
if (!sessionResponse.ok) {
ctx.logger.error('Failed to create QM session', {
sessionKey,
sessionId,
status: sessionResponse.status,
statusText: sessionResponse.statusText,
});
return;
}
const sessionData = await sessionResponse.json();
// Add token to headers
newSession.headers['Datatool-Token'] = sessionData.token;
// Add session to manager
const sessionManager = QMSessionManager.getInstance();
sessionManager.addSession(sessionId, newSession);
// Cache successful session creation
const cacheService = ctx.resolve<any>('cache');
await cacheService.set(
`successful-session:${sessionKey}:${Date.now()}`,
{ sessionId, proxy, tokenExists: !!sessionData.token },
{ ttl: 300 }
);
ctx.logger.info('QM session created successfully', {
sessionKey,
sessionId,
proxy: newSession.proxy,
sessionCount: sessionManager.getSessions(sessionId).length,
hasToken: !!sessionData.token
});
} catch (error) {
if (error.name === 'TimeoutError') {
ctx.logger.warn('QM session creation timed out', { sessionKey, sessionId });
} else {
ctx.logger.error('Error creating QM session', { sessionKey, sessionId, error });
}
// Cache failed session attempt for debugging
const cacheService = ctx.resolve<any>('cache');
await cacheService.set(
`failed-session:${sessionKey}:${Date.now()}`,
{ sessionId, proxy, error: error.message },
{ ttl: 300 }
);
}
}
export async function initializeQMResources(container?: ServiceContainer): Promise<void> {
if (!container) {
throw new Error('Service container is required for QM resource initialization');
}
const ctx = new OperationContext('qm-handler', 'initialize-resources', container);
try {
const cache = ctx.resolve<any>('cache');
// Check if already initialized
const alreadyInitialized = await cache.get('initialized');
if (alreadyInitialized) {
ctx.logger.debug('QM resources already initialized');
return;
}
ctx.logger.debug('Initializing QM resources...');
// Mark as initialized in cache and session manager
await cache.set('initialized', true, { ttl: 3600 });
await cache.set('initialization-time', new Date().toISOString());
const sessionManager = QMSessionManager.getInstance();
sessionManager.setInitialized(true);
ctx.logger.info('QM resources initialized successfully');
} catch (error) {
ctx.logger.error('Failed to initialize QM resources', { error });
throw error;
}
}

View file

@ -1,273 +0,0 @@
/**
* QM Spider Operations - Symbol spider search functionality
*/
import { OperationContext } from '@stock-bot/di';
import { QueueManager } from '@stock-bot/queue';
import { QMSessionManager } from '../shared/session-manager';
import { QM_SESSION_IDS } from '../shared/config';
import type { ServiceContainer } from '@stock-bot/di';
import type { SymbolSpiderJob, SpiderResult } from '../shared/types';
import { initializeQMResources } from './session.operations';
import { searchQMSymbolsAPI } from './symbols.operations';
export async function spiderSymbolSearch(
payload: SymbolSpiderJob,
container: ServiceContainer
): Promise<SpiderResult> {
const ctx = OperationContext.create('qm', 'spider', { container });
try {
const { prefix, depth, source = 'qm', maxDepth = 4 } = payload;
ctx.logger.info('Starting spider search', {
prefix: prefix || 'ROOT',
depth,
source,
maxDepth
});
// Check cache for recent results
const cacheKey = `search-result:${prefix || 'ROOT'}:${depth}`;
const cachedResult = await ctx.cache.get<SpiderResult>(cacheKey);
if (cachedResult) {
ctx.logger.debug('Using cached spider search result', { prefix, depth });
return cachedResult;
}
// Ensure resources are initialized
const sessionManager = QMSessionManager.getInstance();
if (!sessionManager.getInitialized()) {
await initializeQMResources(container);
}
let result: SpiderResult;
// Root job: Create A-Z jobs
if (prefix === null || prefix === undefined || prefix === '') {
result = await createAlphabetJobs(source, maxDepth, ctx);
} else {
// Leaf job: Search for symbols with this prefix
result = await searchAndSpawnJobs(prefix, depth, source, maxDepth, ctx, container);
}
// Cache the result
await ctx.cache.set(cacheKey, result, { ttl: 3600 });
// Store spider operation metrics in cache instead of PostgreSQL for now
try {
const statsKey = `spider-stats:${prefix || 'ROOT'}:${depth}:${Date.now()}`;
await ctx.cache.set(statsKey, {
handler: 'qm',
operation: 'spider',
prefix: prefix || 'ROOT',
depth,
symbolsFound: result.symbolsFound,
jobsCreated: result.jobsCreated,
searchTime: new Date().toISOString()
}, { ttl: 86400 }); // Keep for 24 hours
} catch (error) {
ctx.logger.debug('Failed to store spider stats in cache', { error });
}
ctx.logger.info('Spider search completed', {
prefix: prefix || 'ROOT',
depth,
success: result.success,
symbolsFound: result.symbolsFound,
jobsCreated: result.jobsCreated
});
return result;
} catch (error) {
ctx.logger.error('Spider symbol search failed', { error, payload });
const failedResult = { success: false, symbolsFound: 0, jobsCreated: 0 };
// Cache failed result for a shorter time
const cacheKey = `search-result:${payload.prefix || 'ROOT'}:${payload.depth}`;
await ctx.cache.set(cacheKey, failedResult, { ttl: 300 });
return failedResult;
} finally {
await ctx.dispose();
}
}
async function createAlphabetJobs(
source: string,
maxDepth: number,
ctx: OperationContext
): Promise<SpiderResult> {
try {
const queueManager = QueueManager.getInstance();
const queue = queueManager.getQueue('qm');
let jobsCreated = 0;
ctx.logger.info('Creating alphabet jobs (A-Z)');
// Create jobs for A-Z
for (let i = 0; i < 26; i++) {
const letter = String.fromCharCode(65 + i); // A=65, B=66, etc.
const job: SymbolSpiderJob = {
prefix: letter,
depth: 1,
source,
maxDepth,
};
await queue.add(
'spider-symbol-search',
{
handler: 'qm',
operation: 'spider-symbol-search',
payload: job,
},
{
priority: 5,
delay: i * 100, // Stagger jobs by 100ms
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
}
);
jobsCreated++;
}
// Cache alphabet job creation
await ctx.cache.set('alphabet-jobs-created', {
count: jobsCreated,
timestamp: new Date().toISOString(),
source,
maxDepth
}, { ttl: 3600 });
ctx.logger.info(`Created ${jobsCreated} alphabet jobs (A-Z)`);
return { success: true, symbolsFound: 0, jobsCreated };
} catch (error) {
ctx.logger.error('Failed to create alphabet jobs', { error });
return { success: false, symbolsFound: 0, jobsCreated: 0 };
}
}
async function searchAndSpawnJobs(
prefix: string,
depth: number,
source: string,
maxDepth: number,
ctx: OperationContext,
container: ServiceContainer
): Promise<SpiderResult> {
try {
// Ensure sessions exist for symbol search
const sessionManager = QMSessionManager.getInstance();
const lookupSession = sessionManager.getSession(QM_SESSION_IDS.LOOKUP);
if (!lookupSession) {
ctx.logger.info('No lookup sessions available, creating sessions first...');
const { createSessions } = await import('./session.operations');
await createSessions(container);
// Wait a bit for session creation
await new Promise(resolve => setTimeout(resolve, 1000));
}
// Search for symbols with this prefix
const symbols = await searchQMSymbolsAPI(prefix, container);
const symbolCount = symbols.length;
ctx.logger.info(`Prefix "${prefix}" returned ${symbolCount} symbols`);
let jobsCreated = 0;
// Store symbols in MongoDB
if (ctx.mongodb && symbols.length > 0) {
try {
const updatedSymbols = symbols.map((symbol: Record<string, unknown>) => ({
...symbol,
qmSearchCode: symbol.symbol,
symbol: (symbol.symbol as string)?.split(':')[0],
searchPrefix: prefix,
searchDepth: depth,
discoveredAt: new Date()
}));
await ctx.mongodb.batchUpsert('qmSymbols', updatedSymbols, ['qmSearchCode']);
ctx.logger.debug('Stored symbols in MongoDB', { count: symbols.length });
} catch (error) {
ctx.logger.warn('Failed to store symbols in MongoDB', { error });
}
}
// If we have 50+ symbols and haven't reached max depth, spawn sub-jobs
if (symbolCount >= 50 && depth < maxDepth) {
const queueManager = QueueManager.getInstance();
const queue = queueManager.getQueue('qm');
ctx.logger.info(`Spawning sub-jobs for prefix "${prefix}" (${symbolCount} >= 50 symbols)`);
// Create jobs for prefixA, prefixB, prefixC... prefixZ
for (let i = 0; i < 26; i++) {
const letter = String.fromCharCode(65 + i);
const newPrefix = prefix + letter;
const job: SymbolSpiderJob = {
prefix: newPrefix,
depth: depth + 1,
source,
maxDepth,
};
await queue.add(
'spider-symbol-search',
{
handler: 'qm',
operation: 'spider-symbol-search',
payload: job,
},
{
priority: Math.max(1, 6 - depth), // Higher priority for deeper jobs
delay: i * 50, // Stagger sub-jobs by 50ms
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
}
);
jobsCreated++;
}
// Cache sub-job creation info
await ctx.cache.set(`sub-jobs:${prefix}`, {
parentPrefix: prefix,
depth,
symbolCount,
jobsCreated,
timestamp: new Date().toISOString()
}, { ttl: 3600 });
ctx.logger.info(`Created ${jobsCreated} sub-jobs for prefix "${prefix}"`);
} else {
// Terminal case: save symbols (already done above)
ctx.logger.info(`Terminal case for prefix "${prefix}": ${symbolCount} symbols saved`);
// Cache terminal case info
await ctx.cache.set(`terminal:${prefix}`, {
prefix,
depth,
symbolCount,
isTerminal: true,
reason: symbolCount < 50 ? 'insufficient_symbols' : 'max_depth_reached',
timestamp: new Date().toISOString()
}, { ttl: 3600 });
}
return { success: true, symbolsFound: symbolCount, jobsCreated };
} catch (error) {
ctx.logger.error(`Failed to search and spawn jobs for prefix "${prefix}"`, { error, depth });
return { success: false, symbolsFound: 0, jobsCreated: 0 };
}
}

View file

@ -1,200 +0,0 @@
/**
* QM Symbols Operations - Symbol fetching and API interactions
*/
import { OperationContext } from '@stock-bot/di';
import { getRandomProxy } from '@stock-bot/utils';
import type { ServiceContainer } from '@stock-bot/di';
import { QMSessionManager } from '../shared/session-manager';
import { QM_SESSION_IDS, QM_CONFIG, SESSION_CONFIG } from '../shared/config';
import type { SymbolSpiderJob, Exchange } from '../shared/types';
import { initializeQMResources } from './session.operations';
import { spiderSymbolSearch } from './spider.operations';
export async function fetchSymbols(container: ServiceContainer): Promise<unknown[] | null> {
const ctx = OperationContext.create('qm', 'symbols', { container });
try {
const sessionManager = QMSessionManager.getInstance();
if (!sessionManager.getInitialized()) {
await initializeQMResources(container);
}
ctx.logger.info('Starting QM spider-based symbol search...');
// Check if we have a recent symbol fetch
const lastFetch = await ctx.cache.get('last-symbol-fetch');
if (lastFetch) {
ctx.logger.info('Recent symbol fetch found, using spider search');
}
// Start the spider process with root job
const rootJob: SymbolSpiderJob = {
prefix: null, // Root job creates A-Z jobs
depth: 0,
source: 'qm',
maxDepth: 4,
};
const result = await spiderSymbolSearch(rootJob);
if (result.success) {
// Cache successful fetch info
await ctx.cache.set('last-symbol-fetch', {
timestamp: new Date().toISOString(),
jobsCreated: result.jobsCreated,
success: true
}, { ttl: 3600 });
ctx.logger.info(
`QM spider search initiated successfully. Created ${result.jobsCreated} initial jobs`
);
return [`Spider search initiated with ${result.jobsCreated} jobs`];
} else {
ctx.logger.error('Failed to initiate QM spider search');
return null;
}
} catch (error) {
ctx.logger.error('Failed to start QM spider symbol search', { error });
return null;
} finally {
await ctx.dispose();
}
}
export async function searchQMSymbolsAPI(query: string, container: ServiceContainer): Promise<any[]> {
const ctx = OperationContext.create('qm', 'api-search', { container });
const proxyInfo = await getRandomProxy();
if (!proxyInfo) {
throw new Error('No proxy available for QM API call');
}
const sessionManager = QMSessionManager.getInstance();
const session = sessionManager.getSession(QM_SESSION_IDS.LOOKUP);
if (!session) {
throw new Error(`No active session found for QM API with ID: ${QM_SESSION_IDS.LOOKUP}`);
}
try {
ctx.logger.debug('Searching QM symbols API', { query, proxy: session.proxy });
// Check cache for recent API results
const cacheKey = `api-search:${query}`;
const cachedResult = await ctx.cache.get(cacheKey);
if (cachedResult) {
ctx.logger.debug('Using cached API search result', { query });
return cachedResult;
}
// QM lookup endpoint for symbol search
const searchParams = new URLSearchParams({
marketType: 'equity',
pathName: '/demo/portal/company-summary.php',
q: query,
qmodTool: 'SmartSymbolLookup',
searchType: 'symbol',
showFree: 'false',
showHisa: 'false',
webmasterId: '500'
});
const apiUrl = `${QM_CONFIG.LOOKUP_URL}?${searchParams.toString()}`;
const response = await fetch(apiUrl, {
method: 'GET',
headers: session.headers,
signal: AbortSignal.timeout(SESSION_CONFIG.API_TIMEOUT),
});
if (!response.ok) {
throw new Error(`QM API request failed: ${response.status} ${response.statusText}`);
}
const symbols = await response.json();
// Update session stats
session.successfulCalls++;
session.lastUsed = new Date();
// Process symbols and extract exchanges
if (ctx.mongodb && symbols.length > 0) {
try {
const updatedSymbols = symbols.map((symbol: Record<string, unknown>) => ({
...symbol,
qmSearchCode: symbol.symbol,
symbol: (symbol.symbol as string)?.split(':')[0],
searchQuery: query,
fetchedAt: new Date()
}));
await ctx.mongodb.batchUpsert('qmSymbols', updatedSymbols, ['qmSearchCode']);
// Extract and store unique exchanges
const exchanges: Exchange[] = [];
for (const symbol of symbols) {
if (!exchanges.some(ex => ex.exchange === symbol.exchange)) {
exchanges.push({
exchange: symbol.exchange,
exchangeCode: symbol.exchangeCode,
exchangeShortName: symbol.exchangeShortName,
countryCode: symbol.countryCode,
source: 'qm',
});
}
}
if (exchanges.length > 0) {
await ctx.mongodb.batchUpsert('qmExchanges', exchanges, ['exchange']);
ctx.logger.debug('Stored exchanges in MongoDB', { count: exchanges.length });
}
} catch (error) {
ctx.logger.warn('Failed to store symbols/exchanges in MongoDB', { error });
}
}
// Cache the result
await ctx.cache.set(cacheKey, symbols, { ttl: 1800 }); // 30 minutes
// Store API call stats
await ctx.cache.set(`api-stats:${query}:${Date.now()}`, {
query,
symbolCount: symbols.length,
proxy: session.proxy,
success: true,
timestamp: new Date().toISOString()
}, { ttl: 3600 });
ctx.logger.info(
`QM API returned ${symbols.length} symbols for query: ${query}`,
{ proxy: session.proxy, symbolCount: symbols.length }
);
return symbols;
} catch (error) {
// Update session failure stats
session.failedCalls++;
session.lastUsed = new Date();
// Cache failed API call info
await ctx.cache.set(`api-failure:${query}:${Date.now()}`, {
query,
error: error.message,
proxy: session.proxy,
timestamp: new Date().toISOString()
}, { ttl: 600 });
ctx.logger.error(`Error searching QM symbols for query "${query}"`, {
error: error.message,
proxy: session.proxy
});
throw error;
} finally {
await ctx.dispose();
}
}

View file

@ -14,60 +14,23 @@ export class QMHandler extends BaseHandler {
super(services); // Handler name read from @Handler decorator
}
@Operation('create-sessions')
@Operation('check-sessions')
@QueueSchedule('0 */15 * * *', {
priority: 7,
immediately: true,
description: 'Create and maintain QM sessions'
description: 'Check and maintain QM sessions'
})
async createSessions(input: unknown, context: ExecutionContext): Promise<unknown> {
this.logger.info('Creating QM sessions...');
try {
// Check existing sessions in cache
const sessionKey = 'qm:sessions:active';
const existingSessions = await this.cache.get(sessionKey) || [];
this.logger.info('Current QM sessions', {
existing: existingSessions.length,
action: 'creating_new_sessions'
});
// Create new session
const newSession = {
id: `qm-session-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`,
createdAt: new Date().toISOString(),
status: 'active',
provider: 'quotemedia',
// Add other session properties as needed
};
// Add to existing sessions
const updatedSessions = [...existingSessions, newSession];
// Store sessions in cache with 24 hour TTL (sessions are temporary)
await this.cache.set(sessionKey, updatedSessions, 86400); // 24 hours
// Store session stats for monitoring
await this.cache.set('qm:sessions:count', updatedSessions.length, 3600);
await this.cache.set('qm:sessions:last-created', new Date().toISOString(), 1800);
this.logger.info('QM session created', {
sessionId: newSession.id,
totalSessions: updatedSessions.length
});
return {
success: true,
sessionId: newSession.id,
totalSessions: updatedSessions.length,
message: 'QM session created successfully'
};
} catch (error) {
this.logger.error('Failed to create QM sessions', { error });
throw error;
}
async checkSessions(input: unknown, context: ExecutionContext): Promise<unknown> {
// Call the session maintenance action
const { checkSessions } = await import('./actions/session.action');
return await checkSessions(this.services);
}
@Operation('create-session')
async createSession(input: unknown, context: ExecutionContext): Promise<unknown> {
// Call the individual session creation action
const { createSingleSession } = await import('./actions/session.action');
return await createSingleSession(this.services, input);
}
@Operation('search-symbols')