added logging

This commit is contained in:
Boki 2025-06-23 14:36:47 -04:00
parent d76f0ff5ff
commit 5c87f068d6
10 changed files with 38 additions and 16 deletions

View file

@ -86,22 +86,30 @@ export function findServiceForHandler(handlerName: string): string | undefined {
}
/**
* Get full queue name with service namespace
* Get full queue name - just the handler name since each service has its own Redis DB
*/
export function getFullQueueName(serviceName: string, handlerName: string): string {
return `${serviceName}_${handlerName}`;
// Just return the handler name since DB isolation provides namespace separation
return handlerName;
}
/**
* Parse a full queue name into service and handler
* Since queue names are just handler names now, we need to find the service from the handler
*/
export function parseQueueName(fullQueueName: string): { service: string; handler: string } | null {
const parts = fullQueueName.split('_');
if (parts.length !== 2 || !parts[0] || !parts[1]) {
// Queue name is just the handler name now
const handlerName = fullQueueName;
// Find which service owns this handler
const serviceName = findServiceForHandler(handlerName);
if (!serviceName) {
return null;
}
return {
service: parts[0],
handler: parts[1],
service: serviceName,
handler: handlerName,
};
}

View file

@ -212,7 +212,7 @@ export class SmartQueueManager extends QueueManager {
* Resolve a queue name to a route
*/
private resolveQueueRoute(queueName: string): QueueRoute | null {
// Check if it's a full name (service_handler)
// Check if it's a handler name (which is now the full queue name)
const parsed = parseQueueName(queueName);
if (parsed) {
const config = getServiceConfig(parsed.service);
@ -253,7 +253,8 @@ export class SmartQueueManager extends QueueManager {
private getProducerQueue(route: QueueRoute): BullQueue {
if (!this.producerQueues.has(route.fullName)) {
const connection = this.getConnection(route.db);
const queue = new BullQueue(route.fullName, {
// Match the queue name format used by workers: {queueName}
const queue = new BullQueue(`{${route.fullName}}`, {
connection,
defaultJobOptions: this.getConfig().defaultQueueOptions?.defaultJobOptions || {},
});

View file

@ -175,7 +175,7 @@ export interface SmartQueueConfig extends QueueManagerConfig {
}
export interface QueueRoute {
/** Full queue name (e.g., 'data-ingestion_ceo') */
/** Full queue name (now just the handler name, e.g., 'ceo') */
fullName: string;
/** Service that owns this queue */
service: string;