diff --git a/apps/stock/config/config/default.json b/apps/stock/config/config/default.json index 92f56b8..b664a20 100644 --- a/apps/stock/config/config/default.json +++ b/apps/stock/config/config/default.json @@ -78,7 +78,7 @@ "db": 1 }, "workers": 5, - "concurrency": 3, + "concurrency": 2, "enableScheduledJobs": true, "defaultJobOptions": { "attempts": 3, diff --git a/apps/stock/data-ingestion/src/handlers/ceo/actions/get-posts.action.ts b/apps/stock/data-ingestion/src/handlers/ceo/actions/get-posts.action.ts index 7597055..ec8fcb3 100644 --- a/apps/stock/data-ingestion/src/handlers/ceo/actions/get-posts.action.ts +++ b/apps/stock/data-ingestion/src/handlers/ceo/actions/get-posts.action.ts @@ -6,7 +6,7 @@ export async function getPosts( payload: any, _context: any ): Promise { - const { ceoId, symbol, timestamp, untilTimestamp, finished } = payload; + const { ceoId, symbol, timestamp, finished } = payload; const proxy = this.proxy?.getProxy(); if (!proxy) { this.logger.warn('No proxy available for processing individual CEO symbol'); @@ -40,14 +40,24 @@ export async function getPosts( const spielCount = data.spiels.length; if (spielCount === 0) { this.logger.warn(`No spiels found for ceoId ${ceoId}`); - await this.mongodb.updateMany( - 'ceoSymbols', - { ceoId }, - { $set: { lastSpielTime: timestamp || Date.now(), finished: true } } - ); + + if (!timestamp) { + // First call returned no posts - mark as finished with current time + await this.mongodb.updateMany( + 'ceoSymbols', + { ceoId }, + { $set: { + finished: true, + newestPostTimestamp: Date.now() + }} + ); + } return null; // No data to process } - const latestSpielTime = data.spiels[0]?.timestamp; + + // Extract timestamps clearly + const oldestPostInBatch = data.spiels[0]?.timestamp; + const newestPostInBatch = data.spiels[data.spiels.length - 1]?.timestamp; const posts = data.spiels.map((spiel: any) => ({ ceoId, spiel: spiel.spiel, @@ -80,28 +90,77 @@ export async function getPosts( poll: spiel.poll, votedInPoll: spiel.voted_in_poll, })); - - await this.mongodb.batchUpsert('ceoPosts', posts, ['spielId']); - await this.mongodb.updateMany( - 'ceoSymbols', - { ceoId }, - { $set: { lastSpielTime: latestSpielTime } } - ); - this.logger.info(`Fetched ${spielCount} spiels for ceoId ${ceoId}`); - - // If untilTimestamp is not provider keep going to the end - // Otherwise keep going until the lastSpiel is before the untilTimestamp - if( !finished || (finished && untilTimestamp && latestSpielTime > untilTimestamp)) { + + // Handle based on collection state + if (!finished) { + // INITIAL COLLECTION MODE + // Insert all posts - no duplicate checking needed on initial collection + await this.mongodb.batchUpsert('ceoPosts', posts, ['spielId']); + + // Update newest timestamp on first call only + if (!timestamp) { + await this.mongodb.updateMany( + 'ceoSymbols', + { ceoId }, + { $set: { newestPostTimestamp: newestPostInBatch } } + ); + } + + this.logger.info(`Initial collection: fetched ${spielCount} spiels for ${ceoId}`); + + // Continue paginating backwards through history await this.scheduleOperation( 'get-posts', { ceoId: ceoId, - timestamp: latestSpielTime, - finished: finished, - untilTimestamp: untilTimestamp, + symbol: symbol, + timestamp: oldestPostInBatch, + finished: false, }, { priority: 0 } ); + + } else { + // UPDATE COLLECTION MODE (finished=true) + // Get the last known newest post + const symbolData = await this.mongodb.findOne('ceoSymbols', { ceoId }); + const lastKnownNewestTimestamp = symbolData?.newestPostTimestamp || symbolData?.lastSpielTime || 0; + + // Filter to only posts newer than what we've seen + const newPosts = posts.filter(p => p.timestamp > lastKnownNewestTimestamp); + + if (newPosts.length === 0) { + this.logger.info(`No new posts for ${ceoId}, all ${posts.length} posts already collected`); + return { ceoId, spielCount: 0, timestamp }; + } + + // Insert new posts + await this.mongodb.batchUpsert('ceoPosts', newPosts, ['spielId']); + + // Update to the newest timestamp we've now seen + await this.mongodb.updateMany( + 'ceoSymbols', + { ceoId }, + { $set: { newestPostTimestamp: newestPostInBatch } } + ); + + this.logger.info(`Update collection: collected ${newPosts.length} new posts for ${ceoId}`); + + // If all posts in batch were new, there might be more in the gap + // Continue paginating until we hit posts we've already seen + if (newPosts.length === posts.length && oldestPostInBatch > lastKnownNewestTimestamp) { + this.logger.info(`All posts were new, checking for more in the gap`); + await this.scheduleOperation( + 'get-posts', + { + ceoId: ceoId, + symbol: symbol, + timestamp: oldestPostInBatch, + finished: true, + }, + { priority: 0 } + ); + } } // If timestamp is not provided, run the short positions update @@ -115,7 +174,7 @@ export async function getPosts( } this.logger.info( - `Successfully processed channel ${ceoId} and added channel ${ceoId} at timestamp ${latestSpielTime}` + `Successfully processed channel ${ceoId}` ); return { ceoId, spielCount, timestamp }; diff --git a/apps/stock/data-ingestion/src/handlers/ceo/actions/update-unique-symbols.action.ts b/apps/stock/data-ingestion/src/handlers/ceo/actions/update-unique-symbols.action.ts index eb93a2a..d13d4ce 100644 --- a/apps/stock/data-ingestion/src/handlers/ceo/actions/update-unique-symbols.action.ts +++ b/apps/stock/data-ingestion/src/handlers/ceo/actions/update-unique-symbols.action.ts @@ -37,7 +37,6 @@ export async function updateUniqueSymbols( ceoId: symbol.ceoId, symbol: symbol.symbol, finished: symbol.finished || false, - untilTimestamp: symbol.lastSpielTime || null, }, { priority: 10 } );