fixed up qm tasks and shutdown sequence
This commit is contained in:
parent
5929612e36
commit
1bb2380a28
4 changed files with 37 additions and 7 deletions
|
|
@ -2,6 +2,7 @@ import { getRandomUserAgent } from '@stock-bot/http';
|
||||||
import { getLogger } from '@stock-bot/logger';
|
import { getLogger } from '@stock-bot/logger';
|
||||||
import { getMongoDBClient } from '@stock-bot/mongodb-client';
|
import { getMongoDBClient } from '@stock-bot/mongodb-client';
|
||||||
import { QueueManager } from '@stock-bot/queue';
|
import { QueueManager } from '@stock-bot/queue';
|
||||||
|
import { isShutdownSignalReceived } from '@stock-bot/shutdown';
|
||||||
import { getRandomProxy } from '@stock-bot/utils';
|
import { getRandomProxy } from '@stock-bot/utils';
|
||||||
|
|
||||||
// Shared instances (module-scoped, not global)
|
// Shared instances (module-scoped, not global)
|
||||||
|
|
@ -88,7 +89,11 @@ export async function createSessions(): Promise<void> {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
while (sessionCache[sessionId].length < 50) {
|
while (sessionCache[sessionId].length < 10) {
|
||||||
|
if(isShutdownSignalReceived()) {
|
||||||
|
logger.info('Shutting down, skipping session creation');
|
||||||
|
break; // Exit if shutting down
|
||||||
|
}
|
||||||
logger.info(`Creating new session for ${sessionId}`);
|
logger.info(`Creating new session for ${sessionId}`);
|
||||||
const proxyInfo = await getRandomProxy();
|
const proxyInfo = await getRandomProxy();
|
||||||
if (!proxyInfo) {
|
if (!proxyInfo) {
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
import { Collection, Db, MongoClient, OptionalUnlessRequiredId } from 'mongodb';
|
|
||||||
import { getLogger } from '@stock-bot/logger';
|
import { getLogger } from '@stock-bot/logger';
|
||||||
|
import { Collection, Db, MongoClient, OptionalUnlessRequiredId } from 'mongodb';
|
||||||
import type { DocumentBase, MongoDBClientConfig } from './types';
|
import type { DocumentBase, MongoDBClientConfig } from './types';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -149,7 +149,7 @@ export class MongoDBClient {
|
||||||
let totalUpdated = 0;
|
let totalUpdated = 0;
|
||||||
const errors: unknown[] = [];
|
const errors: unknown[] = [];
|
||||||
|
|
||||||
this.logger.info(`Starting batch upsert operation [${operationId}]`, {
|
this.logger.info(`Starting batch upsert operation [${collectionName}-${documents.length}][${operationId}]`, {
|
||||||
database: dbName,
|
database: dbName,
|
||||||
collection: collectionName,
|
collection: collectionName,
|
||||||
totalDocuments: documents.length,
|
totalDocuments: documents.length,
|
||||||
|
|
|
||||||
|
|
@ -70,6 +70,15 @@ export function isShuttingDown(): boolean {
|
||||||
return globalInstance?.isShutdownInProgress() || false;
|
return globalInstance?.isShutdownInProgress() || false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if shutdown signal was received (for quick checks in running jobs)
|
||||||
|
*/
|
||||||
|
export function isShutdownSignalReceived(): boolean {
|
||||||
|
const globalFlag = globalThis.__SHUTDOWN_SIGNAL_RECEIVED__ || false;
|
||||||
|
const instanceFlag = globalInstance?.isShutdownSignalReceived() || false;
|
||||||
|
return globalFlag || instanceFlag;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the number of registered shutdown callbacks
|
* Get the number of registered shutdown callbacks
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -8,11 +8,17 @@
|
||||||
* - Platform-specific signal support (Windows/Unix)
|
* - Platform-specific signal support (Windows/Unix)
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import type { ShutdownCallback, ShutdownOptions, ShutdownResult, PrioritizedShutdownCallback } from './types';
|
import type { PrioritizedShutdownCallback, ShutdownCallback, ShutdownOptions, ShutdownResult } from './types';
|
||||||
|
|
||||||
|
// Global flag that works across all processes/workers
|
||||||
|
declare global {
|
||||||
|
var __SHUTDOWN_SIGNAL_RECEIVED__: boolean | undefined;
|
||||||
|
}
|
||||||
|
|
||||||
export class Shutdown {
|
export class Shutdown {
|
||||||
private static instance: Shutdown | null = null;
|
private static instance: Shutdown | null = null;
|
||||||
private isShuttingDown = false;
|
private isShuttingDown = false;
|
||||||
|
private signalReceived = false; // Track if shutdown signal was received
|
||||||
private shutdownTimeout = 30000; // 30 seconds default
|
private shutdownTimeout = 30000; // 30 seconds default
|
||||||
private callbacks: PrioritizedShutdownCallback[] = [];
|
private callbacks: PrioritizedShutdownCallback[] = [];
|
||||||
private signalHandlersRegistered = false;
|
private signalHandlersRegistered = false;
|
||||||
|
|
@ -90,6 +96,13 @@ export class Shutdown {
|
||||||
return this.isShuttingDown;
|
return this.isShuttingDown;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if shutdown signal was received (for quick checks in running jobs)
|
||||||
|
*/
|
||||||
|
isShutdownSignalReceived(): boolean {
|
||||||
|
return this.signalReceived || this.isShuttingDown;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get number of registered callbacks
|
* Get number of registered callbacks
|
||||||
*/
|
*/
|
||||||
|
|
@ -179,9 +192,6 @@ export class Shutdown {
|
||||||
try {
|
try {
|
||||||
await callback();
|
await callback();
|
||||||
executed++;
|
executed++;
|
||||||
if (name) {
|
|
||||||
console.log(`✓ Shutdown completed: ${name} (priority: ${priority})`);
|
|
||||||
}
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
failed++;
|
failed++;
|
||||||
if (name) {
|
if (name) {
|
||||||
|
|
@ -209,6 +219,10 @@ export class Shutdown {
|
||||||
process.on(signal, () => {
|
process.on(signal, () => {
|
||||||
// Only process if not already shutting down
|
// Only process if not already shutting down
|
||||||
if (!this.isShuttingDown) {
|
if (!this.isShuttingDown) {
|
||||||
|
// Set signal flag immediately for quick checks
|
||||||
|
this.signalReceived = true;
|
||||||
|
// Also set global flag for workers/other processes
|
||||||
|
globalThis.__SHUTDOWN_SIGNAL_RECEIVED__ = true;
|
||||||
this.shutdownAndExit(signal).catch(() => {
|
this.shutdownAndExit(signal).catch(() => {
|
||||||
process.exit(1);
|
process.exit(1);
|
||||||
});
|
});
|
||||||
|
|
@ -218,6 +232,7 @@ export class Shutdown {
|
||||||
|
|
||||||
// Handle uncaught exceptions
|
// Handle uncaught exceptions
|
||||||
process.on('uncaughtException', () => {
|
process.on('uncaughtException', () => {
|
||||||
|
this.signalReceived = true;
|
||||||
this.shutdownAndExit('uncaughtException', 1).catch(() => {
|
this.shutdownAndExit('uncaughtException', 1).catch(() => {
|
||||||
process.exit(1);
|
process.exit(1);
|
||||||
});
|
});
|
||||||
|
|
@ -225,6 +240,7 @@ export class Shutdown {
|
||||||
|
|
||||||
// Handle unhandled promise rejections
|
// Handle unhandled promise rejections
|
||||||
process.on('unhandledRejection', () => {
|
process.on('unhandledRejection', () => {
|
||||||
|
this.signalReceived = true;
|
||||||
this.shutdownAndExit('unhandledRejection', 1).catch(() => {
|
this.shutdownAndExit('unhandledRejection', 1).catch(() => {
|
||||||
process.exit(1);
|
process.exit(1);
|
||||||
});
|
});
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue