work on postgress / will prob remove and work on ib exchanges and symbols
This commit is contained in:
parent
cce5126cb7
commit
a20a11c1aa
16 changed files with 1441 additions and 95 deletions
|
|
@ -1,4 +1,4 @@
|
|||
import { QueryResult as PgQueryResult, Pool, PoolClient, QueryResultRow } from 'pg';
|
||||
import { Pool, QueryResultRow } from 'pg';
|
||||
import { postgresConfig } from '@stock-bot/config';
|
||||
import { getLogger } from '@stock-bot/logger';
|
||||
import { PostgreSQLHealthMonitor } from './health';
|
||||
|
|
@ -204,6 +204,99 @@ export class PostgreSQLClient {
|
|||
return await this.query<T>(query, params);
|
||||
}
|
||||
|
||||
/**
|
||||
* Batch upsert operation for high-performance inserts/updates
|
||||
*/
|
||||
async batchUpsert(
|
||||
tableName: string,
|
||||
data: Record<string, unknown>[],
|
||||
conflictColumn: string,
|
||||
options: {
|
||||
chunkSize?: number;
|
||||
excludeColumns?: string[];
|
||||
} = {}
|
||||
): Promise<{ insertedCount: number; updatedCount: number }> {
|
||||
if (!this.pool) {
|
||||
throw new Error('PostgreSQL client not connected');
|
||||
}
|
||||
|
||||
if (data.length === 0) {
|
||||
return { insertedCount: 0, updatedCount: 0 };
|
||||
}
|
||||
|
||||
const { chunkSize = 1000, excludeColumns = [] } = options;
|
||||
const columns = Object.keys(data[0]).filter(col => !excludeColumns.includes(col));
|
||||
const updateColumns = columns.filter(col => col !== conflictColumn);
|
||||
|
||||
let totalInserted = 0;
|
||||
let totalUpdated = 0;
|
||||
|
||||
// Process in chunks to avoid memory issues and parameter limits
|
||||
for (let i = 0; i < data.length; i += chunkSize) {
|
||||
const chunk = data.slice(i, i + chunkSize);
|
||||
|
||||
// Build placeholders for this chunk
|
||||
const placeholders = chunk.map((_, rowIndex) => {
|
||||
const rowPlaceholders = columns.map((_, colIndex) => {
|
||||
return `$${rowIndex * columns.length + colIndex + 1}`;
|
||||
});
|
||||
return `(${rowPlaceholders.join(', ')})`;
|
||||
});
|
||||
|
||||
// Flatten the chunk data
|
||||
const values = chunk.flatMap(row => columns.map(col => row[col as keyof typeof row]));
|
||||
|
||||
// Build the upsert query
|
||||
const updateClauses = updateColumns.map(col => `${col} = EXCLUDED.${col}`);
|
||||
const query = `
|
||||
INSERT INTO ${tableName} (${columns.join(', ')})
|
||||
VALUES ${placeholders.join(', ')}
|
||||
ON CONFLICT (${conflictColumn})
|
||||
DO UPDATE SET
|
||||
${updateClauses.join(', ')},
|
||||
updated_at = NOW()
|
||||
RETURNING (xmax = 0) AS is_insert
|
||||
`;
|
||||
|
||||
try {
|
||||
const startTime = Date.now();
|
||||
const result = await this.pool.query(query, values);
|
||||
const executionTime = Date.now() - startTime;
|
||||
|
||||
// Count inserts vs updates
|
||||
const inserted = result.rows.filter((row: { is_insert: boolean }) => row.is_insert).length;
|
||||
const updated = result.rows.length - inserted;
|
||||
|
||||
totalInserted += inserted;
|
||||
totalUpdated += updated;
|
||||
|
||||
this.logger.debug(`Batch upsert chunk processed in ${executionTime}ms`, {
|
||||
chunkSize: chunk.length,
|
||||
inserted,
|
||||
updated,
|
||||
table: tableName,
|
||||
});
|
||||
} catch (error) {
|
||||
this.logger.error(`Batch upsert failed on chunk ${Math.floor(i / chunkSize) + 1}:`, {
|
||||
error,
|
||||
table: tableName,
|
||||
chunkStart: i,
|
||||
chunkSize: chunk.length,
|
||||
});
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.info('Batch upsert completed', {
|
||||
table: tableName,
|
||||
totalRecords: data.length,
|
||||
inserted: totalInserted,
|
||||
updated: totalUpdated,
|
||||
});
|
||||
|
||||
return { insertedCount: totalInserted, updatedCount: totalUpdated };
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a table exists
|
||||
*/
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue