switched to new config and removed old

This commit is contained in:
Boki 2025-06-18 21:03:45 -04:00
parent 6b69bcbcaa
commit 269364fbc8
70 changed files with 889 additions and 2978 deletions

View file

@ -1,15 +1,16 @@
import { CacheProvider, createCache } from '@stock-bot/cache';
import { getLogger } from '@stock-bot/logger';
import type { QueueManager } from './queue-manager';
import type { Queue } from './queue-instance';
import type { BatchJobData, BatchResult, JobData, ProcessOptions } from './types';
const logger = getLogger('batch-processor');
const cacheProviders = new Map<string, CacheProvider>();
function getCache(queueName: string): CacheProvider {
function getCache(queueName: string, redisConfig: any): CacheProvider {
if (!cacheProviders.has(queueName)) {
const cacheProvider = createCache({
redisConfig,
keyPrefix: `batch:${queueName}:`,
ttl: 86400, // 24 hours default
enableMetrics: true,
@ -23,11 +24,12 @@ function getCache(queueName: string): CacheProvider {
* Initialize the batch cache before any batch operations
* This should be called during application startup
*/
export async function initializeBatchCache(queueManager: QueueManager): Promise<void> {
const queueName = queueManager.getQueueName();
export async function initializeBatchCache(queue: Queue): Promise<void> {
const queueName = queue.getName();
const redisConfig = queue.getRedisConfig();
logger.info('Initializing batch cache...', { queueName });
const cache = getCache(queueName);
const cache = getCache(queueName, redisConfig);
await cache.waitForReady(10000);
logger.info('Batch cache initialized successfully', { queueName });
}
@ -38,7 +40,7 @@ export async function initializeBatchCache(queueManager: QueueManager): Promise<
*/
export async function processItems<T>(
items: T[],
queue: QueueManager,
queue: Queue,
options: ProcessOptions
): Promise<BatchResult> {
const startTime = Date.now();
@ -83,7 +85,7 @@ export async function processItems<T>(
*/
async function processDirect<T>(
items: T[],
queue: QueueManager,
queue: Queue,
options: ProcessOptions
): Promise<Omit<BatchResult, 'duration'>> {
const totalDelayMs = options.totalDelayHours * 60 * 60 * 1000; // Convert hours to milliseconds
@ -126,7 +128,7 @@ async function processDirect<T>(
*/
async function processBatched<T>(
items: T[],
queue: QueueManager,
queue: Queue,
options: ProcessOptions
): Promise<Omit<BatchResult, 'duration'>> {
const batchSize = options.batchSize || 100;
@ -186,7 +188,7 @@ async function processBatched<T>(
*/
export async function processBatchJob(
jobData: BatchJobData,
queue: QueueManager
queue: Queue
): Promise<unknown> {
const { payloadKey, batchIndex, totalBatches, itemCount } = jobData;
@ -250,14 +252,14 @@ function createBatches<T>(items: T[], batchSize: number): T[][] {
async function storeItems<T>(
items: T[],
queue: QueueManager,
queue: Queue,
options: ProcessOptions
): Promise<string> {
if (!queue) {
throw new Error('Batch cache not initialized. Call initializeBatchCache() first.');
}
const cache = getCache(queue.getQueueName());
const cache = getCache(queue.getName(), queue.getRedisConfig());
const payloadKey = `payload:${Date.now()}:${Math.random().toString(36).substr(2, 9)}`;
const payload = {
@ -280,7 +282,7 @@ async function storeItems<T>(
async function loadPayload<T>(
key: string,
queue: QueueManager
queue: Queue
): Promise<{
items: T[];
options: {
@ -295,7 +297,7 @@ async function loadPayload<T>(
throw new Error('Batch cache not initialized. Call initializeBatchCache() first.');
}
const cache = getCache(queue.getQueueName());
const cache = getCache(queue.getName(), queue.getRedisConfig());
return (await cache.get(key)) as {
items: T[];
options: {
@ -308,17 +310,17 @@ async function loadPayload<T>(
} | null;
}
async function cleanupPayload(key: string, queue: QueueManager): Promise<void> {
async function cleanupPayload(key: string, queue: Queue): Promise<void> {
if (!queue) {
throw new Error('Batch cache not initialized. Call initializeBatchCache() first.');
}
const cache = getCache(queue.getQueueName());
const cache = getCache(queue.getName(), queue.getRedisConfig());
await cache.del(key);
}
async function addJobsInChunks(
queue: QueueManager,
queue: Queue,
jobs: Array<{ name: string; data: JobData; opts?: Record<string, unknown> }>,
chunkSize = 100
): Promise<unknown[]> {

View file

@ -1,15 +1,28 @@
export * from './batch-processor';
export * from './provider-registry';
export * from './queue-manager';
export * from './queue-instance';
export * from './queue-factory';
export * from './types';
// Re-export commonly used functions
export { initializeBatchCache, processBatchJob, processItems } from './batch-processor';
export { QueueManager } from './queue-manager';
export { Queue } from './queue-instance';
export { providerRegistry } from './provider-registry';
// Re-export queue factory functions
export {
initializeQueueSystem,
getQueue,
processItemsWithQueue,
getActiveQueueNames,
getQueueManager,
shutdownAllQueues
} from './queue-factory';
// Re-export types for convenience
export type {
BatchResult,

View file

@ -0,0 +1,112 @@
import { getLogger } from '@stock-bot/logger';
import { QueueManager } from './queue-manager';
import { Queue } from './queue-instance';
import type { ProcessOptions, BatchResult } from './types';
const logger = getLogger('queue-factory');
// Global queue manager (manages workers and providers)
let queueManager: QueueManager | null = null;
// Registry of individual queues
const queues = new Map<string, Queue>();
let globalRedisConfig: any = null;
/**
* Initialize the queue system with global configuration
*/
export async function initializeQueueSystem(config: {
redis: any;
defaultJobOptions?: any;
workers?: number;
concurrency?: number;
}): Promise<void> {
logger.info('Initializing global queue system...');
globalRedisConfig = config.redis;
// Initialize the global queue manager for worker management
queueManager = new QueueManager({
queueName: 'system-queue-manager',
redis: globalRedisConfig,
workers: config.workers || 5,
concurrency: config.concurrency || 20,
defaultJobOptions: config.defaultJobOptions,
providers: [], // Will be set by individual services
});
await queueManager.initialize();
logger.info('Queue system initialized');
}
/**
* Get or create a queue for the given queue name
*/
export function getQueue(queueName: string): Queue {
if (!globalRedisConfig) {
throw new Error('Queue system not initialized. Call initializeQueueSystem() first.');
}
if (!queues.has(queueName)) {
logger.info(`Creating new queue: ${queueName}`);
const queue = new Queue(queueName, globalRedisConfig);
queues.set(queueName, queue);
}
return queues.get(queueName)!;
}
/**
* Process items using the specified queue
*/
export async function processItemsWithQueue<T>(
queueName: string,
items: T[],
options: ProcessOptions
): Promise<BatchResult> {
const queue = getQueue(queueName);
return queue.processItems(items, options);
}
/**
* Get all active queue names
*/
export function getActiveQueueNames(): string[] {
return Array.from(queues.keys());
}
/**
* Get the global queue manager (for advanced operations)
*/
export function getQueueManager(): QueueManager {
if (!queueManager) {
throw new Error('Queue system not initialized. Call initializeQueueSystem() first.');
}
return queueManager;
}
/**
* Shutdown all queues and the queue manager
*/
export async function shutdownAllQueues(): Promise<void> {
logger.info('Shutting down all queues...');
// Shutdown individual queues
const queueShutdownPromises = Array.from(queues.values()).map(queue =>
queue.shutdown().catch(error => {
logger.error('Error shutting down queue', { error });
})
);
await Promise.all(queueShutdownPromises);
queues.clear();
// Shutdown the global queue manager
if (queueManager) {
await queueManager.shutdown();
queueManager = null;
}
logger.info('All queues shut down');
}

View file

@ -0,0 +1,186 @@
import { Queue as BullQueue, Worker, QueueEvents, type Job } from 'bullmq';
import { getLogger } from '@stock-bot/logger';
import { processItems } from './batch-processor';
import type { JobData, ProcessOptions, BatchResult } from './types';
const logger = getLogger('queue-instance');
export class Queue {
private bullQueue: BullQueue;
private workers: Worker[] = [];
private queueEvents: QueueEvents;
private queueName: string;
private redisConfig: any;
private initialized = false;
constructor(queueName: string, redisConfig: any) {
this.queueName = queueName;
this.redisConfig = redisConfig;
const connection = {
host: redisConfig.host,
port: redisConfig.port,
password: redisConfig.password,
db: redisConfig.db,
};
// Initialize BullMQ queue
this.bullQueue = new BullQueue(`{${queueName}}`, {
connection,
defaultJobOptions: {
removeOnComplete: 10,
removeOnFail: 5,
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
},
});
// Initialize queue events
this.queueEvents = new QueueEvents(`{${queueName}}`, { connection });
}
/**
* Get the queue name
*/
getName(): string {
return this.queueName;
}
/**
* Get the redis configuration
*/
getRedisConfig(): any {
return this.redisConfig;
}
/**
* Initialize batch cache for this queue
*/
async initialize(): Promise<void> {
if (this.initialized) {
return;
}
const { initializeBatchCache } = await import('./batch-processor');
await initializeBatchCache(this);
this.initialized = true;
logger.info(`Queue initialized: ${this.queueName}`);
}
/**
* Process items using this queue
*/
async processItems<T>(items: T[], options: ProcessOptions): Promise<BatchResult> {
// Ensure queue is initialized
if (!this.initialized) {
await this.initialize();
}
return processItems(items, this, options);
}
/**
* Add a single job to the queue
*/
async add(name: string, data: JobData, options: Record<string, unknown> = {}): Promise<Job> {
return await this.bullQueue.add(name, data, options);
}
/**
* Add multiple jobs to the queue in bulk
*/
async addBulk(
jobs: Array<{ name: string; data: JobData; opts?: Record<string, unknown> }>
): Promise<Job[]> {
return await this.bullQueue.addBulk(jobs);
}
/**
* Get queue statistics
*/
async getStats(): Promise<{
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
}> {
const [waiting, active, completed, failed, delayed] = await Promise.all([
this.bullQueue.getWaiting(),
this.bullQueue.getActive(),
this.bullQueue.getCompleted(),
this.bullQueue.getFailed(),
this.bullQueue.getDelayed(),
]);
return {
waiting: waiting.length,
active: active.length,
completed: completed.length,
failed: failed.length,
delayed: delayed.length,
};
}
/**
* Pause the queue
*/
async pause(): Promise<void> {
await this.bullQueue.pause();
logger.info(`Queue paused: ${this.queueName}`);
}
/**
* Resume the queue
*/
async resume(): Promise<void> {
await this.bullQueue.resume();
logger.info(`Queue resumed: ${this.queueName}`);
}
/**
* Clean completed and failed jobs
*/
async clean(grace: number = 0, limit: number = 100): Promise<void> {
await Promise.all([
this.bullQueue.clean(grace, limit, 'completed'),
this.bullQueue.clean(grace, limit, 'failed'),
]);
logger.info(`Queue cleaned: ${this.queueName}`, { grace, limit });
}
/**
* Shutdown this queue
*/
async shutdown(): Promise<void> {
logger.info(`Shutting down queue: ${this.queueName}`);
try {
// Close workers
await Promise.all(this.workers.map(worker => worker.close()));
this.workers = [];
// Close queue events
await this.queueEvents.close();
// Close queue
await this.bullQueue.close();
logger.info(`Queue shutdown complete: ${this.queueName}`);
} catch (error) {
logger.error(`Error during queue shutdown: ${this.queueName}`, { error });
throw error;
}
}
/**
* Get the BullMQ queue instance (for advanced operations)
*/
getBullQueue(): BullQueue {
return this.bullQueue;
}
}

View file

@ -285,6 +285,13 @@ export class QueueManager {
return this.config.queueName;
}
/**
* Get the redis configuration
*/
getRedisConfig(): any {
return this.config.redis;
}
/**
* Shutdown the queue manager
*/