simplified a lot of stuff
This commit is contained in:
parent
b845a8eade
commit
885b484a37
20 changed files with 360 additions and 1335 deletions
264
libs/core/cache/src/connection-manager.ts
vendored
264
libs/core/cache/src/connection-manager.ts
vendored
|
|
@ -1,5 +1,6 @@
|
|||
import Redis from 'ioredis';
|
||||
import type { RedisConfig } from './types';
|
||||
import { REDIS_DEFAULTS } from './constants';
|
||||
|
||||
interface ConnectionConfig {
|
||||
name: string;
|
||||
|
|
@ -10,16 +11,12 @@ interface ConnectionConfig {
|
|||
}
|
||||
|
||||
/**
|
||||
* Redis Connection Manager for managing shared and unique connections
|
||||
* Simplified Redis Connection Manager
|
||||
*/
|
||||
export class RedisConnectionManager {
|
||||
private connections = new Map<string, Redis>();
|
||||
private static sharedConnections = new Map<string, Redis>();
|
||||
private static connections = new Map<string, Redis>();
|
||||
private static instance: RedisConnectionManager;
|
||||
private logger: any = console;
|
||||
private static readyConnections = new Set<string>();
|
||||
|
||||
// Singleton pattern for the manager itself
|
||||
static getInstance(): RedisConnectionManager {
|
||||
if (!this.instance) {
|
||||
this.instance = new RedisConnectionManager();
|
||||
|
|
@ -29,251 +26,50 @@ export class RedisConnectionManager {
|
|||
|
||||
/**
|
||||
* Get or create a Redis connection
|
||||
* @param config Connection configuration
|
||||
* @returns Redis connection instance
|
||||
*/
|
||||
getConnection(config: ConnectionConfig): Redis {
|
||||
const { name, singleton = false, db, redisConfig, logger } = config;
|
||||
if (logger) {
|
||||
this.logger = logger;
|
||||
const { name, singleton = true, redisConfig } = config;
|
||||
|
||||
if (singleton) {
|
||||
const existing = RedisConnectionManager.connections.get(name);
|
||||
if (existing) return existing;
|
||||
}
|
||||
|
||||
const connection = this.createConnection(redisConfig);
|
||||
|
||||
if (singleton) {
|
||||
// Use shared connection across all instances
|
||||
if (!RedisConnectionManager.sharedConnections.has(name)) {
|
||||
const connection = this.createConnection(name, redisConfig, db, logger);
|
||||
RedisConnectionManager.sharedConnections.set(name, connection);
|
||||
this.logger.info(`Created shared Redis connection: ${name}`);
|
||||
}
|
||||
const connection = RedisConnectionManager.sharedConnections.get(name);
|
||||
if (!connection) {
|
||||
throw new Error(`Expected connection ${name} to exist in shared connections`);
|
||||
}
|
||||
return connection;
|
||||
} else {
|
||||
// Create unique connection per instance
|
||||
const uniqueName = `${name}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
|
||||
const connection = this.createConnection(uniqueName, redisConfig, db, logger);
|
||||
this.connections.set(uniqueName, connection);
|
||||
this.logger.debug(`Created unique Redis connection: ${uniqueName}`);
|
||||
return connection;
|
||||
RedisConnectionManager.connections.set(name, connection);
|
||||
}
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new Redis connection with configuration
|
||||
* Create a new Redis connection
|
||||
*/
|
||||
private createConnection(name: string, config: RedisConfig, db?: number, logger?: any): Redis {
|
||||
const redisOptions = {
|
||||
private createConnection(config: RedisConfig): Redis {
|
||||
return new Redis({
|
||||
host: config.host,
|
||||
port: config.port,
|
||||
password: config.password || undefined,
|
||||
username: config.username || undefined,
|
||||
db: db ?? config.db ?? 0,
|
||||
maxRetriesPerRequest: config.maxRetriesPerRequest ?? 3,
|
||||
retryDelayOnFailover: config.retryDelayOnFailover ?? 100,
|
||||
connectTimeout: config.connectTimeout ?? 10000,
|
||||
commandTimeout: config.commandTimeout ?? 5000,
|
||||
keepAlive: config.keepAlive ?? 0,
|
||||
connectionName: name,
|
||||
lazyConnect: false, // Connect immediately instead of waiting for first command
|
||||
...(config.tls && {
|
||||
tls: {
|
||||
cert: config.tls.cert || undefined,
|
||||
key: config.tls.key || undefined,
|
||||
ca: config.tls.ca || undefined,
|
||||
rejectUnauthorized: config.tls.rejectUnauthorized ?? true,
|
||||
},
|
||||
}),
|
||||
};
|
||||
|
||||
const redis = new Redis(redisOptions);
|
||||
|
||||
// Use the provided logger or fall back to instance logger
|
||||
const log = logger || this.logger;
|
||||
|
||||
// Setup event handlers
|
||||
redis.on('connect', () => {
|
||||
log.info(`Redis connection established: ${name}`);
|
||||
password: config.password,
|
||||
username: config.username,
|
||||
db: config.db ?? REDIS_DEFAULTS.DB,
|
||||
maxRetriesPerRequest: config.maxRetriesPerRequest ?? REDIS_DEFAULTS.MAX_RETRIES,
|
||||
connectTimeout: config.connectTimeout ?? REDIS_DEFAULTS.CONNECT_TIMEOUT,
|
||||
commandTimeout: config.commandTimeout ?? REDIS_DEFAULTS.COMMAND_TIMEOUT,
|
||||
lazyConnect: false,
|
||||
...(config.tls && { tls: config.tls }),
|
||||
});
|
||||
|
||||
redis.on('ready', () => {
|
||||
log.info(`Redis connection ready: ${name}`);
|
||||
});
|
||||
|
||||
redis.on('error', err => {
|
||||
log.error(`Redis connection error for ${name}:`, err);
|
||||
});
|
||||
|
||||
redis.on('close', () => {
|
||||
log.warn(`Redis connection closed: ${name}`);
|
||||
});
|
||||
|
||||
redis.on('reconnecting', () => {
|
||||
log.warn(`Redis reconnecting: ${name}`);
|
||||
});
|
||||
|
||||
return redis;
|
||||
}
|
||||
|
||||
/**
|
||||
* Close a specific connection
|
||||
* Close all connections
|
||||
*/
|
||||
async closeConnection(connection: Redis): Promise<void> {
|
||||
try {
|
||||
await connection.quit();
|
||||
} catch (error) {
|
||||
this.logger.warn('Error closing Redis connection:', error as Error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Close all connections managed by this instance
|
||||
*/
|
||||
async closeAllConnections(): Promise<void> {
|
||||
// Close instance-specific connections
|
||||
const instancePromises = Array.from(this.connections.values()).map(conn =>
|
||||
this.closeConnection(conn)
|
||||
static async closeAll(): Promise<void> {
|
||||
const promises = Array.from(this.connections.values()).map(conn =>
|
||||
conn.quit().catch(() => {})
|
||||
);
|
||||
await Promise.all(instancePromises);
|
||||
await Promise.all(promises);
|
||||
this.connections.clear();
|
||||
|
||||
// Close shared connections (only if this is the last instance)
|
||||
if (RedisConnectionManager.instance === this) {
|
||||
const sharedPromises = Array.from(RedisConnectionManager.sharedConnections.values()).map(
|
||||
conn => this.closeConnection(conn)
|
||||
);
|
||||
await Promise.all(sharedPromises);
|
||||
RedisConnectionManager.sharedConnections.clear();
|
||||
}
|
||||
|
||||
this.logger.info('All Redis connections closed');
|
||||
}
|
||||
|
||||
/**
|
||||
* Get connection statistics
|
||||
*/
|
||||
getConnectionCount(): { shared: number; unique: number } {
|
||||
return {
|
||||
shared: RedisConnectionManager.sharedConnections.size,
|
||||
unique: this.connections.size,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all connection names for monitoring
|
||||
*/
|
||||
getConnectionNames(): { shared: string[]; unique: string[] } {
|
||||
return {
|
||||
shared: Array.from(RedisConnectionManager.sharedConnections.keys()),
|
||||
unique: Array.from(this.connections.keys()),
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Health check for all connections
|
||||
*/
|
||||
async healthCheck(): Promise<{ healthy: boolean; details: Record<string, boolean> }> {
|
||||
const details: Record<string, boolean> = {};
|
||||
let allHealthy = true;
|
||||
|
||||
// Check shared connections
|
||||
for (const [name, connection] of RedisConnectionManager.sharedConnections) {
|
||||
try {
|
||||
await connection.ping();
|
||||
details[`shared:${name}`] = true;
|
||||
} catch {
|
||||
details[`shared:${name}`] = false;
|
||||
allHealthy = false;
|
||||
}
|
||||
}
|
||||
|
||||
// Check instance connections
|
||||
for (const [name, connection] of this.connections) {
|
||||
try {
|
||||
await connection.ping();
|
||||
details[`unique:${name}`] = true;
|
||||
} catch {
|
||||
details[`unique:${name}`] = false;
|
||||
allHealthy = false;
|
||||
}
|
||||
}
|
||||
|
||||
return { healthy: allHealthy, details };
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for all created connections to be ready
|
||||
* @param timeout Maximum time to wait in milliseconds
|
||||
* @returns Promise that resolves when all connections are ready
|
||||
*/
|
||||
static async waitForAllConnections(timeout: number = 30000): Promise<void> {
|
||||
const instance = this.getInstance();
|
||||
const allConnections = new Map([...instance.connections, ...this.sharedConnections]);
|
||||
|
||||
if (allConnections.size === 0) {
|
||||
instance.logger.debug('No Redis connections to wait for');
|
||||
return;
|
||||
}
|
||||
|
||||
instance.logger.info(`Waiting for ${allConnections.size} Redis connections to be ready...`);
|
||||
|
||||
const connectionPromises = Array.from(allConnections.entries()).map(([name, redis]) =>
|
||||
instance.waitForConnection(redis, name, timeout)
|
||||
);
|
||||
|
||||
try {
|
||||
await Promise.all(connectionPromises);
|
||||
instance.logger.info('All Redis connections are ready');
|
||||
} catch (error) {
|
||||
instance.logger.error('Failed to establish all Redis connections:', error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for a specific connection to be ready
|
||||
*/
|
||||
private async waitForConnection(redis: Redis, name: string, timeout: number): Promise<void> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const timeoutId = setTimeout(() => {
|
||||
reject(new Error(`Redis connection ${name} failed to be ready within ${timeout}ms`));
|
||||
}, timeout);
|
||||
|
||||
const onReady = () => {
|
||||
clearTimeout(timeoutId);
|
||||
RedisConnectionManager.readyConnections.add(name);
|
||||
this.logger.debug(`Redis connection ready: ${name}`);
|
||||
resolve();
|
||||
};
|
||||
|
||||
const onError = (err: Error) => {
|
||||
clearTimeout(timeoutId);
|
||||
this.logger.error(`Redis connection failed for ${name}:`, err);
|
||||
reject(err);
|
||||
};
|
||||
|
||||
if (redis.status === 'ready') {
|
||||
onReady();
|
||||
} else {
|
||||
redis.once('ready', onReady);
|
||||
redis.once('error', onError);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if all connections are ready
|
||||
*/
|
||||
static areAllConnectionsReady(): boolean {
|
||||
const instance = this.getInstance();
|
||||
const allConnections = new Map([...instance.connections, ...this.sharedConnections]);
|
||||
|
||||
return (
|
||||
allConnections.size > 0 &&
|
||||
Array.from(allConnections.keys()).every(name => this.readyConnections.has(name))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
export default RedisConnectionManager;
|
||||
}
|
||||
16
libs/core/cache/src/constants.ts
vendored
Normal file
16
libs/core/cache/src/constants.ts
vendored
Normal file
|
|
@ -0,0 +1,16 @@
|
|||
// Cache constants
|
||||
export const CACHE_DEFAULTS = {
|
||||
TTL: 3600, // 1 hour in seconds
|
||||
KEY_PREFIX: 'cache:',
|
||||
SCAN_COUNT: 100,
|
||||
} as const;
|
||||
|
||||
// Redis connection constants
|
||||
export const REDIS_DEFAULTS = {
|
||||
DB: 0,
|
||||
MAX_RETRIES: 3,
|
||||
RETRY_DELAY: 100,
|
||||
CONNECT_TIMEOUT: 10000,
|
||||
COMMAND_TIMEOUT: 5000,
|
||||
KEEP_ALIVE: 0,
|
||||
} as const;
|
||||
20
libs/core/cache/src/index.ts
vendored
20
libs/core/cache/src/index.ts
vendored
|
|
@ -5,7 +5,7 @@ import type { CacheOptions, CacheProvider } from './types';
|
|||
const cacheInstances = new Map<string, CacheProvider>();
|
||||
|
||||
/**
|
||||
* Create a Redis cache instance with trading-optimized defaults
|
||||
* Create a Redis cache instance
|
||||
*/
|
||||
export function createCache(options: CacheOptions): CacheProvider {
|
||||
const defaultOptions: CacheOptions = {
|
||||
|
|
@ -37,19 +37,7 @@ export function createCache(options: CacheOptions): CacheProvider {
|
|||
return new RedisCache(defaultOptions);
|
||||
}
|
||||
|
||||
// Export types and classes
|
||||
export type {
|
||||
CacheConfig,
|
||||
CacheKey,
|
||||
CacheOptions,
|
||||
CacheProvider,
|
||||
CacheStats,
|
||||
RedisConfig,
|
||||
SerializationOptions,
|
||||
} from './types';
|
||||
|
||||
export { RedisConnectionManager } from './connection-manager';
|
||||
export { CacheKeyGenerator } from './key-generator';
|
||||
export { RedisCache } from './redis-cache';
|
||||
// Export only what's actually used
|
||||
export type { CacheProvider, CacheStats } from './types';
|
||||
export { NamespacedCache } from './namespaced-cache';
|
||||
export { createNamespacedCache, isCacheAvailable } from './cache-factory';
|
||||
export { createNamespacedCache } from './cache-factory';
|
||||
508
libs/core/cache/src/redis-cache.ts
vendored
508
libs/core/cache/src/redis-cache.ts
vendored
|
|
@ -1,20 +1,16 @@
|
|||
import Redis from 'ioredis';
|
||||
import { RedisConnectionManager } from './connection-manager';
|
||||
import type { CacheOptions, CacheProvider, CacheStats } from './types';
|
||||
import { CACHE_DEFAULTS } from './constants';
|
||||
|
||||
/**
|
||||
* Simplified Redis-based cache provider using connection manager
|
||||
* Simplified Redis-based cache provider
|
||||
*/
|
||||
export class RedisCache implements CacheProvider {
|
||||
private redis: Redis;
|
||||
private logger: any;
|
||||
private defaultTTL: number;
|
||||
private keyPrefix: string;
|
||||
private enableMetrics: boolean;
|
||||
private isConnected = false;
|
||||
private startTime = Date.now();
|
||||
private connectionManager: RedisConnectionManager;
|
||||
|
||||
private stats: CacheStats = {
|
||||
hits: 0,
|
||||
misses: 0,
|
||||
|
|
@ -23,65 +19,22 @@ export class RedisCache implements CacheProvider {
|
|||
total: 0,
|
||||
uptime: 0,
|
||||
};
|
||||
private startTime = Date.now();
|
||||
|
||||
constructor(options: CacheOptions) {
|
||||
this.defaultTTL = options.ttl ?? 3600; // 1 hour default
|
||||
this.keyPrefix = options.keyPrefix ?? 'cache:';
|
||||
this.enableMetrics = options.enableMetrics ?? true;
|
||||
this.logger = options.logger || console; // Use provided logger or console as fallback
|
||||
this.defaultTTL = options.ttl ?? CACHE_DEFAULTS.TTL;
|
||||
this.keyPrefix = options.keyPrefix ?? CACHE_DEFAULTS.KEY_PREFIX;
|
||||
this.logger = options.logger || console;
|
||||
|
||||
// Get connection manager instance
|
||||
this.connectionManager = RedisConnectionManager.getInstance();
|
||||
|
||||
// Generate connection name based on cache type
|
||||
const baseName =
|
||||
options.name ||
|
||||
this.keyPrefix
|
||||
.replace(':', '')
|
||||
.replace(/[^a-zA-Z0-9]/g, '')
|
||||
.toUpperCase() ||
|
||||
'CACHE';
|
||||
|
||||
// Get Redis connection (shared by default for cache)
|
||||
this.redis = this.connectionManager.getConnection({
|
||||
name: `${baseName}-SERVICE`,
|
||||
singleton: options.shared ?? true, // Default to shared connection for cache
|
||||
const manager = RedisConnectionManager.getInstance();
|
||||
const name = options.name || 'CACHE';
|
||||
|
||||
this.redis = manager.getConnection({
|
||||
name: `${name}-SERVICE`,
|
||||
singleton: options.shared ?? true,
|
||||
redisConfig: options.redisConfig,
|
||||
logger: this.logger,
|
||||
});
|
||||
|
||||
// Only setup event handlers for non-shared connections to avoid memory leaks
|
||||
if (!(options.shared ?? true)) {
|
||||
this.setupEventHandlers();
|
||||
} else {
|
||||
// For shared connections, just monitor the connection status without adding handlers
|
||||
this.isConnected = this.redis.status === 'ready';
|
||||
}
|
||||
}
|
||||
|
||||
private setupEventHandlers(): void {
|
||||
this.redis.on('connect', () => {
|
||||
this.logger.info('Redis cache connected');
|
||||
});
|
||||
|
||||
this.redis.on('ready', () => {
|
||||
this.isConnected = true;
|
||||
this.logger.info('Redis cache ready');
|
||||
});
|
||||
|
||||
this.redis.on('error', (error: Error) => {
|
||||
this.isConnected = false;
|
||||
this.logger.error('Redis cache connection error', { error: error.message });
|
||||
});
|
||||
|
||||
this.redis.on('close', () => {
|
||||
this.isConnected = false;
|
||||
this.logger.warn('Redis cache connection closed');
|
||||
});
|
||||
|
||||
this.redis.on('reconnecting', () => {
|
||||
this.logger.warn('Redis cache reconnecting...');
|
||||
});
|
||||
}
|
||||
|
||||
private getKey(key: string): string {
|
||||
|
|
@ -89,10 +42,6 @@ export class RedisCache implements CacheProvider {
|
|||
}
|
||||
|
||||
private updateStats(hit: boolean, error = false): void {
|
||||
if (!this.enableMetrics) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (error) {
|
||||
this.stats.errors++;
|
||||
} else if (hit) {
|
||||
|
|
@ -100,256 +49,145 @@ export class RedisCache implements CacheProvider {
|
|||
} else {
|
||||
this.stats.misses++;
|
||||
}
|
||||
|
||||
this.stats.total = this.stats.hits + this.stats.misses;
|
||||
this.stats.hitRate = this.stats.total > 0 ? this.stats.hits / this.stats.total : 0;
|
||||
this.stats.uptime = Date.now() - this.startTime;
|
||||
}
|
||||
|
||||
private async safeExecute<T>(
|
||||
operation: () => Promise<T>,
|
||||
fallback: T,
|
||||
operationName: string
|
||||
): Promise<T> {
|
||||
try {
|
||||
if (!this.isReady()) {
|
||||
this.logger.warn(`Redis not ready for ${operationName}, using fallback`);
|
||||
this.updateStats(false, true);
|
||||
return fallback;
|
||||
}
|
||||
return await operation();
|
||||
} catch (error) {
|
||||
this.logger.error(`Redis ${operationName} failed`, {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
this.updateStats(false, true);
|
||||
return fallback;
|
||||
}
|
||||
}
|
||||
|
||||
async get<T>(key: string): Promise<T | null> {
|
||||
return this.safeExecute(
|
||||
async () => {
|
||||
const fullKey = this.getKey(key);
|
||||
const value = await this.redis.get(fullKey);
|
||||
|
||||
if (value === null) {
|
||||
this.updateStats(false);
|
||||
this.logger.debug('Cache miss', { key });
|
||||
return null;
|
||||
}
|
||||
|
||||
this.updateStats(true);
|
||||
this.logger.debug('Cache hit', { key });
|
||||
|
||||
try {
|
||||
return JSON.parse(value) as T;
|
||||
} catch {
|
||||
// If parsing fails, return as string
|
||||
return value as unknown as T;
|
||||
}
|
||||
},
|
||||
null,
|
||||
'get'
|
||||
);
|
||||
try {
|
||||
const value = await this.redis.get(this.getKey(key));
|
||||
if (value === null) {
|
||||
this.updateStats(false);
|
||||
return null;
|
||||
}
|
||||
this.updateStats(true);
|
||||
return JSON.parse(value);
|
||||
} catch (error) {
|
||||
this.updateStats(false, true);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
async set<T>(
|
||||
key: string,
|
||||
value: T,
|
||||
options?:
|
||||
| number
|
||||
| {
|
||||
ttl?: number;
|
||||
preserveTTL?: boolean;
|
||||
onlyIfExists?: boolean;
|
||||
onlyIfNotExists?: boolean;
|
||||
getOldValue?: boolean;
|
||||
}
|
||||
): Promise<T | null> {
|
||||
// Validate options before safeExecute
|
||||
const config = typeof options === 'number' ? { ttl: options } : options || {};
|
||||
if (config.onlyIfExists && config.onlyIfNotExists) {
|
||||
throw new Error('Cannot specify both onlyIfExists and onlyIfNotExists');
|
||||
options?: number | {
|
||||
ttl?: number;
|
||||
preserveTTL?: boolean;
|
||||
onlyIfExists?: boolean;
|
||||
onlyIfNotExists?: boolean;
|
||||
getOldValue?: boolean;
|
||||
}
|
||||
|
||||
return this.safeExecute(
|
||||
async () => {
|
||||
const fullKey = this.getKey(key);
|
||||
const serialized = typeof value === 'string' ? value : JSON.stringify(value);
|
||||
|
||||
// Config is already parsed and validated above
|
||||
|
||||
let oldValue: T | null = null;
|
||||
|
||||
// Get old value if requested
|
||||
if (config.getOldValue) {
|
||||
const existingValue = await this.redis.get(fullKey);
|
||||
if (existingValue !== null) {
|
||||
try {
|
||||
oldValue = JSON.parse(existingValue) as T;
|
||||
} catch {
|
||||
oldValue = existingValue as unknown as T;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Handle preserveTTL logic
|
||||
if (config.preserveTTL) {
|
||||
const currentTTL = await this.redis.ttl(fullKey);
|
||||
|
||||
if (currentTTL === -2) {
|
||||
// Key doesn't exist
|
||||
if (config.onlyIfExists) {
|
||||
this.logger.debug('Set skipped - key does not exist and onlyIfExists is true', {
|
||||
key,
|
||||
});
|
||||
return oldValue;
|
||||
}
|
||||
// Set with default or specified TTL
|
||||
const ttl = config.ttl ?? this.defaultTTL;
|
||||
await this.redis.setex(fullKey, ttl, serialized);
|
||||
this.logger.debug('Cache set with new TTL (key did not exist)', { key, ttl });
|
||||
} else if (currentTTL === -1) {
|
||||
// Key exists but has no expiry - preserve the no-expiry state
|
||||
await this.redis.set(fullKey, serialized);
|
||||
this.logger.debug('Cache set preserving no-expiry', { key });
|
||||
} else {
|
||||
// Key exists with TTL - preserve it
|
||||
await this.redis.setex(fullKey, currentTTL, serialized);
|
||||
this.logger.debug('Cache set preserving existing TTL', { key, ttl: currentTTL });
|
||||
}
|
||||
): Promise<T | null> {
|
||||
try {
|
||||
const fullKey = this.getKey(key);
|
||||
const serialized = JSON.stringify(value);
|
||||
const opts = typeof options === 'number' ? { ttl: options } : options || {};
|
||||
|
||||
let oldValue: T | null = null;
|
||||
if (opts.getOldValue) {
|
||||
const existing = await this.redis.get(fullKey);
|
||||
if (existing) oldValue = JSON.parse(existing);
|
||||
}
|
||||
|
||||
const ttl = opts.ttl ?? this.defaultTTL;
|
||||
|
||||
if (opts.onlyIfExists) {
|
||||
const result = await this.redis.set(fullKey, serialized, 'EX', ttl, 'XX');
|
||||
if (!result) return oldValue;
|
||||
} else if (opts.onlyIfNotExists) {
|
||||
const result = await this.redis.set(fullKey, serialized, 'EX', ttl, 'NX');
|
||||
if (!result) return oldValue;
|
||||
} else if (opts.preserveTTL) {
|
||||
const currentTTL = await this.redis.ttl(fullKey);
|
||||
if (currentTTL > 0) {
|
||||
await this.redis.setex(fullKey, currentTTL, serialized);
|
||||
} else {
|
||||
// Standard set logic with conditional operations
|
||||
|
||||
if (config.onlyIfExists) {
|
||||
// Only set if key exists (XX flag)
|
||||
const ttl = config.ttl ?? this.defaultTTL;
|
||||
const result = await this.redis.set(fullKey, serialized, 'EX', ttl, 'XX');
|
||||
if (result === null) {
|
||||
this.logger.debug('Set skipped - key does not exist', { key });
|
||||
return oldValue;
|
||||
}
|
||||
} else if (config.onlyIfNotExists) {
|
||||
// Only set if key doesn't exist (NX flag)
|
||||
const ttl = config.ttl ?? this.defaultTTL;
|
||||
const result = await this.redis.set(fullKey, serialized, 'EX', ttl, 'NX');
|
||||
if (result === null) {
|
||||
this.logger.debug('Set skipped - key already exists', { key });
|
||||
return oldValue;
|
||||
}
|
||||
} else {
|
||||
// Standard set
|
||||
const ttl = config.ttl ?? this.defaultTTL;
|
||||
await this.redis.setex(fullKey, ttl, serialized);
|
||||
}
|
||||
|
||||
this.logger.debug('Cache set', { key, ttl: config.ttl ?? this.defaultTTL });
|
||||
await this.redis.setex(fullKey, ttl, serialized);
|
||||
}
|
||||
|
||||
return oldValue;
|
||||
},
|
||||
null,
|
||||
'set'
|
||||
);
|
||||
} else {
|
||||
await this.redis.setex(fullKey, ttl, serialized);
|
||||
}
|
||||
|
||||
return oldValue;
|
||||
} catch (error) {
|
||||
this.updateStats(false, true);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async del(key: string): Promise<void> {
|
||||
await this.safeExecute(
|
||||
async () => {
|
||||
const fullKey = this.getKey(key);
|
||||
await this.redis.del(fullKey);
|
||||
this.logger.debug('Cache delete', { key });
|
||||
},
|
||||
undefined,
|
||||
'del'
|
||||
);
|
||||
try {
|
||||
await this.redis.del(this.getKey(key));
|
||||
} catch (error) {
|
||||
this.updateStats(false, true);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async exists(key: string): Promise<boolean> {
|
||||
return this.safeExecute(
|
||||
async () => {
|
||||
const fullKey = this.getKey(key);
|
||||
const result = await this.redis.exists(fullKey);
|
||||
return result === 1;
|
||||
},
|
||||
false,
|
||||
'exists'
|
||||
);
|
||||
try {
|
||||
return (await this.redis.exists(this.getKey(key))) === 1;
|
||||
} catch (error) {
|
||||
this.updateStats(false, true);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
async clear(): Promise<void> {
|
||||
await this.safeExecute(
|
||||
async () => {
|
||||
const pattern = `${this.keyPrefix}*`;
|
||||
const keys = await this.redis.keys(pattern);
|
||||
if (keys.length > 0) {
|
||||
await this.redis.del(...keys);
|
||||
this.logger.warn('Cache cleared', { keysDeleted: keys.length });
|
||||
try {
|
||||
const stream = this.redis.scanStream({
|
||||
match: `${this.keyPrefix}*`,
|
||||
count: CACHE_DEFAULTS.SCAN_COUNT
|
||||
});
|
||||
|
||||
const pipeline = this.redis.pipeline();
|
||||
stream.on('data', (keys: string[]) => {
|
||||
if (keys.length) {
|
||||
keys.forEach(key => pipeline.del(key));
|
||||
}
|
||||
},
|
||||
undefined,
|
||||
'clear'
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a value using a raw Redis key (bypassing the keyPrefix)
|
||||
* Useful for accessing cache data from other services with different prefixes
|
||||
*/
|
||||
async getRaw<T = unknown>(key: string): Promise<T | null> {
|
||||
return this.safeExecute(
|
||||
async () => {
|
||||
// Use the key directly without adding our prefix
|
||||
const value = await this.redis.get(key);
|
||||
if (!value) {
|
||||
this.updateStats(false);
|
||||
return null;
|
||||
}
|
||||
this.updateStats(true);
|
||||
|
||||
try {
|
||||
const parsed = JSON.parse(value);
|
||||
this.logger.debug('Cache raw get hit', { key });
|
||||
return parsed;
|
||||
} catch (error) {
|
||||
// If JSON parsing fails, log the error with more context
|
||||
this.logger.warn('Cache getRaw JSON parse failed', {
|
||||
key,
|
||||
valueLength: value.length,
|
||||
valuePreview: value.substring(0, 100),
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
// Return the raw value as-is if it can't be parsed
|
||||
return value as unknown as T;
|
||||
}
|
||||
},
|
||||
null,
|
||||
'getRaw'
|
||||
);
|
||||
});
|
||||
|
||||
await new Promise((resolve, reject) => {
|
||||
stream.on('end', resolve);
|
||||
stream.on('error', reject);
|
||||
});
|
||||
|
||||
await pipeline.exec();
|
||||
} catch (error) {
|
||||
this.updateStats(false, true);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async keys(pattern: string): Promise<string[]> {
|
||||
return this.safeExecute(
|
||||
async () => {
|
||||
const fullPattern = `${this.keyPrefix}${pattern}`;
|
||||
const keys = await this.redis.keys(fullPattern);
|
||||
// Remove the prefix from returned keys to match the interface expectation
|
||||
return keys.map(key => key.replace(this.keyPrefix, ''));
|
||||
},
|
||||
[],
|
||||
'keys'
|
||||
);
|
||||
try {
|
||||
const keys: string[] = [];
|
||||
const stream = this.redis.scanStream({
|
||||
match: `${this.keyPrefix}${pattern}`,
|
||||
count: CACHE_DEFAULTS.SCAN_COUNT
|
||||
});
|
||||
|
||||
await new Promise((resolve, reject) => {
|
||||
stream.on('data', (resultKeys: string[]) => {
|
||||
keys.push(...resultKeys.map(k => k.replace(this.keyPrefix, '')));
|
||||
});
|
||||
stream.on('end', resolve);
|
||||
stream.on('error', reject);
|
||||
});
|
||||
|
||||
return keys;
|
||||
} catch (error) {
|
||||
this.updateStats(false, true);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
async health(): Promise<boolean> {
|
||||
try {
|
||||
const pong = await this.redis.ping();
|
||||
return pong === 'PONG';
|
||||
} catch (error) {
|
||||
this.logger.error('Redis health check failed', {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
return (await this.redis.ping()) === 'PONG';
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
@ -362,115 +200,21 @@ export class RedisCache implements CacheProvider {
|
|||
}
|
||||
|
||||
async waitForReady(timeout = 5000): Promise<void> {
|
||||
if (this.redis.status === 'ready') return;
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
if (this.redis.status === 'ready') {
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
|
||||
const timeoutId = setTimeout(() => {
|
||||
const timer = setTimeout(() => {
|
||||
reject(new Error(`Redis connection timeout after ${timeout}ms`));
|
||||
}, timeout);
|
||||
|
||||
this.redis.once('ready', () => {
|
||||
clearTimeout(timeoutId);
|
||||
clearTimeout(timer);
|
||||
resolve();
|
||||
});
|
||||
|
||||
this.redis.once('error', error => {
|
||||
clearTimeout(timeoutId);
|
||||
reject(error);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
isReady(): boolean {
|
||||
// Always check the actual Redis connection status
|
||||
const ready = this.redis.status === 'ready';
|
||||
|
||||
// Update local flag if we're not using shared connection
|
||||
if (this.isConnected !== ready) {
|
||||
this.isConnected = ready;
|
||||
}
|
||||
|
||||
return ready;
|
||||
return this.redis.status === 'ready';
|
||||
}
|
||||
|
||||
// Enhanced convenience methods
|
||||
async update<T>(key: string, value: T): Promise<T | null> {
|
||||
return this.set(key, value, { preserveTTL: true, getOldValue: true });
|
||||
}
|
||||
|
||||
async setIfExists<T>(key: string, value: T, ttl?: number): Promise<boolean> {
|
||||
const result = await this.set(key, value, { ttl, onlyIfExists: true });
|
||||
return result !== null || (await this.exists(key));
|
||||
}
|
||||
|
||||
async setIfNotExists<T>(key: string, value: T, ttl?: number): Promise<boolean> {
|
||||
const oldValue = await this.set(key, value, { ttl, onlyIfNotExists: true, getOldValue: true });
|
||||
return oldValue === null; // Returns true if key didn't exist before
|
||||
}
|
||||
|
||||
async replace<T>(key: string, value: T, ttl?: number): Promise<T | null> {
|
||||
return this.set(key, value, { ttl, onlyIfExists: true, getOldValue: true });
|
||||
}
|
||||
|
||||
// Atomic update with transformation
|
||||
async updateField<T>(
|
||||
key: string,
|
||||
updater: (current: T | null) => T,
|
||||
ttl?: number
|
||||
): Promise<T | null> {
|
||||
return this.safeExecute(
|
||||
async () => {
|
||||
const fullKey = this.getKey(key);
|
||||
|
||||
// Use Lua script for atomic read-modify-write
|
||||
const luaScript = `
|
||||
local key = KEYS[1]
|
||||
|
||||
-- Get current value and TTL
|
||||
local current_value = redis.call('GET', key)
|
||||
local current_ttl = redis.call('TTL', key)
|
||||
|
||||
-- Return current value for processing
|
||||
return {current_value, current_ttl}
|
||||
`;
|
||||
|
||||
const [currentValue, currentTTL] = (await this.redis.eval(luaScript, 1, fullKey)) as [
|
||||
string | null,
|
||||
number,
|
||||
];
|
||||
|
||||
// Parse current value
|
||||
let parsed: T | null = null;
|
||||
if (currentValue !== null) {
|
||||
try {
|
||||
parsed = JSON.parse(currentValue) as T;
|
||||
} catch {
|
||||
parsed = currentValue as unknown as T;
|
||||
}
|
||||
}
|
||||
|
||||
// Apply updater function
|
||||
const newValue = updater(parsed);
|
||||
|
||||
// Set the new value with appropriate TTL logic
|
||||
if (ttl !== undefined) {
|
||||
// Use specified TTL
|
||||
await this.set(key, newValue, ttl);
|
||||
} else if (currentTTL === -2) {
|
||||
// Key didn't exist, use default TTL
|
||||
await this.set(key, newValue);
|
||||
} else {
|
||||
// Preserve existing TTL
|
||||
await this.set(key, newValue, { preserveTTL: true });
|
||||
}
|
||||
|
||||
return parsed;
|
||||
},
|
||||
null,
|
||||
'updateField'
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue