simplified providers a bit

This commit is contained in:
Boki 2025-06-10 23:08:46 -04:00
parent 35b0eb3783
commit 4aa2942e43
9 changed files with 48 additions and 209 deletions

View file

@ -1,116 +0,0 @@
/**
* Example usage of the new functional batch processing approach
*/
import { processItems, processBatchJob } from '../utils/batch-helpers';
import { queueManager } from '../services/queue.service';
// Example 1: Process a list of symbols for live data
export async function exampleSymbolProcessing() {
const symbols = ['AAPL', 'GOOGL', 'MSFT', 'TSLA', 'AMZN'];
const result = await processItems(
symbols,
(symbol, index) => ({
symbol,
index,
source: 'batch-processing'
}),
queueManager,
{
totalDelayMs: 60000, // 1 minute total
useBatching: false, // Process directly
priority: 1,
service: 'market-data',
provider: 'yahoo',
operation: 'live-data'
}
);
console.log('Symbol processing result:', result);
// Output: { jobsCreated: 5, mode: 'direct', totalItems: 5, duration: 1234 }
}
// Example 2: Process proxies in batches
export async function exampleProxyProcessing() {
const proxies = [
{ host: '1.1.1.1', port: 8080 },
{ host: '2.2.2.2', port: 3128 },
// ... more proxies
];
const result = await processItems(
proxies,
(proxy, index) => ({
proxy,
index,
source: 'batch-processing'
}),
queueManager,
{
totalDelayMs: 3600000, // 1 hour total
useBatching: true, // Use batch mode
batchSize: 100, // 100 proxies per batch
priority: 2,
service: 'proxy',
provider: 'proxy-service',
operation: 'check-proxy'
}
);
console.log('Proxy processing result:', result);
// Output: { jobsCreated: 10, mode: 'batch', totalItems: 1000, batchesCreated: 10, duration: 2345 }
}
// Example 3: Custom processing with generic function
export async function exampleCustomProcessing() {
const customData = [
{ id: 1, name: 'Item 1' },
{ id: 2, name: 'Item 2' },
{ id: 3, name: 'Item 3' }
];
const result = await processItems(
customData,
(item, index) => ({
// Transform each item for processing
itemId: item.id,
itemName: item.name,
processIndex: index,
timestamp: new Date().toISOString()
}),
queueManager,
{
totalDelayMs: 30000, // 30 seconds total
useBatching: false, // Direct processing
priority: 1,
retries: 3
}
);
console.log('Custom processing result:', result);
}
// Example 4: Batch job processor (used by workers)
export async function exampleBatchJobProcessor(jobData: any) {
// This would be called by a BullMQ worker when processing batch jobs
const result = await processBatchJob(jobData, queueManager);
console.log('Batch job processed:', result);
// Output: { batchIndex: 0, itemsProcessed: 100, jobsCreated: 100 }
return result;
}
// Example: Simple functional approach using generic processItems
/*
await processItems(symbols, (symbol, index) => ({ symbol, index }), queueManager, {
totalDelayMs: 3600000,
useBatching: true,
batchSize: 200,
priority: 2,
service: 'data-service',
provider: 'yahoo',
operation: 'live-data'
});
*/

View file

@ -15,8 +15,7 @@ const getEvery24HourCron = (): string => {
export const proxyProvider: ProviderConfig = {
name: 'proxy-provider',
service: 'data-service',
operations: { 'fetch-and-check': async (payload: { sources?: string[] }) => {
operations: {'fetch-and-check': async (payload: { sources?: string[] }) => {
const { proxyService } = await import('./proxy.tasks');
const { queueManager } = await import('../services/queue.service');
const { processItems } = await import('../utils/batch-helpers');
@ -35,13 +34,11 @@ export const proxyProvider: ProviderConfig = {
index,
source: 'batch-processing'
}),
queueManager,
{
queueManager, {
totalDelayMs: parseInt(process.env.PROXY_VALIDATION_HOURS || '4') * 60 * 60 * 1000,
batchSize: parseInt(process.env.PROXY_BATCH_SIZE || '200'),
useBatching: process.env.PROXY_DIRECT_MODE !== 'true',
priority: 2,
service: 'data-service',
provider: 'proxy-provider',
operation: 'check-proxy'
}

View file

@ -152,7 +152,6 @@ export async function queueProxyFetch(): Promise<string> {
const { queueManager } = await import('../services/queue.service');
const job = await queueManager.addJob({
type: 'proxy-fetch',
service: 'proxy',
provider: 'proxy-service',
operation: 'fetch-and-check',
payload: {},
@ -170,7 +169,6 @@ export async function queueProxyCheck(proxies: ProxyInfo[]): Promise<string> {
const { queueManager } = await import('../services/queue.service');
const job = await queueManager.addJob({
type: 'proxy-check',
service: 'proxy',
provider: 'proxy-service',
operation: 'check-specific',
payload: { proxies },

View file

@ -5,7 +5,6 @@ const logger = getLogger('quotemedia-provider');
export const quotemediaProvider: ProviderConfig = {
name: 'quotemedia',
service: 'market-data',
operations: { 'live-data': async (payload: { symbol: string; fields?: string[] }) => {
logger.info('Fetching live data from QuoteMedia', { symbol: payload.symbol });

View file

@ -5,7 +5,6 @@ const logger = getLogger('yahoo-provider');
export const yahooProvider: ProviderConfig = {
name: 'yahoo-finance',
service: 'market-data',
operations: {
'live-data': async (payload: { symbol: string; modules?: string[] }) => {

View file

@ -14,8 +14,7 @@ proxyRoutes.post('/api/proxy/fetch', async (c) => {
try {
const job = await queueManager.addJob({
type: 'proxy-fetch',
service: 'proxy',
provider: 'proxy-service',
provider: 'proxy-provider',
operation: 'fetch-and-check',
payload: {},
priority: 5
@ -37,8 +36,7 @@ proxyRoutes.post('/api/proxy/check', async (c) => {
const { proxies } = await c.req.json();
const job = await queueManager.addJob({
type: 'proxy-check',
service: 'proxy',
provider: 'proxy-service',
provider: 'proxy-provider',
operation: 'check-specific',
payload: { proxies },
priority: 8
@ -60,8 +58,7 @@ proxyRoutes.get('/api/proxy/stats', async (c) => {
try {
const job = await queueManager.addJob({
type: 'proxy-stats',
service: 'proxy',
provider: 'proxy-service',
provider: 'proxy-provider',
operation: 'get-stats',
payload: {},
priority: 3

View file

@ -4,6 +4,15 @@ export interface JobHandler {
(payload: any): Promise<any>;
}
export interface JobData {
type?: string;
provider: string;
operation: string;
payload: any;
priority?: number;
immediately?: boolean;
}
export interface ScheduledJob {
type: string;
operation: string;
@ -16,7 +25,6 @@ export interface ScheduledJob {
export interface ProviderConfig {
name: string;
service: string;
operations: Record<string, JobHandler>;
scheduledJobs?: ScheduledJob[];
}
@ -27,51 +35,47 @@ export class ProviderRegistry {
/**
* Register a provider with its operations
*/ registerProvider(config: ProviderConfig): void {
const key = `${config.service}:${config.name}`;
this.providers.set(key, config);
this.logger.info(`Registered provider: ${key}`, {
*/
registerProvider(config: ProviderConfig): void {
// const key = `${config.service}:${config.name}`;
this.providers.set(config.name, config);
this.logger.info(`Registered provider: ${config.name}`, {
operations: Object.keys(config.operations),
scheduledJobs: config.scheduledJobs?.length || 0
});
}
/**
* Get a job handler for a specific provider and operation
*/
getHandler(service: string, provider: string, operation: string): JobHandler | null {
const key = `${service}:${provider}`;
const providerConfig = this.providers.get(key);
getHandler(provider: string, operation: string): JobHandler | null {
const providerConfig = this.providers.get(provider);
if (!providerConfig) {
this.logger.warn(`Provider not found: ${key}`);
this.logger.warn(`Provider not found: ${provider}`);
return null;
}
const handler = providerConfig.operations[operation];
if (!handler) {
this.logger.warn(`Operation not found: ${operation} in provider ${key}`);
this.logger.warn(`Operation not found: ${operation} in provider ${provider}`);
return null;
}
return handler;
}
/**
* Get all registered providers
* Get all scheduled jobs from all providers
*/
getAllScheduledJobs(): Array<{
service: string;
provider: string;
job: ScheduledJob;
}> {
const allJobs: Array<{ service: string; provider: string; job: ScheduledJob }> = [];
const allJobs: Array<{ provider: string; job: ScheduledJob }> = [];
for (const [key, config] of this.providers) {
if (config.scheduledJobs) {
for (const job of config.scheduledJobs) {
allJobs.push({
service: config.service,
provider: config.name,
job
});
@ -88,21 +92,12 @@ export class ProviderRegistry {
config
}));
}
/**
* Check if a provider exists
*/
hasProvider(service: string, provider: string): boolean {
return this.providers.has(`${service}:${provider}`);
hasProvider(provider: string): boolean {
return this.providers.has(provider);
}
/**
* Get providers by service type
*/
getProvidersByService(service: string): ProviderConfig[] {
return Array.from(this.providers.values()).filter(provider => provider.service === service);
}
/**
* Clear all providers (useful for testing)
*/

View file

@ -1,16 +1,6 @@
import { Queue, Worker, QueueEvents } from 'bullmq';
import { getLogger } from '@stock-bot/logger';
import { providerRegistry } from './provider-registry.service';
export interface JobData {
type: string;
service: string;
provider: string;
operation: string;
payload: any;
priority?: number;
immediately?: boolean;
}
import { providerRegistry, JobData } from './provider-registry.service';
export class QueueService {
private logger = getLogger('queue-service');
@ -135,13 +125,11 @@ export class QueueService {
this.logger.error('Failed to register providers', { error });
throw error;
}
}
private async processJob(job: any) {
const { service, provider, operation, payload }: JobData = job.data;
} private async processJob(job: any) {
const { provider, operation, payload }: JobData = job.data;
this.logger.info('Processing job', {
id: job.id,
service,
provider,
operation,
payloadKeys: Object.keys(payload || {})
@ -155,10 +143,10 @@ export class QueueService {
}
// Get handler from registry
const handler = providerRegistry.getHandler(service, provider, operation);
const handler = providerRegistry.getHandler(provider, operation);
if (!handler) {
throw new Error(`No handler found for ${service}:${provider}:${operation}`);
throw new Error(`No handler found for ${provider}:${operation}`);
}
// Execute the handler
@ -166,7 +154,6 @@ export class QueueService {
this.logger.info('Job completed successfully', {
id: job.id,
service,
provider,
operation
});
@ -177,7 +164,6 @@ export class QueueService {
const errorMessage = error instanceof Error ? error.message : String(error);
this.logger.error('Job failed', {
id: job.id,
service,
provider,
operation,
error: errorMessage
@ -220,12 +206,10 @@ export class QueueService {
let successCount = 0;
let failureCount = 0;
let updatedCount = 0;
let newCount = 0;
// Process each scheduled job
for (const { service, provider, job } of allScheduledJobs) {
let newCount = 0; // Process each scheduled job
for (const { provider, job } of allScheduledJobs) {
try {
const jobKey = `${service}-${provider}-${job.operation}`;
const jobKey = `${provider}-${job.operation}`;
// Check if this job already exists
const existingJob = existingJobs.find(existing =>
@ -257,7 +241,6 @@ export class QueueService {
await this.addRecurringJob({
type: job.type,
service: service,
provider: provider,
operation: job.operation,
payload: job.payload,
@ -267,7 +250,6 @@ export class QueueService {
this.logger.info('Scheduled job registered', {
type: job.type,
service,
provider,
operation: job.operation,
cronPattern: job.cronPattern,
@ -280,7 +262,6 @@ export class QueueService {
} catch (error) {
this.logger.error('Failed to register scheduled job', {
type: job.type,
service,
provider,
error: error instanceof Error ? error.message : String(error)
});
@ -300,12 +281,12 @@ export class QueueService {
this.logger.error('Failed to setup scheduled tasks', error);
}
}
async addJob(jobData: JobData, options?: any) {
if (!this.isInitialized) {
throw new Error('Queue service not initialized. Call initialize() first.');
}
return this.queue.add(jobData.type, jobData, {
const jobType = jobData.type || `${jobData.provider}-${jobData.operation}`;
return this.queue.add(jobType, jobData, {
priority: jobData.priority || 0,
removeOnComplete: 10,
removeOnFail: 5,
@ -318,9 +299,8 @@ export class QueueService {
throw new Error('Queue service not initialized. Call initialize() first.');
}
try {
// Create a unique job key for this specific job
const jobKey = `${jobData.service}-${jobData.provider}-${jobData.operation}`;
try { // Create a unique job key for this specific job
const jobKey = `${jobData.provider}-${jobData.operation}`;
// Get all existing repeatable jobs
const existingJobs = await this.queue.getRepeatableJobs();
@ -336,19 +316,18 @@ export class QueueService {
jobKey,
existingPattern: existingJob.pattern,
newPattern: cronPattern
});
// Remove the existing job
await this.queue.removeRepeatableByKey(existingJob.key);
}); // Remove the existing job
if (existingJob.key) {
await this.queue.removeRepeatableByKey(existingJob.key);
}
// Small delay to ensure cleanup is complete
await new Promise(resolve => setTimeout(resolve, 100));
} else {
this.logger.info('Creating new recurring job', { jobKey, cronPattern });
}
// Add the new/updated recurring job
const job = await this.queue.add(jobData.type, jobData, {
} // Add the new/updated recurring job
const jobType = jobData.type || `${jobData.provider}-${jobData.operation}`;
const job = await this.queue.add(jobType, jobData, {
repeat: {
pattern: cronPattern,
tz: 'UTC',
@ -435,21 +414,17 @@ export class QueueService {
}
return this.workers.length;
}
getRegisteredProviders() {
return providerRegistry.getProviders().map(({ key, config }) => ({
key,
name: config.name,
service: config.service,
operations: Object.keys(config.operations),
scheduledJobs: config.scheduledJobs?.length || 0
}));
}
getScheduledJobsInfo() {
return providerRegistry.getAllScheduledJobs().map(({ service, provider, job }) => ({
id: `${service}-${provider}-${job.type}`,
service,
return providerRegistry.getAllScheduledJobs().map(({ provider, job }) => ({
id: `${provider}-${job.type}`,
provider,
type: job.type,
operation: job.operation,

View file

@ -15,7 +15,6 @@ export interface ProcessOptions {
removeOnComplete?: number;
removeOnFail?: number;
// Job routing information
service?: string;
provider?: string;
operation?: string;
}
@ -121,7 +120,6 @@ async function processDirect<T>(
name: 'process-item',
data: {
type: 'process-item',
service: options.service || 'data-service',
provider: options.provider || 'generic',
operation: options.operation || 'process-item',
payload: processor(item, index),
@ -174,7 +172,6 @@ async function processBatched<T>(
name: 'process-batch',
data: {
type: 'process-batch',
service: options.service || 'generic',
provider: options.provider || 'generic',
operation: 'process-batch-items',
payload: {
@ -234,7 +231,6 @@ export async function processBatchJob(jobData: any, queue: QueueService): Promis
name: 'process-item',
data: {
type: 'process-item',
service: options.service || 'generic',
provider: options.provider || 'generic',
operation: options.operation || 'generic',
payload: processor(item, index),
@ -297,7 +293,6 @@ async function storePayload<T>(
priority: options.priority || 1,
retries: options.retries || 3,
// Store routing information for later use
service: options.service || 'generic',
provider: options.provider || 'generic',
operation: options.operation || 'generic'
},