huge refactor to remove depenencie hell and add typesafe container

This commit is contained in:
Boki 2025-06-24 09:37:51 -04:00
parent 28b9822d55
commit 843a7b9b9b
148 changed files with 3603 additions and 2378 deletions

View file

@ -171,7 +171,11 @@ async function processBatched<T>(
/**
* Process a batch job - loads items and creates individual jobs
*/
export async function processBatchJob(jobData: BatchJobData, queueName: string, queueManager: QueueManager): Promise<unknown> {
export async function processBatchJob(
jobData: BatchJobData,
queueName: string,
queueManager: QueueManager
): Promise<unknown> {
const queue = queueManager.getQueue(queueName);
const logger = queue.createChildLogger('batch-job', {
queueName,
@ -304,7 +308,11 @@ async function loadPayload<T>(
} | null;
}
async function cleanupPayload(key: string, queueName: string, queueManager: QueueManager): Promise<void> {
async function cleanupPayload(
key: string,
queueName: string,
queueManager: QueueManager
): Promise<void> {
const cache = queueManager.getCache(queueName);
await cache.del(key);
}

View file

@ -4,15 +4,15 @@ export { QueueManager } from './queue-manager';
export { SmartQueueManager } from './smart-queue-manager';
export { ServiceCache, createServiceCache } from './service-cache';
// Service utilities
export {
export {
normalizeServiceName,
generateCachePrefix,
getFullQueueName,
parseQueueName
parseQueueName,
} from './service-utils';
// Re-export handler registry and utilities from handlers package
export { handlerRegistry, createJobHandler } from '@stock-bot/handlers';
// Re-export utilities from handlers package
export { createJobHandler } from '@stock-bot/handlers';
// Batch processing
export { processBatchJob, processItems } from './batch-processor';
@ -64,10 +64,8 @@ export type {
// Scheduled job types
ScheduledJob,
ScheduleConfig,
// Smart Queue types
SmartQueueConfig,
QueueRoute,
} from './types';

View file

@ -76,8 +76,9 @@ export class QueueManager {
// Prepare queue configuration
const workers = mergedOptions.workers ?? this.config.defaultQueueOptions?.workers ?? 1;
const concurrency = mergedOptions.concurrency ?? this.config.defaultQueueOptions?.concurrency ?? 1;
const concurrency =
mergedOptions.concurrency ?? this.config.defaultQueueOptions?.concurrency ?? 1;
const queueConfig: QueueWorkerConfig = {
workers,
concurrency,
@ -180,7 +181,6 @@ export class QueueManager {
return this.queues;
}
/**
* Get statistics for all queues
*/
@ -449,4 +449,4 @@ export class QueueManager {
getConfig(): Readonly<QueueManagerConfig> {
return { ...this.config };
}
}
}

View file

@ -1,6 +1,7 @@
import { Queue as BullQueue, QueueEvents, Worker, type Job } from 'bullmq';
import { handlerRegistry } from '@stock-bot/handlers';
import type { JobData, JobOptions, ExtendedJobOptions, QueueStats, RedisConfig } from './types';
// Handler registry will be injected
import type { HandlerRegistry } from '@stock-bot/handler-registry';
import type { ExtendedJobOptions, JobData, JobOptions, QueueStats, RedisConfig } from './types';
import { getRedisConnection } from './utils';
// Logger interface for type safety
@ -17,6 +18,7 @@ export interface QueueWorkerConfig {
workers?: number;
concurrency?: number;
startWorker?: boolean;
handlerRegistry?: HandlerRegistry;
}
/**
@ -30,6 +32,7 @@ export class Queue {
private queueName: string;
private redisConfig: RedisConfig;
private readonly logger: Logger;
private readonly handlerRegistry?: HandlerRegistry;
constructor(
queueName: string,
@ -41,6 +44,7 @@ export class Queue {
this.queueName = queueName;
this.redisConfig = redisConfig;
this.logger = logger || console;
this.handlerRegistry = config.handlerRegistry;
const connection = getRedisConnection(redisConfig);
@ -338,7 +342,10 @@ export class Queue {
try {
// Look up handler in registry
const jobHandler = handlerRegistry.getOperation(handler, operation);
if (!this.handlerRegistry) {
throw new Error('Handler registry not configured for worker processing');
}
const jobHandler = this.handlerRegistry.getOperation(handler, operation);
if (!jobHandler) {
throw new Error(`No handler found for ${handler}:${operation}`);
@ -390,5 +397,4 @@ export class Queue {
getWorkerCount(): number {
return this.workers.length;
}
}

View file

@ -271,7 +271,12 @@ export class QueueRateLimiter {
limit,
};
} catch (error) {
this.logger.error('Failed to get rate limit status', { queueName, handler, operation, error });
this.logger.error('Failed to get rate limit status', {
queueName,
handler,
operation,
error,
});
return {
queueName,
handler,

View file

@ -1,6 +1,6 @@
import { createCache, type CacheProvider, type CacheStats } from '@stock-bot/cache';
import type { RedisConfig } from './types';
import { generateCachePrefix } from './service-utils';
import type { RedisConfig } from './types';
/**
* Service-aware cache that uses the service's Redis DB
@ -132,7 +132,11 @@ export class ServiceCache implements CacheProvider {
return this.cache.set(key, value, ttl);
}
async updateField<T = any>(key: string, updater: (current: T | null) => T, ttl?: number): Promise<T | null> {
async updateField<T = any>(
key: string,
updater: (current: T | null) => T,
ttl?: number
): Promise<T | null> {
if (this.cache.updateField) {
return this.cache.updateField(key, updater, ttl);
}
@ -162,7 +166,6 @@ export class ServiceCache implements CacheProvider {
}
}
/**
* Factory function to create service cache
*/
@ -172,4 +175,4 @@ export function createServiceCache(
options: { global?: boolean; logger?: any } = {}
): ServiceCache {
return new ServiceCache(serviceName, redisConfig, options.global, options.logger);
}
}

View file

@ -1,53 +1,51 @@
/**
* Service utilities for name normalization and auto-discovery
*/
/**
* Normalize service name to kebab-case format
* Examples:
* - webApi -> web-api
* - dataIngestion -> data-ingestion
* - data-pipeline -> data-pipeline (unchanged)
*/
export function normalizeServiceName(serviceName: string): string {
// Handle camelCase to kebab-case conversion
const kebabCase = serviceName
.replace(/([a-z])([A-Z])/g, '$1-$2')
.toLowerCase();
return kebabCase;
}
/**
* Generate cache prefix for a service
*/
export function generateCachePrefix(serviceName: string): string {
const normalized = normalizeServiceName(serviceName);
return `cache:${normalized}`;
}
/**
* Generate full queue name with service namespace
*/
export function getFullQueueName(serviceName: string, handlerName: string): string {
const normalized = normalizeServiceName(serviceName);
// Use {service_handler} format for Dragonfly optimization and BullMQ compatibility
return `{${normalized}_${handlerName}}`;
}
/**
* Parse a full queue name into service and handler
*/
export function parseQueueName(fullQueueName: string): { service: string; handler: string } | null {
// Match pattern {service_handler}
const match = fullQueueName.match(/^\{([^_]+)_([^}]+)\}$/);
if (!match || !match[1] || !match[2]) {
return null;
}
return {
service: match[1],
handler: match[2],
};
}
/**
* Service utilities for name normalization and auto-discovery
*/
/**
* Normalize service name to kebab-case format
* Examples:
* - webApi -> web-api
* - dataIngestion -> data-ingestion
* - data-pipeline -> data-pipeline (unchanged)
*/
export function normalizeServiceName(serviceName: string): string {
// Handle camelCase to kebab-case conversion
const kebabCase = serviceName.replace(/([a-z])([A-Z])/g, '$1-$2').toLowerCase();
return kebabCase;
}
/**
* Generate cache prefix for a service
*/
export function generateCachePrefix(serviceName: string): string {
const normalized = normalizeServiceName(serviceName);
return `cache:${normalized}`;
}
/**
* Generate full queue name with service namespace
*/
export function getFullQueueName(serviceName: string, handlerName: string): string {
const normalized = normalizeServiceName(serviceName);
// Use {service_handler} format for Dragonfly optimization and BullMQ compatibility
return `{${normalized}_${handlerName}}`;
}
/**
* Parse a full queue name into service and handler
*/
export function parseQueueName(fullQueueName: string): { service: string; handler: string } | null {
// Match pattern {service_handler}
const match = fullQueueName.match(/^\{([^_]+)_([^}]+)\}$/);
if (!match || !match[1] || !match[2]) {
return null;
}
return {
service: match[1],
handler: match[2],
};
}

View file

@ -1,16 +1,10 @@
import { Queue as BullQueue, type Job } from 'bullmq';
import { handlerRegistry } from '@stock-bot/handlers';
import type { HandlerRegistry } from '@stock-bot/handler-registry';
import { getLogger, type Logger } from '@stock-bot/logger';
import { QueueManager } from './queue-manager';
import { Queue } from './queue';
import type {
SmartQueueConfig,
QueueRoute,
JobData,
JobOptions,
RedisConfig
} from './types';
import { QueueManager } from './queue-manager';
import { getFullQueueName, parseQueueName } from './service-utils';
import type { JobData, JobOptions, QueueRoute, RedisConfig, SmartQueueConfig } from './types';
import { getRedisConnection } from './utils';
/**
@ -23,30 +17,33 @@ export class SmartQueueManager extends QueueManager {
private connections = new Map<number, any>(); // Redis connections by DB
private producerQueues = new Map<string, BullQueue>(); // For cross-service sending
private _logger: Logger;
private handlerRegistry?: HandlerRegistry;
constructor(config: SmartQueueConfig, logger?: Logger) {
constructor(config: SmartQueueConfig, handlerRegistry?: HandlerRegistry, logger?: Logger) {
// Always use DB 0 for queues (unified queue database)
const modifiedConfig = {
...config,
redis: {
...config.redis,
db: 0, // All queues in DB 0
db: 0, // All queues in DB 0
},
};
super(modifiedConfig, logger);
this.serviceName = config.serviceName;
this.handlerRegistry = handlerRegistry;
this._logger = logger || getLogger('SmartQueueManager');
// Auto-discover routes if enabled
if (config.autoDiscoverHandlers !== false) {
// Auto-discover routes if enabled and registry provided
if (config.autoDiscoverHandlers !== false && handlerRegistry) {
this.discoverQueueRoutes();
}
this._logger.info('SmartQueueManager initialized', {
service: this.serviceName,
discoveredRoutes: this.queueRoutes.size,
hasRegistry: !!handlerRegistry,
});
}
@ -54,26 +51,31 @@ export class SmartQueueManager extends QueueManager {
* Discover all available queue routes from handler registry
*/
private discoverQueueRoutes(): void {
if (!this.handlerRegistry) {
this._logger.warn('No handler registry provided, skipping route discovery');
return;
}
try {
const handlers = handlerRegistry.getAllHandlers();
for (const [handlerName, handlerConfig] of handlers) {
const handlers = this.handlerRegistry.getAllMetadata();
for (const [handlerName, metadata] of handlers) {
// Get the service that registered this handler
const ownerService = handlerRegistry.getHandlerService(handlerName);
const ownerService = metadata.service;
if (ownerService) {
const fullName = getFullQueueName(ownerService, handlerName);
this.queueRoutes.set(handlerName, {
fullName,
service: ownerService,
handler: handlerName,
db: 0, // All queues in DB 0
operations: Object.keys(handlerConfig.operations || {}),
db: 0, // All queues in DB 0
operations: metadata.operations.map((op: any) => op.name),
});
this._logger.trace('Discovered queue route', {
handler: handlerName,
service: ownerService,
operations: Object.keys(handlerConfig.operations || {}).length,
operations: metadata.operations.length,
});
} else {
this._logger.warn('Handler has no service ownership', { handlerName });
@ -81,24 +83,25 @@ export class SmartQueueManager extends QueueManager {
}
// Also discover handlers registered by the current service
const myHandlers = handlerRegistry.getServiceHandlers(this.serviceName);
for (const handlerName of myHandlers) {
const myHandlers = this.handlerRegistry.getServiceHandlers(this.serviceName);
for (const metadata of myHandlers) {
const handlerName = metadata.name;
if (!this.queueRoutes.has(handlerName)) {
const fullName = getFullQueueName(this.serviceName, handlerName);
this.queueRoutes.set(handlerName, {
fullName,
service: this.serviceName,
handler: handlerName,
db: 0, // All queues in DB 0
db: 0, // All queues in DB 0
});
}
}
this._logger.info('Queue routes discovered', {
totalRoutes: this.queueRoutes.size,
routes: Array.from(this.queueRoutes.values()).map(r => ({
handler: r.handler,
service: r.service
routes: Array.from(this.queueRoutes.values()).map(r => ({
handler: r.handler,
service: r.service,
})),
});
} catch (error) {
@ -129,10 +132,10 @@ export class SmartQueueManager extends QueueManager {
override getQueue(queueName: string, options = {}): Queue {
// Check if this is already a full queue name (service:handler format)
const parsed = parseQueueName(queueName);
let fullQueueName: string;
let isOwnQueue: boolean;
if (parsed) {
// Already in service:handler format
fullQueueName = queueName;
@ -142,20 +145,19 @@ export class SmartQueueManager extends QueueManager {
fullQueueName = getFullQueueName(this.serviceName, queueName);
isOwnQueue = true;
}
// For cross-service queues, create without workers (producer-only)
if (!isOwnQueue) {
return super.getQueue(fullQueueName, {
...options,
workers: 0, // No workers for other services' queues
workers: 0, // No workers for other services' queues
});
}
// For own service queues, use configured workers
return super.getQueue(fullQueueName, options);
}
/**
* Send a job to any queue (local or remote)
* This is the main method for cross-service communication
@ -236,7 +238,7 @@ export class SmartQueueManager extends QueueManager {
fullName: queueName,
service: parsed.service,
handler: parsed.handler,
db: 0, // All queues in DB 0
db: 0, // All queues in DB 0
};
}
@ -247,13 +249,13 @@ export class SmartQueueManager extends QueueManager {
}
// Try to find in handler registry
const ownerService = handlerRegistry.getHandlerService(queueName);
const ownerService = this.handlerRegistry?.getHandlerService(queueName);
if (ownerService) {
return {
fullName: getFullQueueName(ownerService, queueName),
service: ownerService,
handler: queueName,
db: 0, // All queues in DB 0
db: 0, // All queues in DB 0
};
}
@ -281,7 +283,7 @@ export class SmartQueueManager extends QueueManager {
*/
getAllQueues(): Record<string, BullQueue> {
const allQueues: Record<string, BullQueue> = {};
// Get all worker queues using public API
const workerQueueNames = this.getQueueNames();
for (const name of workerQueueNames) {
@ -296,7 +298,7 @@ export class SmartQueueManager extends QueueManager {
}
}
}
// Add producer queues
for (const [name, queue] of this.producerQueues) {
// Use the simple handler name without service prefix for display
@ -306,7 +308,7 @@ export class SmartQueueManager extends QueueManager {
allQueues[simpleName] = queue;
}
}
// If no queues found, create from discovered routes
if (Object.keys(allQueues).length === 0) {
for (const [handlerName, route] of this.queueRoutes) {
@ -317,7 +319,7 @@ export class SmartQueueManager extends QueueManager {
});
}
}
return allQueues;
}
@ -350,11 +352,11 @@ export class SmartQueueManager extends QueueManager {
let workersStarted = 0;
const queues = this.getQueues();
for (const [queueName, queue] of queues) {
// Parse queue name to check if it belongs to this service
const parsed = parseQueueName(queueName);
// Skip if not our service's queue
if (parsed && parsed.service !== this.serviceName) {
this._logger.trace('Skipping workers for cross-service queue', {
@ -364,7 +366,7 @@ export class SmartQueueManager extends QueueManager {
});
continue;
}
const workerCount = this.getConfig().defaultQueueOptions?.workers || 1;
const concurrency = this.getConfig().defaultQueueOptions?.concurrency || 1;
@ -399,7 +401,8 @@ export class SmartQueueManager extends QueueManager {
// Close additional connections
for (const [db, connection] of this.connections) {
if (db !== 0) { // Don't close our main connection (DB 0 for queues)
if (db !== 0) {
// Don't close our main connection (DB 0 for queues)
connection.disconnect();
this._logger.debug('Closed Redis connection', { db });
}
@ -408,4 +411,4 @@ export class SmartQueueManager extends QueueManager {
// Call parent shutdown
await super.shutdown();
}
}
}

View file

@ -4,14 +4,14 @@ import type { JobOptions, QueueStats } from '@stock-bot/types';
// Re-export handler and queue types from shared types package
export type {
HandlerConfig,
HandlerConfigWithSchedule,
JobHandler,
ScheduledJob,
HandlerConfigWithSchedule,
JobHandler,
ScheduledJob,
TypedJobHandler,
JobData,
JobOptions,
QueueWorkerConfig,
QueueStats
QueueStats,
} from '@stock-bot/types';
export interface ProcessOptions {
@ -92,7 +92,6 @@ export interface QueueConfig extends QueueManagerConfig {
enableMetrics?: boolean;
}
// Extended batch job data for queue implementation
export interface BatchJobData {
payloadKey: string;