renaming services to more suitable names

This commit is contained in:
Boki 2025-06-21 14:02:54 -04:00
parent 3ae9de8376
commit be6afef832
69 changed files with 41 additions and 2956 deletions

View file

@ -0,0 +1,183 @@
# Data Sync Service
The Data Sync Service handles synchronization of raw MongoDB data to PostgreSQL master records, providing a unified data layer for the stock-bot application.
## Features
### Original Sync Manager
- Basic QM (QuoteMedia) symbol and exchange synchronization
- Simple static exchange mapping
- Manual sync triggers via REST API
### Enhanced Sync Manager ✨ NEW
- **Multi-provider support**: Syncs from EOD, Interactive Brokers, and QuoteMedia
- **Comprehensive exchange handling**: Leverages all 4 MongoDB exchange collections
- **Intelligent exchange mapping**: Dynamic mapping with fallback logic
- **Transaction safety**: Full ACID compliance with rollback on errors
- **Performance optimization**: Exchange caching for faster lookups
- **Enhanced error handling**: Detailed error tracking and reporting
## API Endpoints
### Health Check
- `GET /health` - Service health status
### Original Sync Operations
- `POST /sync/symbols` - Sync QM symbols to PostgreSQL
- `POST /sync/exchanges` - Sync QM exchanges to PostgreSQL
- `GET /sync/status` - Get basic sync status
### Enhanced Sync Operations ✨ NEW
- `POST /sync/exchanges/all?clear=true` - Comprehensive exchange sync from all providers (clear=true removes dummy data first)
- `POST /sync/symbols/:provider?clear=true` - Sync symbols from specific provider (qm, eod, ib)
- `POST /sync/clear` - Clear all PostgreSQL data (exchanges, symbols, mappings)
- `GET /sync/status/enhanced` - Get detailed sync status
- `GET /sync/stats/exchanges` - Get exchange statistics
## Data Sources
### MongoDB Collections
1. **exchanges** (34 records) - Unified exchange reference
2. **eodExchanges** (78 records) - EOD provider with currency/MIC data
3. **ibExchanges** (214 records) - Interactive Brokers with asset types
4. **qmExchanges** (25 records) - QuoteMedia exchanges
### PostgreSQL Tables
1. **master_exchanges** - Unified exchange master data
2. **master_symbols** - Symbol master records
3. **provider_symbol_mappings** - Multi-provider symbol mappings
4. **sync_status** - Synchronization tracking
## Key Improvements
### 1. Multi-Provider Exchange Sync
Instead of only syncing QM exchanges, the enhanced sync manager:
- Syncs from EOD exchanges (comprehensive global data with currencies)
- Adds IB exchanges for additional coverage (214 exchanges vs 25 in QM)
### 2. Intelligent Exchange Mapping
Replaces hard-coded mapping with dynamic resolution:
```typescript
// Before: Static mapping
const exchangeMap = { 'NASDAQ': 'NASDAQ', 'NYSE': 'NYSE' };
// After: Dynamic mapping with variations
const codeMap = {
'NASDAQ': 'NASDAQ', 'NAS': 'NASDAQ',
'NYSE': 'NYSE', 'NYQ': 'NYSE',
'LSE': 'LSE', 'LON': 'LSE', 'LN': 'LSE',
'US': 'NYSE' // EOD uses 'US' for US markets
};
```
### 3. Transaction Safety
All sync operations use database transactions:
- `BEGIN` transaction at start
- `COMMIT` on success
- `ROLLBACK` on any error
- Ensures data consistency
### 4. Performance Optimization
- Exchange cache preloaded at startup
- Reduced database queries during symbol processing
- Batch operations where possible
### 5. Enhanced Error Handling
- Detailed error logging with context
- Separate error counting in sync results
- Graceful handling of missing/invalid data
## Usage Examples
### Clear All Data and Start Fresh Exchange Sync
```bash
curl -X POST "http://localhost:3005/sync/exchanges/all?clear=true"
```
### Sync Symbols from Specific Provider
```bash
# Sync QuoteMedia symbols (clear existing symbols first)
curl -X POST "http://localhost:3005/sync/symbols/qm?clear=true"
# Sync EOD symbols
curl -X POST http://localhost:3005/sync/symbols/eod
# Sync Interactive Brokers symbols
curl -X POST http://localhost:3005/sync/symbols/ib
```
### Clear All PostgreSQL Data
```bash
curl -X POST http://localhost:3005/sync/clear
```
### Get Enhanced Status
```bash
curl http://localhost:3005/sync/status/enhanced
curl http://localhost:3005/sync/stats/exchanges
```
## Configuration
### Environment Variables
- `DATA_SYNC_SERVICE_PORT` - Service port (default: 3005)
- `NODE_ENV` - Environment mode
### Database Connections
- **MongoDB**: `mongodb://trading_admin:trading_mongo_dev@localhost:27017/stock?authSource=admin`
- **PostgreSQL**: `postgresql://trading_user:trading_pass_dev@localhost:5432/trading_bot`
## Development
### Build and Run
```bash
# Development mode
bun run dev
# Build
bun run build
# Production
bun run start
```
### Testing
```bash
# Run tests
bun test
# Start infrastructure
bun run infra:up
# Test sync operations
curl -X POST http://localhost:3005/sync/exchanges/all
curl -X POST http://localhost:3005/sync/symbols/qm
```
## Architecture
```
MongoDB Collections PostgreSQL Tables
┌─ exchanges (34) ┐ ┌─ master_exchanges
├─ eodExchanges (78) ├──▶├─ master_symbols
├─ ibExchanges (214) │ ├─ provider_symbol_mappings
└─ qmExchanges (25) ┘ └─ sync_status
Enhanced Sync Manager
- Exchange caching
- Dynamic mapping
- Transaction safety
- Multi-provider support
```
## Migration Path
The enhanced sync manager is designed to work alongside the original sync manager:
1. **Immediate**: Use enhanced exchange sync for better coverage
2. **Phase 1**: Test enhanced symbol sync with each provider
3. **Phase 2**: Replace original sync manager when confident
4. **Phase 3**: Remove original sync manager and endpoints
Both managers can be used simultaneously during the transition period.

View file

@ -0,0 +1,15 @@
{
"service": {
"name": "data-pipeline",
"port": 3005,
"host": "0.0.0.0",
"healthCheckPath": "/health",
"metricsPath": "/metrics",
"shutdownTimeout": 30000,
"cors": {
"enabled": true,
"origin": "*",
"credentials": false
}
}
}

View file

@ -0,0 +1,28 @@
{
"name": "@stock-bot/data-pipeline",
"version": "1.0.0",
"description": "Data processing pipeline for syncing and transforming raw data to normalized records",
"main": "dist/index.js",
"type": "module",
"scripts": {
"dev": "bun --watch src/index.ts",
"build": "bun build src/index.ts --outdir dist --target node --external chromium-bidi --external electron --external playwright --external playwright-core",
"start": "bun dist/index.js",
"test": "bun test",
"clean": "rm -rf dist"
},
"dependencies": {
"@stock-bot/cache": "*",
"@stock-bot/config": "*",
"@stock-bot/logger": "*",
"@stock-bot/mongodb-client": "*",
"@stock-bot/postgres-client": "*",
"@stock-bot/questdb-client": "*",
"@stock-bot/queue": "*",
"@stock-bot/shutdown": "*",
"hono": "^4.0.0"
},
"devDependencies": {
"typescript": "^5.0.0"
}
}

View file

@ -0,0 +1,58 @@
import { getLogger } from '@stock-bot/logger';
import { handlerRegistry, type HandlerConfig, type ScheduledJobConfig } from '@stock-bot/queue';
import { exchangeOperations } from './operations';
const logger = getLogger('exchanges-handler');
const HANDLER_NAME = 'exchanges';
const exchangesHandlerConfig: HandlerConfig = {
concurrency: 1,
maxAttempts: 3,
scheduledJobs: [
{
operation: 'sync-all-exchanges',
cronPattern: '0 0 * * 0', // Weekly on Sunday at midnight
payload: { clearFirst: true },
priority: 10,
immediately: false,
} as ScheduledJobConfig,
{
operation: 'sync-qm-exchanges',
cronPattern: '0 1 * * *', // Daily at 1 AM
payload: {},
priority: 5,
immediately: false,
} as ScheduledJobConfig,
{
operation: 'sync-ib-exchanges',
cronPattern: '0 3 * * *', // Daily at 3 AM
payload: {},
priority: 3,
immediately: false,
} as ScheduledJobConfig,
{
operation: 'sync-qm-provider-mappings',
cronPattern: '0 3 * * *', // Daily at 3 AM
payload: {},
priority: 7,
immediately: false,
} as ScheduledJobConfig,
],
operations: {
'sync-all-exchanges': exchangeOperations.syncAllExchanges,
'sync-qm-exchanges': exchangeOperations.syncQMExchanges,
'sync-ib-exchanges': exchangeOperations.syncIBExchanges,
'sync-qm-provider-mappings': exchangeOperations.syncQMProviderMappings,
'clear-postgresql-data': exchangeOperations.clearPostgreSQLData,
'get-exchange-stats': exchangeOperations.getExchangeStats,
'get-provider-mapping-stats': exchangeOperations.getProviderMappingStats,
'enhanced-sync-status': exchangeOperations['enhanced-sync-status'],
},
};
export function initializeExchangesHandler(): void {
logger.info('Registering exchanges handler...');
handlerRegistry.registerHandler(HANDLER_NAME, exchangesHandlerConfig);
logger.info('Exchanges handler registered successfully');
}

View file

@ -0,0 +1,60 @@
import { getLogger } from '@stock-bot/logger';
import { getPostgreSQLClient } from '@stock-bot/postgres-client';
import type { JobPayload } from '../../../types/job-payloads';
const logger = getLogger('enhanced-sync-clear-postgresql-data');
export async function clearPostgreSQLData(payload: JobPayload): Promise<{
exchangesCleared: number;
symbolsCleared: number;
mappingsCleared: number;
}> {
logger.info('Clearing existing PostgreSQL data...');
try {
const postgresClient = getPostgreSQLClient();
// Start transaction for atomic operations
await postgresClient.query('BEGIN');
// Get counts before clearing
const exchangeCountResult = await postgresClient.query(
'SELECT COUNT(*) as count FROM exchanges'
);
const symbolCountResult = await postgresClient.query(
'SELECT COUNT(*) as count FROM symbols'
);
const mappingCountResult = await postgresClient.query(
'SELECT COUNT(*) as count FROM provider_mappings'
);
const exchangesCleared = parseInt(exchangeCountResult.rows[0].count);
const symbolsCleared = parseInt(symbolCountResult.rows[0].count);
const mappingsCleared = parseInt(mappingCountResult.rows[0].count);
// Clear data in correct order (respect foreign keys)
await postgresClient.query('DELETE FROM provider_mappings');
await postgresClient.query('DELETE FROM symbols');
await postgresClient.query('DELETE FROM exchanges');
// Reset sync status
await postgresClient.query(
'UPDATE sync_status SET last_sync_at = NULL, last_sync_count = 0, sync_errors = NULL'
);
await postgresClient.query('COMMIT');
logger.info('PostgreSQL data cleared successfully', {
exchangesCleared,
symbolsCleared,
mappingsCleared,
});
return { exchangesCleared, symbolsCleared, mappingsCleared };
} catch (error) {
const postgresClient = getPostgreSQLClient();
await postgresClient.query('ROLLBACK');
logger.error('Failed to clear PostgreSQL data', { error });
throw error;
}
}

View file

@ -0,0 +1,26 @@
import { getLogger } from '@stock-bot/logger';
import { getPostgreSQLClient } from '@stock-bot/postgres-client';
import type { JobPayload, SyncStatus } from '../../../types/job-payloads';
const logger = getLogger('enhanced-sync-status');
export async function getSyncStatus(payload: JobPayload): Promise<SyncStatus[]> {
logger.info('Getting comprehensive sync status...');
try {
const postgresClient = getPostgreSQLClient();
const query = `
SELECT provider, data_type as "dataType", last_sync_at as "lastSyncAt",
last_sync_count as "lastSyncCount", sync_errors as "syncErrors"
FROM sync_status
ORDER BY provider, data_type
`;
const result = await postgresClient.query(query);
logger.info(`Retrieved sync status for ${result.rows.length} entries`);
return result.rows;
} catch (error) {
logger.error('Failed to get sync status', { error });
throw error;
}
}

View file

@ -0,0 +1,28 @@
import { getLogger } from '@stock-bot/logger';
import { getPostgreSQLClient } from '@stock-bot/postgres-client';
import type { JobPayload } from '../../../types/job-payloads';
const logger = getLogger('enhanced-sync-exchange-stats');
export async function getExchangeStats(payload: JobPayload): Promise<any> {
logger.info('Getting exchange statistics...');
try {
const postgresClient = getPostgreSQLClient();
const query = `
SELECT
COUNT(*) as total_exchanges,
COUNT(CASE WHEN active = true THEN 1 END) as active_exchanges,
COUNT(DISTINCT country) as countries,
COUNT(DISTINCT currency) as currencies
FROM exchanges
`;
const result = await postgresClient.query(query);
logger.info('Retrieved exchange statistics');
return result.rows[0];
} catch (error) {
logger.error('Failed to get exchange statistics', { error });
throw error;
}
}

View file

@ -0,0 +1,19 @@
import { syncAllExchanges } from './sync-all-exchanges.operations';
import { syncQMExchanges } from './qm-exchanges.operations';
import { syncIBExchanges } from './sync-ib-exchanges.operations';
import { syncQMProviderMappings } from './sync-qm-provider-mappings.operations';
import { clearPostgreSQLData } from './clear-postgresql-data.operations';
import { getExchangeStats } from './exchange-stats.operations';
import { getProviderMappingStats } from './provider-mapping-stats.operations';
import { getSyncStatus } from './enhanced-sync-status.operations';
export const exchangeOperations = {
syncAllExchanges,
syncQMExchanges,
syncIBExchanges,
syncQMProviderMappings,
clearPostgreSQLData,
getExchangeStats,
getProviderMappingStats,
'enhanced-sync-status': getSyncStatus,
};

View file

@ -0,0 +1,32 @@
import { getLogger } from '@stock-bot/logger';
import { getPostgreSQLClient } from '@stock-bot/postgres-client';
import type { JobPayload } from '../../../types/job-payloads';
const logger = getLogger('enhanced-sync-provider-mapping-stats');
export async function getProviderMappingStats(payload: JobPayload): Promise<any> {
logger.info('Getting provider mapping statistics...');
try {
const postgresClient = getPostgreSQLClient();
const query = `
SELECT
provider,
COUNT(*) as total_mappings,
COUNT(CASE WHEN active = true THEN 1 END) as active_mappings,
COUNT(CASE WHEN verified = true THEN 1 END) as verified_mappings,
COUNT(CASE WHEN auto_mapped = true THEN 1 END) as auto_mapped,
AVG(confidence) as avg_confidence
FROM provider_exchange_mappings
GROUP BY provider
ORDER BY provider
`;
const result = await postgresClient.query(query);
logger.info('Retrieved provider mapping statistics');
return result.rows;
} catch (error) {
logger.error('Failed to get provider mapping statistics', { error });
throw error;
}
}

View file

@ -0,0 +1,103 @@
import { getLogger } from '@stock-bot/logger';
import { getMongoDBClient } from '@stock-bot/mongodb-client';
import { getPostgreSQLClient } from '@stock-bot/postgres-client';
import type { JobPayload } from '../../../types/job-payloads';
const logger = getLogger('sync-qm-exchanges');
export async function syncQMExchanges(payload: JobPayload): Promise<{ processed: number; created: number; updated: number }> {
logger.info('Starting QM exchanges sync...');
try {
const mongoClient = getMongoDBClient();
const postgresClient = getPostgreSQLClient();
// 1. Get all QM exchanges from MongoDB
const qmExchanges = await mongoClient.find('qmExchanges', {});
logger.info(`Found ${qmExchanges.length} QM exchanges to process`);
let created = 0;
let updated = 0;
for (const exchange of qmExchanges) {
try {
// 2. Check if exchange exists
const existingExchange = await findExchange(exchange.exchangeCode, postgresClient);
if (existingExchange) {
// Update existing
await updateExchange(existingExchange.id, exchange, postgresClient);
updated++;
} else {
// Create new
await createExchange(exchange, postgresClient);
created++;
}
} catch (error) {
logger.error('Failed to process exchange', { error, exchange: exchange.exchangeCode });
}
}
// 3. Update sync status
await updateSyncStatus('qm', 'exchanges', qmExchanges.length, postgresClient);
const result = { processed: qmExchanges.length, created, updated };
logger.info('QM exchanges sync completed', result);
return result;
} catch (error) {
logger.error('QM exchanges sync failed', { error });
throw error;
}
}
// Helper functions
async function findExchange(exchangeCode: string, postgresClient: any): Promise<any> {
const query = 'SELECT * FROM exchanges WHERE code = $1';
const result = await postgresClient.query(query, [exchangeCode]);
return result.rows[0] || null;
}
async function createExchange(qmExchange: any, postgresClient: any): Promise<void> {
const query = `
INSERT INTO exchanges (code, name, country, currency, visible)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (code) DO NOTHING
`;
await postgresClient.query(query, [
qmExchange.exchangeCode || qmExchange.exchange,
qmExchange.exchangeShortName || qmExchange.name,
qmExchange.countryCode || 'US',
'USD', // Default currency, can be improved
true, // New exchanges are visible by default
]);
}
async function updateExchange(exchangeId: string, qmExchange: any, postgresClient: any): Promise<void> {
const query = `
UPDATE exchanges
SET name = COALESCE($2, name),
country = COALESCE($3, country),
updated_at = NOW()
WHERE id = $1
`;
await postgresClient.query(query, [
exchangeId,
qmExchange.exchangeShortName || qmExchange.name,
qmExchange.countryCode,
]);
}
async function updateSyncStatus(provider: string, dataType: string, count: number, postgresClient: any): Promise<void> {
const query = `
UPDATE sync_status
SET last_sync_at = NOW(),
last_sync_count = $3,
sync_errors = NULL,
updated_at = NOW()
WHERE provider = $1 AND data_type = $2
`;
await postgresClient.query(query, [provider, dataType, count]);
}

