From 167a5862577dc419b7ae8f325a4392fbbe7111a0 Mon Sep 17 00:00:00 2001 From: Boki Date: Thu, 2 Apr 2026 00:17:39 -0400 Subject: [PATCH] finished tradingeconomics i think --- .../handlers/te/actions/data-fetch.action.ts | 140 +++++++++ .../te/actions/data-scheduler.action.ts | 41 +++ .../te/actions/fetch-countries.action.ts | 279 ++++-------------- .../te/actions/fetch-indicators.action.ts | 57 ++++ .../src/handlers/te/actions/index.ts | 5 + .../te/actions/market-data-fetch.action.ts | 158 ++++++++++ .../actions/market-data-scheduler.action.ts | 43 +++ .../src/handlers/te/shared/config.ts | 9 + .../src/handlers/te/shared/types.ts | 57 ++++ .../src/handlers/te/te.handler.ts | 102 ++++++- 10 files changed, 658 insertions(+), 233 deletions(-) create mode 100644 apps/stock/data-ingestion/src/handlers/te/actions/data-fetch.action.ts create mode 100644 apps/stock/data-ingestion/src/handlers/te/actions/data-scheduler.action.ts create mode 100644 apps/stock/data-ingestion/src/handlers/te/actions/fetch-indicators.action.ts create mode 100644 apps/stock/data-ingestion/src/handlers/te/actions/market-data-fetch.action.ts create mode 100644 apps/stock/data-ingestion/src/handlers/te/actions/market-data-scheduler.action.ts diff --git a/apps/stock/data-ingestion/src/handlers/te/actions/data-fetch.action.ts b/apps/stock/data-ingestion/src/handlers/te/actions/data-fetch.action.ts new file mode 100644 index 0000000..85e3e38 --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/te/actions/data-fetch.action.ts @@ -0,0 +1,140 @@ +import { TE_CONFIG } from '../shared/config'; +import { decodeTEData } from '../shared/decode'; +import type { TeCdnResponse, TeDataFetchPayload } from '../shared/types'; +import type { TeHandler } from '../te.handler'; + +export async function dataFetch( + this: TeHandler, + payload: TeDataFetchPayload, +): Promise<{ success: boolean; recordCount: number }> { + const { logger, mongodb, http, proxy } = this; + const { teSymbol, url, isInitialFetch } = payload; + + const teDoc = await mongodb?.findOne('teUrls', { url }); + if (!teDoc?.teChartsDatasource || !teDoc?.teChartToken || !teDoc?.teChart || !teDoc?.teLastUpdate) { + logger.error(`Missing CDN fields for ${teSymbol} (${url})`); + return { success: false, recordCount: 0 }; + } + + const pathSegment = TE_CONFIG.CDN_PATH_MAP[teDoc.teChart]; + if (!pathSegment) { + logger.warn(`Unknown teChart type: ${teDoc.teChart} for ${teSymbol}`); + return { success: false, recordCount: 0 }; + } + + // Build CDN URL + const cdnUrl = new URL(`${teDoc.teChartsDatasource}/${pathSegment}/${encodeURIComponent(teSymbol.toLowerCase())}`); + if (isInitialFetch) { + cdnUrl.searchParams.set('span', 'max'); + } + cdnUrl.searchParams.set('v', teDoc.teLastUpdate); + cdnUrl.searchParams.set('key', teDoc.teChartToken); + + logger.info(`Fetching TE data: ${teSymbol} (${isInitialFetch ? 'initial' : 'update'})`); + + const response = await http.get(cdnUrl.toString(), { + proxy: proxy?.getProxy(), + headers: { 'Accept': '*/*', 'User-Agent': TE_CONFIG.USER_AGENT }, + }); + + if (!response.ok) { + logger.error(`CDN ${response.status} for ${teSymbol}`); + throw new Error(`CDN HTTP ${response.status} for ${teSymbol}`); + } + + const rawText = (await response.text()).replaceAll('"', ''); + if (!rawText.length) { + logger.warn(`Empty response for ${teSymbol}`); + return { success: false, recordCount: 0 }; + } + + // Decode: response is an array wrapping the actual object + const decoded = decodeTEData(rawText, TE_CONFIG.KEY); + const serie = decoded?.[0]?.series?.[0]?.serie; + + if (!serie?.data?.length) { + logger.warn(`No data points decoded for ${teSymbol}`); + await mongodb?.updateOne('teUrls', { url }, { + $set: { + lastDataFetch: new Date(), + lastDataFetchUpdate: teDoc.teLastUpdate, + lastDataFetchCount: 0, + }, + }); + return { success: true, recordCount: 0 }; + } + + // Transform data points: [value, epoch, null, "YYYY-MM-DD"] → { d, v } + const dataPoints = serie.data.map(([value, _epoch, _null, date]) => ({ d: date, v: value })); + const forecastPoints = serie.forecast?.map(([value, _epoch, _null, date]) => ({ d: date, v: value })) ?? []; + + logger.info(`Storing ${dataPoints.length} points + ${forecastPoints.length} forecast for ${teSymbol}`); + + if (isInitialFetch) { + // Initial: upsert the full document + await mongodb?.updateOne('teData', { teSymbol }, { + $set: { + teSymbol, + name: serie.name, + unit: serie.unit, + source: serie.source, + frequency: serie.frequency, + country: serie.country, + category: serie.category, + data: dataPoints, + forecast: forecastPoints, + lastUpdated: new Date(), + }, + $setOnInsert: { createdAt: new Date() }, + }, { upsert: true }); + } else { + // Update: merge new points into existing data array by date + const existing = await mongodb?.findOne('teData', { teSymbol }); + if (existing?.data) { + const dateMap = new Map(existing.data.map((p: any) => [p.d, p.v])); + for (const p of dataPoints) { + dateMap.set(p.d, p.v); + } + const merged = Array.from(dateMap.entries()) + .sort(([a], [b]) => a.localeCompare(b)) + .map(([d, v]) => ({ d, v })); + + await mongodb?.updateOne('teData', { teSymbol }, { + $set: { + data: merged, + forecast: forecastPoints, + lastUpdated: new Date(), + }, + }); + } else { + // No existing doc (shouldn't happen but handle gracefully) + await mongodb?.updateOne('teData', { teSymbol }, { + $set: { + teSymbol, + name: serie.name, + unit: serie.unit, + source: serie.source, + frequency: serie.frequency, + country: serie.country, + category: serie.category, + data: dataPoints, + forecast: forecastPoints, + lastUpdated: new Date(), + }, + $setOnInsert: { createdAt: new Date() }, + }, { upsert: true }); + } + } + + const totalPoints = dataPoints.length + forecastPoints.length; + + await mongodb?.updateOne('teUrls', { url }, { + $set: { + lastDataFetch: new Date(), + lastDataFetchUpdate: teDoc.teLastUpdate, + lastDataFetchCount: totalPoints, + }, + }); + + return { success: true, recordCount: totalPoints }; +} diff --git a/apps/stock/data-ingestion/src/handlers/te/actions/data-scheduler.action.ts b/apps/stock/data-ingestion/src/handlers/te/actions/data-scheduler.action.ts new file mode 100644 index 0000000..24754c4 --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/te/actions/data-scheduler.action.ts @@ -0,0 +1,41 @@ +import type { TeHandler } from '../te.handler'; + +export async function dataScheduler(this: TeHandler): Promise<{ scheduled: number }> { + const { logger, mongodb } = this; + + const candidates = await mongodb?.find('teUrls', { + teType: 'te', + teSymbol: { $exists: true, $ne: null }, + teChartsDatasource: { $exists: true }, + teChartToken: { $exists: true }, + teLastUpdate: { $exists: true }, + teChart: 'EC', + $or: [ + { lastDataFetch: { $exists: false } }, + { $expr: { $ne: ['$teLastUpdate', '$lastDataFetchUpdate'] } }, + ], + }, { + sort: { lastDataFetch: 1 }, + projection: { teSymbol: 1, url: 1, lastDataFetch: 1 }, + }); + + if (!candidates?.length) { + logger.debug('No symbols need data fetching'); + return { scheduled: 0 }; + } + + logger.info(`Scheduling ${candidates.length} symbols for TE data fetch`); + + for (const doc of candidates) { + await this.scheduleOperation('te-data-fetch', { + teSymbol: doc.teSymbol, + url: doc.url, + isInitialFetch: !doc.lastDataFetch, + }, { + jobId: `data-fetch-${doc.teSymbol}`, + priority: 8, + }); + } + + return { scheduled: candidates.length }; +} diff --git a/apps/stock/data-ingestion/src/handlers/te/actions/fetch-countries.action.ts b/apps/stock/data-ingestion/src/handlers/te/actions/fetch-countries.action.ts index dd08b7a..95b6a2e 100644 --- a/apps/stock/data-ingestion/src/handlers/te/actions/fetch-countries.action.ts +++ b/apps/stock/data-ingestion/src/handlers/te/actions/fetch-countries.action.ts @@ -1,227 +1,52 @@ -import { getRandomUserAgent } from '@stock-bot/utils'; -import * as cheerio from 'cheerio'; -import { TE_CONFIG } from '../shared/config'; -import type { TeCountry } from '../shared/types'; -import type { TeHandler } from '../te.handler'; - -export async function fetchCountries(this: TeHandler): Promise { - const { logger, mongodb } = this; - - try { - // 1. Fetch the HTML page - const reqInfo = { - proxy: this.proxy.getProxy(), - headers: { - 'User-Agent': getRandomUserAgent(), - 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', - 'Accept-Language': 'en-US,en;q=0.5', - 'Accept-Encoding': 'gzip, deflate, br', - }, - } - const response = await fetch(TE_CONFIG.COUNTRIES_URL, reqInfo); - - logger.debug('Response status:', { - status: response.status, - statusText: response.statusText, - url: response.url - }); - - if (!response.ok) { - throw new Error(`Failed to fetch: ${response.status} ${response.statusText}`); - } - - const html = await response.text(); - - logger.info('Fetched HTML length:', { length: html.length }); - - - // 2. Parse HTML to extract country data - const $ = cheerio.load(html); - const countries: TeCountry[] = []; - - // Look for country links - they typically have a pattern like /country-name - // Trading Economics groups countries by region in the page - $('.list-group-item, a[href^="/"]').each((_, element) => { - const $el = $(element); - - // Try to extract country information - let name: string | undefined; - let url: string | undefined; - let region: string | undefined; - - // Check if it's a direct link - if ($el.is('a')) { - const href = $el.attr('href'); - const text = $el.text().trim(); - console.log(href) - // Filter for country URLs (they don't contain special paths like /indicators, /calendar, etc.) - if (href && href.startsWith('/') && !href.includes('/') && text) { - name = text; - url = href; - } - } else { - // Check for links within table rows - const $link = $el.find('a[href^="/"]').first(); - if ($link.length) { - const href = $link.attr('href'); - const text = $link.text().trim(); - - if (href && text && !href.includes('/indicators') && !href.includes('/calendar')) { - name = text; - url = href; - - // Try to get region from parent elements - const $regionHeader = $el.closest('.region-section, .country-group').find('h2, h3, .region-title').first(); - if ($regionHeader.length) { - region = $regionHeader.text().trim(); - } - } - } - } - - // Add to countries array if we found valid data - if (name && url) { - // Extract country code from URL if possible (e.g., /united-states -> US) - const code = extractCountryCode(url, name); - - countries.push({ - name, - code, - url: `https://tradingeconomics.com${url}`, - region, - updated_at: new Date(), - }); - } - }); - - // Remove duplicates based on name - const uniqueCountries = Array.from( - new Map(countries.map(c => [c.name, c])).values() - ); - - if (uniqueCountries.length === 0) { - throw new Error('No countries found in HTML'); - } - - logger.info('Extracted countries from HTML', { - count: uniqueCountries.length, - byRegion: groupCountriesByRegion(uniqueCountries), - }); - - // 3. Save to MongoDB - try { - console.log( uniqueCountries) - if (uniqueCountries.length > 0) { - const result = await mongodb?.batchUpsert('teCountries', uniqueCountries, ['code']); - logger.info('Countries saved to MongoDB', { - matched: result.matchedCount, - modified: result.modifiedCount, - upserted: result.upsertedCount, - }); - } - } catch (dbError) { - logger.error('Failed to save countries to MongoDB', { error: dbError }); - throw dbError; - } - - return uniqueCountries; - } catch (error) { - logger.error('Failed to fetch Trading Economics countries', { - error: error instanceof Error ? error.message : String(error), - stack: error instanceof Error ? error.stack : undefined, - }); - return null; - } -} - -function extractCountryCode(url: string, name: string): string | undefined { - // Common country code mappings - const countryCodeMap: Record = { - 'united-states': 'US', - 'united-kingdom': 'GB', - 'euro-area': 'EU', - 'china': 'CN', - 'japan': 'JP', - 'germany': 'DE', - 'france': 'FR', - 'italy': 'IT', - 'spain': 'ES', - 'canada': 'CA', - 'australia': 'AU', - 'south-korea': 'KR', - 'india': 'IN', - 'brazil': 'BR', - 'russia': 'RU', - 'mexico': 'MX', - 'indonesia': 'ID', - 'netherlands': 'NL', - 'saudi-arabia': 'SA', - 'turkey': 'TR', - 'switzerland': 'CH', - 'poland': 'PL', - 'sweden': 'SE', - 'belgium': 'BE', - 'argentina': 'AR', - 'ireland': 'IE', - 'austria': 'AT', - 'norway': 'NO', - 'israel': 'IL', - 'singapore': 'SG', - 'denmark': 'DK', - 'egypt': 'EG', - 'philippines': 'PH', - 'finland': 'FI', - 'chile': 'CL', - 'pakistan': 'PK', - 'romania': 'RO', - 'new-zealand': 'NZ', - 'greece': 'GR', - 'iraq': 'IQ', - 'portugal': 'PT', - 'czech-republic': 'CZ', - 'vietnam': 'VN', - 'peru': 'PE', - 'colombia': 'CO', - 'malaysia': 'MY', - 'ukraine': 'UA', - 'hungary': 'HU', - 'kuwait': 'KW', - 'morocco': 'MA', - 'slovakia': 'SK', - 'kenya': 'KE', - 'puerto-rico': 'PR', - 'ecuador': 'EC', - 'ethiopia': 'ET', - 'dominican-republic': 'DO', - 'luxembourg': 'LU', - 'oman': 'OM', - 'guatemala': 'GT', - 'bulgaria': 'BG', - 'ghana': 'GH', - 'tanzania': 'TZ', - 'turkmenistan': 'TM', - 'croatia': 'HR', - 'costa-rica': 'CR', - 'lebanon': 'LB', - 'slovenia': 'SI', - 'lithuania': 'LT', - 'serbia': 'RS', - 'panama': 'PA', - }; - - // Clean URL to get country slug - const slug = url.replace(/^\//, '').toLowerCase(); - - return countryCodeMap[slug]; -} - -function groupCountriesByRegion(countries: TeCountry[]): Record { - const groups: Record = {}; - - for (const country of countries) { - const region = country.region || 'Unknown'; - groups[region] = (groups[region] || 0) + 1; - } - - return groups; -} \ No newline at end of file +import { getRandomUserAgent } from '@stock-bot/utils'; +import * as cheerio from 'cheerio'; +import { TE_CONFIG } from '../shared/config'; +import type { TeHandler } from '../te.handler'; + +export async function fetchCountries(this: TeHandler): Promise<{ count: number }> { + const { logger, mongodb, proxy } = this; + + const response = await this.http.get(TE_CONFIG.COUNTRIES_URL, { + proxy: proxy?.getProxy(), + headers: { + 'User-Agent': getRandomUserAgent(), + 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', + 'Accept-Language': 'en-US,en;q=0.5', + }, + }); + + if (!response.ok) { + throw new Error(`Failed to fetch countries page: ${response.status}`); + } + + const html = await response.text(); + const $ = cheerio.load(html); + + // Country links end with /indicators e.g. /united-states/indicators + const countries = new Map(); + $('a[href$="/indicators"]').each((_, el) => { + const href = $(el).attr('href') || ''; + const name = $(el).text().trim(); + const slug = href.replace('/indicators', '').replace('/', ''); + if (slug && name) { + countries.set(slug, name); + } + }); + + if (!countries.size) { + throw new Error('No countries found on page'); + } + + logger.info(`Found ${countries.size} countries`); + + const docs = Array.from(countries.entries()).map(([slug, name]) => ({ + slug, + name, + url: `/${slug}`, + })); + + await mongodb?.batchUpsert('teCountries', docs, ['slug']); + + logger.info(`Upserted ${docs.length} countries into teCountries`); + return { count: docs.length }; +} diff --git a/apps/stock/data-ingestion/src/handlers/te/actions/fetch-indicators.action.ts b/apps/stock/data-ingestion/src/handlers/te/actions/fetch-indicators.action.ts new file mode 100644 index 0000000..d775384 --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/te/actions/fetch-indicators.action.ts @@ -0,0 +1,57 @@ +import { getRandomUserAgent } from '@stock-bot/utils'; +import * as cheerio from 'cheerio'; +import { TE_CONFIG } from '../shared/config'; +import type { TeHandler } from '../te.handler'; + +const SKIP_GROUPS = new Set(['World Bank', 'News', 'Calendar']); + +export async function fetchIndicators(this: TeHandler): Promise<{ count: number }> { + const { logger, mongodb, proxy } = this; + + const response = await this.http.get(`${TE_CONFIG.MAIN_URL}/indicators`, { + proxy: proxy?.getProxy(), + headers: { + 'User-Agent': getRandomUserAgent(), + 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', + 'Accept-Language': 'en-US,en;q=0.5', + }, + }); + + if (!response.ok) { + throw new Error(`Failed to fetch indicators page: ${response.status}`); + } + + const html = await response.text(); + const $ = cheerio.load(html); + + const indicators: Array<{ group: string; slug: string; name: string; url: string }> = []; + + $('li.list-group-item.active').each((_, groupEl) => { + const group = $(groupEl).text().trim(); + if (SKIP_GROUPS.has(group)) return; + + $(groupEl).nextAll('li.list-group-item').each((_, li) => { + if ($(li).hasClass('active')) return false; // next group + const $a = $(li).find('a[href*="/country-list/"]'); + if ($a.length) { + const href = $a.attr('href') || ''; + const name = $a.text().trim(); + const slug = href.replace('/country-list/', ''); + if (slug && name) { + indicators.push({ group, slug, name, url: href }); + } + } + }); + }); + + if (!indicators.length) { + throw new Error('No indicators found on page'); + } + + logger.info(`Found ${indicators.length} indicators across groups`); + + await mongodb?.batchUpsert('teIndicators', indicators, ['slug']); + + logger.info(`Upserted ${indicators.length} indicators into teIndicators`); + return { count: indicators.length }; +} diff --git a/apps/stock/data-ingestion/src/handlers/te/actions/index.ts b/apps/stock/data-ingestion/src/handlers/te/actions/index.ts index 833e166..73a4051 100644 --- a/apps/stock/data-ingestion/src/handlers/te/actions/index.ts +++ b/apps/stock/data-ingestion/src/handlers/te/actions/index.ts @@ -2,4 +2,9 @@ export * from './fetch-countries.action'; export * from './spider.action'; export * from './crawl-scheduler.action'; +export * from './data-scheduler.action'; +export * from './data-fetch.action'; +export * from './fetch-indicators.action'; +export * from './market-data-scheduler.action'; +export * from './market-data-fetch.action'; diff --git a/apps/stock/data-ingestion/src/handlers/te/actions/market-data-fetch.action.ts b/apps/stock/data-ingestion/src/handlers/te/actions/market-data-fetch.action.ts new file mode 100644 index 0000000..8da90a6 --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/te/actions/market-data-fetch.action.ts @@ -0,0 +1,158 @@ +import { TE_CONFIG } from '../shared/config'; +import { decodeTEData } from '../shared/decode'; +import type { TeMarketDataFetchPayload, TeMkCdnSerie } from '../shared/types'; +import type { TeHandler } from '../te.handler'; + +const CATEGORY_MAP: Record = { + ind: 'index', + cur: 'currency', + gov: 'bond', + com: 'commodity', +}; + +/** OHLC point: [epoch, close, pctChange, absChange, open, high, low, close] */ +interface MkDataPoint { + t: number; + o: number; + h: number; + l: number; + c: number; + pc: number | null; + ac: number | null; +} + +async function fetchMkSerie( + handler: TeHandler, + cdnBase: string, + params: Record, +): Promise { + const url = new URL(cdnBase); + for (const [k, v] of Object.entries(params)) { + url.searchParams.set(k, v); + } + + const response = await handler.http.get(url.toString(), { + proxy: handler.proxy?.getProxy(), + headers: { 'Accept': '*/*', 'User-Agent': TE_CONFIG.USER_AGENT }, + }); + + if (!response.ok) { + throw new Error(`CDN HTTP ${response.status} for ${url.toString()}`); + } + + const rawText = (await response.text()).replaceAll('"', ''); + if (!rawText.length) return null; + + const decoded = decodeTEData<{ series: TeMkCdnSerie[] }>(rawText, TE_CONFIG.KEY); + return decoded?.series?.[0] ?? null; +} + +function toPoints(data: any[]): MkDataPoint[] { + return data.map(([t, _c, pc, ac, o, h, l, c]: number[]) => ({ t, o, h, l, c, pc, ac })); +} + +function mergePoints(existing: MkDataPoint[], incoming: MkDataPoint[]): MkDataPoint[] { + const map = new Map(existing.map((p) => [p.t, p])); + for (const p of incoming) { + map.set(p.t, p); + } + return Array.from(map.values()).sort((a, b) => a.t - b.t); +} + +export async function marketDataFetch( + this: TeHandler, + payload: TeMarketDataFetchPayload, +): Promise<{ success: boolean; recordCount: number }> { + const { logger, mongodb } = this; + const { teSymbol, symbolType, url, isInitialFetch } = payload; + + const teDoc = await mongodb?.findOne('teUrls', { url }); + if (!teDoc?.teChartsDatasource || !teDoc?.teChartToken || !teDoc?.teLastUpdate) { + logger.error(`Missing CDN fields for ${teSymbol} (${url})`); + return { success: false, recordCount: 0 }; + } + + const cdnBase = `${teDoc.teChartsDatasource}/markets/${encodeURIComponent(teSymbol.toLowerCase())}:${symbolType}`; + const baseParams = { + ohlc: '1', + v: teDoc.teLastUpdate, + key: teDoc.teChartToken, + }; + const category = CATEGORY_MAP[symbolType] || symbolType; + + if (isInitialFetch) { + logger.info(`Fetching market data: ${teSymbol}:${symbolType} (initial — daily + monthly)`); + + const [dailySerie, monthlySerie] = await Promise.all([ + fetchMkSerie(this, cdnBase, { ...baseParams, interval: '1d', span: '11y' }), + fetchMkSerie(this, cdnBase, { ...baseParams, span: 'max' }), + ]); + + const daily = dailySerie?.data?.length ? toPoints(dailySerie.data) : []; + const monthly = monthlySerie?.data?.length ? toPoints(monthlySerie.data) : []; + const serie = dailySerie || monthlySerie; + + if (!daily.length && !monthly.length) { + logger.warn(`No data points decoded for ${teSymbol}:${symbolType}`); + await mongodb?.updateOne('teUrls', { url }, { + $set: { lastDataFetch: new Date(), lastDataFetchUpdate: teDoc.teLastUpdate, lastDataFetchCount: 0 }, + }); + return { success: true, recordCount: 0 }; + } + + logger.info(`Storing ${daily.length} daily + ${monthly.length} monthly OHLC points for ${teSymbol} (${category})`); + + await mongodb?.updateOne('teMarketData', { teSymbol }, { + $set: { + teSymbol, + name: serie?.name, + fullName: serie?.full_name, + unit: serie?.unit, + type: serie?.type, + category, + symbolType, + ticker: serie?.ticker, + sector: serie?.sector, + industry: serie?.industry, + frequency: serie?.frequency, + daily, + monthly, + lastUpdated: new Date(), + }, + $setOnInsert: { createdAt: new Date() }, + }, { upsert: true }); + + const totalPoints = daily.length + monthly.length; + + await mongodb?.updateOne('teUrls', { url }, { + $set: { lastDataFetch: new Date(), lastDataFetchUpdate: teDoc.teLastUpdate, lastDataFetchCount: totalPoints }, + }); + + return { success: true, recordCount: totalPoints }; + } + + // Update: fetch recent daily OHLC, merge into existing + logger.info(`Fetching market data: ${teSymbol}:${symbolType} (update)`); + + const serie = await fetchMkSerie(this, cdnBase, { ...baseParams, interval: '1d' }); + + if (!serie?.data?.length) { + logger.warn(`No update data for ${teSymbol}:${symbolType}`); + return { success: true, recordCount: 0 }; + } + + const incoming = toPoints(serie.data); + const existing = await mongodb?.findOne('teMarketData', { teSymbol }); + + const merged = existing?.daily ? mergePoints(existing.daily, incoming) : incoming; + + await mongodb?.updateOne('teMarketData', { teSymbol }, { + $set: { daily: merged, lastUpdated: new Date() }, + }); + + await mongodb?.updateOne('teUrls', { url }, { + $set: { lastDataFetch: new Date(), lastDataFetchUpdate: teDoc.teLastUpdate, lastDataFetchCount: merged.length }, + }); + + return { success: true, recordCount: merged.length }; +} diff --git a/apps/stock/data-ingestion/src/handlers/te/actions/market-data-scheduler.action.ts b/apps/stock/data-ingestion/src/handlers/te/actions/market-data-scheduler.action.ts new file mode 100644 index 0000000..f27695f --- /dev/null +++ b/apps/stock/data-ingestion/src/handlers/te/actions/market-data-scheduler.action.ts @@ -0,0 +1,43 @@ +import type { TeHandler } from '../te.handler'; + +export async function marketDataScheduler(this: TeHandler): Promise<{ scheduled: number }> { + const { logger, mongodb } = this; + + const candidates = await mongodb?.find('teUrls', { + teType: 'te', + teSymbol: { $exists: true, $ne: null }, + symbolType: { $exists: true, $ne: null }, + teChartsDatasource: { $exists: true }, + teChartToken: { $exists: true }, + teLastUpdate: { $exists: true }, + teChart: { $in: ['MK', 'TV'] }, + $or: [ + { lastDataFetch: { $exists: false } }, + { $expr: { $ne: ['$teLastUpdate', '$lastDataFetchUpdate'] } }, + ], + }, { + sort: { lastDataFetch: 1 }, + projection: { teSymbol: 1, symbolType: 1, url: 1, lastDataFetch: 1 }, + }); + + if (!candidates?.length) { + logger.debug('No market symbols need data fetching'); + return { scheduled: 0 }; + } + + logger.info(`Scheduling ${candidates.length} market symbols for data fetch`); + + for (const doc of candidates) { + await this.scheduleOperation('te-market-data-fetch', { + teSymbol: doc.teSymbol, + symbolType: doc.symbolType, + url: doc.url, + isInitialFetch: !doc.lastDataFetch, + }, { + jobId: `market-data-fetch-${doc.teSymbol}`, + priority: 8, + }); + } + + return { scheduled: candidates.length }; +} diff --git a/apps/stock/data-ingestion/src/handlers/te/shared/config.ts b/apps/stock/data-ingestion/src/handlers/te/shared/config.ts index fe7c630..9efd7d1 100644 --- a/apps/stock/data-ingestion/src/handlers/te/shared/config.ts +++ b/apps/stock/data-ingestion/src/handlers/te/shared/config.ts @@ -6,4 +6,13 @@ export const TE_CONFIG = { REQUEST_TIMEOUT: 30000, USER_AGENT: 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', KEY: 'tradingeconomics-charts-core-api-key', + + CDN_PATH_MAP: { + EC: 'economics', + MK: 'markets', + TV: 'markets', + } as Record, + + DATA_FETCH_BATCH_SIZE: 50, + DATA_FETCH_DELAY_MS: 500, }; \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/te/shared/types.ts b/apps/stock/data-ingestion/src/handlers/te/shared/types.ts index 52f9b7f..ff90f37 100644 --- a/apps/stock/data-ingestion/src/handlers/te/shared/types.ts +++ b/apps/stock/data-ingestion/src/handlers/te/shared/types.ts @@ -19,4 +19,61 @@ export interface TeCountry { url?: string; updated_at: Date; created_at?: Date; +} + +export interface TeDataFetchPayload { + teSymbol: string; + url: string; + isInitialFetch: boolean; +} + +export interface TeMarketDataFetchPayload { + teSymbol: string; + symbolType: string; + url: string; + isInitialFetch: boolean; +} + +/** MK/TV CDN series structure (flat, no nested serie wrapper) */ +export interface TeMkCdnSerie { + symbol: string; + name: string; + shortname: string; + full_name: string; + unit: string; + type: string; + ticker: string; + description: string; + sector: string; + industry: string; + frequency: string; + /** Each point: [epoch, value, pctChange, absChange] */ + data: Array<[number, number, number | null, number | null]>; +} + +/** Decoded CDN response wrapper */ +export interface TeCdnResponse { + series: Array<{ + serie: TeCdnSerie; + }>; + span: string; + agr: string; + frequency: string; +} + +/** Serie metadata + data from CDN */ +export interface TeCdnSerie { + s: string; + name: string; + shortname: string; + unit: string; + type: string; + source: string; + frequency: string; + country: string; + category: string; + /** Each point: [value, unix_epoch, null, "YYYY-MM-DD"] */ + data: Array<[number, number, null, string]>; + forecast?: Array<[number, number, null, string]>; + projection?: Array<[number, number, null, string]>; } \ No newline at end of file diff --git a/apps/stock/data-ingestion/src/handlers/te/te.handler.ts b/apps/stock/data-ingestion/src/handlers/te/te.handler.ts index 0a3a9a1..fdc4c8b 100644 --- a/apps/stock/data-ingestion/src/handlers/te/te.handler.ts +++ b/apps/stock/data-ingestion/src/handlers/te/te.handler.ts @@ -1,11 +1,11 @@ import { BaseHandler, - Disabled, Handler, + Operation, ScheduledOperation } from '@stock-bot/handlers'; import type { DataIngestionServices } from '../../types'; -import { crawlScheduler, fetchCountries, spiderUrl } from './actions'; +import { crawlScheduler, dataFetch, dataScheduler, fetchCountries, fetchIndicators, marketDataFetch, marketDataScheduler, spiderUrl } from './actions'; @Handler('te') export class TeHandler extends BaseHandler { @@ -73,10 +73,18 @@ export class TeHandler extends BaseHandler { // Index for finding URLs with chart data { indexSpec: { teChartUrl: 1 }, - options: { + options: { name: 'chart_url_idx', sparse: true, - background: true + background: true + } + }, + // Index for data-scheduler: filter by teType + sort by lastDataFetch + { + indexSpec: { teType: 1, lastDataFetch: 1 }, + options: { + name: 'data_fetch_schedule_idx', + background: true, } } ]; @@ -91,6 +99,62 @@ export class TeHandler extends BaseHandler { } } + // Indexes for teData collection (one doc per symbol with data array) + const teDataIndexes = [ + { + indexSpec: { teSymbol: 1 }, + options: { + name: 'symbol_unique_idx', + unique: true, + background: true, + }, + }, + { + indexSpec: { country: 1, category: 1 }, + options: { + name: 'country_category_idx', + background: true, + }, + }, + ]; + + for (const index of teDataIndexes) { + try { + await this.mongodb.createIndex('teData', index.indexSpec, index.options); + this.logger.info(`Created/verified teData index: ${index.options.name}`); + } catch (error) { + this.logger.debug(`teData index ${index.options.name} may already exist:`, error); + } + } + + // Indexes for teMarketData collection + const teMarketDataIndexes = [ + { + indexSpec: { teSymbol: 1 }, + options: { + name: 'symbol_unique_idx', + unique: true, + background: true, + }, + }, + { + indexSpec: { category: 1 }, + options: { + name: 'category_idx', + background: true, + }, + }, + ]; + + for (const index of teMarketDataIndexes) { + try { + await this.mongodb.createIndex('teMarketData', index.indexSpec, index.options); + this.logger.info(`Created/verified teMarketData index: ${index.options.name}`); + } catch (error) { + this.logger.debug(`teMarketData index ${index.options.name} may already exist:`, error); + } + } + // Check collection stats const count = await this.mongodb.countDocuments('teUrls', {}); this.logger.info(`TeUrls collection has ${count} documents`); @@ -104,11 +168,17 @@ export class TeHandler extends BaseHandler { @ScheduledOperation('te-countries', '0 0 * * 0', { priority: 5, description: 'Fetch and update Trading Economics countries data', - immediately: false, + immediately: true, }) - @Disabled() fetchCountries = fetchCountries; + @ScheduledOperation('te-indicators', '0 0 * * 0', { + priority: 5, + description: 'Fetch and update Trading Economics indicators list', + immediately: true, + }) + fetchIndicators = fetchIndicators; + @ScheduledOperation('te-spider', '* * * * *', { priority: 5, description: 'Spider Trading Economics URLs for data extraction (every minute)', @@ -122,4 +192,24 @@ export class TeHandler extends BaseHandler { immediately: true, }) crawlScheduler = crawlScheduler; + + @ScheduledOperation('te-data-scheduler', '*/10 * * * *', { + priority: 7, + description: 'Schedule TE CDN data fetch jobs for symbols with new data (every 10 min)', + immediately: true, + }) + dataScheduler = dataScheduler; + + @Operation('te-data-fetch') + dataFetch = dataFetch; + + @ScheduledOperation('te-market-data-scheduler', '*/10 * * * *', { + priority: 7, + description: 'Schedule market data fetch jobs for MK/TV symbols with new data (every 10 min)', + immediately: true, + }) + marketDataScheduler = marketDataScheduler; + + @Operation('te-market-data-fetch') + marketDataFetch = marketDataFetch; } \ No newline at end of file