added crawler for schedules jobs

This commit is contained in:
Boki 2026-03-27 09:14:48 -04:00
parent eeed957fe1
commit 104bebb783
3 changed files with 47 additions and 1 deletions

View file

@ -0,0 +1,38 @@
import type { TeHandler } from '../te.handler';
const STALE_DAYS = 30;
export async function crawlScheduler(this: TeHandler): Promise<{ scheduled: number }> {
const { logger, mongodb } = this;
const cutoff = new Date(Date.now() - STALE_DAYS * 24 * 60 * 60 * 1000);
const staleUrls = await mongodb?.find('teUrls', {
$or: [
{ lastCrawled: { $exists: false } },
{ lastCrawled: { $lt: cutoff } }
],
skipReason: { $exists: false }
}, {
sort: { lastCrawled: 1 },
projection: { url: 1 }
});
if (!staleUrls?.length) {
logger.debug('No stale URLs to schedule');
return { scheduled: 0 };
}
logger.info(`Scheduling ${staleUrls.length} stale/uncrawled URLs for spidering`);
for (let i = 0; i < staleUrls.length; i++) {
const { url } = staleUrls[i];
await this.scheduleOperation('te-spider', { url }, {
jobId: `spider-${url.replace(/\//g, '-')}`,
priority: 8,
delay: i * 200
});
}
return { scheduled: staleUrls.length };
}

View file

@ -1,4 +1,5 @@
// Export all action functions here
export * from './fetch-countries.action';
export * from './spider.action';
export * from './crawl-scheduler.action';

View file

@ -5,7 +5,7 @@ import {
ScheduledOperation
} from '@stock-bot/handlers';
import type { DataIngestionServices } from '../../types';
import { fetchCountries, spiderUrl } from './actions';
import { crawlScheduler, fetchCountries, spiderUrl } from './actions';
@Handler('te')
export class TeHandler extends BaseHandler<DataIngestionServices> {
@ -115,4 +115,11 @@ export class TeHandler extends BaseHandler<DataIngestionServices> {
immediately: true,
})
spiderUrlSchedule = spiderUrl;
@ScheduledOperation('te-crawl-scheduler', '*/5 * * * *', {
priority: 8,
description: 'Schedule spider jobs for stale/uncrawled URLs (every 5 min)',
immediately: true,
})
crawlScheduler = crawlScheduler;
}