diff --git a/apps/stock/data-ingestion/src/handlers/ceo/actions/get-channels.action.ts b/apps/stock/data-ingestion/src/handlers/ceo/actions/get-channels.action.ts new file mode 100644 index 0000000..aff0734 --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/ceo/actions/get-channels.action.ts @@ -0,0 +1,84 @@ +import { getRandomUserAgent } from '@stock-bot/utils'; +import type { CeoHandler } from '../ceo.handler'; + +export async function getChannels( + this: CeoHandler, + payload: number | undefined +): Promise { + const proxy = this.proxy?.getProxy(); + if (!proxy) { + this.logger.warn('No proxy available for CEO channels update'); + return; + } + let page; + if (payload === undefined) { + page = 1; + } else { + page = payload; + } + + try { + this.logger.info(`Fetching CEO channels for page ${page} with proxy ${proxy}`); + const response = await this.http.get( + 'https://api.ceo.ca/api/home?exchange=all&sort_by=symbol§or=All&tab=companies&page=' + page, + { + proxy: proxy, + headers: { + 'User-Agent': getRandomUserAgent(), + }, + } + ); + if (!response.ok) { + this.logger.debug(`Response status: ${response.status}`); + throw new Error(`Failed to fetch CEO channels: ${response.statusText}`); + } + + const results = await response.json(); + const channels = results.channel_categories[0].channels; + const totalChannels = results.channel_categories[0].total_channels; + const totalPages = Math.ceil(totalChannels / channels.length); + const exchanges: { exchange: string; countryCode: string }[] = []; + const symbols = channels.map((channel: any) => { + // check if exchange is in the exchanges array object + if (!exchanges.find((e: any) => e.exchange === channel.exchange)) { + exchanges.push({ + exchange: channel.exchange, + countryCode: 'CA', + }); + } + const details = channel.company_details || {}; + return { + symbol: channel.symbol, + exchange: channel.exchange, + name: channel.title, + type: channel.type, + ceoId: channel.channel, + marketCap: details.market_cap, + volumeRatio: details.volume_ratio, + avgVolume: details.avg_volume, + stockType: details.stock_type, + issueType: details.issue_type, + sharesOutstanding: details.shares_outstanding, + float: details.float, + lastSeen: new Date(), + active: true, + }; + }); + + await this.mongodb.batchUpsert('ceoSymbols', symbols, ['symbol', 'exchange']); + await this.mongodb.batchUpsert('ceoExchanges', exchanges, ['exchange']); + + if (page === 1) { + for (let i = 2; i <= totalPages; i++) { + this.logger.info(`Scheduling page ${i} of ${totalPages} for CEO channels`); + await this.scheduleOperation('get-channels', i); + } + } + + this.logger.info(`Fetched CEO channels for page ${page}/${totalPages}`); + return { page, totalPages }; + }catch (error) { + this.logger.error(`Error fetching CEO channels for page ${page} with proxy ${proxy}:`, error); + throw new Error(`Failed to fetch CEO channels: ${error.message}`); + } +} diff --git a/apps/stock/data-ingestion/src/handlers/ceo/actions/process-individual-symbol.action.ts b/apps/stock/data-ingestion/src/handlers/ceo/actions/get-posts.action.ts similarity index 77% rename from apps/stock/data-ingestion/src/handlers/ceo/actions/process-individual-symbol.action.ts rename to apps/stock/data-ingestion/src/handlers/ceo/actions/get-posts.action.ts index 5dc3f4f..b8c82c9 100644 --- a/apps/stock/data-ingestion/src/handlers/ceo/actions/process-individual-symbol.action.ts +++ b/apps/stock/data-ingestion/src/handlers/ceo/actions/get-posts.action.ts @@ -1,12 +1,12 @@ import { getRandomUserAgent } from '@stock-bot/utils'; import type { CeoHandler } from '../ceo.handler'; -export async function processIndividualSymbol( +export async function getPosts( this: CeoHandler, payload: any, _context: any ): Promise { - const { ceoId, symbol, timestamp } = payload; + const { ceoId, symbol, timestamp, untilTimestamp } = payload; const proxy = this.proxy?.getProxy(); if (!proxy) { this.logger.warn('No proxy available for processing individual CEO symbol'); @@ -31,6 +31,7 @@ export async function processIndividualSymbol( ); if (!response.ok) { + this.logger.debug(`Response status: ${response.status}`); throw new Error(`Failed to fetch details for ceoId ${ceoId}: ${response.statusText}`); } @@ -88,14 +89,28 @@ export async function processIndividualSymbol( ); this.logger.info(`Fetched ${spielCount} spiels for ceoId ${ceoId}`); - await this.scheduleOperation( - 'process-individual-symbol', - { - ceoId: ceoId, - timestamp: latestSpielTime, - }, - { priority: 0 } - ); + // If untilTimestamp is not provider keep going to the end + // Otherwise keep going until the lastSpiel is before the untilTimestamp + if( untilTimestamp === undefined || (untilTimestamp && latestSpielTime <= untilTimestamp)) { + await this.scheduleOperation( + 'get-posts', + { + ceoId: ceoId, + timestamp: latestSpielTime, + }, + { priority: 0 } + ); + } + + // If timestamp is not provided, run the short positions update + if (!timestamp) { + await this.scheduleOperation( + 'get-shorts', + { + symbol: symbol, + }, + ); + } this.logger.info( `Successfully processed channel ${ceoId} and added channel ${ceoId} at timestamp ${latestSpielTime}` @@ -108,6 +123,6 @@ export async function processIndividualSymbol( ceoId, timestamp, }); - throw error; + throw new Error(`Failed to process individual symbol ${symbol}`, error.message); } } diff --git a/apps/stock/data-ingestion/src/handlers/ceo/actions/process-individual-short.action.ts b/apps/stock/data-ingestion/src/handlers/ceo/actions/get-shorts.action.ts similarity index 96% rename from apps/stock/data-ingestion/src/handlers/ceo/actions/process-individual-short.action.ts rename to apps/stock/data-ingestion/src/handlers/ceo/actions/get-shorts.action.ts index 8e868e1..b4a603e 100644 --- a/apps/stock/data-ingestion/src/handlers/ceo/actions/process-individual-short.action.ts +++ b/apps/stock/data-ingestion/src/handlers/ceo/actions/get-shorts.action.ts @@ -1,7 +1,7 @@ import { getRandomUserAgent } from '@stock-bot/utils'; import type { CeoHandler } from '../ceo.handler'; -export async function processIndividualSymbol( +export async function getShorts( this: CeoHandler, payload: any, _context: any diff --git a/apps/stock/data-ingestion/src/handlers/ceo/actions/index.ts b/apps/stock/data-ingestion/src/handlers/ceo/actions/index.ts index 247112b..8f8e926 100644 --- a/apps/stock/data-ingestion/src/handlers/ceo/actions/index.ts +++ b/apps/stock/data-ingestion/src/handlers/ceo/actions/index.ts @@ -1,3 +1,5 @@ -export { updateCeoChannels } from './update-ceo-channels.action'; +export { getChannels } from './get-channels.action'; +export { getPosts } from './get-posts.action'; +export { getShorts } from './get-shorts.action'; export { updateUniqueSymbols } from './update-unique-symbols.action'; -export { processIndividualSymbol } from './process-individual-symbol.action'; + diff --git a/apps/stock/data-ingestion/src/handlers/ceo/actions/update-ceo-channels.action.ts b/apps/stock/data-ingestion/src/handlers/ceo/actions/update-ceo-channels.action.ts deleted file mode 100644 index d0be435..0000000 --- a/apps/stock/data-ingestion/src/handlers/ceo/actions/update-ceo-channels.action.ts +++ /dev/null @@ -1,72 +0,0 @@ -import { getRandomUserAgent } from '@stock-bot/utils'; -import type { CeoHandler } from '../ceo.handler'; - -export async function updateCeoChannels( - this: CeoHandler, - payload: number | undefined -): Promise { - const proxy = this.proxy?.getProxy(); - if (!proxy) { - this.logger.warn('No proxy available for CEO channels update'); - return; - } - let page; - if (payload === undefined) { - page = 1; - } else { - page = payload; - } - - this.logger.info(`Fetching CEO channels for page ${page} with proxy ${proxy}`); - const response = await this.http.get( - 'https://api.ceo.ca/api/home?exchange=all&sort_by=symbol§or=All&tab=companies&page=' + page, - { - proxy: proxy, - headers: { - 'User-Agent': getRandomUserAgent(), - }, - } - ); - const results = await response.json(); - const channels = results.channel_categories[0].channels; - const totalChannels = results.channel_categories[0].total_channels; - const totalPages = Math.ceil(totalChannels / channels.length); - const exchanges: { exchange: string; countryCode: string }[] = []; - const symbols = channels.map((channel: any) => { - // check if exchange is in the exchanges array object - if (!exchanges.find((e: any) => e.exchange === channel.exchange)) { - exchanges.push({ - exchange: channel.exchange, - countryCode: 'CA', - }); - } - const details = channel.company_details || {}; - return { - symbol: channel.symbol, - exchange: channel.exchange, - name: channel.title, - type: channel.type, - ceoId: channel.channel, - marketCap: details.market_cap, - volumeRatio: details.volume_ratio, - avgVolume: details.avg_volume, - stockType: details.stock_type, - issueType: details.issue_type, - sharesOutstanding: details.shares_outstanding, - float: details.float, - }; - }); - - await this.mongodb.batchUpsert('ceoSymbols', symbols, ['symbol', 'exchange']); - await this.mongodb.batchUpsert('ceoExchanges', exchanges, ['exchange']); - - if (page === 1) { - for (let i = 2; i <= totalPages; i++) { - this.logger.info(`Scheduling page ${i} of ${totalPages} for CEO channels`); - await this.scheduleOperation('update-ceo-channels', i); - } - } - - this.logger.info(`Fetched CEO channels for page ${page}/${totalPages}`); - return { page, totalPages }; -} diff --git a/apps/stock/data-ingestion/src/handlers/ceo/actions/update-unique-symbols.action.ts b/apps/stock/data-ingestion/src/handlers/ceo/actions/update-unique-symbols.action.ts index 0d9e6e3..4a20f93 100644 --- a/apps/stock/data-ingestion/src/handlers/ceo/actions/update-unique-symbols.action.ts +++ b/apps/stock/data-ingestion/src/handlers/ceo/actions/update-unique-symbols.action.ts @@ -32,13 +32,14 @@ export async function updateUniqueSymbols( for (const symbol of uniqueSymbols) { // Schedule a job to process this individual symbol await this.scheduleOperation( - 'process-individual-symbol', + 'get-posts', { ceoId: symbol.ceoId, symbol: symbol.symbol, }, { priority: 10 } ); + scheduledJobs++; // Add small delay to avoid overwhelming the queue diff --git a/apps/stock/data-ingestion/src/handlers/ceo/ceo.handler.ts b/apps/stock/data-ingestion/src/handlers/ceo/ceo.handler.ts index a3a813d..fc0fe30 100644 --- a/apps/stock/data-ingestion/src/handlers/ceo/ceo.handler.ts +++ b/apps/stock/data-ingestion/src/handlers/ceo/ceo.handler.ts @@ -5,7 +5,7 @@ import { ScheduledOperation, type IServiceContainer, } from '@stock-bot/handlers'; -import { processIndividualSymbol, updateCeoChannels, updateUniqueSymbols } from './actions'; +import { getChannels, getPosts, getShorts, updateUniqueSymbols } from './actions'; @Handler('ceo') // @Disabled() @@ -14,12 +14,12 @@ export class CeoHandler extends BaseHandler { super(services); // Handler name read from @Handler decorator } - @ScheduledOperation('update-ceo-channels', '0 */15 * * *', { + @ScheduledOperation('get-channels', '0 */15 * * *', { priority: 7, immediately: false, description: 'Get all CEO symbols and exchanges', }) - updateCeoChannels = updateCeoChannels; + getChannels = getChannels; @Operation('update-unique-symbols') @ScheduledOperation('update-unique-symbols', '0 0 1 * *', { @@ -29,6 +29,9 @@ export class CeoHandler extends BaseHandler { }) updateUniqueSymbols = updateUniqueSymbols; - @Operation('process-individual-symbol') - processIndividualSymbol = processIndividualSymbol; + @Operation('get-posts') + getPosts = getPosts; + + @Operation('get-shorts') + getShorts = getShorts; } diff --git a/apps/stock/data-ingestion/src/routes/market-data.routes.ts b/apps/stock/data-ingestion/src/routes/market-data.routes.ts index 12dc6b4..123eae5 100644 --- a/apps/stock/data-ingestion/src/routes/market-data.routes.ts +++ b/apps/stock/data-ingestion/src/routes/market-data.routes.ts @@ -1,10 +1,10 @@ /** * Market data routes */ -import { Hono } from 'hono'; import type { IServiceContainer } from '@stock-bot/handlers'; import { getLogger } from '@stock-bot/logger'; import { processItems } from '@stock-bot/queue'; +import { Hono } from 'hono'; const logger = getLogger('market-data-routes'); @@ -122,8 +122,8 @@ export function createMarketDataRoutes(container: IServiceContainer) { batchSize, priority: 2, retries: 2, - removeOnComplete: 5, - removeOnFail: 10, + removeOnComplete: 100, + removeOnFail: 100, }, queueManager ); diff --git a/libs/core/config/config/default.json b/libs/core/config/config/default.json index 10ce440..166b02c 100644 --- a/libs/core/config/config/default.json +++ b/libs/core/config/config/default.json @@ -76,7 +76,7 @@ "delay": 1000 }, "removeOnComplete": 100, - "removeOnFail": 50 + "removeOnFail": 100 } }, "http": { diff --git a/libs/core/config/config/test.json b/libs/core/config/config/test.json index 85f7ac4..2080deb 100644 --- a/libs/core/config/config/test.json +++ b/libs/core/config/config/test.json @@ -32,7 +32,7 @@ "defaultJobOptions": { "attempts": 1, "removeOnComplete": 100, - "removeOnFail": 50 + "removeOnFail": 100 } }, "http": { diff --git a/libs/core/config/src/schemas/service.schema.ts b/libs/core/config/src/schemas/service.schema.ts index 73eba6d..f92361c 100644 --- a/libs/core/config/src/schemas/service.schema.ts +++ b/libs/core/config/src/schemas/service.schema.ts @@ -55,7 +55,7 @@ export const queueConfigSchema = z.object({ }) .default({}), removeOnComplete: z.number().default(100), - removeOnFail: z.number().default(50), + removeOnFail: z.number().default(100), timeout: z.number().optional(), }) .default({}), diff --git a/libs/core/di/src/config/schemas/service.schema.ts b/libs/core/di/src/config/schemas/service.schema.ts index 4fa2063..73ba194 100644 --- a/libs/core/di/src/config/schemas/service.schema.ts +++ b/libs/core/di/src/config/schemas/service.schema.ts @@ -32,7 +32,7 @@ export const queueConfigSchema = z.object({ }) .default({}), removeOnComplete: z.number().default(100), - removeOnFail: z.number().default(50), + removeOnFail: z.number().default(100), timeout: z.number().optional(), }) .optional() diff --git a/libs/core/di/src/container/builder.ts b/libs/core/di/src/container/builder.ts index 5718802..cbcdc88 100644 --- a/libs/core/di/src/container/builder.ts +++ b/libs/core/di/src/container/builder.ts @@ -1,7 +1,7 @@ -import { asClass, asFunction, createContainer, InjectionMode, type AwilixContainer } from 'awilix'; import type { BaseAppConfig as StockBotAppConfig, UnifiedAppConfig } from '@stock-bot/config'; import { toUnifiedConfig } from '@stock-bot/config'; import { HandlerRegistry } from '@stock-bot/handler-registry'; +import { asClass, asFunction, createContainer, InjectionMode, type AwilixContainer } from 'awilix'; import { appConfigSchema, type AppConfig } from '../config/schemas'; import { registerApplicationServices, @@ -133,7 +133,7 @@ export class ServiceContainerBuilder { attempts: 3, backoff: { type: 'exponential' as const, delay: 1000 }, removeOnComplete: 100, - removeOnFail: 50, + removeOnFail: 100, }, } : undefined, diff --git a/libs/core/queue/src/batch-processor.ts b/libs/core/queue/src/batch-processor.ts index f504142..c44f958 100644 --- a/libs/core/queue/src/batch-processor.ts +++ b/libs/core/queue/src/batch-processor.ts @@ -88,8 +88,8 @@ async function processDirect( delay: index * delayPerItem, priority: options.priority || undefined, attempts: options.retries || 3, - removeOnComplete: options.removeOnComplete || 10, - removeOnFail: options.removeOnFail || 5, + removeOnComplete: options.removeOnComplete || 100, + removeOnFail: options.removeOnFail || 100, }, })); @@ -151,8 +151,8 @@ async function processBatched( delay: batchIndex * delayPerBatch, priority: options.priority || undefined, attempts: options.retries || 3, - removeOnComplete: options.removeOnComplete || 10, - removeOnFail: options.removeOnFail || 5, + removeOnComplete: options.removeOnComplete || 100, + removeOnFail: options.removeOnFail || 100, }, }; }) diff --git a/libs/core/queue/src/dlq-handler.ts b/libs/core/queue/src/dlq-handler.ts index 28fbc08..ec1fd4b 100644 --- a/libs/core/queue/src/dlq-handler.ts +++ b/libs/core/queue/src/dlq-handler.ts @@ -85,7 +85,7 @@ export class DeadLetterQueueHandler { await this.dlq.add('failed-job', dlqData, { removeOnComplete: 100, - removeOnFail: 50, + removeOnFail: 100, }); this.logger.error('Job moved to DLQ', { diff --git a/libs/core/queue/src/queue.ts b/libs/core/queue/src/queue.ts index 985fa71..811d098 100644 --- a/libs/core/queue/src/queue.ts +++ b/libs/core/queue/src/queue.ts @@ -63,8 +63,8 @@ export class Queue { this.bullQueue = new BullQueue(queueName, { connection, defaultJobOptions: { - removeOnComplete: 10, - removeOnFail: 5, + removeOnComplete: 100, + removeOnFail: 100, attempts: 3, backoff: { type: 'exponential', diff --git a/libs/core/queue/test/batch-processor.test.ts b/libs/core/queue/test/batch-processor.test.ts index d98ad48..4801ff2 100644 --- a/libs/core/queue/test/batch-processor.test.ts +++ b/libs/core/queue/test/batch-processor.test.ts @@ -311,7 +311,7 @@ describe('Batch Processor', () => { priority: 5, retries: 10, removeOnComplete: 100, - removeOnFail: 50, + removeOnFail: 100, }); // Check all states including job ID "1" specifically (as it often doesn't show up in state queries) @@ -337,7 +337,7 @@ describe('Batch Processor', () => { expect(job.opts.priority).toBe(5); expect(job.opts.attempts).toBe(10); expect(job.opts.removeOnComplete).toBe(100); - expect(job.opts.removeOnFail).toBe(50); + expect(job.opts.removeOnFail).toBe(100); }); }); diff --git a/libs/data/mongodb/src/client.ts b/libs/data/mongodb/src/client.ts index 8189945..2aba63d 100644 --- a/libs/data/mongodb/src/client.ts +++ b/libs/data/mongodb/src/client.ts @@ -363,6 +363,254 @@ export class MongoDBClient { return { ...docWithTimestamps, _id: result.insertedId } as T; } + /** + * Insert multiple documents + */ + async insertMany( + collectionName: string, + documents: T[], + options?: any, + dbName?: string + ): Promise { + const collection = this.getCollection(collectionName, dbName); + const now = new Date(); + + const docsWithTimestamps = documents.map(doc => ({ + ...doc, + created_at: (doc as any).created_at || now, + updated_at: now, + })); + + const result = await collection.insertMany(docsWithTimestamps as any, options); + return { + insertedCount: result.insertedCount, + insertedIds: result.insertedIds, + }; + } + + /** + * Find multiple documents + */ + async find( + collectionName: string, + filter: any = {}, + options?: any, + dbName?: string + ): Promise { + const collection = this.getCollection(collectionName, dbName); + const cursor = collection.find(filter, options); + return await cursor.toArray() as T[]; + } + + /** + * Find a single document + */ + async findOne( + collectionName: string, + filter: any, + options?: any, + dbName?: string + ): Promise { + const collection = this.getCollection(collectionName, dbName); + const result = await collection.findOne(filter, options); + return result as T | null; + } + + /** + * Update a single document + */ + async updateOne( + collectionName: string, + filter: any, + update: any, + options?: any, + dbName?: string + ): Promise { + const collection = this.getCollection(collectionName, dbName); + + // Add updated_at timestamp + if (update.$set) { + update.$set.updated_at = new Date(); + } else if (!update.$setOnInsert && !update.$unset && !update.$inc) { + update = { $set: { ...update, updated_at: new Date() } }; + } + + const result = await collection.updateOne(filter, update, options); + return { + matchedCount: result.matchedCount, + modifiedCount: result.modifiedCount, + upsertedCount: result.upsertedCount, + upsertedId: result.upsertedId, + }; + } + + /** + * Update multiple documents + */ + async updateMany( + collectionName: string, + filter: any, + update: any, + options?: any, + dbName?: string + ): Promise { + const collection = this.getCollection(collectionName, dbName); + + // Add updated_at timestamp + if (update.$set) { + update.$set.updated_at = new Date(); + } else if (!update.$setOnInsert && !update.$unset && !update.$inc) { + update = { $set: { ...update, updated_at: new Date() } }; + } + + const result = await collection.updateMany(filter, update, options); + return { + matchedCount: result.matchedCount, + modifiedCount: result.modifiedCount, + upsertedCount: result.upsertedCount, + upsertedId: result.upsertedId, + }; + } + + /** + * Delete a single document + */ + async deleteOne( + collectionName: string, + filter: any, + options?: any, + dbName?: string + ): Promise { + const collection = this.getCollection(collectionName, dbName); + const result = await collection.deleteOne(filter, options); + return { + deletedCount: result.deletedCount, + }; + } + + /** + * Delete multiple documents + */ + async deleteMany( + collectionName: string, + filter: any, + options?: any, + dbName?: string + ): Promise { + const collection = this.getCollection(collectionName, dbName); + const result = await collection.deleteMany(filter, options); + return { + deletedCount: result.deletedCount, + }; + } + + /** + * Count documents matching a filter + */ + async countDocuments( + collectionName: string, + filter: any = {}, + options?: any, + dbName?: string + ): Promise { + const collection = this.getCollection(collectionName, dbName); + return await collection.countDocuments(filter, options); + } + + /** + * Perform aggregation operations + */ + async aggregate( + collectionName: string, + pipeline: any[], + options?: any, + dbName?: string + ): Promise { + const collection = this.getCollection(collectionName, dbName); + const cursor = collection.aggregate(pipeline, options); + return await cursor.toArray() as T[]; + } + + /** + * Create an index + */ + async createIndex( + collectionName: string, + indexSpec: any, + options?: any, + dbName?: string + ): Promise { + const collection = this.getCollection(collectionName, dbName); + return await collection.createIndex(indexSpec, options); + } + + /** + * Drop an index + */ + async dropIndex( + collectionName: string, + indexName: string, + options?: any, + dbName?: string + ): Promise { + const collection = this.getCollection(collectionName, dbName); + await collection.dropIndex(indexName, options); + } + + /** + * List all indexes on a collection + */ + async listIndexes( + collectionName: string, + dbName?: string + ): Promise { + const collection = this.getCollection(collectionName, dbName); + const cursor = collection.listIndexes(); + return await cursor.toArray(); + } + + /** + * Get a database instance (interface compatibility) + */ + getDb(dbName?: string): Db { + return this.getDatabase(dbName); + } + + /** + * Create a new collection + */ + async createCollection( + collectionName: string, + options?: any, + dbName?: string + ): Promise { + const db = this.getDatabase(dbName); + await db.createCollection(collectionName, options); + } + + /** + * Drop a collection + */ + async dropCollection( + collectionName: string, + dbName?: string + ): Promise { + const db = this.getDatabase(dbName); + await db.dropCollection(collectionName); + } + + /** + * List all collections in a database + */ + async listCollections( + filter: any = {}, + dbName?: string + ): Promise { + const db = this.getDatabase(dbName); + const collections = await db.listCollections(filter).toArray(); + return collections; + } + /** * Check if client is connected */