trying to fix startup

This commit is contained in:
Boki 2025-06-10 14:48:46 -04:00
parent 00b21a57d7
commit 32d0eaac2d
7 changed files with 107 additions and 25 deletions

View file

@ -208,9 +208,11 @@ async function initializeServices() {
logger.info('Initializing data service...');
try {
// Initialize queue service
// Initialize queue service (Redis connections should be ready now)
logger.info('Starting queue service initialization...');
await queueManager.initialize();
logger.info('Queue service initialized');
logger.info('All services initialized successfully');
} catch (error) {
logger.error('Failed to initialize services', { error });
@ -221,22 +223,6 @@ async function initializeServices() {
// Start server
async function startServer() {
await initializeServices();
serve({
fetch: app.fetch,
port: PORT,
});
logger.info(`Data Service started on port ${PORT}`);
logger.info('Available endpoints:');
logger.info(' GET /health - Health check');
logger.info(' GET /api/queue/status - Queue status');
logger.info(' POST /api/queue/job - Add job to queue');
logger.info(' GET /api/live/:symbol - Live market data');
logger.info(' GET /api/historical/:symbol - Historical market data');
logger.info(' POST /api/proxy/fetch - Queue proxy fetch');
logger.info(' POST /api/proxy/check - Queue proxy check');
logger.info(' GET /api/providers - List registered providers');
}
// Graceful shutdown

View file

@ -123,7 +123,7 @@ export const proxyProvider: ProviderConfig = {
}
}
},
scheduledJobs: [
scheduledJobs: [
{
type: 'proxy-maintenance',
operation: 'fetch-and-check',
@ -131,7 +131,7 @@ export const proxyProvider: ProviderConfig = {
// should remove and just run at the same time so app restarts dont keeping adding same jobs
cronPattern: getEvery24HourCron(),
priority: 5,
immediately: true,
immediately: true, // Don't run immediately during startup to avoid conflicts
description: 'Fetch and validate proxy list from sources'
}
]

View file

@ -116,17 +116,20 @@ async function initializeSharedResources() {
enableMetrics: true
});
// Always initialize httpClient and concurrencyLimit first
httpClient = new HttpClient({ timeout: 10000 }, logger);
concurrencyLimit = pLimit(PROXY_CONFIG.CONCURRENCY_LIMIT);
// Try to connect to cache, but don't block initialization if it fails
try {
// Use longer timeout for cache connection
await cache.waitForReady(30000); // 30 seconds
logger.info('Cache connection established');
} catch (error) {
logger.error('Cache connection failed, continuing with degraded functionality:', error);
logger.warn('Cache connection failed, continuing with degraded functionality:', {error});
// Don't throw - allow the service to continue with cache fallbacks
}
httpClient = new HttpClient({ timeout: 10000 }, logger);
concurrencyLimit = pLimit(PROXY_CONFIG.CONCURRENCY_LIMIT);
logger.info('Proxy tasks initialized');
}
}
@ -172,6 +175,19 @@ export async function fetchProxiesFromSources(): Promise<ProxyInfo[]> {
await initializeSharedResources();
await resetProxyStats();
// Ensure concurrencyLimit is available before using it
if (!concurrencyLimit) {
logger.error('concurrencyLimit not initialized, using sequential processing');
const result = [];
for (const source of PROXY_CONFIG.PROXY_SOURCES) {
const proxies = await fetchProxiesFromSource(source);
result.push(...proxies);
}
let allProxies: ProxyInfo[] = result;
allProxies = removeDuplicateProxies(allProxies);
return allProxies;
}
const sources = PROXY_CONFIG.PROXY_SOURCES.map(source =>
concurrencyLimit(() => fetchProxiesFromSource(source))
);

View file

@ -41,7 +41,7 @@ export class QueueService {
maxRetriesPerRequest: null,
retryDelayOnFailover: 100,
enableReadyCheck: false,
lazyConnect: true,
lazyConnect: false,
// Disable Redis Cluster mode if you're using standalone Redis/Dragonfly
enableOfflineQueue: true
};

View file

@ -16,6 +16,7 @@ export class RedisConnectionManager {
private static sharedConnections = new Map<string, Redis>();
private static instance: RedisConnectionManager;
private logger = getLogger('redis-connection-manager');
private static readyConnections = new Set<string>();
// Singleton pattern for the manager itself
static getInstance(): RedisConnectionManager {
@ -189,6 +190,85 @@ export class RedisConnectionManager {
return { healthy: allHealthy, details };
}
/**
* Wait for all created connections to be ready
* @param timeout Maximum time to wait in milliseconds
* @returns Promise that resolves when all connections are ready
*/
static async waitForAllConnections(timeout: number = 30000): Promise<void> {
const instance = this.getInstance();
const allConnections = new Map([
...instance.connections,
...this.sharedConnections
]);
if (allConnections.size === 0) {
instance.logger.info('No Redis connections to wait for');
return;
}
instance.logger.info(`Waiting for ${allConnections.size} Redis connections to be ready...`);
const connectionPromises = Array.from(allConnections.entries()).map(([name, redis]) =>
instance.waitForConnection(redis, name, timeout)
);
try {
await Promise.all(connectionPromises);
instance.logger.info('All Redis connections are ready');
} catch (error) {
instance.logger.error('Failed to establish all Redis connections:', error);
throw error;
}
}
/**
* Wait for a specific connection to be ready
*/
private async waitForConnection(redis: Redis, name: string, timeout: number): Promise<void> {
return new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => {
reject(new Error(`Redis connection ${name} failed to be ready within ${timeout}ms`));
}, timeout);
const onReady = () => {
clearTimeout(timeoutId);
RedisConnectionManager.readyConnections.add(name);
this.logger.info(`Redis connection ready: ${name}`);
resolve();
};
const onError = (err: Error) => {
clearTimeout(timeoutId);
this.logger.error(`Redis connection failed for ${name}:`, err);
reject(err);
};
if (redis.status === 'ready') {
onReady();
} else {
redis.once('ready', onReady);
redis.once('error', onError);
}
});
}
/**
* Check if all connections are ready
*/
static areAllConnectionsReady(): boolean {
const instance = this.getInstance();
const allConnections = new Map([
...instance.connections,
...this.sharedConnections
]);
return allConnections.size > 0 &&
Array.from(allConnections.keys()).every(name =>
this.readyConnections.has(name)
);
}
}
export default RedisConnectionManager;

View file

@ -59,7 +59,7 @@ export class EventBus extends EventEmitter {
password: dragonflyConfig.DRAGONFLY_PASSWORD,
db: dragonflyConfig.DRAGONFLY_DATABASE || 0,
maxRetriesPerRequest: dragonflyConfig.DRAGONFLY_MAX_RETRIES,
lazyConnect: true,
lazyConnect: false,
});
if (!this.useStreams) {

View file

@ -61,7 +61,7 @@ class DragonFlyConnectionMonitor {
password: process.env.DRAGONFLY_PASSWORD || undefined,
db: parseInt(process.env.DRAGONFLY_DATABASE || '0'),
maxRetriesPerRequest: 3,
lazyConnect: true,
lazyConnect: false,
});
}