View file

@ -0,0 +1,267 @@
import { getLogger } from '@stock-bot/logger';
import { getMongoDBClient } from "@stock-bot/mongodb-client";
import { getPostgreSQLClient } from '@stock-bot/postgres-client';
import type { JobPayload, SyncResult } from '../../../types/job-payloads';
const logger = getLogger('enhanced-sync-all-exchanges');
export async function syncAllExchanges(payload: JobPayload): Promise<SyncResult> {
const clearFirst = payload.clearFirst || true;
logger.info('Starting comprehensive exchange sync...', { clearFirst });
const result: SyncResult = {
processed: 0,
created: 0,
updated: 0,
skipped: 0,
errors: 0,
};
try {
const postgresClient = getPostgreSQLClient();
// Clear existing data if requested
if (clearFirst) {
await clearPostgreSQLData(postgresClient);
}
// Start transaction for atomic operations
await postgresClient.query('BEGIN');
// 1. Sync from EOD exchanges (comprehensive global data)
const eodResult = await syncEODExchanges();
mergeResults(result, eodResult);
// 2. Sync from IB exchanges (detailed asset information)
const ibResult = await syncIBExchanges();
mergeResults(result, ibResult);
// 3. Update sync status
await updateSyncStatus('all', 'exchanges', result.processed, postgresClient);
await postgresClient.query('COMMIT');
logger.info('Comprehensive exchange sync completed', result);
return result;
} catch (error) {
const postgresClient = getPostgreSQLClient();
await postgresClient.query('ROLLBACK');
logger.error('Comprehensive exchange sync failed', { error });
throw error;
}
}
async function clearPostgreSQLData(postgresClient: any): Promise<void> {
logger.info('Clearing existing PostgreSQL data...');
// Clear data in correct order (respect foreign keys)
await postgresClient.query('DELETE FROM provider_mappings');
await postgresClient.query('DELETE FROM symbols');
await postgresClient.query('DELETE FROM exchanges');
// Reset sync status
await postgresClient.query(
'UPDATE sync_status SET last_sync_at = NULL, last_sync_count = 0, sync_errors = NULL'
);
logger.info('PostgreSQL data cleared successfully');
}
async function syncEODExchanges(): Promise<SyncResult> {
const mongoClient = getMongoDBClient();
const exchanges = await mongoClient.find('eodExchanges', { active: true });
const result: SyncResult = { processed: 0, created: 0, updated: 0, skipped: 0, errors: 0 };
for (const exchange of exchanges) {
try {
// Create provider exchange mapping for EOD
await createProviderExchangeMapping(
'eod', // provider
exchange.Code,
exchange.Name,
exchange.CountryISO2,
exchange.Currency,
0.95 // very high confidence for EOD data
);
result.processed++;
result.created++; // Count as created mapping
} catch (error) {
logger.error('Failed to process EOD exchange', { error, exchange });
result.errors++;
}
}
return result;
}
async function syncIBExchanges(): Promise<SyncResult> {
const mongoClient = getMongoDBClient();
const exchanges = await mongoClient.find('ibExchanges', {});
const result: SyncResult = { processed: 0, created: 0, updated: 0, skipped: 0, errors: 0 };
for (const exchange of exchanges) {
try {
// Create provider exchange mapping for IB
await createProviderExchangeMapping(
'ib', // provider
exchange.exchange_id,
exchange.name,
exchange.country_code,
'USD', // IB doesn't specify currency, default to USD
0.85 // good confidence for IB data
);
result.processed++;
result.created++; // Count as created mapping
} catch (error) {
logger.error('Failed to process IB exchange', { error, exchange });
result.errors++;
}
}
return result;
}
async function createProviderExchangeMapping(
provider: string,
providerExchangeCode: string,
providerExchangeName: string,
countryCode: string | null,
currency: string | null,
confidence: number
): Promise<void> {
if (!providerExchangeCode) {
return;
}
const postgresClient = getPostgreSQLClient();
// Check if mapping already exists
const existingMapping = await findProviderExchangeMapping(provider, providerExchangeCode);
if (existingMapping) {
// Don't override existing mappings to preserve manual work
return;
}
// Find or create master exchange
const masterExchange = await findOrCreateMasterExchange(
providerExchangeCode,
providerExchangeName,
countryCode,
currency
);
// Create the provider exchange mapping
const query = `
INSERT INTO provider_exchange_mappings
(provider, provider_exchange_code, provider_exchange_name, master_exchange_id,
country_code, currency, confidence, active, auto_mapped)
VALUES ($1, $2, $3, $4, $5, $6, $7, false, true)
ON CONFLICT (provider, provider_exchange_code) DO NOTHING
`;
await postgresClient.query(query, [
provider,
providerExchangeCode,
providerExchangeName,
masterExchange.id,
countryCode,
currency,
confidence,
]);
}
async function findOrCreateMasterExchange(
providerCode: string,
providerName: string,
countryCode: string | null,
currency: string | null
): Promise<any> {
const postgresClient = getPostgreSQLClient();
// First, try to find exact match
let masterExchange = await findExchangeByCode(providerCode);
if (masterExchange) {
return masterExchange;
}
// Try to find by similar codes (basic mapping)
const basicMapping = getBasicExchangeMapping(providerCode);
if (basicMapping) {
masterExchange = await findExchangeByCode(basicMapping);
if (masterExchange) {
return masterExchange;
}
}
// Create new master exchange (inactive by default)
const query = `
INSERT INTO exchanges (code, name, country, currency, active)
VALUES ($1, $2, $3, $4, false)
ON CONFLICT (code) DO UPDATE SET
name = COALESCE(EXCLUDED.name, exchanges.name),
country = COALESCE(EXCLUDED.country, exchanges.country),
currency = COALESCE(EXCLUDED.currency, exchanges.currency)
RETURNING id, code, name, country, currency
`;
const result = await postgresClient.query(query, [
providerCode,
providerName || providerCode,
countryCode || 'US',
currency || 'USD',
]);
return result.rows[0];
}
function getBasicExchangeMapping(providerCode: string): string | null {
const mappings: Record<string, string> = {
NYE: 'NYSE',
NAS: 'NASDAQ',
TO: 'TSX',
LN: 'LSE',
LON: 'LSE',
};
return mappings[providerCode.toUpperCase()] || null;
}
async function findProviderExchangeMapping(provider: string, providerExchangeCode: string): Promise<any> {
const postgresClient = getPostgreSQLClient();
const query = 'SELECT * FROM provider_exchange_mappings WHERE provider = $1 AND provider_exchange_code = $2';
const result = await postgresClient.query(query, [provider, providerExchangeCode]);
return result.rows[0] || null;
}
async function findExchangeByCode(code: string): Promise<any> {
const postgresClient = getPostgreSQLClient();
const query = 'SELECT * FROM exchanges WHERE code = $1';
const result = await postgresClient.query(query, [code]);
return result.rows[0] || null;
}
async function updateSyncStatus(provider: string, dataType: string, count: number, postgresClient: any): Promise<void> {
const query = `
INSERT INTO sync_status (provider, data_type, last_sync_at, last_sync_count, sync_errors)
VALUES ($1, $2, NOW(), $3, NULL)
ON CONFLICT (provider, data_type)
DO UPDATE SET
last_sync_at = NOW(),
last_sync_count = EXCLUDED.last_sync_count,
sync_errors = NULL,
updated_at = NOW()
`;
await postgresClient.query(query, [provider, dataType, count]);
}
function mergeResults(target: SyncResult, source: SyncResult): void {
target.processed += source.processed;
target.created += source.created;
target.updated += source.updated;
target.skipped += source.skipped;
target.errors += source.errors;
}

View file

@ -0,0 +1,206 @@
import { getLogger } from '@stock-bot/logger';
import { getMongoDBClient } from '@stock-bot/mongodb-client';
import type { JobPayload } from '../../../types/job-payloads';
import type { MasterExchange } from '@stock-bot/mongodb-client';
const logger = getLogger('sync-ib-exchanges');
interface IBExchange {
id?: string;
_id?: any;
name?: string;
code?: string;
country_code?: string;
currency?: string;
}
export async function syncIBExchanges(payload: JobPayload): Promise<{ syncedCount: number; totalExchanges: number }> {
logger.info('Syncing IB exchanges from database...');
try {
const mongoClient = getMongoDBClient();
const db = mongoClient.getDatabase();
// Filter by country code US and CA
const ibExchanges = await db
.collection<IBExchange>('ibExchanges')
.find({
country_code: { $in: ['US', 'CA'] },
})
.toArray();
logger.info('Found IB exchanges in database', { count: ibExchanges.length });
let syncedCount = 0;
for (const exchange of ibExchanges) {
try {
await createOrUpdateMasterExchange(exchange);
syncedCount++;
logger.debug('Synced IB exchange', {
ibId: exchange.id,
country: exchange.country_code,
});
} catch (error) {
logger.error('Failed to sync IB exchange', { exchange: exchange.id, error });
}
}
logger.info('IB exchange sync completed', {
syncedCount,
totalExchanges: ibExchanges.length,
});
return { syncedCount, totalExchanges: ibExchanges.length };
} catch (error) {
logger.error('Failed to fetch IB exchanges from database', { error });
return { syncedCount: 0, totalExchanges: 0 };
}
}
/**
* Create or update master exchange record 1:1 from IB exchange
*/
async function createOrUpdateMasterExchange(ibExchange: IBExchange): Promise<void> {
const mongoClient = getMongoDBClient();
const db = mongoClient.getDatabase();
const collection = db.collection<MasterExchange>('masterExchanges');
const masterExchangeId = generateMasterExchangeId(ibExchange);
const now = new Date();
// Check if master exchange already exists
const existing = await collection.findOne({ masterExchangeId });
if (existing) {
// Update existing record
await collection.updateOne(
{ masterExchangeId },
{
$set: {
officialName: ibExchange.name || `Exchange ${ibExchange.id}`,
country: ibExchange.country_code || 'UNKNOWN',
currency: ibExchange.currency || 'USD',
timezone: inferTimezone(ibExchange),
updated_at: now,
},
}
);
logger.debug('Updated existing master exchange', { masterExchangeId });
} else {
// Create new master exchange
const masterExchange: MasterExchange = {
masterExchangeId,
shortName: masterExchangeId, // Set shortName to masterExchangeId on creation
officialName: ibExchange.name || `Exchange ${ibExchange.id}`,
country: ibExchange.country_code || 'UNKNOWN',
currency: ibExchange.currency || 'USD',
timezone: inferTimezone(ibExchange),
active: false, // Set active to false only on creation
sourceMappings: {
ib: {
id: ibExchange.id || ibExchange._id?.toString() || 'unknown',
name: ibExchange.name || `Exchange ${ibExchange.id}`,
code: ibExchange.code || ibExchange.id || '',
aliases: generateAliases(ibExchange),
lastUpdated: now,
},
},
confidence: 1.0, // High confidence for direct IB mapping
verified: true, // Mark as verified since it's direct from IB
// DocumentBase fields
source: 'ib-exchange-sync',
created_at: now,
updated_at: now,
};
await collection.insertOne(masterExchange);
logger.debug('Created new master exchange', { masterExchangeId });
}
}
/**
* Generate master exchange ID from IB exchange
*/
function generateMasterExchangeId(ibExchange: IBExchange): string {
// Use code if available, otherwise use ID, otherwise generate from name
if (ibExchange.code) {
return ibExchange.code.toUpperCase().replace(/[^A-Z0-9]/g, '');
}
if (ibExchange.id) {
return ibExchange.id.toUpperCase().replace(/[^A-Z0-9]/g, '');
}
if (ibExchange.name) {
return ibExchange.name
.toUpperCase()
.split(' ')
.slice(0, 2)
.join('_')
.replace(/[^A-Z0-9_]/g, '');
}
return 'UNKNOWN_EXCHANGE';
}
/**
* Generate aliases for the exchange
*/
function generateAliases(ibExchange: IBExchange): string[] {
const aliases: string[] = [];
if (ibExchange.name && ibExchange.name.includes(' ')) {
// Add abbreviated version
aliases.push(
ibExchange.name
.split(' ')
.map(w => w[0])
.join('')
.toUpperCase()
);
}
if (ibExchange.code) {
aliases.push(ibExchange.code.toUpperCase());
}
return aliases;
}
/**
* Infer timezone from exchange name/location
*/
function inferTimezone(ibExchange: IBExchange): string {
if (!ibExchange.name) {
return 'UTC';
}
const name = ibExchange.name.toUpperCase();
if (name.includes('NEW YORK') || name.includes('NYSE') || name.includes('NASDAQ')) {
return 'America/New_York';
}
if (name.includes('LONDON')) {
return 'Europe/London';
}
if (name.includes('TOKYO')) {
return 'Asia/Tokyo';
}
if (name.includes('SHANGHAI')) {
return 'Asia/Shanghai';
}
if (name.includes('TORONTO')) {
return 'America/Toronto';
}
if (name.includes('FRANKFURT')) {
return 'Europe/Berlin';
}
return 'UTC'; // Default
}

View file

@ -0,0 +1,204 @@
import { getLogger } from '@stock-bot/logger';
import { getMongoDBClient } from "@stock-bot/mongodb-client";
import { getPostgreSQLClient } from '@stock-bot/postgres-client';
import type { JobPayload, SyncResult } from '../../../types/job-payloads';
const logger = getLogger('enhanced-sync-qm-provider-mappings');
export async function syncQMProviderMappings(payload: JobPayload): Promise<SyncResult> {
logger.info('Starting QM provider exchange mappings sync...');
const result: SyncResult = {
processed: 0,
created: 0,
updated: 0,
skipped: 0,
errors: 0,
};
try {
const mongoClient = getMongoDBClient();
const postgresClient = getPostgreSQLClient();
// Start transaction
await postgresClient.query('BEGIN');
// Get unique exchange combinations from QM symbols
const db = mongoClient.getDatabase();
const pipeline = [
{
$group: {
_id: {
exchangeCode: '$exchangeCode',
exchange: '$exchange',
countryCode: '$countryCode',
},
count: { $sum: 1 },
sampleExchange: { $first: '$exchange' },
},
},
{
$project: {
exchangeCode: '$_id.exchangeCode',
exchange: '$_id.exchange',
countryCode: '$_id.countryCode',
count: 1,
sampleExchange: 1,
},
},
];
const qmExchanges = await db.collection('qmSymbols').aggregate(pipeline).toArray();
logger.info(`Found ${qmExchanges.length} unique QM exchange combinations`);
for (const exchange of qmExchanges) {
try {
// Create provider exchange mapping for QM
await createProviderExchangeMapping(
'qm', // provider
exchange.exchangeCode,
exchange.sampleExchange || exchange.exchangeCode,
exchange.countryCode,
exchange.countryCode === 'CA' ? 'CAD' : 'USD', // Simple currency mapping
0.8 // good confidence for QM data
);
result.processed++;
result.created++;
} catch (error) {
logger.error('Failed to process QM exchange mapping', { error, exchange });
result.errors++;
}
}
await postgresClient.query('COMMIT');
logger.info('QM provider exchange mappings sync completed', result);
return result;
} catch (error) {
const postgresClient = getPostgreSQLClient();
await postgresClient.query('ROLLBACK');
logger.error('QM provider exchange mappings sync failed', { error });
throw error;
}
}
async function createProviderExchangeMapping(
provider: string,
providerExchangeCode: string,
providerExchangeName: string,
countryCode: string | null,
currency: string | null,
confidence: number
): Promise<void> {
if (!providerExchangeCode) {
return;
}
const postgresClient = getPostgreSQLClient();
// Check if mapping already exists
const existingMapping = await findProviderExchangeMapping(provider, providerExchangeCode);
if (existingMapping) {
// Don't override existing mappings to preserve manual work
return;
}
// Find or create master exchange
const masterExchange = await findOrCreateMasterExchange(
providerExchangeCode,
providerExchangeName,
countryCode,
currency
);
// Create the provider exchange mapping
const query = `
INSERT INTO provider_exchange_mappings
(provider, provider_exchange_code, provider_exchange_name, master_exchange_id,
country_code, currency, confidence, active, auto_mapped)
VALUES ($1, $2, $3, $4, $5, $6, $7, false, true)
ON CONFLICT (provider, provider_exchange_code) DO NOTHING
`;
await postgresClient.query(query, [
provider,
providerExchangeCode,
providerExchangeName,
masterExchange.id,
countryCode,
currency,
confidence,
]);
}
async function findProviderExchangeMapping(provider: string, providerExchangeCode: string): Promise<any> {
const postgresClient = getPostgreSQLClient();
const query = 'SELECT * FROM provider_exchange_mappings WHERE provider = $1 AND provider_exchange_code = $2';
const result = await postgresClient.query(query, [provider, providerExchangeCode]);
return result.rows[0] || null;
}
async function findOrCreateMasterExchange(
providerCode: string,
providerName: string,
countryCode: string | null,
currency: string | null
): Promise<any> {
const postgresClient = getPostgreSQLClient();
// First, try to find exact match
let masterExchange = await findExchangeByCode(providerCode);
if (masterExchange) {
return masterExchange;
}
// Try to find by similar codes (basic mapping)
const basicMapping = getBasicExchangeMapping(providerCode);
if (basicMapping) {
masterExchange = await findExchangeByCode(basicMapping);
if (masterExchange) {
return masterExchange;
}
}
// Create new master exchange (inactive by default)
const query = `
INSERT INTO exchanges (code, name, country, currency, active)
VALUES ($1, $2, $3, $4, false)
ON CONFLICT (code) DO UPDATE SET
name = COALESCE(EXCLUDED.name, exchanges.name),
country = COALESCE(EXCLUDED.country, exchanges.country),
currency = COALESCE(EXCLUDED.currency, exchanges.currency)
RETURNING id, code, name, country, currency
`;
const result = await postgresClient.query(query, [
providerCode,
providerName || providerCode,
countryCode || 'US',
currency || 'USD',
]);
return result.rows[0];
}
function getBasicExchangeMapping(providerCode: string): string | null {
const mappings: Record<string, string> = {
NYE: 'NYSE',
NAS: 'NASDAQ',
TO: 'TSX',
LN: 'LSE',
LON: 'LSE',
};
return mappings[providerCode.toUpperCase()] || null;
}
async function findExchangeByCode(code: string): Promise<any> {
const postgresClient = getPostgreSQLClient();
const query = 'SELECT * FROM exchanges WHERE code = $1';
const result = await postgresClient.query(query, [code]);
return result.rows[0] || null;
}

View file

@ -0,0 +1,9 @@
import { syncQMSymbols } from './qm-symbols.operations';
import { syncSymbolsFromProvider } from './sync-symbols-from-provider.operations';
import { getSyncStatus } from './sync-status.operations';
export const symbolOperations = {
syncQMSymbols,
syncSymbolsFromProvider,
getSyncStatus,
};

View file

@ -0,0 +1,168 @@
import { getLogger } from '@stock-bot/logger';
import { getMongoDBClient } from '@stock-bot/mongodb-client';
import { getPostgreSQLClient } from '@stock-bot/postgres-client';
import type { JobPayload } from '../../../types/job-payloads';
const logger = getLogger('sync-qm-symbols');
export async function syncQMSymbols(payload: JobPayload): Promise<{ processed: number; created: number; updated: number }> {
logger.info('Starting QM symbols sync...');
try {
const mongoClient = getMongoDBClient();
const postgresClient = getPostgreSQLClient();
// 1. Get all QM symbols from MongoDB
const qmSymbols = await mongoClient.find('qmSymbols', {});
logger.info(`Found ${qmSymbols.length} QM symbols to process`);
let created = 0;
let updated = 0;
for (const symbol of qmSymbols) {
try {
// 2. Resolve exchange
const exchangeId = await resolveExchange(symbol.exchangeCode || symbol.exchange, postgresClient);
if (!exchangeId) {
logger.warn('Unknown exchange, skipping symbol', {
symbol: symbol.symbol,
exchange: symbol.exchangeCode || symbol.exchange,
});
continue;
}
// 3. Check if symbol exists
const existingSymbol = await findSymbol(symbol.symbol, exchangeId, postgresClient);
if (existingSymbol) {
// Update existing
await updateSymbol(existingSymbol.id, symbol, postgresClient);
await upsertProviderMapping(existingSymbol.id, 'qm', symbol, postgresClient);
updated++;
} else {
// Create new
const newSymbolId = await createSymbol(symbol, exchangeId, postgresClient);
await upsertProviderMapping(newSymbolId, 'qm', symbol, postgresClient);
created++;
}
} catch (error) {
logger.error('Failed to process symbol', { error, symbol: symbol.symbol });
}
}
// 4. Update sync status
await updateSyncStatus('qm', 'symbols', qmSymbols.length, postgresClient);
const result = { processed: qmSymbols.length, created, updated };
logger.info('QM symbols sync completed', result);
return result;
} catch (error) {
logger.error('QM symbols sync failed', { error });
throw error;
}
}
// Helper functions
async function resolveExchange(exchangeCode: string, postgresClient: any): Promise<string | null> {
if (!exchangeCode) return null;
// Simple mapping - expand this as needed
const exchangeMap: Record<string, string> = {
NASDAQ: 'NASDAQ',
NYSE: 'NYSE',
TSX: 'TSX',
TSE: 'TSX', // TSE maps to TSX
LSE: 'LSE',
CME: 'CME',
};
const normalizedCode = exchangeMap[exchangeCode.toUpperCase()];
if (!normalizedCode) {
return null;
}
const query = 'SELECT id FROM exchanges WHERE code = $1';
const result = await postgresClient.query(query, [normalizedCode]);
return result.rows[0]?.id || null;
}
async function findSymbol(symbol: string, exchangeId: string, postgresClient: any): Promise<any> {
const query = 'SELECT * FROM symbols WHERE symbol = $1 AND exchange_id = $2';
const result = await postgresClient.query(query, [symbol, exchangeId]);
return result.rows[0] || null;
}
async function createSymbol(qmSymbol: any, exchangeId: string, postgresClient: any): Promise<string> {
const query = `
INSERT INTO symbols (symbol, exchange_id, company_name, country, currency)
VALUES ($1, $2, $3, $4, $5)
RETURNING id
`;
const result = await postgresClient.query(query, [
qmSymbol.symbol,
exchangeId,
qmSymbol.companyName || qmSymbol.name,
qmSymbol.countryCode || 'US',
qmSymbol.currency || 'USD',
]);
return result.rows[0].id;
}
async function updateSymbol(symbolId: string, qmSymbol: any, postgresClient: any): Promise<void> {
const query = `
UPDATE symbols
SET company_name = COALESCE($2, company_name),
country = COALESCE($3, country),
currency = COALESCE($4, currency),
updated_at = NOW()
WHERE id = $1
`;
await postgresClient.query(query, [
symbolId,
qmSymbol.companyName || qmSymbol.name,
qmSymbol.countryCode,
qmSymbol.currency,
]);
}
async function upsertProviderMapping(
symbolId: string,
provider: string,
qmSymbol: any,
postgresClient: any
): Promise<void> {
const query = `
INSERT INTO provider_mappings
(symbol_id, provider, provider_symbol, provider_exchange, last_seen)
VALUES ($1, $2, $3, $4, NOW())
ON CONFLICT (provider, provider_symbol)
DO UPDATE SET
symbol_id = EXCLUDED.symbol_id,
provider_exchange = EXCLUDED.provider_exchange,
last_seen = NOW()
`;
await postgresClient.query(query, [
symbolId,
provider,
qmSymbol.qmSearchCode || qmSymbol.symbol,
qmSymbol.exchangeCode || qmSymbol.exchange,
]);
}
async function updateSyncStatus(provider: string, dataType: string, count: number, postgresClient: any): Promise<void> {
const query = `
UPDATE sync_status
SET last_sync_at = NOW(),
last_sync_count = $3,
sync_errors = NULL,
updated_at = NOW()
WHERE provider = $1 AND data_type = $2
`;
await postgresClient.query(query, [provider, dataType, count]);
}

View file

@ -0,0 +1,21 @@
import { getLogger } from '@stock-bot/logger';
import { getPostgreSQLClient } from '@stock-bot/postgres-client';
import type { JobPayload } from '../../../types/job-payloads';
const logger = getLogger('sync-status');
export async function getSyncStatus(payload: JobPayload): Promise<Record<string, unknown>[]> {
logger.info('Getting sync status...');
try {
const postgresClient = getPostgreSQLClient();
const query = 'SELECT * FROM sync_status ORDER BY provider, data_type';
const result = await postgresClient.query(query);
logger.info(`Retrieved sync status for ${result.rows.length} entries`);
return result.rows;
} catch (error) {
logger.error('Failed to get sync status', { error });
throw error;
}
}

View file

@ -0,0 +1,216 @@
import { getLogger } from '@stock-bot/logger';
import { getMongoDBClient } from "@stock-bot/mongodb-client";
import { getPostgreSQLClient } from '@stock-bot/postgres-client';
import type { JobPayload, SyncResult } from '../../../types/job-payloads';
const logger = getLogger('enhanced-sync-symbols-from-provider');
export async function syncSymbolsFromProvider(payload: JobPayload): Promise<SyncResult> {
const provider = payload.provider;
const clearFirst = payload.clearFirst || false;
if (!provider) {
throw new Error('Provider is required in payload');
}
logger.info(`Starting ${provider} symbols sync...`, { clearFirst });
const result: SyncResult = {
processed: 0,
created: 0,
updated: 0,
skipped: 0,
errors: 0,
};
try {
const mongoClient = getMongoDBClient();
const postgresClient = getPostgreSQLClient();
// Clear existing data if requested (only symbols and mappings, keep exchanges)
if (clearFirst) {
await postgresClient.query('BEGIN');
await postgresClient.query('DELETE FROM provider_mappings');
await postgresClient.query('DELETE FROM symbols');
await postgresClient.query('COMMIT');
logger.info('Cleared existing symbols and mappings before sync');
}
// Start transaction
await postgresClient.query('BEGIN');
let symbols: Record<string, unknown>[] = [];
// Get symbols based on provider
const db = mongoClient.getDatabase();
switch (provider.toLowerCase()) {
case 'qm':
symbols = await db.collection('qmSymbols').find({}).toArray();
break;
case 'eod':
symbols = await db.collection('eodSymbols').find({}).toArray();
break;
case 'ib':
symbols = await db.collection('ibSymbols').find({}).toArray();
break;
default:
throw new Error(`Unsupported provider: ${provider}`);
}
logger.info(`Found ${symbols.length} ${provider} symbols to process`);
result.processed = symbols.length;
for (const symbol of symbols) {
try {
await processSingleSymbol(symbol, provider, result);
} catch (error) {
logger.error('Failed to process symbol', {
error,
symbol: symbol.symbol || symbol.code,
provider,
});
result.errors++;
}
}
// Update sync status
await updateSyncStatus(provider, 'symbols', result.processed, postgresClient);
await postgresClient.query('COMMIT');
logger.info(`${provider} symbols sync completed`, result);
return result;
} catch (error) {
const postgresClient = getPostgreSQLClient();
await postgresClient.query('ROLLBACK');
logger.error(`${provider} symbols sync failed`, { error });
throw error;
}
}
async function processSingleSymbol(symbol: any, provider: string, result: SyncResult): Promise<void> {
const symbolCode = symbol.symbol || symbol.code;
const exchangeCode = symbol.exchangeCode || symbol.exchange || symbol.exchange_id;
if (!symbolCode || !exchangeCode) {
result.skipped++;
return;
}
// Find active provider exchange mapping
const providerMapping = await findActiveProviderExchangeMapping(provider, exchangeCode);
if (!providerMapping) {
result.skipped++;
return;
}
// Check if symbol exists
const existingSymbol = await findSymbolByCodeAndExchange(
symbolCode,
providerMapping.master_exchange_id
);
if (existingSymbol) {
await updateSymbol(existingSymbol.id, symbol);
await upsertProviderMapping(existingSymbol.id, provider, symbol);
result.updated++;
} else {
const newSymbolId = await createSymbol(symbol, providerMapping.master_exchange_id);
await upsertProviderMapping(newSymbolId, provider, symbol);
result.created++;
}
}
async function findActiveProviderExchangeMapping(provider: string, providerExchangeCode: string): Promise<any> {
const postgresClient = getPostgreSQLClient();
const query = `
SELECT pem.*, e.code as master_exchange_code
FROM provider_exchange_mappings pem
JOIN exchanges e ON pem.master_exchange_id = e.id
WHERE pem.provider = $1 AND pem.provider_exchange_code = $2 AND pem.active = true
`;
const result = await postgresClient.query(query, [provider, providerExchangeCode]);
return result.rows[0] || null;
}
async function findSymbolByCodeAndExchange(symbol: string, exchangeId: string): Promise<any> {
const postgresClient = getPostgreSQLClient();
const query = 'SELECT * FROM symbols WHERE symbol = $1 AND exchange_id = $2';
const result = await postgresClient.query(query, [symbol, exchangeId]);
return result.rows[0] || null;
}
async function createSymbol(symbol: any, exchangeId: string): Promise<string> {
const postgresClient = getPostgreSQLClient();
const query = `
INSERT INTO symbols (symbol, exchange_id, company_name, country, currency)
VALUES ($1, $2, $3, $4, $5)
RETURNING id
`;
const result = await postgresClient.query(query, [
symbol.symbol || symbol.code,
exchangeId,
symbol.companyName || symbol.name || symbol.company_name,
symbol.countryCode || symbol.country_code || 'US',
symbol.currency || 'USD',
]);
return result.rows[0].id;
}
async function updateSymbol(symbolId: string, symbol: any): Promise<void> {
const postgresClient = getPostgreSQLClient();
const query = `
UPDATE symbols
SET company_name = COALESCE($2, company_name),
country = COALESCE($3, country),
currency = COALESCE($4, currency),
updated_at = NOW()
WHERE id = $1
`;
await postgresClient.query(query, [
symbolId,
symbol.companyName || symbol.name || symbol.company_name,
symbol.countryCode || symbol.country_code,
symbol.currency,
]);
}
async function upsertProviderMapping(symbolId: string, provider: string, symbol: any): Promise<void> {
const postgresClient = getPostgreSQLClient();
const query = `
INSERT INTO provider_mappings
(symbol_id, provider, provider_symbol, provider_exchange, last_seen)
VALUES ($1, $2, $3, $4, NOW())
ON CONFLICT (provider, provider_symbol)
DO UPDATE SET
symbol_id = EXCLUDED.symbol_id,
provider_exchange = EXCLUDED.provider_exchange,
last_seen = NOW()
`;
await postgresClient.query(query, [
symbolId,
provider,
symbol.qmSearchCode || symbol.symbol || symbol.code,
symbol.exchangeCode || symbol.exchange || symbol.exchange_id,
]);
}
async function updateSyncStatus(provider: string, dataType: string, count: number, postgresClient: any): Promise<void> {
const query = `
INSERT INTO sync_status (provider, data_type, last_sync_at, last_sync_count, sync_errors)
VALUES ($1, $2, NOW(), $3, NULL)
ON CONFLICT (provider, data_type)
DO UPDATE SET
last_sync_at = NOW(),
last_sync_count = EXCLUDED.last_sync_count,
sync_errors = NULL,
updated_at = NOW()
`;
await postgresClient.query(query, [provider, dataType, count]);
}

View file

