diff --git a/apps/stock/data-ingestion/src/handlers/ceo/ceo.handler.ts b/apps/stock/data-ingestion/src/handlers/ceo/ceo.handler.ts index 5f25264..a2603f2 100644 --- a/apps/stock/data-ingestion/src/handlers/ceo/ceo.handler.ts +++ b/apps/stock/data-ingestion/src/handlers/ceo/ceo.handler.ts @@ -1,16 +1,16 @@ import { BaseHandler, + Disabled, Handler, Operation, ScheduledOperation, - type IServiceContainer, } from '@stock-bot/handlers'; import { getChannels, getPosts, getShorts, updateUniqueSymbols } from './actions'; @Handler('ceo') -// @Disabled() +@Disabled() export class CeoHandler extends BaseHandler { - constructor(services: IServiceContainer) { + constructor(services: any) { super(services); // Handler name read from @Handler decorator } @@ -21,7 +21,6 @@ export class CeoHandler extends BaseHandler { }) getChannels = getChannels; - @Operation('update-unique-symbols-posts') @ScheduledOperation('update-unique-symbols-posts', '30 * * * *', { immediately: false, description: 'Process unique CEO symbols and schedule individual jobs', @@ -33,7 +32,6 @@ export class CeoHandler extends BaseHandler { }) updateUniqueSymbolsPosts = updateUniqueSymbols; - @Operation('update-unique-symbols-shorts') @ScheduledOperation('update-unique-symbols-shorts', '0 0 * * *', { immediately: false, description: 'Process unique CEO symbols and schedule individual jobs', diff --git a/apps/stock/data-ingestion/src/handlers/ib/ib.handler.ts b/apps/stock/data-ingestion/src/handlers/ib/ib.handler.ts index 053b9b8..f70a91f 100644 --- a/apps/stock/data-ingestion/src/handlers/ib/ib.handler.ts +++ b/apps/stock/data-ingestion/src/handlers/ib/ib.handler.ts @@ -3,13 +3,12 @@ import { Handler, Operation, ScheduledOperation, - type IServiceContainer, } from '@stock-bot/handlers'; import { fetchExchanges, fetchExchangesAndSymbols, fetchSession, fetchSymbols } from './actions'; @Handler('ib') export class IbHandler extends BaseHandler { - constructor(services: IServiceContainer) { + constructor(services: any) { super(services); } @@ -28,7 +27,6 @@ export class IbHandler extends BaseHandler { return fetchSymbols(this); } - @Operation('ib-exchanges-and-symbols') @ScheduledOperation('ib-exchanges-and-symbols', '0 0 * * 0', { priority: 5, description: 'Fetch and update IB exchanges and symbols data', diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/index.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/index.ts new file mode 100644 index 0000000..bc8834a --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/index.ts @@ -0,0 +1,5 @@ +/** + * QM Action Exports + */ + +export { checkSessions, createSession } from './session.action'; diff --git a/apps/stock/data-ingestion/src/handlers/qm/actions/session.action.ts b/apps/stock/data-ingestion/src/handlers/qm/actions/session.action.ts index 58eef50..b195a8f 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/actions/session.action.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/actions/session.action.ts @@ -2,35 +2,74 @@ * QM Session Actions - Session management and creation */ -import { BaseHandler } from '@stock-bot/core/handlers'; -import { QM_SESSION_IDS, SESSION_CONFIG } from '../shared/config'; +import type { BaseHandler, ExecutionContext } from '@stock-bot/handlers'; +import { BunRequestInit, getRandomUserAgent } from '@stock-bot/utils'; +import { QM_CONFIG, QM_SESSION_IDS, SESSION_CONFIG } from '../shared/config'; import { QMSessionManager } from '../shared/session-manager'; /** * Check existing sessions and queue creation jobs for needed sessions + * This is the main session management function that handles cleanup, maintenance, and initialization */ -export async function checkSessions(handler: BaseHandler): Promise<{ +export async function checkSessions( + this: BaseHandler, + _input: unknown, + _context: ExecutionContext +): Promise<{ cleaned: number; queued: number; message: string; }> { + this.logger.info('Checking QM sessions'); + const sessionManager = QMSessionManager.getInstance(); + + // Set cache provider if not already set + if (this.cache) { + sessionManager.setCacheProvider(this.cache); + } + + // Load sessions from cache if not initialized + if (!sessionManager.getInitialized()) { + await sessionManager.loadFromCache(); + sessionManager.setInitialized(true); + } + const cleanedCount = sessionManager.cleanupFailedSessions(); + + // Sync after cleanup + await sessionManager.syncToCache(); + // Check which session IDs need more sessions and queue creation jobs let queuedCount = 0; for (const [sessionType, sessionId] of Object.entries(QM_SESSION_IDS)) { - handler.logger.debug(`Checking session ID: ${sessionId}`); + this.logger.debug(`Checking session ID: ${sessionId}`); if (sessionManager.needsMoreSessions(sessionId)) { const currentCount = sessionManager.getSessions(sessionId).length; - const neededSessions = SESSION_CONFIG.MAX_SESSIONS - currentCount; - for (let i = 0; i < neededSessions; i++) { - await handler.scheduleOperation('create-session', { sessionId, sessionType }); - handler.logger.info(`Queued job to create session for ${sessionType}`); + const neededSessions = SESSION_CONFIG.MIN_SESSIONS - currentCount; + + // Queue up to 10 at a time to avoid overwhelming the system + const toQueue = Math.min(neededSessions, 10); + + for (let i = 0; i < toQueue; i++) { + await this.scheduleOperation('create-session', { sessionId, sessionType }, { + delay: i * 2000, // Stagger creation by 2 seconds + }); queuedCount++; } + + this.logger.info(`Queued ${toQueue} jobs to create sessions for ${sessionType}`, { + currentCount, + targetCount: SESSION_CONFIG.MIN_SESSIONS, + }); } } + this.logger.info('QM session check completed', { + cleaned: cleanedCount, + queued: queuedCount, + }); + return { cleaned: cleanedCount, queued: queuedCount, @@ -46,32 +85,143 @@ interface CreateSessionInput { sessionType?: string; } -export async function createSingleSession( - handler: BaseHandler, +export async function createSession( + this: BaseHandler, input: CreateSessionInput ): Promise<{ sessionId: string; status: string; sessionType: string }> { - const { sessionId: _sessionId, sessionType } = input || {}; - const _sessionManager = QMSessionManager.getInstance(); + const { sessionId, sessionType = 'LOOKUP' } = input || {}; + const sessionManager = QMSessionManager.getInstance(); - // Get proxy from proxy service - const _proxyString = handler.proxy.getProxy(); + // Get the actual session ID from config + const actualSessionId = sessionId || QM_SESSION_IDS[sessionType as keyof typeof QM_SESSION_IDS]; + + if (!actualSessionId) { + throw new Error(`Invalid session type: ${sessionType}`); + } - // const session = { - // proxy: proxyString || 'http://proxy:8080', - // headers: sessionManager.getQmHeaders(), - // successfulCalls: 0, - // failedCalls: 0, - // lastUsed: new Date() - // }; + // Set cache provider if not already set + if (this.cache) { + sessionManager.setCacheProvider(this.cache); + } - handler.logger.info(`Creating session for ${sessionType}`); + try { + // Get proxy from proxy service + const proxyUrl: string | null = this.proxy ? this.proxy.getProxy() : null; + if (!proxyUrl) { + this.logger.warn(`No proxy available for session type ${sessionType}`); + throw new Error(`No proxy available for session type ${sessionType}`); + } - // Add session to manager - // sessionManager.addSession(sessionType, session); + const userAgent = getRandomUserAgent(); + this.logger.debug(`Using User-Agent: ${userAgent}, proxy: ${proxyUrl || 'none'}`); - return { - sessionId: sessionType, - status: 'created', - sessionType, - }; + // Authenticate with QM API inline + const authUrl = `${QM_CONFIG.BASE_URL}${QM_CONFIG.SESSION_PATH}`; + + // Build request options + const requestOptions: BunRequestInit = { + method: 'GET', + proxy: proxyUrl || undefined, + headers: { + 'User-Agent': userAgent, + Accept: '*/*', + 'Accept-Language': 'en', + 'Sec-Fetch-Mode': 'cors', + Origin: 'https://www.quotemedia.com', + Referer: 'https://www.quotemedia.com/', + }, + redirect: 'manual', // Don't follow redirects automatically + }; + + this.logger.debug('Authenticating with QM API', { authUrl }); + + const response = await fetch(authUrl, requestOptions); + + // Extract cookies from response headers + // const cookies: string[] = []; + // const setCookieHeaders = response.headers.getSetCookie(); + + // if (setCookieHeaders && setCookieHeaders.length > 0) { + // cookies.push(...setCookieHeaders); + // } + + // // Check if authentication was successful + // if (response.status === 200 || response.status === 302) { + // this.logger.info('QM authentication successful', { + // status: response.status, + // cookieCount: cookies.length, + // }); + + // // Build headers with cookies + // const headers = sessionManager.getQmHeaders(); + // if (cookies.length > 0) { + // headers['Cookie'] = buildCookieString(cookies); + // } + + // // Create session object + // const session: QMSession = { + // proxy: proxyUrl || '', + // headers, + // successfulCalls: 0, + // failedCalls: 0, + // lastUsed: new Date(), + // }; + + // // Add session to manager + // sessionManager.addSession(actualSessionId, session); + + // // Sync to cache + // await sessionManager.syncToCache(); + + // this.logger.info(`Successfully created session for ${sessionType}`, { + // sessionId: actualSessionId, + // hasProxy: !!proxyUrl, + // hasCookies: cookies.length > 0, + // }); + + // return { + // sessionId: actualSessionId, + // status: 'created', + // sessionType, + // }; + // } else { + // this.logger.warn('QM authentication failed', { + // status: response.status, + // statusText: response.statusText, + // }); + + // return { + // sessionId: actualSessionId, + // status: 'failed', + // sessionType, + // }; + // } + + return { + sessionId: 'test',//actualSessionId, + status: 'created', + sessionType, + }; + } catch (error) { + this.logger.error(`Failed to create session for ${sessionType}`, { error }); + return { + sessionId: actualSessionId, + status: 'error', + sessionType, + }; + } +} + +/** + * Build cookie string from array of set-cookie headers + */ +function buildCookieString(cookies: string[]): string { + return cookies + .map(cookie => { + // Extract just the name=value part, ignore attributes + const match = cookie.match(/^([^=]+=[^;]+)/); + return match ? match[1] : ''; + }) + .filter(Boolean) + .join('; '); } diff --git a/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts b/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts index 6433566..0998d93 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/qm.handler.ts @@ -1,29 +1,26 @@ -import { BaseHandler, Handler, type IServiceContainer } from '@stock-bot/handlers'; +import { + BaseHandler, + Handler, + Operation, + ScheduledOperation, +} from '@stock-bot/handlers'; +import { checkSessions, createSession } from './actions'; @Handler('qm') export class QMHandler extends BaseHandler { - constructor(services: IServiceContainer) { + constructor(services: any) { super(services); // Handler name read from @Handler decorator } - // @Operation('check-sessions') - // @QueueSchedule('0 */15 * * *', { - // priority: 7, - // immediately: true, - // description: 'Check and maintain QM sessions' - // }) - // async checkSessions(input: unknown, context: ExecutionContext): Promise { - // // Call the session maintenance action - // const { checkSessions } = await import('./actions/session.action'); - // return await checkSessions(this); - // } + @ScheduledOperation('check-sessions', '*/2 * * * *', { + priority: 8, + immediately: true, + description: 'Check and maintain QM sessions every 2 minutes', + }) + checkSessions = checkSessions; - // @Operation('create-session') - // async createSession(input: unknown, context: ExecutionContext): Promise { - // // Call the individual session creation action - // const { createSingleSession } = await import('./actions/session.action'); - // return await createSingleSession(this, input); - // } + @Operation('create-session') + createSession = createSession; // @Operation('search-symbols') // async searchSymbols(_input: unknown, _context: ExecutionContext): Promise { diff --git a/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts b/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts index 9964359..c15e6a6 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/shared/config.ts @@ -24,15 +24,15 @@ export const QM_SESSION_IDS = { // QM API Configuration export const QM_CONFIG = { BASE_URL: 'https://app.quotemedia.com', - AUTH_PATH: '/auth/g/authenticate/dataTool/v0/500', + SESSION_PATH: '/auth/g/authenticate/dataTool/v0/500', LOOKUP_URL: 'https://app.quotemedia.com/datatool/lookup.json', } as const; // Session management settings export const SESSION_CONFIG = { - MIN_SESSIONS: 5, - MAX_SESSIONS: 10, - MAX_FAILED_CALLS: 10, - SESSION_TIMEOUT: 10000, // 10 seconds - API_TIMEOUT: 15000, // 15 seconds + MIN_SESSIONS: 15, + MAX_SESSIONS: 50, + MAX_FAILED_CALLS: 3, + SESSION_TIMEOUT: 5000, // 10 seconds + API_TIMEOUT: 30000, // 15 seconds } as const; diff --git a/apps/stock/data-ingestion/src/handlers/qm/shared/session-manager.ts b/apps/stock/data-ingestion/src/handlers/qm/shared/session-manager.ts index ce8d464..520f435 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/shared/session-manager.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/shared/session-manager.ts @@ -2,14 +2,16 @@ * QM Session Manager - Centralized session state management */ +import type { CacheProvider } from '@stock-bot/cache'; import { getRandomUserAgent } from '@stock-bot/utils'; import { QM_SESSION_IDS, SESSION_CONFIG } from './config'; -import type { QMSession } from './types'; +import type { CachedSession, QMSession } from './types'; export class QMSessionManager { private static instance: QMSessionManager | null = null; private sessionCache: Record = {}; private isInitialized = false; + private cacheProvider: CacheProvider | null = null; private constructor() { // Initialize session cache with known session IDs @@ -25,6 +27,13 @@ export class QMSessionManager { return QMSessionManager.instance; } + /** + * Set the cache provider for persistence + */ + setCacheProvider(cache: CacheProvider): void { + this.cacheProvider = cache; + } + /** * Get a random session for the given session ID */ @@ -153,4 +162,108 @@ export class QMSessionManager { getInitialized(): boolean { return this.isInitialized; } + + /** + * Load sessions from cache + */ + async loadFromCache(): Promise { + if (!this.cacheProvider) { + return; + } + + try { + // Load sessions for each session type + for (const [sessionType, sessionId] of Object.entries(QM_SESSION_IDS)) { + const listKey = `qm:sessions:${sessionType.toLowerCase()}:list`; + const sessionIds = await this.cacheProvider.get(listKey); + + if (sessionIds && Array.isArray(sessionIds)) { + const sessions: QMSession[] = []; + + for (const id of sessionIds) { + const sessionKey = `qm:sessions:${sessionType.toLowerCase()}:${id}`; + const cachedSession = await this.cacheProvider.get(sessionKey); + + if (cachedSession) { + sessions.push({ + proxy: cachedSession.proxy, + headers: cachedSession.headers, + successfulCalls: cachedSession.successfulCalls, + failedCalls: cachedSession.failedCalls, + lastUsed: new Date(cachedSession.lastUsed), + }); + } + } + + this.sessionCache[sessionId] = sessions; + } + } + } catch (error) { + console.error('Failed to load sessions from cache:', error); + } + } + + /** + * Sync sessions to cache + */ + async syncToCache(): Promise { + if (!this.cacheProvider) { + return; + } + + try { + for (const [sessionType, sessionId] of Object.entries(QM_SESSION_IDS)) { + const sessions = this.sessionCache[sessionId] || []; + const sessionIds: string[] = []; + + // Store each session + for (let i = 0; i < sessions.length; i++) { + const session = sessions[i]; + const id = `${sessionType.toLowerCase()}_${i}_${Date.now()}`; + const sessionKey = `qm:sessions:${sessionType.toLowerCase()}:${id}`; + + const cachedSession: CachedSession = { + ...session, + id, + sessionType, + }; + + await this.cacheProvider.set(sessionKey, cachedSession, 86400); // 24 hour TTL + sessionIds.push(id); + } + + // Store the list of session IDs + const listKey = `qm:sessions:${sessionType.toLowerCase()}:list`; + await this.cacheProvider.set(listKey, sessionIds, 86400); + } + + // Store stats + const statsKey = 'qm:sessions:stats'; + await this.cacheProvider.set(statsKey, this.getStats(), 3600); + } catch (error) { + console.error('Failed to sync sessions to cache:', error); + } + } + + /** + * Increment failed calls for a session + */ + async incrementFailedCalls(sessionId: string, session: QMSession): Promise { + session.failedCalls++; + session.lastUsed = new Date(); + + // Sync to cache after update + await this.syncToCache(); + } + + /** + * Increment successful calls for a session + */ + async incrementSuccessfulCalls(sessionId: string, session: QMSession): Promise { + session.successfulCalls++; + session.lastUsed = new Date(); + + // Sync to cache after update + await this.syncToCache(); + } } diff --git a/apps/stock/data-ingestion/src/handlers/qm/shared/types.ts b/apps/stock/data-ingestion/src/handlers/qm/shared/types.ts index 1855a0c..b0b2514 100644 --- a/apps/stock/data-ingestion/src/handlers/qm/shared/types.ts +++ b/apps/stock/data-ingestion/src/handlers/qm/shared/types.ts @@ -30,3 +30,22 @@ export interface SpiderResult { symbolsFound: number; jobsCreated: number; } + +export interface QMSessionStats { + sessionType: string; + total: number; + valid: number; + failed: number; + lastUpdate: Date; +} + +export interface QMAuthResponse { + success: boolean; + cookies?: string[]; + error?: string; +} + +export interface CachedSession extends QMSession { + id: string; + sessionType: string; +} diff --git a/apps/stock/data-ingestion/src/handlers/webshare/webshare.handler.ts b/apps/stock/data-ingestion/src/handlers/webshare/webshare.handler.ts index 28bec9f..03215fc 100644 --- a/apps/stock/data-ingestion/src/handlers/webshare/webshare.handler.ts +++ b/apps/stock/data-ingestion/src/handlers/webshare/webshare.handler.ts @@ -4,12 +4,11 @@ import { Operation, QueueSchedule, type ExecutionContext, - type IServiceContainer, } from '@stock-bot/handlers'; @Handler('webshare') export class WebShareHandler extends BaseHandler { - constructor(services: IServiceContainer) { + constructor(services: any) { super(services); } diff --git a/libs/core/handlers/src/index.ts b/libs/core/handlers/src/index.ts index fc0f646..939cd03 100644 --- a/libs/core/handlers/src/index.ts +++ b/libs/core/handlers/src/index.ts @@ -8,3 +8,19 @@ export { Disabled, } from './decorators/decorators'; export { createJobHandler } from './utils/create-job-handler'; + +// Re-export commonly used types from @stock-bot/types for convenience +export type { + ExecutionContext, + IHandler, + JobHandler, + HandlerConfig, + HandlerConfigWithSchedule, + HandlerMetadata, + OperationMetadata, + ScheduledJob, + TypedJobHandler, +} from '@stock-bot/types'; + +// Re-export JobScheduleOptions from BaseHandler +export type { JobScheduleOptions } from './base/BaseHandler'; diff --git a/libs/core/types/src/services.ts b/libs/core/types/src/services.ts index 6f668a3..058fd01 100644 --- a/libs/core/types/src/services.ts +++ b/libs/core/types/src/services.ts @@ -262,29 +262,27 @@ export interface Page { // Proxy Manager types export interface ProxyManager { - getProxy(key?: string): Promise; + getProxy(): string | null; + getProxyInfo(): ProxyInfo | null; getProxies(count: number, key?: string): Promise; releaseProxy(proxy: ProxyInfo | string): Promise; markProxyFailed(proxy: ProxyInfo | string, reason?: string): Promise; - getStats(): Promise; + getStats(): ProxyStats; resetProxy(proxy: ProxyInfo | string): Promise; blacklistProxy(proxy: ProxyInfo | string, duration?: number): Promise; isBlacklisted(proxy: ProxyInfo | string): Promise; refreshProxies(): Promise; } +// ProxyInfo should be imported from @stock-bot/proxy package +// to avoid duplication. Using minimal definition here for type compatibility export interface ProxyInfo { - id: string; host: string; port: number; + protocol: 'http' | 'https'; username?: string; password?: string; - protocol?: string; - country?: string; - lastUsed?: Date; - failureCount?: number; - successCount?: number; - averageResponseTime?: number; + [key: string]: any; // Allow additional properties from proxy package } export interface ProxyStats { diff --git a/libs/services/proxy/src/proxy-manager.ts b/libs/services/proxy/src/proxy-manager.ts index 71355dc..0904724 100644 --- a/libs/services/proxy/src/proxy-manager.ts +++ b/libs/services/proxy/src/proxy-manager.ts @@ -73,53 +73,20 @@ export class ProxyManager { return proxyUrl; } /** - * Get a random working proxy from the available pool (synchronous) + * Get proxy info for the current proxy in rotation (synchronous) */ - getRandomProxy(): ProxyInfo | null { - // Ensure initialized - if (!this.isInitialized) { - throw new Error('ProxyManager not initialized'); - } - - // Return null if no proxies available + getProxyInfo(): ProxyInfo | null { if (this.proxies.length === 0) { this.logger.warn('No proxies available in memory'); return null; } - // Filter for working proxies (not explicitly marked as non-working) - const workingProxies = this.proxies.filter(proxy => proxy.isWorking !== false); - - if (workingProxies.length === 0) { - this.logger.warn('No working proxies available'); - return null; - } - - // Return random proxy with preference for recently successful ones - const sortedProxies = workingProxies.sort((a, b) => { - // Prefer proxies with better success rates - const aRate = a.successRate || 0; - const bRate = b.successRate || 0; - return bRate - aRate; - }); - - // Take from top 50% of best performing proxies - const topProxies = sortedProxies.slice(0, Math.max(1, Math.floor(sortedProxies.length * 0.5))); - const selectedProxy = topProxies[Math.floor(Math.random() * topProxies.length)]; - - if (!selectedProxy) { - this.logger.warn('No proxy selected from available pool'); - return null; - } - - this.logger.debug('Selected proxy', { - host: selectedProxy.host, - port: selectedProxy.port, - successRate: selectedProxy.successRate, - totalAvailable: workingProxies.length, - }); - - return selectedProxy; + // Use same rotation logic as getProxy() to ensure consistency + // Note: We don't increment the index here since getProxy() already does that + const currentIndex = this.proxyIndex > 0 ? this.proxyIndex - 1 : this.proxies.length - 1; + const proxyInfo = this.proxies[currentIndex]; + + return proxyInfo || null; } /**