diff --git a/apps/data-service/src/providers/qm.tasks.ts b/apps/data-service/src/providers/qm.tasks.ts index f4117df..0ae5880 100644 --- a/apps/data-service/src/providers/qm.tasks.ts +++ b/apps/data-service/src/providers/qm.tasks.ts @@ -2,6 +2,7 @@ import { getRandomUserAgent } from '@stock-bot/http'; import { getLogger } from '@stock-bot/logger'; import { getMongoDBClient } from '@stock-bot/mongodb-client'; import { QueueManager } from '@stock-bot/queue'; +import { isShutdownSignalReceived } from '@stock-bot/shutdown'; import { getRandomProxy } from '@stock-bot/utils'; // Shared instances (module-scoped, not global) @@ -88,7 +89,11 @@ export async function createSessions(): Promise { ); } - while (sessionCache[sessionId].length < 50) { + while (sessionCache[sessionId].length < 10) { + if(isShutdownSignalReceived()) { + logger.info('Shutting down, skipping session creation'); + break; // Exit if shutting down + } logger.info(`Creating new session for ${sessionId}`); const proxyInfo = await getRandomProxy(); if (!proxyInfo) { diff --git a/libs/mongodb-client/src/client.ts b/libs/mongodb-client/src/client.ts index 5c3843d..1bed8c7 100644 --- a/libs/mongodb-client/src/client.ts +++ b/libs/mongodb-client/src/client.ts @@ -1,5 +1,5 @@ -import { Collection, Db, MongoClient, OptionalUnlessRequiredId } from 'mongodb'; import { getLogger } from '@stock-bot/logger'; +import { Collection, Db, MongoClient, OptionalUnlessRequiredId } from 'mongodb'; import type { DocumentBase, MongoDBClientConfig } from './types'; /** @@ -149,7 +149,7 @@ export class MongoDBClient { let totalUpdated = 0; const errors: unknown[] = []; - this.logger.info(`Starting batch upsert operation [${operationId}]`, { + this.logger.info(`Starting batch upsert operation [${collectionName}-${documents.length}][${operationId}]`, { database: dbName, collection: collectionName, totalDocuments: documents.length, diff --git a/libs/shutdown/src/index.ts b/libs/shutdown/src/index.ts index 61e5e13..b498a06 100644 --- a/libs/shutdown/src/index.ts +++ b/libs/shutdown/src/index.ts @@ -70,6 +70,15 @@ export function isShuttingDown(): boolean { return globalInstance?.isShutdownInProgress() || false; } +/** + * Check if shutdown signal was received (for quick checks in running jobs) + */ +export function isShutdownSignalReceived(): boolean { + const globalFlag = globalThis.__SHUTDOWN_SIGNAL_RECEIVED__ || false; + const instanceFlag = globalInstance?.isShutdownSignalReceived() || false; + return globalFlag || instanceFlag; +} + /** * Get the number of registered shutdown callbacks */ diff --git a/libs/shutdown/src/shutdown.ts b/libs/shutdown/src/shutdown.ts index d4407de..78a0be8 100644 --- a/libs/shutdown/src/shutdown.ts +++ b/libs/shutdown/src/shutdown.ts @@ -8,11 +8,17 @@ * - Platform-specific signal support (Windows/Unix) */ -import type { ShutdownCallback, ShutdownOptions, ShutdownResult, PrioritizedShutdownCallback } from './types'; +import type { PrioritizedShutdownCallback, ShutdownCallback, ShutdownOptions, ShutdownResult } from './types'; + +// Global flag that works across all processes/workers +declare global { + var __SHUTDOWN_SIGNAL_RECEIVED__: boolean | undefined; +} export class Shutdown { private static instance: Shutdown | null = null; private isShuttingDown = false; + private signalReceived = false; // Track if shutdown signal was received private shutdownTimeout = 30000; // 30 seconds default private callbacks: PrioritizedShutdownCallback[] = []; private signalHandlersRegistered = false; @@ -90,6 +96,13 @@ export class Shutdown { return this.isShuttingDown; } + /** + * Check if shutdown signal was received (for quick checks in running jobs) + */ + isShutdownSignalReceived(): boolean { + return this.signalReceived || this.isShuttingDown; + } + /** * Get number of registered callbacks */ @@ -179,9 +192,6 @@ export class Shutdown { try { await callback(); executed++; - if (name) { - console.log(`✓ Shutdown completed: ${name} (priority: ${priority})`); - } } catch (error) { failed++; if (name) { @@ -209,6 +219,10 @@ export class Shutdown { process.on(signal, () => { // Only process if not already shutting down if (!this.isShuttingDown) { + // Set signal flag immediately for quick checks + this.signalReceived = true; + // Also set global flag for workers/other processes + globalThis.__SHUTDOWN_SIGNAL_RECEIVED__ = true; this.shutdownAndExit(signal).catch(() => { process.exit(1); }); @@ -218,6 +232,7 @@ export class Shutdown { // Handle uncaught exceptions process.on('uncaughtException', () => { + this.signalReceived = true; this.shutdownAndExit('uncaughtException', 1).catch(() => { process.exit(1); }); @@ -225,6 +240,7 @@ export class Shutdown { // Handle unhandled promise rejections process.on('unhandledRejection', () => { + this.signalReceived = true; this.shutdownAndExit('unhandledRejection', 1).catch(() => { process.exit(1); });