finished tradingeconomics i think
This commit is contained in:
parent
748cce0eeb
commit
167a586257
10 changed files with 658 additions and 233 deletions
|
|
@ -0,0 +1,140 @@
|
|||
import { TE_CONFIG } from '../shared/config';
|
||||
import { decodeTEData } from '../shared/decode';
|
||||
import type { TeCdnResponse, TeDataFetchPayload } from '../shared/types';
|
||||
import type { TeHandler } from '../te.handler';
|
||||
|
||||
export async function dataFetch(
|
||||
this: TeHandler,
|
||||
payload: TeDataFetchPayload,
|
||||
): Promise<{ success: boolean; recordCount: number }> {
|
||||
const { logger, mongodb, http, proxy } = this;
|
||||
const { teSymbol, url, isInitialFetch } = payload;
|
||||
|
||||
const teDoc = await mongodb?.findOne('teUrls', { url });
|
||||
if (!teDoc?.teChartsDatasource || !teDoc?.teChartToken || !teDoc?.teChart || !teDoc?.teLastUpdate) {
|
||||
logger.error(`Missing CDN fields for ${teSymbol} (${url})`);
|
||||
return { success: false, recordCount: 0 };
|
||||
}
|
||||
|
||||
const pathSegment = TE_CONFIG.CDN_PATH_MAP[teDoc.teChart];
|
||||
if (!pathSegment) {
|
||||
logger.warn(`Unknown teChart type: ${teDoc.teChart} for ${teSymbol}`);
|
||||
return { success: false, recordCount: 0 };
|
||||
}
|
||||
|
||||
// Build CDN URL
|
||||
const cdnUrl = new URL(`${teDoc.teChartsDatasource}/${pathSegment}/${encodeURIComponent(teSymbol.toLowerCase())}`);
|
||||
if (isInitialFetch) {
|
||||
cdnUrl.searchParams.set('span', 'max');
|
||||
}
|
||||
cdnUrl.searchParams.set('v', teDoc.teLastUpdate);
|
||||
cdnUrl.searchParams.set('key', teDoc.teChartToken);
|
||||
|
||||
logger.info(`Fetching TE data: ${teSymbol} (${isInitialFetch ? 'initial' : 'update'})`);
|
||||
|
||||
const response = await http.get(cdnUrl.toString(), {
|
||||
proxy: proxy?.getProxy(),
|
||||
headers: { 'Accept': '*/*', 'User-Agent': TE_CONFIG.USER_AGENT },
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
logger.error(`CDN ${response.status} for ${teSymbol}`);
|
||||
throw new Error(`CDN HTTP ${response.status} for ${teSymbol}`);
|
||||
}
|
||||
|
||||
const rawText = (await response.text()).replaceAll('"', '');
|
||||
if (!rawText.length) {
|
||||
logger.warn(`Empty response for ${teSymbol}`);
|
||||
return { success: false, recordCount: 0 };
|
||||
}
|
||||
|
||||
// Decode: response is an array wrapping the actual object
|
||||
const decoded = decodeTEData<TeCdnResponse[]>(rawText, TE_CONFIG.KEY);
|
||||
const serie = decoded?.[0]?.series?.[0]?.serie;
|
||||
|
||||
if (!serie?.data?.length) {
|
||||
logger.warn(`No data points decoded for ${teSymbol}`);
|
||||
await mongodb?.updateOne('teUrls', { url }, {
|
||||
$set: {
|
||||
lastDataFetch: new Date(),
|
||||
lastDataFetchUpdate: teDoc.teLastUpdate,
|
||||
lastDataFetchCount: 0,
|
||||
},
|
||||
});
|
||||
return { success: true, recordCount: 0 };
|
||||
}
|
||||
|
||||
// Transform data points: [value, epoch, null, "YYYY-MM-DD"] → { d, v }
|
||||
const dataPoints = serie.data.map(([value, _epoch, _null, date]) => ({ d: date, v: value }));
|
||||
const forecastPoints = serie.forecast?.map(([value, _epoch, _null, date]) => ({ d: date, v: value })) ?? [];
|
||||
|
||||
logger.info(`Storing ${dataPoints.length} points + ${forecastPoints.length} forecast for ${teSymbol}`);
|
||||
|
||||
if (isInitialFetch) {
|
||||
// Initial: upsert the full document
|
||||
await mongodb?.updateOne('teData', { teSymbol }, {
|
||||
$set: {
|
||||
teSymbol,
|
||||
name: serie.name,
|
||||
unit: serie.unit,
|
||||
source: serie.source,
|
||||
frequency: serie.frequency,
|
||||
country: serie.country,
|
||||
category: serie.category,
|
||||
data: dataPoints,
|
||||
forecast: forecastPoints,
|
||||
lastUpdated: new Date(),
|
||||
},
|
||||
$setOnInsert: { createdAt: new Date() },
|
||||
}, { upsert: true });
|
||||
} else {
|
||||
// Update: merge new points into existing data array by date
|
||||
const existing = await mongodb?.findOne('teData', { teSymbol });
|
||||
if (existing?.data) {
|
||||
const dateMap = new Map(existing.data.map((p: any) => [p.d, p.v]));
|
||||
for (const p of dataPoints) {
|
||||
dateMap.set(p.d, p.v);
|
||||
}
|
||||
const merged = Array.from(dateMap.entries())
|
||||
.sort(([a], [b]) => a.localeCompare(b))
|
||||
.map(([d, v]) => ({ d, v }));
|
||||
|
||||
await mongodb?.updateOne('teData', { teSymbol }, {
|
||||
$set: {
|
||||
data: merged,
|
||||
forecast: forecastPoints,
|
||||
lastUpdated: new Date(),
|
||||
},
|
||||
});
|
||||
} else {
|
||||
// No existing doc (shouldn't happen but handle gracefully)
|
||||
await mongodb?.updateOne('teData', { teSymbol }, {
|
||||
$set: {
|
||||
teSymbol,
|
||||
name: serie.name,
|
||||
unit: serie.unit,
|
||||
source: serie.source,
|
||||
frequency: serie.frequency,
|
||||
country: serie.country,
|
||||
category: serie.category,
|
||||
data: dataPoints,
|
||||
forecast: forecastPoints,
|
||||
lastUpdated: new Date(),
|
||||
},
|
||||
$setOnInsert: { createdAt: new Date() },
|
||||
}, { upsert: true });
|
||||
}
|
||||
}
|
||||
|
||||
const totalPoints = dataPoints.length + forecastPoints.length;
|
||||
|
||||
await mongodb?.updateOne('teUrls', { url }, {
|
||||
$set: {
|
||||
lastDataFetch: new Date(),
|
||||
lastDataFetchUpdate: teDoc.teLastUpdate,
|
||||
lastDataFetchCount: totalPoints,
|
||||
},
|
||||
});
|
||||
|
||||
return { success: true, recordCount: totalPoints };
|
||||
}
|
||||
|
|
@ -0,0 +1,41 @@
|
|||
import type { TeHandler } from '../te.handler';
|
||||
|
||||
export async function dataScheduler(this: TeHandler): Promise<{ scheduled: number }> {
|
||||
const { logger, mongodb } = this;
|
||||
|
||||
const candidates = await mongodb?.find('teUrls', {
|
||||
teType: 'te',
|
||||
teSymbol: { $exists: true, $ne: null },
|
||||
teChartsDatasource: { $exists: true },
|
||||
teChartToken: { $exists: true },
|
||||
teLastUpdate: { $exists: true },
|
||||
teChart: 'EC',
|
||||
$or: [
|
||||
{ lastDataFetch: { $exists: false } },
|
||||
{ $expr: { $ne: ['$teLastUpdate', '$lastDataFetchUpdate'] } },
|
||||
],
|
||||
}, {
|
||||
sort: { lastDataFetch: 1 },
|
||||
projection: { teSymbol: 1, url: 1, lastDataFetch: 1 },
|
||||
});
|
||||
|
||||
if (!candidates?.length) {
|
||||
logger.debug('No symbols need data fetching');
|
||||
return { scheduled: 0 };
|
||||
}
|
||||
|
||||
logger.info(`Scheduling ${candidates.length} symbols for TE data fetch`);
|
||||
|
||||
for (const doc of candidates) {
|
||||
await this.scheduleOperation('te-data-fetch', {
|
||||
teSymbol: doc.teSymbol,
|
||||
url: doc.url,
|
||||
isInitialFetch: !doc.lastDataFetch,
|
||||
}, {
|
||||
jobId: `data-fetch-${doc.teSymbol}`,
|
||||
priority: 8,
|
||||
});
|
||||
}
|
||||
|
||||
return { scheduled: candidates.length };
|
||||
}
|
||||
|
|
@ -1,227 +1,52 @@
|
|||
import { getRandomUserAgent } from '@stock-bot/utils';
|
||||
import * as cheerio from 'cheerio';
|
||||
import { TE_CONFIG } from '../shared/config';
|
||||
import type { TeCountry } from '../shared/types';
|
||||
import type { TeHandler } from '../te.handler';
|
||||
|
||||
export async function fetchCountries(this: TeHandler): Promise<TeCountry[] | null> {
|
||||
const { logger, mongodb } = this;
|
||||
|
||||
try {
|
||||
// 1. Fetch the HTML page
|
||||
const reqInfo = {
|
||||
proxy: this.proxy.getProxy(),
|
||||
headers: {
|
||||
'User-Agent': getRandomUserAgent(),
|
||||
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
|
||||
'Accept-Language': 'en-US,en;q=0.5',
|
||||
'Accept-Encoding': 'gzip, deflate, br',
|
||||
},
|
||||
}
|
||||
const response = await fetch(TE_CONFIG.COUNTRIES_URL, reqInfo);
|
||||
|
||||
logger.debug('Response status:', {
|
||||
status: response.status,
|
||||
statusText: response.statusText,
|
||||
url: response.url
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error(`Failed to fetch: ${response.status} ${response.statusText}`);
|
||||
}
|
||||
|
||||
const html = await response.text();
|
||||
|
||||
logger.info('Fetched HTML length:', { length: html.length });
|
||||
|
||||
|
||||
// 2. Parse HTML to extract country data
|
||||
const $ = cheerio.load(html);
|
||||
const countries: TeCountry[] = [];
|
||||
|
||||
// Look for country links - they typically have a pattern like /country-name
|
||||
// Trading Economics groups countries by region in the page
|
||||
$('.list-group-item, a[href^="/"]').each((_, element) => {
|
||||
const $el = $(element);
|
||||
|
||||
// Try to extract country information
|
||||
let name: string | undefined;
|
||||
let url: string | undefined;
|
||||
let region: string | undefined;
|
||||
|
||||
// Check if it's a direct link
|
||||
if ($el.is('a')) {
|
||||
const href = $el.attr('href');
|
||||
const text = $el.text().trim();
|
||||
console.log(href)
|
||||
// Filter for country URLs (they don't contain special paths like /indicators, /calendar, etc.)
|
||||
if (href && href.startsWith('/') && !href.includes('/') && text) {
|
||||
name = text;
|
||||
url = href;
|
||||
}
|
||||
} else {
|
||||
// Check for links within table rows
|
||||
const $link = $el.find('a[href^="/"]').first();
|
||||
if ($link.length) {
|
||||
const href = $link.attr('href');
|
||||
const text = $link.text().trim();
|
||||
|
||||
if (href && text && !href.includes('/indicators') && !href.includes('/calendar')) {
|
||||
name = text;
|
||||
url = href;
|
||||
|
||||
// Try to get region from parent elements
|
||||
const $regionHeader = $el.closest('.region-section, .country-group').find('h2, h3, .region-title').first();
|
||||
if ($regionHeader.length) {
|
||||
region = $regionHeader.text().trim();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add to countries array if we found valid data
|
||||
if (name && url) {
|
||||
// Extract country code from URL if possible (e.g., /united-states -> US)
|
||||
const code = extractCountryCode(url, name);
|
||||
|
||||
countries.push({
|
||||
name,
|
||||
code,
|
||||
url: `https://tradingeconomics.com${url}`,
|
||||
region,
|
||||
updated_at: new Date(),
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// Remove duplicates based on name
|
||||
const uniqueCountries = Array.from(
|
||||
new Map(countries.map(c => [c.name, c])).values()
|
||||
);
|
||||
|
||||
if (uniqueCountries.length === 0) {
|
||||
throw new Error('No countries found in HTML');
|
||||
}
|
||||
|
||||
logger.info('Extracted countries from HTML', {
|
||||
count: uniqueCountries.length,
|
||||
byRegion: groupCountriesByRegion(uniqueCountries),
|
||||
});
|
||||
|
||||
// 3. Save to MongoDB
|
||||
try {
|
||||
console.log( uniqueCountries)
|
||||
if (uniqueCountries.length > 0) {
|
||||
const result = await mongodb?.batchUpsert('teCountries', uniqueCountries, ['code']);
|
||||
logger.info('Countries saved to MongoDB', {
|
||||
matched: result.matchedCount,
|
||||
modified: result.modifiedCount,
|
||||
upserted: result.upsertedCount,
|
||||
});
|
||||
}
|
||||
} catch (dbError) {
|
||||
logger.error('Failed to save countries to MongoDB', { error: dbError });
|
||||
throw dbError;
|
||||
}
|
||||
|
||||
return uniqueCountries;
|
||||
} catch (error) {
|
||||
logger.error('Failed to fetch Trading Economics countries', {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
stack: error instanceof Error ? error.stack : undefined,
|
||||
});
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function extractCountryCode(url: string, name: string): string | undefined {
|
||||
// Common country code mappings
|
||||
const countryCodeMap: Record<string, string> = {
|
||||
'united-states': 'US',
|
||||
'united-kingdom': 'GB',
|
||||
'euro-area': 'EU',
|
||||
'china': 'CN',
|
||||
'japan': 'JP',
|
||||
'germany': 'DE',
|
||||
'france': 'FR',
|
||||
'italy': 'IT',
|
||||
'spain': 'ES',
|
||||
'canada': 'CA',
|
||||
'australia': 'AU',
|
||||
'south-korea': 'KR',
|
||||
'india': 'IN',
|
||||
'brazil': 'BR',
|
||||
'russia': 'RU',
|
||||
'mexico': 'MX',
|
||||
'indonesia': 'ID',
|
||||
'netherlands': 'NL',
|
||||
'saudi-arabia': 'SA',
|
||||
'turkey': 'TR',
|
||||
'switzerland': 'CH',
|
||||
'poland': 'PL',
|
||||
'sweden': 'SE',
|
||||
'belgium': 'BE',
|
||||
'argentina': 'AR',
|
||||
'ireland': 'IE',
|
||||
'austria': 'AT',
|
||||
'norway': 'NO',
|
||||
'israel': 'IL',
|
||||
'singapore': 'SG',
|
||||
'denmark': 'DK',
|
||||
'egypt': 'EG',
|
||||
'philippines': 'PH',
|
||||
'finland': 'FI',
|
||||
'chile': 'CL',
|
||||
'pakistan': 'PK',
|
||||
'romania': 'RO',
|
||||
'new-zealand': 'NZ',
|
||||
'greece': 'GR',
|
||||
'iraq': 'IQ',
|
||||
'portugal': 'PT',
|
||||
'czech-republic': 'CZ',
|
||||
'vietnam': 'VN',
|
||||
'peru': 'PE',
|
||||
'colombia': 'CO',
|
||||
'malaysia': 'MY',
|
||||
'ukraine': 'UA',
|
||||
'hungary': 'HU',
|
||||
'kuwait': 'KW',
|
||||
'morocco': 'MA',
|
||||
'slovakia': 'SK',
|
||||
'kenya': 'KE',
|
||||
'puerto-rico': 'PR',
|
||||
'ecuador': 'EC',
|
||||
'ethiopia': 'ET',
|
||||
'dominican-republic': 'DO',
|
||||
'luxembourg': 'LU',
|
||||
'oman': 'OM',
|
||||
'guatemala': 'GT',
|
||||
'bulgaria': 'BG',
|
||||
'ghana': 'GH',
|
||||
'tanzania': 'TZ',
|
||||
'turkmenistan': 'TM',
|
||||
'croatia': 'HR',
|
||||
'costa-rica': 'CR',
|
||||
'lebanon': 'LB',
|
||||
'slovenia': 'SI',
|
||||
'lithuania': 'LT',
|
||||
'serbia': 'RS',
|
||||
'panama': 'PA',
|
||||
};
|
||||
|
||||
// Clean URL to get country slug
|
||||
const slug = url.replace(/^\//, '').toLowerCase();
|
||||
|
||||
return countryCodeMap[slug];
|
||||
}
|
||||
|
||||
function groupCountriesByRegion(countries: TeCountry[]): Record<string, number> {
|
||||
const groups: Record<string, number> = {};
|
||||
|
||||
for (const country of countries) {
|
||||
const region = country.region || 'Unknown';
|
||||
groups[region] = (groups[region] || 0) + 1;
|
||||
}
|
||||
|
||||
return groups;
|
||||
}
|
||||
import { getRandomUserAgent } from '@stock-bot/utils';
|
||||
import * as cheerio from 'cheerio';
|
||||
import { TE_CONFIG } from '../shared/config';
|
||||
import type { TeHandler } from '../te.handler';
|
||||
|
||||
export async function fetchCountries(this: TeHandler): Promise<{ count: number }> {
|
||||
const { logger, mongodb, proxy } = this;
|
||||
|
||||
const response = await this.http.get(TE_CONFIG.COUNTRIES_URL, {
|
||||
proxy: proxy?.getProxy(),
|
||||
headers: {
|
||||
'User-Agent': getRandomUserAgent(),
|
||||
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
|
||||
'Accept-Language': 'en-US,en;q=0.5',
|
||||
},
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error(`Failed to fetch countries page: ${response.status}`);
|
||||
}
|
||||
|
||||
const html = await response.text();
|
||||
const $ = cheerio.load(html);
|
||||
|
||||
// Country links end with /indicators e.g. /united-states/indicators
|
||||
const countries = new Map<string, string>();
|
||||
$('a[href$="/indicators"]').each((_, el) => {
|
||||
const href = $(el).attr('href') || '';
|
||||
const name = $(el).text().trim();
|
||||
const slug = href.replace('/indicators', '').replace('/', '');
|
||||
if (slug && name) {
|
||||
countries.set(slug, name);
|
||||
}
|
||||
});
|
||||
|
||||
if (!countries.size) {
|
||||
throw new Error('No countries found on page');
|
||||
}
|
||||
|
||||
logger.info(`Found ${countries.size} countries`);
|
||||
|
||||
const docs = Array.from(countries.entries()).map(([slug, name]) => ({
|
||||
slug,
|
||||
name,
|
||||
url: `/${slug}`,
|
||||
}));
|
||||
|
||||
await mongodb?.batchUpsert('teCountries', docs, ['slug']);
|
||||
|
||||
logger.info(`Upserted ${docs.length} countries into teCountries`);
|
||||
return { count: docs.length };
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,57 @@
|
|||
import { getRandomUserAgent } from '@stock-bot/utils';
|
||||
import * as cheerio from 'cheerio';
|
||||
import { TE_CONFIG } from '../shared/config';
|
||||
import type { TeHandler } from '../te.handler';
|
||||
|
||||
const SKIP_GROUPS = new Set(['World Bank', 'News', 'Calendar']);
|
||||
|
||||
export async function fetchIndicators(this: TeHandler): Promise<{ count: number }> {
|
||||
const { logger, mongodb, proxy } = this;
|
||||
|
||||
const response = await this.http.get(`${TE_CONFIG.MAIN_URL}/indicators`, {
|
||||
proxy: proxy?.getProxy(),
|
||||
headers: {
|
||||
'User-Agent': getRandomUserAgent(),
|
||||
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
|
||||
'Accept-Language': 'en-US,en;q=0.5',
|
||||
},
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error(`Failed to fetch indicators page: ${response.status}`);
|
||||
}
|
||||
|
||||
const html = await response.text();
|
||||
const $ = cheerio.load(html);
|
||||
|
||||
const indicators: Array<{ group: string; slug: string; name: string; url: string }> = [];
|
||||
|
||||
$('li.list-group-item.active').each((_, groupEl) => {
|
||||
const group = $(groupEl).text().trim();
|
||||
if (SKIP_GROUPS.has(group)) return;
|
||||
|
||||
$(groupEl).nextAll('li.list-group-item').each((_, li) => {
|
||||
if ($(li).hasClass('active')) return false; // next group
|
||||
const $a = $(li).find('a[href*="/country-list/"]');
|
||||
if ($a.length) {
|
||||
const href = $a.attr('href') || '';
|
||||
const name = $a.text().trim();
|
||||
const slug = href.replace('/country-list/', '');
|
||||
if (slug && name) {
|
||||
indicators.push({ group, slug, name, url: href });
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
if (!indicators.length) {
|
||||
throw new Error('No indicators found on page');
|
||||
}
|
||||
|
||||
logger.info(`Found ${indicators.length} indicators across groups`);
|
||||
|
||||
await mongodb?.batchUpsert('teIndicators', indicators, ['slug']);
|
||||
|
||||
logger.info(`Upserted ${indicators.length} indicators into teIndicators`);
|
||||
return { count: indicators.length };
|
||||
}
|
||||
|
|
@ -2,4 +2,9 @@
|
|||
export * from './fetch-countries.action';
|
||||
export * from './spider.action';
|
||||
export * from './crawl-scheduler.action';
|
||||
export * from './data-scheduler.action';
|
||||
export * from './data-fetch.action';
|
||||
export * from './fetch-indicators.action';
|
||||
export * from './market-data-scheduler.action';
|
||||
export * from './market-data-fetch.action';
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,158 @@
|
|||
import { TE_CONFIG } from '../shared/config';
|
||||
import { decodeTEData } from '../shared/decode';
|
||||
import type { TeMarketDataFetchPayload, TeMkCdnSerie } from '../shared/types';
|
||||
import type { TeHandler } from '../te.handler';
|
||||
|
||||
const CATEGORY_MAP: Record<string, string> = {
|
||||
ind: 'index',
|
||||
cur: 'currency',
|
||||
gov: 'bond',
|
||||
com: 'commodity',
|
||||
};
|
||||
|
||||
/** OHLC point: [epoch, close, pctChange, absChange, open, high, low, close] */
|
||||
interface MkDataPoint {
|
||||
t: number;
|
||||
o: number;
|
||||
h: number;
|
||||
l: number;
|
||||
c: number;
|
||||
pc: number | null;
|
||||
ac: number | null;
|
||||
}
|
||||
|
||||
async function fetchMkSerie(
|
||||
handler: TeHandler,
|
||||
cdnBase: string,
|
||||
params: Record<string, string>,
|
||||
): Promise<TeMkCdnSerie | null> {
|
||||
const url = new URL(cdnBase);
|
||||
for (const [k, v] of Object.entries(params)) {
|
||||
url.searchParams.set(k, v);
|
||||
}
|
||||
|
||||
const response = await handler.http.get(url.toString(), {
|
||||
proxy: handler.proxy?.getProxy(),
|
||||
headers: { 'Accept': '*/*', 'User-Agent': TE_CONFIG.USER_AGENT },
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error(`CDN HTTP ${response.status} for ${url.toString()}`);
|
||||
}
|
||||
|
||||
const rawText = (await response.text()).replaceAll('"', '');
|
||||
if (!rawText.length) return null;
|
||||
|
||||
const decoded = decodeTEData<{ series: TeMkCdnSerie[] }>(rawText, TE_CONFIG.KEY);
|
||||
return decoded?.series?.[0] ?? null;
|
||||
}
|
||||
|
||||
function toPoints(data: any[]): MkDataPoint[] {
|
||||
return data.map(([t, _c, pc, ac, o, h, l, c]: number[]) => ({ t, o, h, l, c, pc, ac }));
|
||||
}
|
||||
|
||||
function mergePoints(existing: MkDataPoint[], incoming: MkDataPoint[]): MkDataPoint[] {
|
||||
const map = new Map(existing.map((p) => [p.t, p]));
|
||||
for (const p of incoming) {
|
||||
map.set(p.t, p);
|
||||
}
|
||||
return Array.from(map.values()).sort((a, b) => a.t - b.t);
|
||||
}
|
||||
|
||||
export async function marketDataFetch(
|
||||
this: TeHandler,
|
||||
payload: TeMarketDataFetchPayload,
|
||||
): Promise<{ success: boolean; recordCount: number }> {
|
||||
const { logger, mongodb } = this;
|
||||
const { teSymbol, symbolType, url, isInitialFetch } = payload;
|
||||
|
||||
const teDoc = await mongodb?.findOne('teUrls', { url });
|
||||
if (!teDoc?.teChartsDatasource || !teDoc?.teChartToken || !teDoc?.teLastUpdate) {
|
||||
logger.error(`Missing CDN fields for ${teSymbol} (${url})`);
|
||||
return { success: false, recordCount: 0 };
|
||||
}
|
||||
|
||||
const cdnBase = `${teDoc.teChartsDatasource}/markets/${encodeURIComponent(teSymbol.toLowerCase())}:${symbolType}`;
|
||||
const baseParams = {
|
||||
ohlc: '1',
|
||||
v: teDoc.teLastUpdate,
|
||||
key: teDoc.teChartToken,
|
||||
};
|
||||
const category = CATEGORY_MAP[symbolType] || symbolType;
|
||||
|
||||
if (isInitialFetch) {
|
||||
logger.info(`Fetching market data: ${teSymbol}:${symbolType} (initial — daily + monthly)`);
|
||||
|
||||
const [dailySerie, monthlySerie] = await Promise.all([
|
||||
fetchMkSerie(this, cdnBase, { ...baseParams, interval: '1d', span: '11y' }),
|
||||
fetchMkSerie(this, cdnBase, { ...baseParams, span: 'max' }),
|
||||
]);
|
||||
|
||||
const daily = dailySerie?.data?.length ? toPoints(dailySerie.data) : [];
|
||||
const monthly = monthlySerie?.data?.length ? toPoints(monthlySerie.data) : [];
|
||||
const serie = dailySerie || monthlySerie;
|
||||
|
||||
if (!daily.length && !monthly.length) {
|
||||
logger.warn(`No data points decoded for ${teSymbol}:${symbolType}`);
|
||||
await mongodb?.updateOne('teUrls', { url }, {
|
||||
$set: { lastDataFetch: new Date(), lastDataFetchUpdate: teDoc.teLastUpdate, lastDataFetchCount: 0 },
|
||||
});
|
||||
return { success: true, recordCount: 0 };
|
||||
}
|
||||
|
||||
logger.info(`Storing ${daily.length} daily + ${monthly.length} monthly OHLC points for ${teSymbol} (${category})`);
|
||||
|
||||
await mongodb?.updateOne('teMarketData', { teSymbol }, {
|
||||
$set: {
|
||||
teSymbol,
|
||||
name: serie?.name,
|
||||
fullName: serie?.full_name,
|
||||
unit: serie?.unit,
|
||||
type: serie?.type,
|
||||
category,
|
||||
symbolType,
|
||||
ticker: serie?.ticker,
|
||||
sector: serie?.sector,
|
||||
industry: serie?.industry,
|
||||
frequency: serie?.frequency,
|
||||
daily,
|
||||
monthly,
|
||||
lastUpdated: new Date(),
|
||||
},
|
||||
$setOnInsert: { createdAt: new Date() },
|
||||
}, { upsert: true });
|
||||
|
||||
const totalPoints = daily.length + monthly.length;
|
||||
|
||||
await mongodb?.updateOne('teUrls', { url }, {
|
||||
$set: { lastDataFetch: new Date(), lastDataFetchUpdate: teDoc.teLastUpdate, lastDataFetchCount: totalPoints },
|
||||
});
|
||||
|
||||
return { success: true, recordCount: totalPoints };
|
||||
}
|
||||
|
||||
// Update: fetch recent daily OHLC, merge into existing
|
||||
logger.info(`Fetching market data: ${teSymbol}:${symbolType} (update)`);
|
||||
|
||||
const serie = await fetchMkSerie(this, cdnBase, { ...baseParams, interval: '1d' });
|
||||
|
||||
if (!serie?.data?.length) {
|
||||
logger.warn(`No update data for ${teSymbol}:${symbolType}`);
|
||||
return { success: true, recordCount: 0 };
|
||||
}
|
||||
|
||||
const incoming = toPoints(serie.data);
|
||||
const existing = await mongodb?.findOne('teMarketData', { teSymbol });
|
||||
|
||||
const merged = existing?.daily ? mergePoints(existing.daily, incoming) : incoming;
|
||||
|
||||
await mongodb?.updateOne('teMarketData', { teSymbol }, {
|
||||
$set: { daily: merged, lastUpdated: new Date() },
|
||||
});
|
||||
|
||||
await mongodb?.updateOne('teUrls', { url }, {
|
||||
$set: { lastDataFetch: new Date(), lastDataFetchUpdate: teDoc.teLastUpdate, lastDataFetchCount: merged.length },
|
||||
});
|
||||
|
||||
return { success: true, recordCount: merged.length };
|
||||
}
|
||||
|
|
@ -0,0 +1,43 @@
|
|||
import type { TeHandler } from '../te.handler';
|
||||
|
||||
export async function marketDataScheduler(this: TeHandler): Promise<{ scheduled: number }> {
|
||||
const { logger, mongodb } = this;
|
||||
|
||||
const candidates = await mongodb?.find('teUrls', {
|
||||
teType: 'te',
|
||||
teSymbol: { $exists: true, $ne: null },
|
||||
symbolType: { $exists: true, $ne: null },
|
||||
teChartsDatasource: { $exists: true },
|
||||
teChartToken: { $exists: true },
|
||||
teLastUpdate: { $exists: true },
|
||||
teChart: { $in: ['MK', 'TV'] },
|
||||
$or: [
|
||||
{ lastDataFetch: { $exists: false } },
|
||||
{ $expr: { $ne: ['$teLastUpdate', '$lastDataFetchUpdate'] } },
|
||||
],
|
||||
}, {
|
||||
sort: { lastDataFetch: 1 },
|
||||
projection: { teSymbol: 1, symbolType: 1, url: 1, lastDataFetch: 1 },
|
||||
});
|
||||
|
||||
if (!candidates?.length) {
|
||||
logger.debug('No market symbols need data fetching');
|
||||
return { scheduled: 0 };
|
||||
}
|
||||
|
||||
logger.info(`Scheduling ${candidates.length} market symbols for data fetch`);
|
||||
|
||||
for (const doc of candidates) {
|
||||
await this.scheduleOperation('te-market-data-fetch', {
|
||||
teSymbol: doc.teSymbol,
|
||||
symbolType: doc.symbolType,
|
||||
url: doc.url,
|
||||
isInitialFetch: !doc.lastDataFetch,
|
||||
}, {
|
||||
jobId: `market-data-fetch-${doc.teSymbol}`,
|
||||
priority: 8,
|
||||
});
|
||||
}
|
||||
|
||||
return { scheduled: candidates.length };
|
||||
}
|
||||
|
|
@ -6,4 +6,13 @@ export const TE_CONFIG = {
|
|||
REQUEST_TIMEOUT: 30000,
|
||||
USER_AGENT: 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
||||
KEY: 'tradingeconomics-charts-core-api-key',
|
||||
|
||||
CDN_PATH_MAP: {
|
||||
EC: 'economics',
|
||||
MK: 'markets',
|
||||
TV: 'markets',
|
||||
} as Record<string, string>,
|
||||
|
||||
DATA_FETCH_BATCH_SIZE: 50,
|
||||
DATA_FETCH_DELAY_MS: 500,
|
||||
};
|
||||
|
|
@ -19,4 +19,61 @@ export interface TeCountry {
|
|||
url?: string;
|
||||
updated_at: Date;
|
||||
created_at?: Date;
|
||||
}
|
||||
|
||||
export interface TeDataFetchPayload {
|
||||
teSymbol: string;
|
||||
url: string;
|
||||
isInitialFetch: boolean;
|
||||
}
|
||||
|
||||
export interface TeMarketDataFetchPayload {
|
||||
teSymbol: string;
|
||||
symbolType: string;
|
||||
url: string;
|
||||
isInitialFetch: boolean;
|
||||
}
|
||||
|
||||
/** MK/TV CDN series structure (flat, no nested serie wrapper) */
|
||||
export interface TeMkCdnSerie {
|
||||
symbol: string;
|
||||
name: string;
|
||||
shortname: string;
|
||||
full_name: string;
|
||||
unit: string;
|
||||
type: string;
|
||||
ticker: string;
|
||||
description: string;
|
||||
sector: string;
|
||||
industry: string;
|
||||
frequency: string;
|
||||
/** Each point: [epoch, value, pctChange, absChange] */
|
||||
data: Array<[number, number, number | null, number | null]>;
|
||||
}
|
||||
|
||||
/** Decoded CDN response wrapper */
|
||||
export interface TeCdnResponse {
|
||||
series: Array<{
|
||||
serie: TeCdnSerie;
|
||||
}>;
|
||||
span: string;
|
||||
agr: string;
|
||||
frequency: string;
|
||||
}
|
||||
|
||||
/** Serie metadata + data from CDN */
|
||||
export interface TeCdnSerie {
|
||||
s: string;
|
||||
name: string;
|
||||
shortname: string;
|
||||
unit: string;
|
||||
type: string;
|
||||
source: string;
|
||||
frequency: string;
|
||||
country: string;
|
||||
category: string;
|
||||
/** Each point: [value, unix_epoch, null, "YYYY-MM-DD"] */
|
||||
data: Array<[number, number, null, string]>;
|
||||
forecast?: Array<[number, number, null, string]>;
|
||||
projection?: Array<[number, number, null, string]>;
|
||||
}
|
||||
|
|
@ -1,11 +1,11 @@
|
|||
import {
|
||||
BaseHandler,
|
||||
Disabled,
|
||||
Handler,
|
||||
Operation,
|
||||
ScheduledOperation
|
||||
} from '@stock-bot/handlers';
|
||||
import type { DataIngestionServices } from '../../types';
|
||||
import { crawlScheduler, fetchCountries, spiderUrl } from './actions';
|
||||
import { crawlScheduler, dataFetch, dataScheduler, fetchCountries, fetchIndicators, marketDataFetch, marketDataScheduler, spiderUrl } from './actions';
|
||||
|
||||
@Handler('te')
|
||||
export class TeHandler extends BaseHandler<DataIngestionServices> {
|
||||
|
|
@ -73,10 +73,18 @@ export class TeHandler extends BaseHandler<DataIngestionServices> {
|
|||
// Index for finding URLs with chart data
|
||||
{
|
||||
indexSpec: { teChartUrl: 1 },
|
||||
options: {
|
||||
options: {
|
||||
name: 'chart_url_idx',
|
||||
sparse: true,
|
||||
background: true
|
||||
background: true
|
||||
}
|
||||
},
|
||||
// Index for data-scheduler: filter by teType + sort by lastDataFetch
|
||||
{
|
||||
indexSpec: { teType: 1, lastDataFetch: 1 },
|
||||
options: {
|
||||
name: 'data_fetch_schedule_idx',
|
||||
background: true,
|
||||
}
|
||||
}
|
||||
];
|
||||
|
|
@ -91,6 +99,62 @@ export class TeHandler extends BaseHandler<DataIngestionServices> {
|
|||
}
|
||||
}
|
||||
|
||||
// Indexes for teData collection (one doc per symbol with data array)
|
||||
const teDataIndexes = [
|
||||
{
|
||||
indexSpec: { teSymbol: 1 },
|
||||
options: {
|
||||
name: 'symbol_unique_idx',
|
||||
unique: true,
|
||||
background: true,
|
||||
},
|
||||
},
|
||||
{
|
||||
indexSpec: { country: 1, category: 1 },
|
||||
options: {
|
||||
name: 'country_category_idx',
|
||||
background: true,
|
||||
},
|
||||
},
|
||||
];
|
||||
|
||||
for (const index of teDataIndexes) {
|
||||
try {
|
||||
await this.mongodb.createIndex('teData', index.indexSpec, index.options);
|
||||
this.logger.info(`Created/verified teData index: ${index.options.name}`);
|
||||
} catch (error) {
|
||||
this.logger.debug(`teData index ${index.options.name} may already exist:`, error);
|
||||
}
|
||||
}
|
||||
|
||||
// Indexes for teMarketData collection
|
||||
const teMarketDataIndexes = [
|
||||
{
|
||||
indexSpec: { teSymbol: 1 },
|
||||
options: {
|
||||
name: 'symbol_unique_idx',
|
||||
unique: true,
|
||||
background: true,
|
||||
},
|
||||
},
|
||||
{
|
||||
indexSpec: { category: 1 },
|
||||
options: {
|
||||
name: 'category_idx',
|
||||
background: true,
|
||||
},
|
||||
},
|
||||
];
|
||||
|
||||
for (const index of teMarketDataIndexes) {
|
||||
try {
|
||||
await this.mongodb.createIndex('teMarketData', index.indexSpec, index.options);
|
||||
this.logger.info(`Created/verified teMarketData index: ${index.options.name}`);
|
||||
} catch (error) {
|
||||
this.logger.debug(`teMarketData index ${index.options.name} may already exist:`, error);
|
||||
}
|
||||
}
|
||||
|
||||
// Check collection stats
|
||||
const count = await this.mongodb.countDocuments('teUrls', {});
|
||||
this.logger.info(`TeUrls collection has ${count} documents`);
|
||||
|
|
@ -104,11 +168,17 @@ export class TeHandler extends BaseHandler<DataIngestionServices> {
|
|||
@ScheduledOperation('te-countries', '0 0 * * 0', {
|
||||
priority: 5,
|
||||
description: 'Fetch and update Trading Economics countries data',
|
||||
immediately: false,
|
||||
immediately: true,
|
||||
})
|
||||
@Disabled()
|
||||
fetchCountries = fetchCountries;
|
||||
|
||||
@ScheduledOperation('te-indicators', '0 0 * * 0', {
|
||||
priority: 5,
|
||||
description: 'Fetch and update Trading Economics indicators list',
|
||||
immediately: true,
|
||||
})
|
||||
fetchIndicators = fetchIndicators;
|
||||
|
||||
@ScheduledOperation('te-spider', '* * * * *', {
|
||||
priority: 5,
|
||||
description: 'Spider Trading Economics URLs for data extraction (every minute)',
|
||||
|
|
@ -122,4 +192,24 @@ export class TeHandler extends BaseHandler<DataIngestionServices> {
|
|||
immediately: true,
|
||||
})
|
||||
crawlScheduler = crawlScheduler;
|
||||
|
||||
@ScheduledOperation('te-data-scheduler', '*/10 * * * *', {
|
||||
priority: 7,
|
||||
description: 'Schedule TE CDN data fetch jobs for symbols with new data (every 10 min)',
|
||||
immediately: true,
|
||||
})
|
||||
dataScheduler = dataScheduler;
|
||||
|
||||
@Operation('te-data-fetch')
|
||||
dataFetch = dataFetch;
|
||||
|
||||
@ScheduledOperation('te-market-data-scheduler', '*/10 * * * *', {
|
||||
priority: 7,
|
||||
description: 'Schedule market data fetch jobs for MK/TV symbols with new data (every 10 min)',
|
||||
immediately: true,
|
||||
})
|
||||
marketDataScheduler = marketDataScheduler;
|
||||
|
||||
@Operation('te-market-data-fetch')
|
||||
marketDataFetch = marketDataFetch;
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue