added a smart queue manager and moved proxy logic to proxy manager to make handler just schedule a call to it
This commit is contained in:
parent
da1c52a841
commit
e7c0fe2798
19 changed files with 903 additions and 231 deletions
|
|
@ -1,7 +1,16 @@
|
|||
// Core exports
|
||||
export { Queue, type QueueWorkerConfig } from './queue';
|
||||
export { QueueManager } from './queue-manager';
|
||||
export { SmartQueueManager } from './smart-queue-manager';
|
||||
export { createJobHandler } from './types';
|
||||
export { ServiceCache, createServiceCache } from './service-cache';
|
||||
export {
|
||||
SERVICE_REGISTRY,
|
||||
getServiceConfig,
|
||||
findServiceForHandler,
|
||||
getFullQueueName,
|
||||
parseQueueName
|
||||
} from './service-registry';
|
||||
|
||||
// Re-export handler registry from types package
|
||||
export { handlerRegistry } from '@stock-bot/types';
|
||||
|
|
@ -55,4 +64,12 @@ export type {
|
|||
// Scheduled job types
|
||||
ScheduledJob,
|
||||
ScheduleConfig,
|
||||
|
||||
// Smart Queue types
|
||||
SmartQueueConfig,
|
||||
QueueRoute,
|
||||
|
||||
} from './types';
|
||||
|
||||
// Re-export service registry types
|
||||
export type { ServiceConfig } from './service-registry';
|
||||
|
|
|
|||
167
libs/services/queue/src/service-cache.ts
Normal file
167
libs/services/queue/src/service-cache.ts
Normal file
|
|
@ -0,0 +1,167 @@
|
|||
import { createCache, type CacheProvider, type CacheStats } from '@stock-bot/cache';
|
||||
import type { RedisConfig } from './types';
|
||||
import { getServiceConfig, type ServiceConfig } from './service-registry';
|
||||
|
||||
/**
|
||||
* Service-aware cache that uses the service's Redis DB
|
||||
* Automatically prefixes keys with the service's cache namespace
|
||||
*/
|
||||
export class ServiceCache implements CacheProvider {
|
||||
private cache: CacheProvider;
|
||||
private prefix: string;
|
||||
|
||||
constructor(
|
||||
serviceName: string,
|
||||
redisConfig: RedisConfig,
|
||||
isGlobalCache: boolean = false
|
||||
) {
|
||||
// Get service configuration
|
||||
const serviceConfig = getServiceConfig(serviceName);
|
||||
if (!serviceConfig && !isGlobalCache) {
|
||||
throw new Error(`Unknown service: ${serviceName}`);
|
||||
}
|
||||
|
||||
// Determine Redis DB and prefix
|
||||
let db: number;
|
||||
let prefix: string;
|
||||
|
||||
if (isGlobalCache) {
|
||||
// Global cache uses db:0
|
||||
db = 0;
|
||||
prefix = 'stock-bot:shared';
|
||||
} else {
|
||||
// Service cache uses service's DB
|
||||
db = serviceConfig!.db;
|
||||
prefix = serviceConfig!.cachePrefix;
|
||||
}
|
||||
|
||||
// Create underlying cache with correct DB
|
||||
const cacheConfig = {
|
||||
redisConfig: {
|
||||
...redisConfig,
|
||||
db,
|
||||
},
|
||||
keyPrefix: prefix + ':',
|
||||
};
|
||||
|
||||
this.cache = createCache(cacheConfig);
|
||||
this.prefix = prefix;
|
||||
}
|
||||
|
||||
// Implement CacheProvider interface
|
||||
async get<T = any>(key: string): Promise<T | null> {
|
||||
return this.cache.get<T>(key);
|
||||
}
|
||||
|
||||
async set<T = any>(
|
||||
key: string,
|
||||
value: T,
|
||||
options?:
|
||||
| number
|
||||
| {
|
||||
ttl?: number;
|
||||
preserveTTL?: boolean;
|
||||
onlyIfExists?: boolean;
|
||||
onlyIfNotExists?: boolean;
|
||||
getOldValue?: boolean;
|
||||
}
|
||||
): Promise<T | null> {
|
||||
return this.cache.set(key, value, options);
|
||||
}
|
||||
|
||||
async del(key: string): Promise<void> {
|
||||
return this.cache.del(key);
|
||||
}
|
||||
|
||||
async exists(key: string): Promise<boolean> {
|
||||
return this.cache.exists(key);
|
||||
}
|
||||
|
||||
async clear(): Promise<void> {
|
||||
return this.cache.clear();
|
||||
}
|
||||
|
||||
async keys(pattern: string): Promise<string[]> {
|
||||
return this.cache.keys(pattern);
|
||||
}
|
||||
|
||||
getStats(): CacheStats {
|
||||
return this.cache.getStats();
|
||||
}
|
||||
|
||||
async health(): Promise<boolean> {
|
||||
return this.cache.health();
|
||||
}
|
||||
|
||||
async waitForReady(timeout?: number): Promise<void> {
|
||||
return this.cache.waitForReady(timeout);
|
||||
}
|
||||
|
||||
isReady(): boolean {
|
||||
return this.cache.isReady();
|
||||
}
|
||||
|
||||
// Enhanced cache methods (delegate to underlying cache if available)
|
||||
async update<T = any>(key: string, value: T): Promise<T | null> {
|
||||
if (this.cache.update) {
|
||||
return this.cache.update(key, value);
|
||||
}
|
||||
// Fallback implementation
|
||||
return this.cache.set(key, value, { preserveTTL: true });
|
||||
}
|
||||
|
||||
async setIfExists<T = any>(key: string, value: T, ttl?: number): Promise<boolean> {
|
||||
if (this.cache.setIfExists) {
|
||||
return this.cache.setIfExists(key, value, ttl);
|
||||
}
|
||||
// Fallback implementation
|
||||
const result = await this.cache.set(key, value, { onlyIfExists: true, ttl });
|
||||
return result !== null;
|
||||
}
|
||||
|
||||
async setIfNotExists<T = any>(key: string, value: T, ttl?: number): Promise<boolean> {
|
||||
if (this.cache.setIfNotExists) {
|
||||
return this.cache.setIfNotExists(key, value, ttl);
|
||||
}
|
||||
// Fallback implementation
|
||||
const result = await this.cache.set(key, value, { onlyIfNotExists: true, ttl });
|
||||
return result !== null;
|
||||
}
|
||||
|
||||
async replace<T = any>(key: string, value: T, ttl?: number): Promise<T | null> {
|
||||
if (this.cache.replace) {
|
||||
return this.cache.replace(key, value, ttl);
|
||||
}
|
||||
// Fallback implementation
|
||||
return this.cache.set(key, value, ttl);
|
||||
}
|
||||
|
||||
async updateField<T = any>(key: string, updater: (current: T | null) => T, ttl?: number): Promise<T | null> {
|
||||
if (this.cache.updateField) {
|
||||
return this.cache.updateField(key, updater, ttl);
|
||||
}
|
||||
// Fallback implementation
|
||||
const current = await this.cache.get<T>(key);
|
||||
const updated = updater(current);
|
||||
return this.cache.set(key, updated, ttl);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the actual Redis key with prefix
|
||||
*/
|
||||
getKey(key: string): string {
|
||||
return `${this.prefix}:${key}`;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Factory function to create service cache
|
||||
*/
|
||||
export function createServiceCache(
|
||||
serviceName: string,
|
||||
redisConfig: RedisConfig,
|
||||
options: { global?: boolean } = {}
|
||||
): ServiceCache {
|
||||
return new ServiceCache(serviceName, redisConfig, options.global);
|
||||
}
|
||||
89
libs/services/queue/src/service-registry.ts
Normal file
89
libs/services/queue/src/service-registry.ts
Normal file
|
|
@ -0,0 +1,89 @@
|
|||
/**
|
||||
* Service Registry Configuration
|
||||
* Maps services to their Redis databases and configurations
|
||||
*/
|
||||
|
||||
export interface ServiceConfig {
|
||||
/** Redis database number for this service (used for both queues and cache) */
|
||||
db: number;
|
||||
/** Prefix for queue keys (e.g., 'bull:di') */
|
||||
queuePrefix: string;
|
||||
/** Prefix for cache keys (e.g., 'cache:di') */
|
||||
cachePrefix: string;
|
||||
/** Whether this service only produces jobs (doesn't process them) */
|
||||
producerOnly?: boolean;
|
||||
/** List of handlers this service owns (auto-discovered if not provided) */
|
||||
handlers?: string[];
|
||||
}
|
||||
|
||||
/**
|
||||
* Central registry of all services and their configurations
|
||||
* Each service gets one Redis DB for both queues and cache
|
||||
*
|
||||
* Database assignments:
|
||||
* - db:0 = Global shared cache
|
||||
* - db:1 = data-ingestion (queues + cache)
|
||||
* - db:2 = data-pipeline (queues + cache)
|
||||
* - db:3 = web-api (cache only, producer-only for queues)
|
||||
*/
|
||||
export const SERVICE_REGISTRY: Record<string, ServiceConfig> = {
|
||||
'data-ingestion': {
|
||||
db: 1,
|
||||
queuePrefix: 'bull:di',
|
||||
cachePrefix: 'cache:di',
|
||||
handlers: ['ceo', 'qm', 'webshare', 'ib', 'proxy'],
|
||||
},
|
||||
'data-pipeline': {
|
||||
db: 2,
|
||||
queuePrefix: 'bull:dp',
|
||||
cachePrefix: 'cache:dp',
|
||||
handlers: ['exchanges', 'symbols'],
|
||||
},
|
||||
'web-api': {
|
||||
db: 3,
|
||||
queuePrefix: 'bull:api', // Not used since producer-only
|
||||
cachePrefix: 'cache:api',
|
||||
producerOnly: true,
|
||||
},
|
||||
// Add more services as needed
|
||||
};
|
||||
|
||||
/**
|
||||
* Get service configuration
|
||||
*/
|
||||
export function getServiceConfig(serviceName: string): ServiceConfig | undefined {
|
||||
return SERVICE_REGISTRY[serviceName];
|
||||
}
|
||||
|
||||
/**
|
||||
* Find which service owns a handler
|
||||
*/
|
||||
export function findServiceForHandler(handlerName: string): string | undefined {
|
||||
for (const [serviceName, config] of Object.entries(SERVICE_REGISTRY)) {
|
||||
if (config.handlers?.includes(handlerName)) {
|
||||
return serviceName;
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get full queue name with service namespace
|
||||
*/
|
||||
export function getFullQueueName(serviceName: string, handlerName: string): string {
|
||||
return `${serviceName}:${handlerName}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse a full queue name into service and handler
|
||||
*/
|
||||
export function parseQueueName(fullQueueName: string): { service: string; handler: string } | null {
|
||||
const parts = fullQueueName.split(':');
|
||||
if (parts.length !== 2 || !parts[0] || !parts[1]) {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
service: parts[0],
|
||||
handler: parts[1],
|
||||
};
|
||||
}
|
||||
301
libs/services/queue/src/smart-queue-manager.ts
Normal file
301
libs/services/queue/src/smart-queue-manager.ts
Normal file
|
|
@ -0,0 +1,301 @@
|
|||
import { Queue as BullQueue, type Job } from 'bullmq';
|
||||
import IoRedis from 'ioredis';
|
||||
import { handlerRegistry } from '@stock-bot/types';
|
||||
import { getLogger, type Logger } from '@stock-bot/logger';
|
||||
import { QueueManager } from './queue-manager';
|
||||
import { Queue } from './queue';
|
||||
import type {
|
||||
SmartQueueConfig,
|
||||
QueueRoute,
|
||||
JobData,
|
||||
JobOptions,
|
||||
RedisConfig
|
||||
} from './types';
|
||||
import {
|
||||
SERVICE_REGISTRY,
|
||||
getServiceConfig,
|
||||
findServiceForHandler,
|
||||
getFullQueueName,
|
||||
parseQueueName,
|
||||
type ServiceConfig
|
||||
} from './service-registry';
|
||||
import { getRedisConnection } from './utils';
|
||||
|
||||
/**
|
||||
* Smart Queue Manager with automatic service discovery and routing
|
||||
* Handles cross-service communication seamlessly
|
||||
*/
|
||||
export class SmartQueueManager extends QueueManager {
|
||||
private serviceName: string;
|
||||
private serviceConfig: ServiceConfig;
|
||||
private queueRoutes = new Map<string, QueueRoute>();
|
||||
private connections = new Map<number, any>(); // Redis connections by DB
|
||||
private producerQueues = new Map<string, BullQueue>(); // For cross-service sending
|
||||
private _logger: Logger;
|
||||
|
||||
constructor(config: SmartQueueConfig, logger?: Logger) {
|
||||
// Get service config
|
||||
const serviceConfig = getServiceConfig(config.serviceName);
|
||||
if (!serviceConfig) {
|
||||
throw new Error(`Unknown service: ${config.serviceName}`);
|
||||
}
|
||||
|
||||
// Update Redis config to use service's DB
|
||||
const modifiedConfig = {
|
||||
...config,
|
||||
redis: {
|
||||
...config.redis,
|
||||
db: serviceConfig.db,
|
||||
},
|
||||
};
|
||||
|
||||
super(modifiedConfig, logger);
|
||||
|
||||
this.serviceName = config.serviceName;
|
||||
this.serviceConfig = serviceConfig;
|
||||
this._logger = logger || getLogger('SmartQueueManager');
|
||||
|
||||
// Auto-discover routes if enabled
|
||||
if (config.autoDiscoverHandlers !== false) {
|
||||
this.discoverQueueRoutes();
|
||||
}
|
||||
|
||||
this._logger.info('SmartQueueManager initialized', {
|
||||
service: this.serviceName,
|
||||
db: serviceConfig.db,
|
||||
handlers: serviceConfig.handlers,
|
||||
producerOnly: serviceConfig.producerOnly,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Discover all available queue routes from handler registry
|
||||
*/
|
||||
private discoverQueueRoutes(): void {
|
||||
// Discover from handler registry if available
|
||||
try {
|
||||
const handlers = handlerRegistry.getAllHandlers();
|
||||
for (const [handlerName, handlerConfig] of handlers) {
|
||||
// Find which service owns this handler
|
||||
const ownerService = findServiceForHandler(handlerName);
|
||||
if (ownerService) {
|
||||
const ownerConfig = getServiceConfig(ownerService)!;
|
||||
const fullName = getFullQueueName(ownerService, handlerName);
|
||||
|
||||
this.queueRoutes.set(handlerName, {
|
||||
fullName,
|
||||
service: ownerService,
|
||||
handler: handlerName,
|
||||
db: ownerConfig.db,
|
||||
operations: Object.keys(handlerConfig.operations || {}),
|
||||
});
|
||||
|
||||
this._logger.trace('Discovered queue route', {
|
||||
handler: handlerName,
|
||||
service: ownerService,
|
||||
db: ownerConfig.db,
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
this._logger.warn('Handler registry not available, using static configuration', { error });
|
||||
}
|
||||
|
||||
// Also add routes from static configuration
|
||||
Object.entries(SERVICE_REGISTRY).forEach(([serviceName, config]) => {
|
||||
if (config.handlers) {
|
||||
config.handlers.forEach(handlerName => {
|
||||
if (!this.queueRoutes.has(handlerName)) {
|
||||
const fullName = getFullQueueName(serviceName, handlerName);
|
||||
this.queueRoutes.set(handlerName, {
|
||||
fullName,
|
||||
service: serviceName,
|
||||
handler: handlerName,
|
||||
db: config.db,
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get or create a Redis connection for a specific DB
|
||||
*/
|
||||
private getConnection(db: number): any {
|
||||
if (!this.connections.has(db)) {
|
||||
const redisConfig: RedisConfig = {
|
||||
...this.getRedisConfig(),
|
||||
db,
|
||||
};
|
||||
const connection = getRedisConnection(redisConfig);
|
||||
this.connections.set(db, connection);
|
||||
this._logger.debug('Created Redis connection', { db });
|
||||
}
|
||||
return this.connections.get(db);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a queue for the current service (for processing)
|
||||
* Overrides parent to use namespaced queue names
|
||||
*/
|
||||
override getQueue(queueName: string, options = {}): Queue {
|
||||
// For local queues, use the service namespace
|
||||
const fullQueueName = getFullQueueName(this.serviceName, queueName);
|
||||
return super.getQueue(fullQueueName, options);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a job to any queue (local or remote)
|
||||
* This is the main method for cross-service communication
|
||||
*/
|
||||
async send(
|
||||
targetQueue: string,
|
||||
operation: string,
|
||||
payload: unknown,
|
||||
options: JobOptions = {}
|
||||
): Promise<Job> {
|
||||
// Resolve the target queue
|
||||
const route = this.resolveQueueRoute(targetQueue);
|
||||
if (!route) {
|
||||
throw new Error(`Unknown queue: ${targetQueue}`);
|
||||
}
|
||||
|
||||
// Validate operation if we have metadata
|
||||
if (route.operations && !route.operations.includes(operation)) {
|
||||
this._logger.warn('Operation not found in handler metadata', {
|
||||
queue: targetQueue,
|
||||
operation,
|
||||
available: route.operations,
|
||||
});
|
||||
}
|
||||
|
||||
// Get or create producer queue for the target
|
||||
const producerQueue = this.getProducerQueue(route);
|
||||
|
||||
// Create job data
|
||||
const jobData: JobData = {
|
||||
handler: route.handler,
|
||||
operation,
|
||||
payload,
|
||||
};
|
||||
|
||||
// Send the job
|
||||
const job = await producerQueue.add(operation, jobData, options);
|
||||
|
||||
this._logger.debug('Job sent to queue', {
|
||||
from: this.serviceName,
|
||||
to: route.service,
|
||||
queue: route.handler,
|
||||
operation,
|
||||
jobId: job.id,
|
||||
});
|
||||
|
||||
return job;
|
||||
}
|
||||
|
||||
/**
|
||||
* Alias for send() with more explicit name
|
||||
*/
|
||||
async sendTo(
|
||||
targetService: string,
|
||||
handler: string,
|
||||
operation: string,
|
||||
payload: unknown,
|
||||
options: JobOptions = {}
|
||||
): Promise<Job> {
|
||||
const fullQueueName = `${targetService}:${handler}`;
|
||||
return this.send(fullQueueName, operation, payload, options);
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve a queue name to a route
|
||||
*/
|
||||
private resolveQueueRoute(queueName: string): QueueRoute | null {
|
||||
// Check if it's a full name (service:handler)
|
||||
const parsed = parseQueueName(queueName);
|
||||
if (parsed) {
|
||||
const config = getServiceConfig(parsed.service);
|
||||
if (config) {
|
||||
return {
|
||||
fullName: queueName,
|
||||
service: parsed.service,
|
||||
handler: parsed.handler,
|
||||
db: config.db,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// Check if it's just a handler name
|
||||
const route = this.queueRoutes.get(queueName);
|
||||
if (route) {
|
||||
return route;
|
||||
}
|
||||
|
||||
// Try to find in static config
|
||||
const ownerService = findServiceForHandler(queueName);
|
||||
if (ownerService) {
|
||||
const config = getServiceConfig(ownerService)!;
|
||||
return {
|
||||
fullName: getFullQueueName(ownerService, queueName),
|
||||
service: ownerService,
|
||||
handler: queueName,
|
||||
db: config.db,
|
||||
};
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get or create a producer queue for cross-service communication
|
||||
*/
|
||||
private getProducerQueue(route: QueueRoute): BullQueue {
|
||||
if (!this.producerQueues.has(route.fullName)) {
|
||||
const connection = this.getConnection(route.db);
|
||||
const queue = new BullQueue(route.fullName, {
|
||||
connection,
|
||||
defaultJobOptions: this.getConfig().defaultQueueOptions?.defaultJobOptions || {},
|
||||
});
|
||||
this.producerQueues.set(route.fullName, queue);
|
||||
}
|
||||
return this.producerQueues.get(route.fullName)!;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get statistics for all queues across all services
|
||||
*/
|
||||
async getAllStats(): Promise<Record<string, any>> {
|
||||
const stats: Record<string, any> = {};
|
||||
|
||||
// Get stats for local queues
|
||||
stats[this.serviceName] = await this.getGlobalStats();
|
||||
|
||||
// Get stats for other services if we have access
|
||||
// This would require additional implementation
|
||||
|
||||
return stats;
|
||||
}
|
||||
|
||||
/**
|
||||
* Graceful shutdown
|
||||
*/
|
||||
override async shutdown(): Promise<void> {
|
||||
// Close producer queues
|
||||
for (const [name, queue] of this.producerQueues) {
|
||||
await queue.close();
|
||||
this._logger.debug('Closed producer queue', { queue: name });
|
||||
}
|
||||
|
||||
// Close additional connections
|
||||
for (const [db, connection] of this.connections) {
|
||||
if (db !== this.serviceConfig.db) { // Don't close our main connection
|
||||
connection.disconnect();
|
||||
this._logger.debug('Closed Redis connection', { db });
|
||||
}
|
||||
}
|
||||
|
||||
// Call parent shutdown
|
||||
await super.shutdown();
|
||||
}
|
||||
}
|
||||
|
|
@ -163,3 +163,26 @@ export interface ScheduleConfig {
|
|||
data?: unknown;
|
||||
options?: JobOptions;
|
||||
}
|
||||
|
||||
// Smart Queue Types
|
||||
export interface SmartQueueConfig extends QueueManagerConfig {
|
||||
/** Name of the current service */
|
||||
serviceName: string;
|
||||
/** Whether to auto-discover handlers from registry */
|
||||
autoDiscoverHandlers?: boolean;
|
||||
/** Custom service registry (defaults to built-in) */
|
||||
serviceRegistry?: Record<string, any>;
|
||||
}
|
||||
|
||||
export interface QueueRoute {
|
||||
/** Full queue name (e.g., 'data-ingestion:ceo') */
|
||||
fullName: string;
|
||||
/** Service that owns this queue */
|
||||
service: string;
|
||||
/** Handler name */
|
||||
handler: string;
|
||||
/** Redis DB number */
|
||||
db: number;
|
||||
/** Available operations */
|
||||
operations?: string[];
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue