trying to fix build

This commit is contained in:
Boki 2025-06-18 21:17:32 -04:00
parent 269364fbc8
commit 2db7c0dc36
3 changed files with 119 additions and 30 deletions

View file

@ -1,7 +1,8 @@
import { Queue as BullQueue, Worker, QueueEvents, type Job } from 'bullmq'; import { Queue as BullQueue, Worker, QueueEvents, type Job } from 'bullmq';
import { getLogger } from '@stock-bot/logger'; import { getLogger } from '@stock-bot/logger';
import { processItems } from './batch-processor'; import { processItems, processBatchJob } from './batch-processor';
import type { JobData, ProcessOptions, BatchResult } from './types'; import { providerRegistry } from './provider-registry';
import type { JobData, ProcessOptions, BatchResult, BatchJobData } from './types';
const logger = getLogger('queue-instance'); const logger = getLogger('queue-instance');
@ -40,6 +41,9 @@ export class Queue {
// Initialize queue events // Initialize queue events
this.queueEvents = new QueueEvents(`{${queueName}}`, { connection }); this.queueEvents = new QueueEvents(`{${queueName}}`, { connection });
// Start a worker for this queue
this.startWorker();
} }
/** /**
@ -177,6 +181,94 @@ export class Queue {
} }
} }
/**
* Start a worker for this queue
*/
private startWorker(): void {
const connection = {
host: this.redisConfig.host,
port: this.redisConfig.port,
password: this.redisConfig.password,
db: this.redisConfig.db,
};
const worker = new Worker(`{${this.queueName}}`, this.processJob.bind(this), {
connection,
concurrency: 20,
});
worker.on('completed', job => {
logger.debug('Job completed', {
id: job.id,
name: job.name,
queue: this.queueName,
});
});
worker.on('failed', (job, err) => {
logger.error('Job failed', {
id: job?.id,
name: job?.name,
queue: this.queueName,
error: err.message,
});
});
this.workers.push(worker);
logger.info(`Started worker for queue: ${this.queueName}`);
}
/**
* Process a job
*/
private async processJob(job: Job) {
const { provider, operation, payload }: JobData = job.data;
logger.info('Processing job', {
id: job.id,
provider,
operation,
queue: this.queueName,
payloadKeys: Object.keys(payload || {}),
});
try {
let result;
if (operation === 'process-batch-items') {
// Special handling for batch processing
result = await processBatchJob(payload as BatchJobData, this);
} else {
// Regular handler lookup
const handler = providerRegistry.getHandler(provider, operation);
if (!handler) {
throw new Error(`No handler found for ${provider}:${operation}`);
}
result = await handler(payload);
}
logger.info('Job completed successfully', {
id: job.id,
provider,
operation,
queue: this.queueName,
});
return result;
} catch (error) {
logger.error('Job processing failed', {
id: job.id,
provider,
operation,
queue: this.queueName,
error: error instanceof Error ? error.message : String(error),
});
throw error;
}
}
/** /**
* Get the BullMQ queue instance (for advanced operations) * Get the BullMQ queue instance (for advanced operations)
*/ */

View file

@ -96,9 +96,7 @@ export class QueueManager {
// Step 4: Setup event listeners // Step 4: Setup event listeners
this.setupEventListeners(); this.setupEventListeners();
// Step 5: Initialize batch cache // Step 5: Batch cache will be initialized by individual Queue instances
const { initializeBatchCache } = await import('./batch-processor');
await initializeBatchCache(this);
// Step 6: Set up scheduled jobs // Step 6: Set up scheduled jobs
if (this.enableScheduledJobs) { if (this.enableScheduledJobs) {
@ -373,10 +371,6 @@ export class QueueManager {
try { try {
let result; let result;
if (operation === 'process-batch-items') {
// Special handling for batch processing - requires queue manager instance
result = await processBatchJob(payload, this);
} else {
// Regular handler lookup // Regular handler lookup
const handler = providerRegistry.getHandler(provider, operation); const handler = providerRegistry.getHandler(provider, operation);
@ -385,7 +379,6 @@ export class QueueManager {
} }
result = await handler(payload); result = await handler(payload);
}
logger.info('Job completed successfully', { logger.info('Job completed successfully', {
id: job.id, id: job.id,

View file

@ -31,26 +31,28 @@ trap cleanup EXIT
# Build order is important due to dependencies # Build order is important due to dependencies
libs=( libs=(
# Core Libraries
"types" # Base types - no dependencies "types" # Base types - no dependencies
"config" # Configuration - depends on types "config" # Configuration - depends on types
"config" # Configuration - depends on types
"logger" # Logging utilities - depends on types "logger" # Logging utilities - depends on types
"utils" # Utilities - depends on types and config "utils" # Utilities - depends on types and config
# Database clients
"postgres-client" # PostgreSQL client - depends on types, config, logger
# "mongodb-client" # MongoDB client - depends on types, config, logger (temporarily disabled - needs zod->yup conversion)
"questdb-client" # QuestDB client - depends on types, config, logger
# Service libraries # # Database clients
"cache" # Cache - depends on types and logger # "postgres-client" # PostgreSQL client - depends on types, config, logger
"http" # HTTP client - depends on types, config, logger # "mongodb-client" # MongoDB client - depends on types, config, logger
"event-bus" # Event bus - depends on types, logger # "questdb-client" # QuestDB client - depends on types, config, logger
"shutdown" # Shutdown - depends on types, logger
# Engine libraries # # Service libraries
"data-frame" # Data frame - depends on types, utils # "cache" # Cache - depends on types and logger
"vector-engine" # Vector engine - depends on types, utils, data-frame # "http" # HTTP client - depends on types, config, logger
"strategy-engine" # Strategy engine - depends on types, utils, event-bus # "event-bus" # Event bus - depends on types, logger
# # "queue" # Queue - depends on types, logger, cache
# "shutdown" # Shutdown - depends on types, logger
# # Engine libraries
# "data-frame" # Data frame - depends on types, utils
# "vector-engine" # Vector engine - depends on types, utils, data-frame
# "strategy-engine" # Strategy engine - depends on types, utils, event-bus
) )
# Build each library in order # Build each library in order
@ -59,6 +61,8 @@ for lib in "${libs[@]}"; do
echo -e "${GREEN}Building $lib...${NC}" echo -e "${GREEN}Building $lib...${NC}"
cd "$lib_path" cd "$lib_path"
# print the current working directory
rm -rf dist tsconfig.tsbuildinfo
bun run build bun run build
if [ $? -ne 0 ]; then if [ $? -ne 0 ]; then