diff --git a/apps/stock/config/config/default.json b/apps/stock/config/config/default.json index ee0a17b..92f56b8 100644 --- a/apps/stock/config/config/default.json +++ b/apps/stock/config/config/default.json @@ -77,8 +77,8 @@ "port": 6379, "db": 1 }, - "workers": 1, - "concurrency": 1, + "workers": 5, + "concurrency": 3, "enableScheduledJobs": true, "defaultJobOptions": { "attempts": 3, diff --git a/apps/stock/data-ingestion/src/handlers/ceo/actions/get-posts.action.ts b/apps/stock/data-ingestion/src/handlers/ceo/actions/get-posts.action.ts index b8c82c9..7597055 100644 --- a/apps/stock/data-ingestion/src/handlers/ceo/actions/get-posts.action.ts +++ b/apps/stock/data-ingestion/src/handlers/ceo/actions/get-posts.action.ts @@ -6,7 +6,7 @@ export async function getPosts( payload: any, _context: any ): Promise { - const { ceoId, symbol, timestamp, untilTimestamp } = payload; + const { ceoId, symbol, timestamp, untilTimestamp, finished } = payload; const proxy = this.proxy?.getProxy(); if (!proxy) { this.logger.warn('No proxy available for processing individual CEO symbol'); @@ -41,9 +41,9 @@ export async function getPosts( if (spielCount === 0) { this.logger.warn(`No spiels found for ceoId ${ceoId}`); await this.mongodb.updateMany( - 'ceoChannels', + 'ceoSymbols', { ceoId }, - { $set: { lastSpielTime: timestamp, finished: true } } + { $set: { lastSpielTime: timestamp || Date.now(), finished: true } } ); return null; // No data to process } @@ -83,7 +83,7 @@ export async function getPosts( await this.mongodb.batchUpsert('ceoPosts', posts, ['spielId']); await this.mongodb.updateMany( - 'ceoChannels', + 'ceoSymbols', { ceoId }, { $set: { lastSpielTime: latestSpielTime } } ); @@ -91,12 +91,14 @@ export async function getPosts( // If untilTimestamp is not provider keep going to the end // Otherwise keep going until the lastSpiel is before the untilTimestamp - if( untilTimestamp === undefined || (untilTimestamp && latestSpielTime <= untilTimestamp)) { + if( !finished || (finished && untilTimestamp && latestSpielTime > untilTimestamp)) { await this.scheduleOperation( 'get-posts', { ceoId: ceoId, timestamp: latestSpielTime, + finished: finished, + untilTimestamp: untilTimestamp, }, { priority: 0 } ); diff --git a/apps/stock/data-ingestion/src/handlers/ceo/actions/update-unique-symbols.action.ts b/apps/stock/data-ingestion/src/handlers/ceo/actions/update-unique-symbols.action.ts index 4a20f93..eb93a2a 100644 --- a/apps/stock/data-ingestion/src/handlers/ceo/actions/update-unique-symbols.action.ts +++ b/apps/stock/data-ingestion/src/handlers/ceo/actions/update-unique-symbols.action.ts @@ -36,6 +36,8 @@ export async function updateUniqueSymbols( { ceoId: symbol.ceoId, symbol: symbol.symbol, + finished: symbol.finished || false, + untilTimestamp: symbol.lastSpielTime || null, }, { priority: 10 } );