@ -0,0 +1,41 @@
import { getLogger } from '@stock-bot/logger';
import { handlerRegistry, type HandlerConfig, type ScheduledJobConfig } from '@stock-bot/queue';
import { symbolOperations } from './operations';
const logger = getLogger('symbols-handler');
const HANDLER_NAME = 'symbols';
const symbolsHandlerConfig: HandlerConfig = {
concurrency: 1,
maxAttempts: 3,
scheduledJobs: [
{
operation: 'sync-qm-symbols',
cronPattern: '0 2 * * *', // Daily at 2 AM
payload: {},
priority: 5,
immediately: false,
} as ScheduledJobConfig,
{
operation: 'sync-symbols-qm',
cronPattern: '0 4 * * *', // Daily at 4 AM
payload: { provider: 'qm', clearFirst: false },
priority: 5,
immediately: false,
} as ScheduledJobConfig,
],
operations: {
'sync-qm-symbols': symbolOperations.syncQMSymbols,
'sync-symbols-qm': symbolOperations.syncSymbolsFromProvider,
'sync-symbols-eod': symbolOperations.syncSymbolsFromProvider,
'sync-symbols-ib': symbolOperations.syncSymbolsFromProvider,
'sync-status': symbolOperations.getSyncStatus,
},
};
export function initializeSymbolsHandler(): void {
logger.info('Registering symbols handler...');
handlerRegistry.registerHandler(HANDLER_NAME, symbolsHandlerConfig);
logger.info('Symbols handler registered successfully');
}

View file

@ -0,0 +1,267 @@
// Framework imports
import { initializeServiceConfig } from '@stock-bot/config';
import { Hono } from 'hono';
import { cors } from 'hono/cors';
// Library imports
import { getLogger, setLoggerConfig, shutdownLoggers } from '@stock-bot/logger';
import { connectMongoDB } from '@stock-bot/mongodb-client';
import { connectPostgreSQL } from '@stock-bot/postgres-client';
import { QueueManager, type QueueManagerConfig } from '@stock-bot/queue';
import { Shutdown } from '@stock-bot/shutdown';
// Local imports
import { enhancedSyncRoutes, healthRoutes, statsRoutes, syncRoutes } from './routes';
const config = initializeServiceConfig();
console.log('Data Sync Service Configuration:', JSON.stringify(config, null, 2));
const serviceConfig = config.service;
const databaseConfig = config.database;
const queueConfig = config.queue;
if (config.log) {
setLoggerConfig({
logLevel: config.log.level,
logConsole: true,
logFile: false,
environment: config.environment,
hideObject: config.log.hideObject,
});
}
// Create logger AFTER config is set
const logger = getLogger('data-pipeline');
const app = new Hono();
// Add CORS middleware
app.use(
'*',
cors({
origin: '*',
allowMethods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS', 'PATCH'],
allowHeaders: ['Content-Type', 'Authorization'],
credentials: false,
})
);
const PORT = serviceConfig.port;
let server: ReturnType<typeof Bun.serve> | null = null;
// Singleton clients are managed in libraries
let queueManager: QueueManager | null = null;
// Initialize shutdown manager
const shutdown = Shutdown.getInstance({ timeout: 15000 });
// Mount routes
app.route('/health', healthRoutes);
app.route('/sync', syncRoutes);
app.route('/sync', enhancedSyncRoutes);
app.route('/sync/stats', statsRoutes);
// Initialize services
async function initializeServices() {
logger.info('Initializing data sync service...');
try {
// Initialize MongoDB client singleton
logger.debug('Connecting to MongoDB...');
const mongoConfig = databaseConfig.mongodb;
await connectMongoDB({
uri: mongoConfig.uri,
database: mongoConfig.database,
host: mongoConfig.host || 'localhost',
port: mongoConfig.port || 27017,
timeouts: {
connectTimeout: 30000,
socketTimeout: 30000,
serverSelectionTimeout: 5000,
},
});
logger.info('MongoDB connected');
// Initialize PostgreSQL client singleton
logger.debug('Connecting to PostgreSQL...');
const pgConfig = databaseConfig.postgres;
await connectPostgreSQL({
host: pgConfig.host,
port: pgConfig.port,
database: pgConfig.database,
username: pgConfig.user,
password: pgConfig.password,
poolSettings: {
min: 2,
max: pgConfig.poolSize || 10,
idleTimeoutMillis: pgConfig.idleTimeout || 30000,
},
});
logger.info('PostgreSQL connected');
// Initialize queue system (with delayed worker start)
logger.debug('Initializing queue system...');
const queueManagerConfig: QueueManagerConfig = {
redis: queueConfig?.redis || {
host: 'localhost',
port: 6379,
db: 1,
},
defaultQueueOptions: {
defaultJobOptions: queueConfig?.defaultJobOptions || {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
removeOnComplete: 10,
removeOnFail: 5,
},
workers: 2,
concurrency: 1,
enableMetrics: true,
enableDLQ: true,
},
enableScheduledJobs: true,
delayWorkerStart: true, // Prevent workers from starting until all singletons are ready
};
queueManager = QueueManager.getOrInitialize(queueManagerConfig);
logger.info('Queue system initialized');
// Initialize handlers (register handlers and scheduled jobs)
logger.debug('Initializing sync handlers...');
const { initializeExchangesHandler } = await import('./handlers/exchanges/exchanges.handler');
const { initializeSymbolsHandler } = await import('./handlers/symbols/symbols.handler');
initializeExchangesHandler();
initializeSymbolsHandler();
logger.info('Sync handlers initialized');
// Create scheduled jobs from registered handlers
logger.debug('Creating scheduled jobs from registered handlers...');
const { handlerRegistry } = await import('@stock-bot/queue');
const allHandlers = handlerRegistry.getAllHandlers();
let totalScheduledJobs = 0;
for (const [handlerName, config] of allHandlers) {
if (config.scheduledJobs && config.scheduledJobs.length > 0) {
const queue = queueManager.getQueue(handlerName);
for (const scheduledJob of config.scheduledJobs) {
// Include handler and operation info in job data
const jobData = {
handler: handlerName,
operation: scheduledJob.operation,
payload: scheduledJob.payload || {},
};
// Build job options from scheduled job config
const jobOptions = {
priority: scheduledJob.priority,
delay: scheduledJob.delay,
repeat: {
immediately: scheduledJob.immediately,
},
};
await queue.addScheduledJob(
scheduledJob.operation,
jobData,
scheduledJob.cronPattern,
jobOptions
);
totalScheduledJobs++;
logger.debug('Scheduled job created', {
handler: handlerName,
operation: scheduledJob.operation,
cronPattern: scheduledJob.cronPattern,
immediately: scheduledJob.immediately,
priority: scheduledJob.priority,
});
}
}
}
logger.info('Scheduled jobs created', { totalJobs: totalScheduledJobs });
// Now that all singletons are initialized and jobs are scheduled, start the workers
logger.debug('Starting queue workers...');
queueManager.startAllWorkers();
logger.info('Queue workers started');
logger.info('All services initialized successfully');
} catch (error) {
logger.error('Failed to initialize services', { error });
throw error;
}
}
// Start server
async function startServer() {
await initializeServices();
server = Bun.serve({
port: PORT,
fetch: app.fetch,
development: config.environment === 'development',
});
logger.info(`Data Sync Service started on port ${PORT}`);
}
// Register shutdown handlers with priorities
// Priority 1: Queue system (highest priority)
shutdown.onShutdownHigh(async () => {
logger.info('Shutting down queue system...');
try {
if (queueManager) {
await queueManager.shutdown();
}
logger.info('Queue system shut down');
} catch (error) {
logger.error('Error shutting down queue system', { error });
}
}, 'Queue System');
// Priority 1: HTTP Server (high priority)
shutdown.onShutdownHigh(async () => {
if (server) {
logger.info('Stopping HTTP server...');
try {
server.stop();
logger.info('HTTP server stopped');
} catch (error) {
logger.error('Error stopping HTTP server', { error });
}
}
}, 'HTTP Server');
// Priority 2: Database connections (medium priority)
shutdown.onShutdownMedium(async () => {
logger.info('Disconnecting from databases...');
try {
const { disconnectMongoDB } = await import('@stock-bot/mongodb-client');
const { disconnectPostgreSQL } = await import('@stock-bot/postgres-client');
await disconnectMongoDB();
await disconnectPostgreSQL();
logger.info('Database connections closed');
} catch (error) {
logger.error('Error closing database connections', { error });
}
}, 'Databases');
// Priority 3: Logger shutdown (lowest priority - runs last)
shutdown.onShutdownLow(async () => {
try {
logger.info('Shutting down loggers...');
await shutdownLoggers();
// Don't log after shutdown
} catch {
// Silently ignore logger shutdown errors
}
}, 'Loggers');
// Start the service
startServer().catch(error => {
logger.fatal('Failed to start data sync service', { error });
process.exit(1);
});
logger.info('Data sync service startup initiated');

View file

