fixed up session maanger to use just redis no in store memory, finished prices
This commit is contained in:
parent
100efb575f
commit
966f6ac612
16 changed files with 773 additions and 351 deletions
|
|
@ -77,8 +77,8 @@
|
||||||
"port": 6379,
|
"port": 6379,
|
||||||
"db": 1
|
"db": 1
|
||||||
},
|
},
|
||||||
"workers": 1,
|
"workers": 5,
|
||||||
"concurrency": 1,
|
"concurrency": 5,
|
||||||
"enableScheduledJobs": true,
|
"enableScheduledJobs": true,
|
||||||
"defaultJobOptions": {
|
"defaultJobOptions": {
|
||||||
"attempts": 3,
|
"attempts": 3,
|
||||||
|
|
|
||||||
|
|
@ -49,7 +49,7 @@ export async function updateCorporateActions(
|
||||||
this.logger.info('Fetching corporate actions', { symbol, symbolId });
|
this.logger.info('Fetching corporate actions', { symbol, symbolId });
|
||||||
|
|
||||||
const sessionManager = QMSessionManager.getInstance();
|
const sessionManager = QMSessionManager.getInstance();
|
||||||
sessionManager.initialize(this.cache, this.logger);
|
await sessionManager.initialize(this.cache, this.logger);
|
||||||
|
|
||||||
// Get a session - you'll need to add the appropriate session ID
|
// Get a session - you'll need to add the appropriate session ID
|
||||||
const sessionId = QM_SESSION_IDS.LOOKUP; // TODO: Update with correct session ID
|
const sessionId = QM_SESSION_IDS.LOOKUP; // TODO: Update with correct session ID
|
||||||
|
|
|
||||||
|
|
@ -44,7 +44,7 @@ export async function updateFilings(
|
||||||
this.logger.info('Fetching filings', { symbol, symbolId });
|
this.logger.info('Fetching filings', { symbol, symbolId });
|
||||||
|
|
||||||
const sessionManager = QMSessionManager.getInstance();
|
const sessionManager = QMSessionManager.getInstance();
|
||||||
sessionManager.initialize(this.cache, this.logger);
|
await sessionManager.initialize(this.cache, this.logger);
|
||||||
|
|
||||||
// Get a session - you'll need to add the appropriate session ID for filings
|
// Get a session - you'll need to add the appropriate session ID for filings
|
||||||
const sessionId = QM_SESSION_IDS.LOOKUP; // TODO: Update with correct session ID
|
const sessionId = QM_SESSION_IDS.LOOKUP; // TODO: Update with correct session ID
|
||||||
|
|
|
||||||
|
|
@ -44,7 +44,7 @@ export async function updateFinancials(
|
||||||
this.logger.info('Fetching financials', { symbol, symbolId });
|
this.logger.info('Fetching financials', { symbol, symbolId });
|
||||||
|
|
||||||
const sessionManager = QMSessionManager.getInstance();
|
const sessionManager = QMSessionManager.getInstance();
|
||||||
sessionManager.initialize(this.cache, this.logger);
|
await sessionManager.initialize(this.cache, this.logger);
|
||||||
|
|
||||||
// Get a session - you'll need to add the appropriate session ID for financials
|
// Get a session - you'll need to add the appropriate session ID for financials
|
||||||
const sessionId = QM_SESSION_IDS.LOOKUP; // TODO: Update with correct session ID
|
const sessionId = QM_SESSION_IDS.LOOKUP; // TODO: Update with correct session ID
|
||||||
|
|
|
||||||
|
|
@ -46,7 +46,7 @@ export async function updateIntradayBars(
|
||||||
this.logger.info('Fetching intraday bars', { symbol, symbolId, crawlDate });
|
this.logger.info('Fetching intraday bars', { symbol, symbolId, crawlDate });
|
||||||
|
|
||||||
const sessionManager = QMSessionManager.getInstance();
|
const sessionManager = QMSessionManager.getInstance();
|
||||||
sessionManager.initialize(this.cache, this.logger);
|
await sessionManager.initialize(this.cache, this.logger);
|
||||||
|
|
||||||
// Get a session - you'll need to add the appropriate session ID for intraday
|
// Get a session - you'll need to add the appropriate session ID for intraday
|
||||||
const sessionId = QM_SESSION_IDS.LOOKUP; // TODO: Update with correct session ID
|
const sessionId = QM_SESSION_IDS.LOOKUP; // TODO: Update with correct session ID
|
||||||
|
|
|
||||||
|
|
@ -45,7 +45,7 @@ export async function updatePrices(
|
||||||
this.logger.info(`Fetching daily prices ${qmSearchCode}`, { qmSearchCode });
|
this.logger.info(`Fetching daily prices ${qmSearchCode}`, { qmSearchCode });
|
||||||
|
|
||||||
const sessionManager = QMSessionManager.getInstance();
|
const sessionManager = QMSessionManager.getInstance();
|
||||||
sessionManager.initialize(this.cache, this.logger);
|
await sessionManager.initialize(this.cache, this.logger);
|
||||||
|
|
||||||
// Get a session - you'll need to add the appropriate session ID for prices
|
// Get a session - you'll need to add the appropriate session ID for prices
|
||||||
const sessionId = QM_SESSION_IDS.PRICES;
|
const sessionId = QM_SESSION_IDS.PRICES;
|
||||||
|
|
@ -86,11 +86,17 @@ export async function updatePrices(
|
||||||
|
|
||||||
// Update session success stats
|
// Update session success stats
|
||||||
await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid);
|
await sessionManager.incrementSuccessfulCalls(sessionId, session.uuid);
|
||||||
|
// Update symbol to track last price update
|
||||||
|
const tracker = await getOperationTracker(this);
|
||||||
|
|
||||||
const priceData = responseData.results?.history[0].eoddata || [];
|
const priceData = responseData.results?.history[0].eoddata || [];
|
||||||
|
|
||||||
if(!priceData || priceData.length === 0) {
|
if(!priceData || priceData.length === 0) {
|
||||||
this.logger.warn(`No price data found for symbol ${qmSearchCode}`);
|
this.logger.warn(`No price data found for symbol ${qmSearchCode}`);
|
||||||
|
await tracker.updateSymbolOperation(qmSearchCode, 'price_update', {
|
||||||
|
status: 'success',
|
||||||
|
recordCount: priceData.length
|
||||||
|
});
|
||||||
return {
|
return {
|
||||||
success: false,
|
success: false,
|
||||||
qmSearchCode,
|
qmSearchCode,
|
||||||
|
|
@ -121,8 +127,7 @@ export async function updatePrices(
|
||||||
new Date(0)
|
new Date(0)
|
||||||
);
|
);
|
||||||
|
|
||||||
// Update symbol to track last price update
|
|
||||||
const tracker = await getOperationTracker(this);
|
|
||||||
await tracker.updateSymbolOperation(qmSearchCode, 'price_update', {
|
await tracker.updateSymbolOperation(qmSearchCode, 'price_update', {
|
||||||
status: 'success',
|
status: 'success',
|
||||||
lastRecordDate: latestDate,
|
lastRecordDate: latestDate,
|
||||||
|
|
@ -193,7 +198,7 @@ export async function schedulePriceUpdates(
|
||||||
symbolsQueued: number;
|
symbolsQueued: number;
|
||||||
errors: number;
|
errors: number;
|
||||||
}> {
|
}> {
|
||||||
const { limit = 100, forceUpdate = false } = input;
|
const { limit = 50000, forceUpdate = false } = input;
|
||||||
const tracker = await getOperationTracker(this);
|
const tracker = await getOperationTracker(this);
|
||||||
|
|
||||||
this.logger.info('Scheduling price updates', { limit, forceUpdate });
|
this.logger.info('Scheduling price updates', { limit, forceUpdate });
|
||||||
|
|
@ -236,7 +241,7 @@ export async function schedulePriceUpdates(
|
||||||
exchange: doc.exchange
|
exchange: doc.exchange
|
||||||
}, {
|
}, {
|
||||||
priority: 7, // High priority for price data
|
priority: 7, // High priority for price data
|
||||||
delay: queued * 500 // 0.5 seconds between jobs
|
delay: queued * 100 // 0.1 seconds between jobs
|
||||||
});
|
});
|
||||||
|
|
||||||
queued++;
|
queued++;
|
||||||
|
|
|
||||||
|
|
@ -26,10 +26,7 @@ export async function checkSessions(
|
||||||
const sessionManager = QMSessionManager.getInstance();
|
const sessionManager = QMSessionManager.getInstance();
|
||||||
|
|
||||||
// Initialize with cache provider and logger
|
// Initialize with cache provider and logger
|
||||||
sessionManager.initialize(this.cache, this.logger);
|
await sessionManager.initialize(this.cache, this.logger);
|
||||||
|
|
||||||
// Always load fresh data from cache (don't rely on initialization flag)
|
|
||||||
await sessionManager.loadFromCache();
|
|
||||||
|
|
||||||
// Cleanup failed sessions (this now handles its own cache sync)
|
// Cleanup failed sessions (this now handles its own cache sync)
|
||||||
const cleanedCount = await sessionManager.cleanupFailedSessions();
|
const cleanedCount = await sessionManager.cleanupFailedSessions();
|
||||||
|
|
@ -39,15 +36,15 @@ export async function checkSessions(
|
||||||
for (const [sessionType, sessionId] of Object.entries(QM_SESSION_IDS)) {
|
for (const [sessionType, sessionId] of Object.entries(QM_SESSION_IDS)) {
|
||||||
this.logger.debug(`Checking session ID: ${sessionId}`);
|
this.logger.debug(`Checking session ID: ${sessionId}`);
|
||||||
if (await sessionManager.needsMoreSessions(sessionId)) {
|
if (await sessionManager.needsMoreSessions(sessionId)) {
|
||||||
const currentCount = sessionManager.getSessions(sessionId).length;
|
const currentCount = await sessionManager.getSessionCount(sessionId);
|
||||||
const neededSessions = SESSION_CONFIG.MAX_SESSIONS - currentCount;
|
const neededSessions = SESSION_CONFIG.MAX_SESSIONS - currentCount;
|
||||||
|
|
||||||
// Queue up to 10 at a time to avoid overwhelming the system
|
// Queue up to 10 at a time to avoid overwhelming the system
|
||||||
const toQueue = Math.min(neededSessions, 20);
|
const toQueue = Math.min(neededSessions, 40);
|
||||||
|
|
||||||
for (let i = 0; i < toQueue; i++) {
|
for (let i = 0; i < toQueue; i++) {
|
||||||
await this.scheduleOperation('create-session', { sessionId, sessionType }, {
|
await this.scheduleOperation('create-session', { sessionId, sessionType }, {
|
||||||
delay: i * 2000, // Stagger creation by 2 seconds
|
// delay: i * 2000, // Stagger creation by 2 seconds
|
||||||
});
|
});
|
||||||
queuedCount++;
|
queuedCount++;
|
||||||
}
|
}
|
||||||
|
|
@ -94,7 +91,7 @@ export async function createSession(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize with cache provider and logger
|
// Initialize with cache provider and logger
|
||||||
sessionManager.initialize(this.cache, this.logger);
|
await sessionManager.initialize(this.cache, this.logger);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Get proxy from proxy service
|
// Get proxy from proxy service
|
||||||
|
|
|
||||||
|
|
@ -201,7 +201,7 @@ export async function deduplicateSymbols(
|
||||||
countryDocs.sort((a, b) => {
|
countryDocs.sort((a, b) => {
|
||||||
const priorityA = priorityMap.get(a.exchange) || 1;
|
const priorityA = priorityMap.get(a.exchange) || 1;
|
||||||
const priorityB = priorityMap.get(b.exchange) || 1;
|
const priorityB = priorityMap.get(b.exchange) || 1;
|
||||||
if (priorityA !== priorityB) return priorityB - priorityA;
|
if (priorityA !== priorityB) { return priorityB - priorityA }
|
||||||
return (b.marketCap || 0) - (a.marketCap || 0);
|
return (b.marketCap || 0) - (a.marketCap || 0);
|
||||||
});
|
});
|
||||||
activeDoc = countryDocs[0];
|
activeDoc = countryDocs[0];
|
||||||
|
|
@ -217,7 +217,7 @@ export async function deduplicateSymbols(
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
totalProcessed++;
|
totalProcessed++;
|
||||||
if (!isActive) totalDeactivated++;
|
if (!isActive) { totalDeactivated++ }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -43,7 +43,7 @@ export async function updateSymbolInfo(
|
||||||
this.logger.info(`Fetching symbol info ${qmSearchCode}`, { qmSearchCode });
|
this.logger.info(`Fetching symbol info ${qmSearchCode}`, { qmSearchCode });
|
||||||
|
|
||||||
const sessionManager = QMSessionManager.getInstance();
|
const sessionManager = QMSessionManager.getInstance();
|
||||||
sessionManager.initialize(this.cache, this.logger);
|
await sessionManager.initialize(this.cache, this.logger);
|
||||||
|
|
||||||
// Get a session
|
// Get a session
|
||||||
const sessionId = QM_SESSION_IDS.SYMBOL;
|
const sessionId = QM_SESSION_IDS.SYMBOL;
|
||||||
|
|
|
||||||
|
|
@ -145,7 +145,7 @@ export async function searchSymbols(
|
||||||
this.logger.debug('Searching QM symbols', { query });
|
this.logger.debug('Searching QM symbols', { query });
|
||||||
|
|
||||||
const sessionManager = QMSessionManager.getInstance();
|
const sessionManager = QMSessionManager.getInstance();
|
||||||
sessionManager.initialize(this.cache, this.logger);
|
await sessionManager.initialize(this.cache, this.logger);
|
||||||
|
|
||||||
// Get a session
|
// Get a session
|
||||||
const sessionId = QM_SESSION_IDS.LOOKUP;
|
const sessionId = QM_SESSION_IDS.LOOKUP;
|
||||||
|
|
|
||||||
|
|
@ -42,10 +42,10 @@ export class QMHandler extends BaseHandler<DataIngestionServices> {
|
||||||
/**
|
/**
|
||||||
* SESSIONS
|
* SESSIONS
|
||||||
*/
|
*/
|
||||||
@ScheduledOperation('check-sessions', '*/2 * * * *', {
|
@ScheduledOperation('check-sessions', '*/1 * * * *', {
|
||||||
priority: 8,
|
priority: 8,
|
||||||
immediately: false,
|
immediately: true,
|
||||||
description: 'Check and maintain QM sessions every 2 minutes',
|
description: 'Check and maintain QM sessions every 1 minutes',
|
||||||
})
|
})
|
||||||
checkSessions = checkSessions;
|
checkSessions = checkSessions;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -42,9 +42,9 @@ export const QM_CONFIG = {
|
||||||
|
|
||||||
// Session management settings
|
// Session management settings
|
||||||
export const SESSION_CONFIG = {
|
export const SESSION_CONFIG = {
|
||||||
MIN_SESSIONS: 2,
|
MIN_SESSIONS: 20,
|
||||||
MAX_SESSIONS: 2,
|
MAX_SESSIONS: 100,
|
||||||
MAX_FAILED_CALLS: 3,
|
MAX_FAILED_CALLS: 5,
|
||||||
SESSION_TIMEOUT: 5000, // 10 seconds
|
SESSION_TIMEOUT: 5000, // 10 seconds
|
||||||
API_TIMEOUT: 30000, // 15 seconds
|
API_TIMEOUT: 30000, // 15 seconds
|
||||||
} as const;
|
} as const;
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,6 @@
|
||||||
export * from './config';
|
export * from './config';
|
||||||
export * from './session-manager';
|
export * from './session-manager';
|
||||||
|
export * from './session-manager-redis';
|
||||||
|
export * from './session-manager-wrapper';
|
||||||
export * from './types';
|
export * from './types';
|
||||||
export * from './operation-tracker';
|
export * from './operation-tracker';
|
||||||
|
|
@ -0,0 +1,382 @@
|
||||||
|
/**
|
||||||
|
* QM Session Manager - Redis-based implementation for cluster support
|
||||||
|
* Uses Redis as the single source of truth for session management
|
||||||
|
*/
|
||||||
|
|
||||||
|
import type { CacheProvider } from '@stock-bot/cache';
|
||||||
|
import type { Logger } from '@stock-bot/types';
|
||||||
|
import { QM_SESSION_IDS, SESSION_CONFIG } from './config';
|
||||||
|
import type { QMSession } from './types';
|
||||||
|
|
||||||
|
export class QMSessionManagerRedis {
|
||||||
|
private static instance: QMSessionManagerRedis | null = null;
|
||||||
|
private cacheProvider: CacheProvider | null = null;
|
||||||
|
private logger: Logger | null = null;
|
||||||
|
private readonly SESSION_TTL = 86400; // 24 hours
|
||||||
|
|
||||||
|
private constructor() {}
|
||||||
|
|
||||||
|
static getInstance(): QMSessionManagerRedis {
|
||||||
|
if (!QMSessionManagerRedis.instance) {
|
||||||
|
QMSessionManagerRedis.instance = new QMSessionManagerRedis();
|
||||||
|
}
|
||||||
|
return QMSessionManagerRedis.instance;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize with cache provider and logger
|
||||||
|
*/
|
||||||
|
initialize(cache?: CacheProvider, logger?: Logger): void {
|
||||||
|
if (cache) {
|
||||||
|
this.cacheProvider = cache;
|
||||||
|
}
|
||||||
|
if (logger) {
|
||||||
|
this.logger = logger;
|
||||||
|
this.logger.trace('Redis-based SessionManager initialized');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a random valid session for the given session ID
|
||||||
|
*/
|
||||||
|
async getSession(sessionId: string): Promise<QMSession | null> {
|
||||||
|
if (!this.cacheProvider) {
|
||||||
|
this.logger?.error('No cache provider available');
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Get all session UUIDs for this sessionId
|
||||||
|
const sessionUuids = await this.getSessionUuids(sessionId);
|
||||||
|
|
||||||
|
if (sessionUuids.length === 0) {
|
||||||
|
this.logger?.debug(`No sessions found for ${sessionId}`);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Shuffle and try to find a valid session
|
||||||
|
const shuffled = [...sessionUuids].sort(() => Math.random() - 0.5);
|
||||||
|
|
||||||
|
for (const uuid of shuffled) {
|
||||||
|
const session = await this.getSessionByUuid(sessionId, uuid);
|
||||||
|
if (session && (session.failedCalls || 0) <= SESSION_CONFIG.MAX_FAILED_CALLS) {
|
||||||
|
this.logger?.trace(`Selected session`, {
|
||||||
|
uuid: session.uuid,
|
||||||
|
failedCalls: session.failedCalls,
|
||||||
|
successfulCalls: session.successfulCalls,
|
||||||
|
});
|
||||||
|
return session;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger?.warn(`No valid sessions found for ${sessionId}`);
|
||||||
|
return null;
|
||||||
|
} catch (error) {
|
||||||
|
this.logger?.error('Failed to get session', { error, sessionId });
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a specific session by UUID
|
||||||
|
*/
|
||||||
|
async getSessionByUuid(sessionId: string, uuid: string): Promise<QMSession | null> {
|
||||||
|
if (!this.cacheProvider) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
const sessionKey = this.getSessionKey(sessionId, uuid);
|
||||||
|
try {
|
||||||
|
const session = await this.cacheProvider.get<QMSession>(sessionKey);
|
||||||
|
if (session) {
|
||||||
|
// Ensure dates are Date objects
|
||||||
|
session.lastUsed = new Date(session.lastUsed);
|
||||||
|
session.createdAt = new Date(session.createdAt);
|
||||||
|
}
|
||||||
|
return session;
|
||||||
|
} catch (error) {
|
||||||
|
this.logger?.error('Failed to get session by UUID', { error, sessionId, uuid });
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a session
|
||||||
|
*/
|
||||||
|
async addSession(sessionId: string, session: QMSession): Promise<void> {
|
||||||
|
if (!this.cacheProvider) {
|
||||||
|
throw new Error('No cache provider available');
|
||||||
|
}
|
||||||
|
|
||||||
|
const sessionKey = this.getSessionKey(sessionId, session.uuid);
|
||||||
|
const listKey = this.getSessionListKey(sessionId);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Store the session data
|
||||||
|
await this.cacheProvider.set(sessionKey, session, this.SESSION_TTL);
|
||||||
|
|
||||||
|
// Add UUID to the session list (using SET to simulate SADD)
|
||||||
|
const uuids = await this.getSessionUuids(sessionId);
|
||||||
|
if (!uuids.includes(session.uuid)) {
|
||||||
|
uuids.push(session.uuid);
|
||||||
|
await this.cacheProvider.set(listKey, uuids, this.SESSION_TTL);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger?.debug(`Added session ${session.uuid} to ${sessionId}`);
|
||||||
|
} catch (error) {
|
||||||
|
this.logger?.error('Failed to add session', { error, sessionId, uuid: session.uuid });
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove a session
|
||||||
|
*/
|
||||||
|
async removeSession(sessionId: string, uuid: string): Promise<void> {
|
||||||
|
if (!this.cacheProvider) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const sessionKey = this.getSessionKey(sessionId, uuid);
|
||||||
|
const listKey = this.getSessionListKey(sessionId);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Remove session data
|
||||||
|
await this.cacheProvider.del(sessionKey);
|
||||||
|
|
||||||
|
// Remove UUID from list
|
||||||
|
const uuids = await this.getSessionUuids(sessionId);
|
||||||
|
const filtered = uuids.filter(u => u !== uuid);
|
||||||
|
if (filtered.length !== uuids.length) {
|
||||||
|
await this.cacheProvider.set(listKey, filtered, this.SESSION_TTL);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger?.debug(`Removed session ${uuid} from ${sessionId}`);
|
||||||
|
} catch (error) {
|
||||||
|
this.logger?.error('Failed to remove session', { error, sessionId, uuid });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get all sessions for a session ID
|
||||||
|
*/
|
||||||
|
async getSessions(sessionId: string): Promise<QMSession[]> {
|
||||||
|
if (!this.cacheProvider) {
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
|
||||||
|
const uuids = await this.getSessionUuids(sessionId);
|
||||||
|
const sessions: QMSession[] = [];
|
||||||
|
|
||||||
|
for (const uuid of uuids) {
|
||||||
|
const session = await this.getSessionByUuid(sessionId, uuid);
|
||||||
|
if (session) {
|
||||||
|
sessions.push(session);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return sessions;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get session count
|
||||||
|
*/
|
||||||
|
async getSessionCount(sessionId?: string): Promise<number> {
|
||||||
|
if (!this.cacheProvider) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (sessionId) {
|
||||||
|
const uuids = await this.getSessionUuids(sessionId);
|
||||||
|
return uuids.length;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Count all sessions
|
||||||
|
let total = 0;
|
||||||
|
for (const sid of Object.values(QM_SESSION_IDS)) {
|
||||||
|
const uuids = await this.getSessionUuids(sid);
|
||||||
|
total += uuids.length;
|
||||||
|
}
|
||||||
|
return total;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clean up failed sessions
|
||||||
|
*/
|
||||||
|
async cleanupFailedSessions(): Promise<number> {
|
||||||
|
if (!this.cacheProvider) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
let removedCount = 0;
|
||||||
|
|
||||||
|
for (const sessionId of Object.values(QM_SESSION_IDS)) {
|
||||||
|
const sessions = await this.getSessions(sessionId);
|
||||||
|
|
||||||
|
for (const session of sessions) {
|
||||||
|
if (session.failedCalls > SESSION_CONFIG.MAX_FAILED_CALLS) {
|
||||||
|
await this.removeSession(sessionId, session.uuid);
|
||||||
|
removedCount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger?.info(`Cleaned up ${removedCount} failed sessions`);
|
||||||
|
return removedCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if more sessions are needed
|
||||||
|
*/
|
||||||
|
async needsMoreSessions(sessionId: string): Promise<boolean> {
|
||||||
|
const sessions = await this.getSessions(sessionId);
|
||||||
|
const validSessions = sessions.filter(
|
||||||
|
s => (s.failedCalls || 0) <= SESSION_CONFIG.MAX_FAILED_CALLS
|
||||||
|
);
|
||||||
|
return validSessions.length < SESSION_CONFIG.MAX_SESSIONS;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get statistics
|
||||||
|
*/
|
||||||
|
async getStats(): Promise<Record<string, { total: number; valid: number; failed: number }>> {
|
||||||
|
const stats: Record<string, { total: number; valid: number; failed: number }> = {};
|
||||||
|
|
||||||
|
for (const [sessionType, sessionId] of Object.entries(QM_SESSION_IDS)) {
|
||||||
|
const sessions = await this.getSessions(sessionId);
|
||||||
|
const validSessions = sessions.filter(
|
||||||
|
s => (s.failedCalls || 0) <= SESSION_CONFIG.MAX_FAILED_CALLS
|
||||||
|
);
|
||||||
|
const failedSessions = sessions.filter(
|
||||||
|
s => s.failedCalls > SESSION_CONFIG.MAX_FAILED_CALLS
|
||||||
|
);
|
||||||
|
|
||||||
|
stats[sessionId] = {
|
||||||
|
total: sessions.length,
|
||||||
|
valid: validSessions.length,
|
||||||
|
failed: failedSessions.length,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
return stats;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Increment failed calls for a session
|
||||||
|
*/
|
||||||
|
async incrementFailedCalls(sessionId: string, sessionUuid: string): Promise<void> {
|
||||||
|
const session = await this.getSessionByUuid(sessionId, sessionUuid);
|
||||||
|
|
||||||
|
if (session) {
|
||||||
|
session.failedCalls = (session.failedCalls || 0) + 1;
|
||||||
|
session.lastUsed = new Date();
|
||||||
|
|
||||||
|
const sessionKey = this.getSessionKey(sessionId, sessionUuid);
|
||||||
|
await this.cacheProvider?.set(sessionKey, session, this.SESSION_TTL);
|
||||||
|
|
||||||
|
this.logger?.debug(`Incremented failed calls for session`, {
|
||||||
|
sessionUuid,
|
||||||
|
failedCalls: session.failedCalls,
|
||||||
|
sessionId
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
this.logger?.warn(`Session not found`, { sessionUuid, sessionId });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Increment successful calls for a session
|
||||||
|
*/
|
||||||
|
async incrementSuccessfulCalls(sessionId: string, sessionUuid: string): Promise<void> {
|
||||||
|
const session = await this.getSessionByUuid(sessionId, sessionUuid);
|
||||||
|
|
||||||
|
if (session) {
|
||||||
|
session.successfulCalls = (session.successfulCalls || 0) + 1;
|
||||||
|
session.lastUsed = new Date();
|
||||||
|
|
||||||
|
const sessionKey = this.getSessionKey(sessionId, sessionUuid);
|
||||||
|
await this.cacheProvider?.set(sessionKey, session, this.SESSION_TTL);
|
||||||
|
|
||||||
|
this.logger?.debug(`Incremented successful calls for session`, {
|
||||||
|
sessionUuid,
|
||||||
|
successfulCalls: session.successfulCalls,
|
||||||
|
sessionId
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
this.logger?.warn(`Session not found`, { sessionUuid, sessionId });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper: Get session UUIDs from cache
|
||||||
|
*/
|
||||||
|
private async getSessionUuids(sessionId: string): Promise<string[]> {
|
||||||
|
if (!this.cacheProvider) {
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
|
||||||
|
const listKey = this.getSessionListKey(sessionId);
|
||||||
|
try {
|
||||||
|
const uuids = await this.cacheProvider.get<string[]>(listKey);
|
||||||
|
return uuids || [];
|
||||||
|
} catch (error) {
|
||||||
|
this.logger?.error('Failed to get session UUIDs', { error, sessionId });
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper: Generate cache keys
|
||||||
|
*/
|
||||||
|
private getSessionKey(sessionId: string, uuid: string): string {
|
||||||
|
return `qm:sessions:${sessionId}:session:${uuid}`;
|
||||||
|
}
|
||||||
|
|
||||||
|
private getSessionListKey(sessionId: string): string {
|
||||||
|
return `qm:sessions:${sessionId}:list`;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Migrate from old session manager (for backward compatibility)
|
||||||
|
*/
|
||||||
|
async migrateFromOldFormat(): Promise<void> {
|
||||||
|
if (!this.cacheProvider) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger?.info('Starting migration from old session format');
|
||||||
|
|
||||||
|
for (const [sessionType, sessionId] of Object.entries(QM_SESSION_IDS)) {
|
||||||
|
const oldListKey = `qm:sessions:${sessionType.toLowerCase()}:list`;
|
||||||
|
const oldSessionIds = await this.cacheProvider.get<string[]>(oldListKey);
|
||||||
|
|
||||||
|
if (oldSessionIds && Array.isArray(oldSessionIds)) {
|
||||||
|
this.logger?.info(`Migrating ${oldSessionIds.length} sessions for ${sessionType}`);
|
||||||
|
|
||||||
|
for (const id of oldSessionIds) {
|
||||||
|
const oldSessionKey = `qm:sessions:${sessionType.toLowerCase()}:${id}`;
|
||||||
|
const oldSession = await this.cacheProvider.get<any>(oldSessionKey);
|
||||||
|
|
||||||
|
if (oldSession && oldSession.uuid) {
|
||||||
|
const session: QMSession = {
|
||||||
|
uuid: oldSession.uuid,
|
||||||
|
proxy: oldSession.proxy,
|
||||||
|
headers: oldSession.headers,
|
||||||
|
successfulCalls: oldSession.successfulCalls || 0,
|
||||||
|
failedCalls: oldSession.failedCalls || 0,
|
||||||
|
lastUsed: new Date(oldSession.lastUsed),
|
||||||
|
createdAt: new Date(oldSession.createdAt || new Date()),
|
||||||
|
};
|
||||||
|
|
||||||
|
await this.addSession(sessionId, session);
|
||||||
|
await this.cacheProvider.del(oldSessionKey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.cacheProvider.del(oldListKey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger?.info('Migration completed');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,128 @@
|
||||||
|
/**
|
||||||
|
* Session Manager Wrapper - Provides a unified interface that can switch between implementations
|
||||||
|
*/
|
||||||
|
|
||||||
|
import type { CacheProvider } from '@stock-bot/cache';
|
||||||
|
import type { Logger } from '@stock-bot/types';
|
||||||
|
import { QMSessionManager } from './session-manager';
|
||||||
|
import { QMSessionManagerRedis } from './session-manager-redis';
|
||||||
|
import type { QMSession } from './types';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrapper that delegates to either in-memory or Redis-based implementation
|
||||||
|
* Set REDIS_SESSION_MANAGER=true to use Redis implementation
|
||||||
|
*/
|
||||||
|
export class SessionManagerWrapper {
|
||||||
|
private implementation: QMSessionManager | QMSessionManagerRedis;
|
||||||
|
private static instance: SessionManagerWrapper | null = null;
|
||||||
|
|
||||||
|
private constructor() {
|
||||||
|
// Use Redis implementation if env var is set or if in production
|
||||||
|
const useRedis = process.env.REDIS_SESSION_MANAGER === 'true' ||
|
||||||
|
process.env.NODE_ENV === 'production';
|
||||||
|
|
||||||
|
if (useRedis) {
|
||||||
|
this.implementation = QMSessionManagerRedis.getInstance();
|
||||||
|
} else {
|
||||||
|
this.implementation = QMSessionManager.getInstance();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static getInstance(): SessionManagerWrapper {
|
||||||
|
if (!SessionManagerWrapper.instance) {
|
||||||
|
SessionManagerWrapper.instance = new SessionManagerWrapper();
|
||||||
|
}
|
||||||
|
return SessionManagerWrapper.instance;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reset instance (for testing)
|
||||||
|
*/
|
||||||
|
static resetInstance(): void {
|
||||||
|
SessionManagerWrapper.instance = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize with cache provider and logger
|
||||||
|
*/
|
||||||
|
initialize(cache?: CacheProvider, logger?: Logger): void {
|
||||||
|
this.implementation.initialize(cache, logger);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the implementation type
|
||||||
|
*/
|
||||||
|
getImplementationType(): string {
|
||||||
|
return this.implementation instanceof QMSessionManagerRedis ? 'redis' : 'memory';
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delegate all methods to the implementation
|
||||||
|
async getSession(sessionId: string): Promise<QMSession | null> {
|
||||||
|
return this.implementation.getSession(sessionId);
|
||||||
|
}
|
||||||
|
|
||||||
|
async getSessionByUuid(sessionId: string, uuid: string): Promise<QMSession | null> {
|
||||||
|
if (this.implementation instanceof QMSessionManagerRedis) {
|
||||||
|
return this.implementation.getSessionByUuid(sessionId, uuid);
|
||||||
|
} else {
|
||||||
|
return this.implementation.getSessionByUuid(sessionId, uuid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async addSession(sessionId: string, session: QMSession): Promise<void> {
|
||||||
|
return this.implementation.addSession(sessionId, session);
|
||||||
|
}
|
||||||
|
|
||||||
|
async getSessions(sessionId: string): Promise<QMSession[]> {
|
||||||
|
if (this.implementation instanceof QMSessionManagerRedis) {
|
||||||
|
return this.implementation.getSessions(sessionId);
|
||||||
|
} else {
|
||||||
|
return this.implementation.getSessions(sessionId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async getSessionCount(sessionId?: string): Promise<number> {
|
||||||
|
if (this.implementation instanceof QMSessionManagerRedis) {
|
||||||
|
return this.implementation.getSessionCount(sessionId);
|
||||||
|
} else {
|
||||||
|
return this.implementation.getSessionCount();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async cleanupFailedSessions(): Promise<number> {
|
||||||
|
return this.implementation.cleanupFailedSessions();
|
||||||
|
}
|
||||||
|
|
||||||
|
async needsMoreSessions(sessionId: string): Promise<boolean> {
|
||||||
|
return this.implementation.needsMoreSessions(sessionId);
|
||||||
|
}
|
||||||
|
|
||||||
|
async getStats(): Promise<Record<string, { total: number; valid: number; failed: number }>> {
|
||||||
|
return this.implementation.getStats();
|
||||||
|
}
|
||||||
|
|
||||||
|
async incrementFailedCalls(sessionId: string, sessionUuid: string): Promise<void> {
|
||||||
|
return this.implementation.incrementFailedCalls(sessionId, sessionUuid);
|
||||||
|
}
|
||||||
|
|
||||||
|
async incrementSuccessfulCalls(sessionId: string, sessionUuid: string): Promise<void> {
|
||||||
|
return this.implementation.incrementSuccessfulCalls(sessionId, sessionUuid);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Additional methods for memory implementation
|
||||||
|
isAtCapacity(sessionId: string): boolean {
|
||||||
|
if (this.implementation instanceof QMSessionManager) {
|
||||||
|
return this.implementation.isAtCapacity(sessionId);
|
||||||
|
}
|
||||||
|
// For Redis, we'd need to implement this async
|
||||||
|
throw new Error('isAtCapacity not available for Redis implementation');
|
||||||
|
}
|
||||||
|
|
||||||
|
// Migration helper
|
||||||
|
async migrateToRedis(): Promise<void> {
|
||||||
|
if (this.implementation instanceof QMSessionManagerRedis) {
|
||||||
|
return this.implementation.migrateFromOldFormat();
|
||||||
|
}
|
||||||
|
throw new Error('Migration only available for Redis implementation');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -9,37 +9,29 @@ import type { CachedSession, QMSession } from './types';
|
||||||
|
|
||||||
export class QMSessionManager {
|
export class QMSessionManager {
|
||||||
private static instance: QMSessionManager | null = null;
|
private static instance: QMSessionManager | null = null;
|
||||||
private sessionCache: Record<string, QMSession[]> = {};
|
private cacheProvider?: CacheProvider;
|
||||||
private isInitialized = false;
|
private logger?: Logger;
|
||||||
private cacheProvider: CacheProvider | null = null;
|
|
||||||
private logger: Logger | null = null;
|
|
||||||
|
|
||||||
private constructor() {
|
private constructor() {}
|
||||||
// Initialize session cache with known session IDs
|
|
||||||
Object.values(QM_SESSION_IDS).forEach(sessionId => {
|
|
||||||
this.sessionCache[sessionId] = [];
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
static getInstance(): QMSessionManager {
|
static getInstance(): QMSessionManager {
|
||||||
if (!QMSessionManager.instance) {
|
if (!this.instance) {
|
||||||
QMSessionManager.instance = new QMSessionManager();
|
this.instance = new QMSessionManager();
|
||||||
}
|
}
|
||||||
return QMSessionManager.instance;
|
return this.instance;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reset the singleton instance (for testing only)
|
* Reset the singleton instance (mainly for testing)
|
||||||
*/
|
*/
|
||||||
static resetInstance(): void {
|
static resetInstance(): void {
|
||||||
if (QMSessionManager.instance?.logger) {
|
if (this.instance) {
|
||||||
QMSessionManager.instance.logger.warn('Resetting QMSessionManager instance - this should only be used for testing');
|
this.instance = null;
|
||||||
}
|
}
|
||||||
QMSessionManager.instance = null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the cache provider for persistence
|
* Set the cache provider
|
||||||
*/
|
*/
|
||||||
setCacheProvider(cache: CacheProvider): void {
|
setCacheProvider(cache: CacheProvider): void {
|
||||||
this.cacheProvider = cache;
|
this.cacheProvider = cache;
|
||||||
|
|
@ -50,13 +42,12 @@ export class QMSessionManager {
|
||||||
*/
|
*/
|
||||||
setLogger(logger: Logger): void {
|
setLogger(logger: Logger): void {
|
||||||
this.logger = logger;
|
this.logger = logger;
|
||||||
this.logger.trace('Logger set for QMSessionManager');
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initialize with cache provider and logger
|
* Initialize the session manager with cache and logger
|
||||||
*/
|
*/
|
||||||
initialize(cache?: CacheProvider, logger?: Logger): void {
|
async initialize(cache?: CacheProvider, logger?: Logger): Promise<void> {
|
||||||
if (cache) {
|
if (cache) {
|
||||||
this.setCacheProvider(cache);
|
this.setCacheProvider(cache);
|
||||||
}
|
}
|
||||||
|
|
@ -66,229 +57,119 @@ export class QMSessionManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a random session for the given session ID
|
* Get an active session for the given session ID
|
||||||
|
* Implements round-robin selection with health checks
|
||||||
*/
|
*/
|
||||||
async getSession(sessionId: string): Promise<QMSession | null> {
|
async getSession(sessionId: string): Promise<QMSession | null> {
|
||||||
let retries = 3;
|
if (!this.cacheProvider) {
|
||||||
let session: QMSession | null = null;
|
this.logger?.error('No cache provider available');
|
||||||
|
|
||||||
while (retries > 0 && !session) {
|
|
||||||
// Always load fresh data from cache
|
|
||||||
await this.loadFromCache();
|
|
||||||
|
|
||||||
const sessions = this.sessionCache[sessionId];
|
|
||||||
|
|
||||||
if (!sessions || sessions.length === 0) {
|
|
||||||
retries--;
|
|
||||||
if (retries > 0) {
|
|
||||||
this.logger?.debug(`No sessions found for ${sessionId}, retrying... (${retries} attempts left)`);
|
|
||||||
await new Promise(resolve => setTimeout(resolve, 500));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.logger?.error(`No sessions found for sessionId: ${sessionId}`, {
|
|
||||||
availableSessionIds: Object.keys(this.sessionCache),
|
|
||||||
sessionCounts: Object.entries(this.sessionCache).map(([id, s]) => ({ id, count: s.length })),
|
|
||||||
});
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Filter out sessions with excessive failures
|
try {
|
||||||
|
// Get all sessions for this session type
|
||||||
|
const sessions = await this.getSessions(sessionId);
|
||||||
|
|
||||||
|
// Filter out failed sessions
|
||||||
const validSessions = sessions.filter(
|
const validSessions = sessions.filter(
|
||||||
session => session.failedCalls <= SESSION_CONFIG.MAX_FAILED_CALLS
|
session => session.failedCalls <= SESSION_CONFIG.MAX_FAILED_CALLS
|
||||||
);
|
);
|
||||||
|
|
||||||
if (validSessions.length === 0) {
|
this.logger?.trace(`Found ${validSessions.length} valid sessions for ${sessionId}`, {
|
||||||
retries--;
|
sessionId,
|
||||||
if (retries > 0) {
|
totalSessions: sessions.length,
|
||||||
this.logger?.debug(`No valid sessions after filtering, retrying... (${retries} attempts left)`);
|
validSessions: validSessions.length,
|
||||||
await new Promise(resolve => setTimeout(resolve, 500));
|
});
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.logger?.error(`No valid sessions after filtering for sessionId: ${sessionId}`, {
|
if (validSessions.length === 0) {
|
||||||
|
this.logger?.warn(`No valid sessions available for ${sessionId}`, {
|
||||||
|
sessionId,
|
||||||
totalSessions: sessions.length,
|
totalSessions: sessions.length,
|
||||||
maxFailedCalls: SESSION_CONFIG.MAX_FAILED_CALLS,
|
maxFailedCalls: SESSION_CONFIG.MAX_FAILED_CALLS,
|
||||||
});
|
});
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
session = validSessions[Math.floor(Math.random() * validSessions.length)] || null;
|
// Sort by least recently used
|
||||||
this.logger?.trace(`Selected session`, {
|
validSessions.sort((a, b) =>
|
||||||
uuid: session?.uuid || 'null',
|
new Date(a.lastUsed).getTime() - new Date(b.lastUsed).getTime()
|
||||||
failedCalls: session?.failedCalls || 'null',
|
);
|
||||||
successfulCalls: session?.successfulCalls || 'null',
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
return session;
|
// Return the least recently used session
|
||||||
|
return validSessions[0] || null;
|
||||||
|
} catch (error) {
|
||||||
|
this.logger?.error('Failed to get session', { error, sessionId });
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a specific session by UUID
|
* Get a session by its UUID
|
||||||
*/
|
*/
|
||||||
getSessionByUuid(sessionId: string, uuid: string): QMSession | null {
|
async getSessionByUuid(sessionId: string, uuid: string): Promise<QMSession | null> {
|
||||||
const sessions = this.sessionCache[sessionId] || [];
|
const sessions = await this.getSessions(sessionId);
|
||||||
return sessions.find(s => s.uuid === uuid) || null;
|
return sessions.find(s => s.uuid === uuid) || null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add a session to the cache
|
* Add a new session
|
||||||
*/
|
*/
|
||||||
async addSession(sessionId: string, session: QMSession): Promise<void> {
|
async addSession(sessionId: string, session: QMSession): Promise<void> {
|
||||||
// Load latest from cache first to avoid overwriting other service's changes
|
if (!this.cacheProvider) {
|
||||||
await this.loadFromCache();
|
this.logger?.error('No cache provider available');
|
||||||
|
return;
|
||||||
if (!this.sessionCache[sessionId]) {
|
|
||||||
this.sessionCache[sessionId] = [];
|
|
||||||
}
|
}
|
||||||
this.sessionCache[sessionId].push(session);
|
|
||||||
|
|
||||||
this.logger?.debug(`Added session ${session.uuid} to ${sessionId}`, {
|
try {
|
||||||
sessionId,
|
// Get session type from session ID
|
||||||
uuid: session.uuid
|
const sessionType = Object.entries(QM_SESSION_IDS).find(([_, id]) => id === sessionId)?.[0] || 'UNKNOWN';
|
||||||
});
|
|
||||||
|
|
||||||
// Sync to cache immediately
|
// Store the session with a unique key
|
||||||
await this.syncToCache();
|
const sessionKey = `qm:sessions:${sessionType.toLowerCase()}:${session.uuid}`;
|
||||||
|
const cachedSession: CachedSession = {
|
||||||
|
...session,
|
||||||
|
lastUsed: session.lastUsed instanceof Date ? session.lastUsed.toISOString() : session.lastUsed,
|
||||||
|
createdAt: session.createdAt instanceof Date ? session.createdAt.toISOString() : session.createdAt,
|
||||||
|
id: session.uuid,
|
||||||
|
sessionType,
|
||||||
|
} as any;
|
||||||
|
|
||||||
|
await this.cacheProvider.set(sessionKey, cachedSession, 86400); // 24 hour TTL
|
||||||
|
|
||||||
|
// Update the session list
|
||||||
|
const listKey = `qm:sessions:${sessionType.toLowerCase()}:list`;
|
||||||
|
const sessionIds = await this.cacheProvider.get<string[]>(listKey) || [];
|
||||||
|
if (!sessionIds.includes(session.uuid)) {
|
||||||
|
sessionIds.push(session.uuid);
|
||||||
|
await this.cacheProvider.set(listKey, sessionIds, 86400);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger?.info(`Added new session`, { sessionId, uuid: session.uuid });
|
||||||
|
} catch (error) {
|
||||||
|
this.logger?.error('Failed to add session', { error, sessionId });
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get all sessions for a session ID
|
* Get all sessions for a session ID
|
||||||
*/
|
*/
|
||||||
getSessions(sessionId: string): QMSession[] {
|
async getSessions(sessionId: string): Promise<QMSession[]> {
|
||||||
return this.sessionCache[sessionId] || [];
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get session count for all session IDs
|
|
||||||
*/
|
|
||||||
getSessionCount(): number {
|
|
||||||
return Object.values(this.sessionCache).reduce((total, sessions) => total + sessions.length, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Clean up failed sessions
|
|
||||||
*/
|
|
||||||
async cleanupFailedSessions(): Promise<number> {
|
|
||||||
// Always load latest from cache first
|
|
||||||
await this.loadFromCache();
|
|
||||||
|
|
||||||
let removedCount = 0;
|
|
||||||
|
|
||||||
Object.keys(this.sessionCache).forEach(sessionId => {
|
|
||||||
const initialCount = this.sessionCache[sessionId]?.length;
|
|
||||||
if (!initialCount || this.sessionCache[sessionId] === undefined) {
|
|
||||||
this.logger?.trace(`No sessions to clean up for ${sessionId}`);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.sessionCache[sessionId] = this.sessionCache[sessionId]?.filter(
|
|
||||||
session => session.failedCalls <= SESSION_CONFIG.MAX_FAILED_CALLS
|
|
||||||
);
|
|
||||||
removedCount += initialCount - this.sessionCache[sessionId].length;
|
|
||||||
});
|
|
||||||
|
|
||||||
// Sync back to cache after cleanup
|
|
||||||
await this.syncToCache();
|
|
||||||
|
|
||||||
return removedCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Check if more sessions are needed for a session ID
|
|
||||||
*/
|
|
||||||
async needsMoreSessions(sessionId: string): Promise<boolean> {
|
|
||||||
// Always load fresh data from cache
|
|
||||||
await this.loadFromCache();
|
|
||||||
|
|
||||||
const sessions = this.sessionCache[sessionId] || [];
|
|
||||||
const validSessions = sessions.filter(
|
|
||||||
session => session.failedCalls <= SESSION_CONFIG.MAX_FAILED_CALLS
|
|
||||||
);
|
|
||||||
return validSessions.length < SESSION_CONFIG.MAX_SESSIONS;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Check if session ID is at capacity
|
|
||||||
*/
|
|
||||||
isAtCapacity(sessionId: string): boolean {
|
|
||||||
const sessions = this.sessionCache[sessionId] || [];
|
|
||||||
return sessions.length >= SESSION_CONFIG.MAX_SESSIONS;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get session cache statistics
|
|
||||||
*/
|
|
||||||
getStats() {
|
|
||||||
const stats: Record<string, { total: number; valid: number; failed: number }> = {};
|
|
||||||
|
|
||||||
Object.entries(this.sessionCache).forEach(([sessionId, sessions]) => {
|
|
||||||
const validSessions = sessions.filter(
|
|
||||||
session => session.failedCalls <= SESSION_CONFIG.MAX_FAILED_CALLS
|
|
||||||
);
|
|
||||||
const failedSessions = sessions.filter(
|
|
||||||
session => session.failedCalls > SESSION_CONFIG.MAX_FAILED_CALLS
|
|
||||||
);
|
|
||||||
|
|
||||||
stats[sessionId] = {
|
|
||||||
total: sessions.length,
|
|
||||||
valid: validSessions.length,
|
|
||||||
failed: failedSessions.length,
|
|
||||||
};
|
|
||||||
});
|
|
||||||
|
|
||||||
return stats;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Mark manager as initialized (deprecated - we always load from cache now)
|
|
||||||
*/
|
|
||||||
setInitialized(initialized: boolean = true): void {
|
|
||||||
this.isInitialized = initialized;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Check if manager is initialized (deprecated - we always load from cache now)
|
|
||||||
*/
|
|
||||||
getInitialized(): boolean {
|
|
||||||
return this.isInitialized;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Load sessions from cache
|
|
||||||
*/
|
|
||||||
async loadFromCache(): Promise<void> {
|
|
||||||
if (!this.cacheProvider) {
|
if (!this.cacheProvider) {
|
||||||
this.logger?.warn('No cache provider available for loading sessions');
|
return [];
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
this.logger?.trace('Loading sessions from cache...');
|
const sessionType = Object.entries(QM_SESSION_IDS).find(([_, id]) => id === sessionId)?.[0] || 'UNKNOWN';
|
||||||
|
|
||||||
// Load sessions for each session type
|
|
||||||
for (const [sessionType, sessionId] of Object.entries(QM_SESSION_IDS)) {
|
|
||||||
const listKey = `qm:sessions:${sessionType.toLowerCase()}:list`;
|
const listKey = `qm:sessions:${sessionType.toLowerCase()}:list`;
|
||||||
const sessionIds = await this.cacheProvider.get<string[]>(listKey);
|
const sessionIds = await this.cacheProvider.get<string[]>(listKey) || [];
|
||||||
|
|
||||||
this.logger?.trace(`Loading ${sessionType} sessions`, {
|
|
||||||
sessionType,
|
|
||||||
sessionId,
|
|
||||||
listKey,
|
|
||||||
sessionIds
|
|
||||||
});
|
|
||||||
|
|
||||||
if (sessionIds && Array.isArray(sessionIds)) {
|
|
||||||
const sessions: QMSession[] = [];
|
const sessions: QMSession[] = [];
|
||||||
|
for (const uuid of sessionIds) {
|
||||||
for (const id of sessionIds) {
|
const sessionKey = `qm:sessions:${sessionType.toLowerCase()}:${uuid}`;
|
||||||
const sessionKey = `qm:sessions:${sessionType.toLowerCase()}:${id}`;
|
|
||||||
const cachedSession = await this.cacheProvider.get<CachedSession>(sessionKey);
|
const cachedSession = await this.cacheProvider.get<CachedSession>(sessionKey);
|
||||||
|
|
||||||
if (cachedSession) {
|
if (cachedSession) {
|
||||||
const session = {
|
sessions.push({
|
||||||
uuid: cachedSession.uuid,
|
uuid: cachedSession.uuid,
|
||||||
proxy: cachedSession.proxy,
|
proxy: cachedSession.proxy,
|
||||||
headers: cachedSession.headers,
|
headers: cachedSession.headers,
|
||||||
|
|
@ -296,148 +177,175 @@ export class QMSessionManager {
|
||||||
failedCalls: cachedSession.failedCalls || 0,
|
failedCalls: cachedSession.failedCalls || 0,
|
||||||
lastUsed: new Date(cachedSession.lastUsed),
|
lastUsed: new Date(cachedSession.lastUsed),
|
||||||
createdAt: cachedSession.createdAt ? new Date(cachedSession.createdAt) : new Date(),
|
createdAt: cachedSession.createdAt ? new Date(cachedSession.createdAt) : new Date(),
|
||||||
};
|
|
||||||
|
|
||||||
this.logger?.trace(`Loaded session ${id}`, {
|
|
||||||
uuid: session.uuid,
|
|
||||||
successfulCalls: session.successfulCalls,
|
|
||||||
failedCalls: session.failedCalls
|
|
||||||
});
|
});
|
||||||
|
|
||||||
sessions.push(session);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
this.sessionCache[sessionId] = sessions;
|
return sessions;
|
||||||
this.logger?.debug(`Loaded ${sessions.length} sessions for ${sessionType}`);
|
|
||||||
} else {
|
|
||||||
this.logger?.trace(`No sessions found for ${sessionType}`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
this.logger?.error('Failed to load sessions from cache', { error });
|
this.logger?.error('Failed to get sessions', { error, sessionId });
|
||||||
|
return [];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sync sessions to cache
|
* Get session count for a session ID
|
||||||
*/
|
*/
|
||||||
async syncToCache(): Promise<void> {
|
async getSessionCount(sessionId: string): Promise<number> {
|
||||||
if (!this.cacheProvider) {
|
const sessions = await this.getSessions(sessionId);
|
||||||
return;
|
return sessions.filter(s => (s.failedCalls || 0) <= SESSION_CONFIG.MAX_FAILED_CALLS).length;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanup failed sessions
|
||||||
|
*/
|
||||||
|
async cleanupFailedSessions(): Promise<number> {
|
||||||
|
if (!this.cacheProvider) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
let removedCount = 0;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
for (const [sessionType, sessionId] of Object.entries(QM_SESSION_IDS)) {
|
for (const [sessionType, sessionId] of Object.entries(QM_SESSION_IDS)) {
|
||||||
const sessions = this.sessionCache[sessionId] || [];
|
const sessions = await this.getSessions(sessionId);
|
||||||
const sessionIds: string[] = [];
|
const listKey = `qm:sessions:${sessionType.toLowerCase()}:list`;
|
||||||
|
const updatedList: string[] = [];
|
||||||
|
|
||||||
// Clear old sessions first
|
for (const session of sessions) {
|
||||||
const oldListKey = `qm:sessions:${sessionType.toLowerCase()}:list`;
|
if ((session.failedCalls || 0) > SESSION_CONFIG.MAX_FAILED_CALLS) {
|
||||||
const oldSessionIds = await this.cacheProvider.get<string[]>(oldListKey);
|
// Delete the failed session
|
||||||
if (oldSessionIds && Array.isArray(oldSessionIds)) {
|
const sessionKey = `qm:sessions:${sessionType.toLowerCase()}:${session.uuid}`;
|
||||||
// Delete old session entries
|
await this.cacheProvider.del(sessionKey);
|
||||||
for (const oldId of oldSessionIds) {
|
removedCount++;
|
||||||
const oldSessionKey = `qm:sessions:${sessionType.toLowerCase()}:${oldId}`;
|
this.logger?.info(`Removed failed session`, {
|
||||||
await this.cacheProvider.del(oldSessionKey);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Store each session with stable IDs
|
|
||||||
for (let i = 0; i < sessions.length; i++) {
|
|
||||||
const session = sessions[i];
|
|
||||||
const id = `${sessionType.toLowerCase()}_${i}`;
|
|
||||||
const sessionKey = `qm:sessions:${sessionType.toLowerCase()}:${id}`;
|
|
||||||
|
|
||||||
if(!session){
|
|
||||||
this.logger?.warn(`Skipping empty session at index ${i} for ${sessionType}`);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
const cachedSession: CachedSession = {
|
|
||||||
...session,
|
|
||||||
lastUsed: session.lastUsed instanceof Date ? session.lastUsed.toISOString() : session.lastUsed,
|
|
||||||
createdAt: session.createdAt instanceof Date ? session.createdAt.toISOString() : session.createdAt,
|
|
||||||
id,
|
|
||||||
sessionType,
|
sessionType,
|
||||||
} as any;
|
uuid: session.uuid,
|
||||||
|
failedCalls: session.failedCalls
|
||||||
this.logger?.trace(`Saving session ${id}`, {
|
|
||||||
uuid: cachedSession.uuid,
|
|
||||||
successfulCalls: cachedSession.successfulCalls,
|
|
||||||
failedCalls: cachedSession.failedCalls
|
|
||||||
});
|
});
|
||||||
|
} else {
|
||||||
await this.cacheProvider.set(sessionKey, cachedSession, 86400); // 24 hour TTL
|
updatedList.push(session.uuid);
|
||||||
sessionIds.push(id);
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store the list of session IDs
|
// Update the session list
|
||||||
await this.cacheProvider.set(oldListKey, sessionIds, 86400);
|
if (updatedList.length > 0) {
|
||||||
|
await this.cacheProvider.set(listKey, updatedList, 86400);
|
||||||
|
} else {
|
||||||
|
await this.cacheProvider.del(listKey);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store stats
|
|
||||||
const statsKey = 'qm:sessions:stats';
|
|
||||||
await this.cacheProvider.set(statsKey, this.getStats(), 3600);
|
|
||||||
|
|
||||||
this.logger?.trace('Session sync to cache completed');
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
this.logger?.error('Failed to sync sessions to cache', { error });
|
this.logger?.error('Failed to cleanup sessions', { error });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return removedCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if we need more sessions for a session ID
|
||||||
|
*/
|
||||||
|
async needsMoreSessions(sessionId: string): Promise<boolean> {
|
||||||
|
const validCount = await this.getSessionCount(sessionId);
|
||||||
|
return validCount < SESSION_CONFIG.MAX_SESSIONS;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if we're at capacity for a session ID
|
||||||
|
*/
|
||||||
|
async isAtCapacity(sessionId: string): Promise<boolean> {
|
||||||
|
const sessions = await this.getSessions(sessionId);
|
||||||
|
return sessions.length >= SESSION_CONFIG.MAX_SESSIONS;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get statistics about sessions
|
||||||
|
*/
|
||||||
|
async getStats(): Promise<Record<string, any>> {
|
||||||
|
const stats: Record<string, any> = {};
|
||||||
|
|
||||||
|
for (const [sessionType, sessionId] of Object.entries(QM_SESSION_IDS)) {
|
||||||
|
const sessions = await this.getSessions(sessionId);
|
||||||
|
const validSessions = sessions.filter(
|
||||||
|
session => session.failedCalls <= SESSION_CONFIG.MAX_FAILED_CALLS
|
||||||
|
);
|
||||||
|
const failedSessions = sessions.filter(
|
||||||
|
session => session.failedCalls > SESSION_CONFIG.MAX_FAILED_CALLS
|
||||||
|
);
|
||||||
|
|
||||||
|
stats[sessionType] = {
|
||||||
|
total: sessions.length,
|
||||||
|
valid: validSessions.length,
|
||||||
|
failed: failedSessions.length,
|
||||||
|
successfulCalls: sessions.reduce((sum, s) => sum + s.successfulCalls, 0),
|
||||||
|
failedCalls: sessions.reduce((sum, s) => sum + s.failedCalls, 0),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
return stats;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Increment failed calls for a session
|
* Increment failed calls for a session
|
||||||
*/
|
*/
|
||||||
async incrementFailedCalls(sessionId: string, sessionUuid: string): Promise<void> {
|
async incrementFailedCalls(sessionId: string, sessionUuid: string): Promise<void> {
|
||||||
// Load latest from cache first
|
if (!this.cacheProvider) {
|
||||||
await this.loadFromCache();
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Find session by UUID
|
try {
|
||||||
const sessions = this.sessionCache[sessionId] || [];
|
const sessionType = Object.entries(QM_SESSION_IDS).find(([_, id]) => id === sessionId)?.[0] || 'UNKNOWN';
|
||||||
const session = sessions.find(s => s.uuid === sessionUuid);
|
const sessionKey = `qm:sessions:${sessionType.toLowerCase()}:${sessionUuid}`;
|
||||||
|
const cachedSession = await this.cacheProvider.get<CachedSession>(sessionKey);
|
||||||
|
|
||||||
if (session) {
|
if (cachedSession) {
|
||||||
session.failedCalls++;
|
cachedSession.failedCalls = (cachedSession.failedCalls || 0) + 1;
|
||||||
session.lastUsed = new Date();
|
cachedSession.lastUsed = new Date().toISOString();
|
||||||
|
|
||||||
|
await this.cacheProvider.set(sessionKey, cachedSession, 86400);
|
||||||
|
|
||||||
this.logger?.debug(`Incremented failed calls for session`, {
|
this.logger?.debug(`Incremented failed calls for session`, {
|
||||||
sessionUuid,
|
sessionUuid,
|
||||||
failedCalls: session.failedCalls,
|
failedCalls: cachedSession.failedCalls,
|
||||||
sessionId
|
sessionId
|
||||||
});
|
});
|
||||||
|
|
||||||
// Sync to cache after update
|
|
||||||
await this.syncToCache();
|
|
||||||
} else {
|
} else {
|
||||||
this.logger?.warn(`Session not found`, { sessionUuid, sessionId });
|
this.logger?.warn(`Session not found`, { sessionUuid, sessionId });
|
||||||
}
|
}
|
||||||
|
} catch (error) {
|
||||||
|
this.logger?.error('Failed to increment failed calls', { error, sessionUuid, sessionId });
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Increment successful calls for a session
|
* Increment successful calls for a session
|
||||||
*/
|
*/
|
||||||
async incrementSuccessfulCalls(sessionId: string, sessionUuid: string): Promise<void> {
|
async incrementSuccessfulCalls(sessionId: string, sessionUuid: string): Promise<void> {
|
||||||
// Load latest from cache first
|
if (!this.cacheProvider) {
|
||||||
await this.loadFromCache();
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Find session by UUID
|
try {
|
||||||
const sessions = this.sessionCache[sessionId] || [];
|
const sessionType = Object.entries(QM_SESSION_IDS).find(([_, id]) => id === sessionId)?.[0] || 'UNKNOWN';
|
||||||
const session = sessions.find(s => s.uuid === sessionUuid);
|
const sessionKey = `qm:sessions:${sessionType.toLowerCase()}:${sessionUuid}`;
|
||||||
|
const cachedSession = await this.cacheProvider.get<CachedSession>(sessionKey);
|
||||||
|
|
||||||
if (session) {
|
if (cachedSession) {
|
||||||
session.successfulCalls++;
|
cachedSession.successfulCalls = (cachedSession.successfulCalls || 0) + 1;
|
||||||
session.lastUsed = new Date();
|
cachedSession.lastUsed = new Date().toISOString();
|
||||||
|
|
||||||
|
await this.cacheProvider.set(sessionKey, cachedSession, 86400);
|
||||||
|
|
||||||
this.logger?.debug(`Incremented successful calls for session`, {
|
this.logger?.debug(`Incremented successful calls for session`, {
|
||||||
sessionUuid,
|
sessionUuid,
|
||||||
successfulCalls: session.successfulCalls,
|
successfulCalls: cachedSession.successfulCalls,
|
||||||
sessionId
|
sessionId
|
||||||
});
|
});
|
||||||
|
|
||||||
// Sync to cache after update
|
|
||||||
await this.syncToCache();
|
|
||||||
} else {
|
} else {
|
||||||
this.logger?.warn(`Session not found`, { sessionUuid, sessionId });
|
this.logger?.warn(`Session not found`, { sessionUuid, sessionId });
|
||||||
}
|
}
|
||||||
|
} catch (error) {
|
||||||
|
this.logger?.error('Failed to increment successful calls', { error, sessionUuid, sessionId });
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue