fixed up ceo

This commit is contained in:
Boki 2025-06-24 22:26:32 -04:00
parent a5688e4723
commit b30c79542b
3 changed files with 83 additions and 25 deletions

View file

@ -78,7 +78,7 @@
"db": 1 "db": 1
}, },
"workers": 5, "workers": 5,
"concurrency": 3, "concurrency": 2,
"enableScheduledJobs": true, "enableScheduledJobs": true,
"defaultJobOptions": { "defaultJobOptions": {
"attempts": 3, "attempts": 3,

View file

@ -6,7 +6,7 @@ export async function getPosts(
payload: any, payload: any,
_context: any _context: any
): Promise<unknown> { ): Promise<unknown> {
const { ceoId, symbol, timestamp, untilTimestamp, finished } = payload; const { ceoId, symbol, timestamp, finished } = payload;
const proxy = this.proxy?.getProxy(); const proxy = this.proxy?.getProxy();
if (!proxy) { if (!proxy) {
this.logger.warn('No proxy available for processing individual CEO symbol'); this.logger.warn('No proxy available for processing individual CEO symbol');
@ -40,14 +40,24 @@ export async function getPosts(
const spielCount = data.spiels.length; const spielCount = data.spiels.length;
if (spielCount === 0) { if (spielCount === 0) {
this.logger.warn(`No spiels found for ceoId ${ceoId}`); this.logger.warn(`No spiels found for ceoId ${ceoId}`);
await this.mongodb.updateMany(
'ceoSymbols', if (!timestamp) {
{ ceoId }, // First call returned no posts - mark as finished with current time
{ $set: { lastSpielTime: timestamp || Date.now(), finished: true } } await this.mongodb.updateMany(
); 'ceoSymbols',
{ ceoId },
{ $set: {
finished: true,
newestPostTimestamp: Date.now()
}}
);
}
return null; // No data to process return null; // No data to process
} }
const latestSpielTime = data.spiels[0]?.timestamp;
// Extract timestamps clearly
const oldestPostInBatch = data.spiels[0]?.timestamp;
const newestPostInBatch = data.spiels[data.spiels.length - 1]?.timestamp;
const posts = data.spiels.map((spiel: any) => ({ const posts = data.spiels.map((spiel: any) => ({
ceoId, ceoId,
spiel: spiel.spiel, spiel: spiel.spiel,
@ -80,28 +90,77 @@ export async function getPosts(
poll: spiel.poll, poll: spiel.poll,
votedInPoll: spiel.voted_in_poll, votedInPoll: spiel.voted_in_poll,
})); }));
await this.mongodb.batchUpsert('ceoPosts', posts, ['spielId']); // Handle based on collection state
await this.mongodb.updateMany( if (!finished) {
'ceoSymbols', // INITIAL COLLECTION MODE
{ ceoId }, // Insert all posts - no duplicate checking needed on initial collection
{ $set: { lastSpielTime: latestSpielTime } } await this.mongodb.batchUpsert('ceoPosts', posts, ['spielId']);
);
this.logger.info(`Fetched ${spielCount} spiels for ceoId ${ceoId}`); // Update newest timestamp on first call only
if (!timestamp) {
// If untilTimestamp is not provider keep going to the end await this.mongodb.updateMany(
// Otherwise keep going until the lastSpiel is before the untilTimestamp 'ceoSymbols',
if( !finished || (finished && untilTimestamp && latestSpielTime > untilTimestamp)) { { ceoId },
{ $set: { newestPostTimestamp: newestPostInBatch } }
);
}
this.logger.info(`Initial collection: fetched ${spielCount} spiels for ${ceoId}`);
// Continue paginating backwards through history
await this.scheduleOperation( await this.scheduleOperation(
'get-posts', 'get-posts',
{ {
ceoId: ceoId, ceoId: ceoId,
timestamp: latestSpielTime, symbol: symbol,
finished: finished, timestamp: oldestPostInBatch,
untilTimestamp: untilTimestamp, finished: false,
}, },
{ priority: 0 } { priority: 0 }
); );
} else {
// UPDATE COLLECTION MODE (finished=true)
// Get the last known newest post
const symbolData = await this.mongodb.findOne('ceoSymbols', { ceoId });
const lastKnownNewestTimestamp = symbolData?.newestPostTimestamp || symbolData?.lastSpielTime || 0;
// Filter to only posts newer than what we've seen
const newPosts = posts.filter(p => p.timestamp > lastKnownNewestTimestamp);
if (newPosts.length === 0) {
this.logger.info(`No new posts for ${ceoId}, all ${posts.length} posts already collected`);
return { ceoId, spielCount: 0, timestamp };
}
// Insert new posts
await this.mongodb.batchUpsert('ceoPosts', newPosts, ['spielId']);
// Update to the newest timestamp we've now seen
await this.mongodb.updateMany(
'ceoSymbols',
{ ceoId },
{ $set: { newestPostTimestamp: newestPostInBatch } }
);
this.logger.info(`Update collection: collected ${newPosts.length} new posts for ${ceoId}`);
// If all posts in batch were new, there might be more in the gap
// Continue paginating until we hit posts we've already seen
if (newPosts.length === posts.length && oldestPostInBatch > lastKnownNewestTimestamp) {
this.logger.info(`All posts were new, checking for more in the gap`);
await this.scheduleOperation(
'get-posts',
{
ceoId: ceoId,
symbol: symbol,
timestamp: oldestPostInBatch,
finished: true,
},
{ priority: 0 }
);
}
} }
// If timestamp is not provided, run the short positions update // If timestamp is not provided, run the short positions update
@ -115,7 +174,7 @@ export async function getPosts(
} }
this.logger.info( this.logger.info(
`Successfully processed channel ${ceoId} and added channel ${ceoId} at timestamp ${latestSpielTime}` `Successfully processed channel ${ceoId}`
); );
return { ceoId, spielCount, timestamp }; return { ceoId, spielCount, timestamp };

View file

@ -37,7 +37,6 @@ export async function updateUniqueSymbols(
ceoId: symbol.ceoId, ceoId: symbol.ceoId,
symbol: symbol.symbol, symbol: symbol.symbol,
finished: symbol.finished || false, finished: symbol.finished || false,
untilTimestamp: symbol.lastSpielTime || null,
}, },
{ priority: 10 } { priority: 10 }
); );