@ -0,0 +1,96 @@
import { Hono } from 'hono';
import { getLogger } from '@stock-bot/logger';
import { QueueManager } from '@stock-bot/queue';
const logger = getLogger('enhanced-sync-routes');
const enhancedSync = new Hono();
// Enhanced sync endpoints
enhancedSync.post('/exchanges/all', async c => {
try {
const clearFirst = c.req.query('clear') === 'true';
const queueManager = QueueManager.getInstance();
const exchangesQueue = queueManager.getQueue('exchanges');
const job = await exchangesQueue.addJob('sync-all-exchanges', {
handler: 'exchanges',
operation: 'sync-all-exchanges',
payload: { clearFirst },
});
return c.json({ success: true, jobId: job.id, message: 'Enhanced exchange sync job queued' });
} catch (error) {
logger.error('Failed to queue enhanced exchange sync job', { error });
return c.json(
{ success: false, error: error instanceof Error ? error.message : 'Unknown error' },
500
);
}
});
enhancedSync.post('/provider-mappings/qm', async c => {
try {
const queueManager = QueueManager.getInstance();
const exchangesQueue = queueManager.getQueue('exchanges');
const job = await exchangesQueue.addJob('sync-qm-provider-mappings', {
handler: 'exchanges',
operation: 'sync-qm-provider-mappings',
payload: {},
});
return c.json({ success: true, jobId: job.id, message: 'QM provider mappings sync job queued' });
} catch (error) {
logger.error('Failed to queue QM provider mappings sync job', { error });
return c.json(
{ success: false, error: error instanceof Error ? error.message : 'Unknown error' },
500
);
}
});
enhancedSync.post('/symbols/:provider', async c => {
try {
const provider = c.req.param('provider');
const clearFirst = c.req.query('clear') === 'true';
const queueManager = QueueManager.getInstance();
const symbolsQueue = queueManager.getQueue('symbols');
const job = await symbolsQueue.addJob(`sync-symbols-${provider}`, {
handler: 'symbols',
operation: `sync-symbols-${provider}`,
payload: { provider, clearFirst },
});
return c.json({ success: true, jobId: job.id, message: `${provider} symbols sync job queued` });
} catch (error) {
logger.error('Failed to queue enhanced symbol sync job', { error });
return c.json(
{ success: false, error: error instanceof Error ? error.message : 'Unknown error' },
500
);
}
});
// Enhanced status endpoints
enhancedSync.get('/status/enhanced', async c => {
try {
const queueManager = QueueManager.getInstance();
const exchangesQueue = queueManager.getQueue('exchanges');
const job = await exchangesQueue.addJob('enhanced-sync-status', {
handler: 'exchanges',
operation: 'enhanced-sync-status',
payload: {},
});
// Wait for job to complete and return result
const result = await job.waitUntilFinished();
return c.json(result);
} catch (error) {
logger.error('Failed to get enhanced sync status', { error });
return c.json({ error: error instanceof Error ? error.message : 'Unknown error' }, 500);
}
});
export { enhancedSync as enhancedSyncRoutes };

View file

@ -0,0 +1,14 @@
import { Hono } from 'hono';
const health = new Hono();
// Basic health check endpoint
health.get('/', c => {
return c.json({
status: 'healthy',
service: 'data-pipeline',
timestamp: new Date().toISOString(),
});
});
export { health as healthRoutes };

View file

@ -0,0 +1,5 @@
// Export all route modules
export { healthRoutes } from './health.routes';
export { syncRoutes } from './sync.routes';
export { enhancedSyncRoutes } from './enhanced-sync.routes';
export { statsRoutes } from './stats.routes';

View file

@ -0,0 +1,49 @@
import { Hono } from 'hono';
import { getLogger } from '@stock-bot/logger';
import { QueueManager } from '@stock-bot/queue';
const logger = getLogger('stats-routes');
const stats = new Hono();
// Statistics endpoints
stats.get('/exchanges', async c => {
try {
const queueManager = QueueManager.getInstance();
const exchangesQueue = queueManager.getQueue('exchanges');
const job = await exchangesQueue.addJob('get-exchange-stats', {
handler: 'exchanges',
operation: 'get-exchange-stats',
payload: {},
});
// Wait for job to complete and return result
const result = await job.waitUntilFinished();
return c.json(result);
} catch (error) {
logger.error('Failed to get exchange stats', { error });
return c.json({ error: error instanceof Error ? error.message : 'Unknown error' }, 500);
}
});
stats.get('/provider-mappings', async c => {
try {
const queueManager = QueueManager.getInstance();
const exchangesQueue = queueManager.getQueue('exchanges');
const job = await exchangesQueue.addJob('get-provider-mapping-stats', {
handler: 'exchanges',
operation: 'get-provider-mapping-stats',
payload: {},
});
// Wait for job to complete and return result
const result = await job.waitUntilFinished();
return c.json(result);
} catch (error) {
logger.error('Failed to get provider mapping stats', { error });
return c.json({ error: error instanceof Error ? error.message : 'Unknown error' }, 500);
}
});
export { stats as statsRoutes };

View file

@ -0,0 +1,96 @@
import { Hono } from 'hono';
import { getLogger } from '@stock-bot/logger';
import { QueueManager } from '@stock-bot/queue';
const logger = getLogger('sync-routes');
const sync = new Hono();
// Manual sync trigger endpoints
sync.post('/symbols', async c => {
try {
const queueManager = QueueManager.getInstance();
const symbolsQueue = queueManager.getQueue('symbols');
const job = await symbolsQueue.addJob('sync-qm-symbols', {
handler: 'symbols',
operation: 'sync-qm-symbols',
payload: {},
});
return c.json({ success: true, jobId: job.id, message: 'QM symbols sync job queued' });
} catch (error) {
logger.error('Failed to queue symbol sync job', { error });
return c.json(
{ success: false, error: error instanceof Error ? error.message : 'Unknown error' },
500
);
}
});
sync.post('/exchanges', async c => {
try {
const queueManager = QueueManager.getInstance();
const exchangesQueue = queueManager.getQueue('exchanges');
const job = await exchangesQueue.addJob('sync-qm-exchanges', {
handler: 'exchanges',
operation: 'sync-qm-exchanges',
payload: {},
});
return c.json({ success: true, jobId: job.id, message: 'QM exchanges sync job queued' });
} catch (error) {
logger.error('Failed to queue exchange sync job', { error });
return c.json(
{ success: false, error: error instanceof Error ? error.message : 'Unknown error' },
500
);
}
});
// Get sync status
sync.get('/status', async c => {
try {
const queueManager = QueueManager.getInstance();
const symbolsQueue = queueManager.getQueue('symbols');
const job = await symbolsQueue.addJob('sync-status', {
handler: 'symbols',
operation: 'sync-status',
payload: {},
});
// Wait for job to complete and return result
const result = await job.waitUntilFinished();
return c.json(result);
} catch (error) {
logger.error('Failed to get sync status', { error });
return c.json({ error: error instanceof Error ? error.message : 'Unknown error' }, 500);
}
});
// Clear data endpoint
sync.post('/clear', async c => {
try {
const queueManager = QueueManager.getInstance();
const exchangesQueue = queueManager.getQueue('exchanges');
const job = await exchangesQueue.addJob('clear-postgresql-data', {
handler: 'exchanges',
operation: 'clear-postgresql-data',
payload: {},
});
// Wait for job to complete and return result
const result = await job.waitUntilFinished();
return c.json({ success: true, result });
} catch (error) {
logger.error('Failed to clear PostgreSQL data', { error });
return c.json(
{ success: false, error: error instanceof Error ? error.message : 'Unknown error' },
500
);
}
});
export { sync as syncRoutes };

View file

@ -0,0 +1,27 @@
export interface JobPayload {
[key: string]: any;
}
export interface SyncResult {
processed: number;
created: number;
updated: number;
skipped: number;
errors: number;
}
export interface SyncStatus {
provider: string;
dataType: string;
lastSyncAt?: Date;
lastSyncCount: number;
syncErrors?: string;
}
export interface ExchangeMapping {
id: string;
code: string;
name: string;
country: string;
currency: string;
}

View file

@ -0,0 +1,14 @@
{
"extends": "../../tsconfig.app.json",
"references": [
{ "path": "../../libs/types" },
{ "path": "../../libs/config" },
{ "path": "../../libs/logger" },
{ "path": "../../libs/cache" },
{ "path": "../../libs/queue" },
{ "path": "../../libs/mongodb-client" },
{ "path": "../../libs/postgres-client" },
{ "path": "../../libs/questdb-client" },
{ "path": "../../libs/shutdown" }
]
}