201 lines
No EOL
5.6 KiB
TypeScript
201 lines
No EOL
5.6 KiB
TypeScript
import type { EventHandler, EventSubscription, EventBusMessage } from './types';
|
|
|
|
/**
|
|
* Simple in-memory event bus for testing
|
|
*/
|
|
export class SimpleEventBus {
|
|
private subscriptions = new Map<string, Set<{ id: string; handler: EventHandler }>>();
|
|
private subscriptionById = new Map<string, { id: string; channel: string; handler: EventHandler }>();
|
|
private nextId = 1;
|
|
|
|
subscribe(channel: string, handler: EventHandler): EventSubscription {
|
|
const id = `sub-${this.nextId++}`;
|
|
const subscription = { id, handler };
|
|
|
|
if (!this.subscriptions.has(channel)) {
|
|
this.subscriptions.set(channel, new Set());
|
|
}
|
|
this.subscriptions.get(channel)!.add(subscription);
|
|
this.subscriptionById.set(id, { id, channel, handler });
|
|
|
|
return { channel, handler };
|
|
}
|
|
|
|
unsubscribe(idOrSubscription: string | EventSubscription): boolean {
|
|
if (typeof idOrSubscription === 'string') {
|
|
const subscription = this.subscriptionById.get(idOrSubscription);
|
|
if (!subscription) {
|
|
return false;
|
|
}
|
|
|
|
const channelSubs = this.subscriptions.get(subscription.channel);
|
|
if (channelSubs) {
|
|
channelSubs.forEach(sub => {
|
|
if (sub.id === idOrSubscription) {
|
|
channelSubs.delete(sub);
|
|
}
|
|
});
|
|
if (channelSubs.size === 0) {
|
|
this.subscriptions.delete(subscription.channel);
|
|
}
|
|
}
|
|
|
|
this.subscriptionById.delete(idOrSubscription);
|
|
return true;
|
|
} else {
|
|
// Unsubscribe by matching handler and channel
|
|
const channelSubs = this.subscriptions.get(idOrSubscription.channel);
|
|
if (channelSubs) {
|
|
let removed = false;
|
|
channelSubs.forEach(sub => {
|
|
if (sub.handler === idOrSubscription.handler) {
|
|
channelSubs.delete(sub);
|
|
this.subscriptionById.delete(sub.id);
|
|
removed = true;
|
|
}
|
|
});
|
|
if (channelSubs.size === 0) {
|
|
this.subscriptions.delete(idOrSubscription.channel);
|
|
}
|
|
return removed;
|
|
}
|
|
return false;
|
|
}
|
|
}
|
|
|
|
async publish(event: string, data: any): Promise<void> {
|
|
const message: EventBusMessage = {
|
|
id: `msg-${this.nextId++}`,
|
|
type: event,
|
|
source: 'simple-event-bus',
|
|
timestamp: Date.now(),
|
|
data,
|
|
};
|
|
|
|
const handlers: EventHandler[] = [];
|
|
|
|
// Direct matches
|
|
const directSubs = this.subscriptions.get(event);
|
|
if (directSubs) {
|
|
handlers.push(...Array.from(directSubs).map(s => s.handler));
|
|
}
|
|
|
|
// Pattern matches
|
|
for (const [pattern, subs] of this.subscriptions) {
|
|
if (pattern.includes('*') && this.matchPattern(pattern, event)) {
|
|
handlers.push(...Array.from(subs).map(s => s.handler));
|
|
}
|
|
}
|
|
|
|
// Execute all handlers
|
|
await Promise.all(
|
|
handlers.map(handler =>
|
|
Promise.resolve(handler(message)).catch(err => {
|
|
// Silently catch errors
|
|
})
|
|
)
|
|
);
|
|
}
|
|
|
|
publishSync(event: string, data: any): void {
|
|
const message: EventBusMessage = {
|
|
id: `msg-${this.nextId++}`,
|
|
type: event,
|
|
source: 'simple-event-bus',
|
|
timestamp: Date.now(),
|
|
data,
|
|
};
|
|
|
|
const handlers: EventHandler[] = [];
|
|
|
|
// Direct matches
|
|
const directSubs = this.subscriptions.get(event);
|
|
if (directSubs) {
|
|
handlers.push(...Array.from(directSubs).map(s => s.handler));
|
|
}
|
|
|
|
// Pattern matches
|
|
for (const [pattern, subs] of this.subscriptions) {
|
|
if (pattern.includes('*') && this.matchPattern(pattern, event)) {
|
|
handlers.push(...Array.from(subs).map(s => s.handler));
|
|
}
|
|
}
|
|
|
|
// Execute all handlers synchronously
|
|
handlers.forEach(handler => {
|
|
try {
|
|
handler(message);
|
|
} catch {
|
|
// Silently catch errors
|
|
}
|
|
});
|
|
}
|
|
|
|
once(event: string, handler: EventHandler): EventSubscription {
|
|
let subId: string;
|
|
const wrappedHandler: EventHandler = async (message) => {
|
|
await handler(message);
|
|
this.unsubscribe(subId);
|
|
};
|
|
|
|
const subscription = this.subscribe(event, wrappedHandler);
|
|
// Find the subscription ID
|
|
this.subscriptionById.forEach((value, key) => {
|
|
if (value.handler === wrappedHandler) {
|
|
subId = key;
|
|
}
|
|
});
|
|
|
|
return subscription;
|
|
}
|
|
|
|
off(event: string, handler?: EventHandler): void {
|
|
if (!handler) {
|
|
// Remove all handlers for this event
|
|
const subs = this.subscriptions.get(event);
|
|
if (subs) {
|
|
for (const sub of subs) {
|
|
this.subscriptionById.delete(sub.id);
|
|
}
|
|
this.subscriptions.delete(event);
|
|
}
|
|
} else {
|
|
// Remove specific handler
|
|
const subs = this.subscriptions.get(event);
|
|
if (subs) {
|
|
const toRemove: string[] = [];
|
|
subs.forEach(sub => {
|
|
if (sub.handler === handler) {
|
|
toRemove.push(sub.id);
|
|
}
|
|
});
|
|
toRemove.forEach(id => {
|
|
subs.forEach(sub => {
|
|
if (sub.id === id) {
|
|
subs.delete(sub);
|
|
}
|
|
});
|
|
this.subscriptionById.delete(id);
|
|
});
|
|
if (subs.size === 0) {
|
|
this.subscriptions.delete(event);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
hasSubscribers(event: string): boolean {
|
|
return this.subscriptions.has(event) && this.subscriptions.get(event)!.size > 0;
|
|
}
|
|
|
|
clear(): void {
|
|
this.subscriptions.clear();
|
|
this.subscriptionById.clear();
|
|
}
|
|
|
|
private matchPattern(pattern: string, event: string): boolean {
|
|
// Simple pattern matching: user.* matches user.created, user.updated, etc.
|
|
const regex = new RegExp('^' + pattern.replace(/\*/g, '.*') + '$');
|
|
return regex.test(event);
|
|
}
|
|
} |