/** * Real-Time Event Stream System for LocalGreenChain * * Provides Server-Sent Events (SSE) and webhook support for * real-time transparency and notifications. */ import * as crypto from 'crypto'; import * as fs from 'fs'; import * as path from 'path'; // Event Types export type TransparencyEventType = | 'plant.registered' | 'plant.cloned' | 'plant.transferred' | 'plant.updated' | 'transport.started' | 'transport.completed' | 'transport.verified' | 'demand.created' | 'demand.matched' | 'supply.committed' | 'farm.registered' | 'farm.updated' | 'batch.started' | 'batch.harvested' | 'agent.alert' | 'agent.task_completed' | 'agent.error' | 'blockchain.block_added' | 'blockchain.verified' | 'blockchain.error' | 'system.health' | 'system.alert' | 'system.metric' | 'audit.logged' | 'audit.anomaly'; export type EventPriority = 'LOW' | 'NORMAL' | 'HIGH' | 'CRITICAL'; export interface TransparencyEvent { id: string; type: TransparencyEventType; priority: EventPriority; timestamp: string; source: string; data: Record; metadata?: { correlationId?: string; causationId?: string; userId?: string; sessionId?: string; }; } export interface EventSubscription { id: string; types: TransparencyEventType[]; priorities?: EventPriority[]; filters?: { source?: string; userId?: string; plantId?: string; region?: string; }; callback?: (event: TransparencyEvent) => void; webhookUrl?: string; createdAt: string; lastEventAt?: string; eventCount: number; } export interface WebhookConfig { id: string; url: string; secret: string; types: TransparencyEventType[]; active: boolean; retryCount: number; maxRetries: number; lastSuccess?: string; lastFailure?: string; failureCount: number; } export interface EventStats { totalEvents: number; eventsLast24h: number; eventsByType: Record; eventsByPriority: Record; activeSubscriptions: number; activeWebhooks: number; averageEventsPerMinute: number; } type EventCallback = (event: TransparencyEvent) => void; class EventStream { private events: TransparencyEvent[] = []; private subscriptions: Map = new Map(); private webhooks: Map = new Map(); private sseConnections: Map = new Map(); private maxEvents: number = 10000; private dataDir: string; private configFile: string; constructor() { this.dataDir = path.join(process.cwd(), 'data'); this.configFile = path.join(this.dataDir, 'event-stream-config.json'); this.loadConfig(); } private loadConfig(): void { try { if (fs.existsSync(this.configFile)) { const data = JSON.parse(fs.readFileSync(this.configFile, 'utf-8')); if (data.webhooks) { data.webhooks.forEach((wh: WebhookConfig) => { this.webhooks.set(wh.id, wh); }); } console.log(`[EventStream] Loaded ${this.webhooks.size} webhook configurations`); } } catch (error) { console.error('[EventStream] Error loading config:', error); } } private saveConfig(): void { try { if (!fs.existsSync(this.dataDir)) { fs.mkdirSync(this.dataDir, { recursive: true }); } const config = { webhooks: Array.from(this.webhooks.values()), savedAt: new Date().toISOString() }; fs.writeFileSync(this.configFile, JSON.stringify(config, null, 2)); } catch (error) { console.error('[EventStream] Error saving config:', error); } } private generateId(): string { return `evt_${Date.now()}_${crypto.randomBytes(6).toString('hex')}`; } /** * Emit a new event */ emit( type: TransparencyEventType, source: string, data: Record, options: { priority?: EventPriority; metadata?: TransparencyEvent['metadata']; } = {} ): TransparencyEvent { const event: TransparencyEvent = { id: this.generateId(), type, priority: options.priority || 'NORMAL', timestamp: new Date().toISOString(), source, data, metadata: options.metadata }; // Store event this.events.push(event); // Trim old events if needed if (this.events.length > this.maxEvents) { this.events = this.events.slice(-this.maxEvents); } // Notify subscriptions this.notifySubscriptions(event); // Trigger webhooks this.triggerWebhooks(event); // Notify SSE connections this.notifySSE(event); return event; } /** * Subscribe to events */ subscribe( types: TransparencyEventType[], callback: EventCallback, options: { priorities?: EventPriority[]; filters?: EventSubscription['filters']; } = {} ): string { const id = `sub_${crypto.randomBytes(8).toString('hex')}`; const subscription: EventSubscription = { id, types, priorities: options.priorities, filters: options.filters, callback, createdAt: new Date().toISOString(), eventCount: 0 }; this.subscriptions.set(id, subscription); return id; } /** * Unsubscribe from events */ unsubscribe(subscriptionId: string): boolean { return this.subscriptions.delete(subscriptionId); } /** * Register an SSE connection */ registerSSE(connectionId: string, callback: EventCallback): void { this.sseConnections.set(connectionId, callback); } /** * Unregister an SSE connection */ unregisterSSE(connectionId: string): void { this.sseConnections.delete(connectionId); } /** * Register a webhook */ registerWebhook( url: string, types: TransparencyEventType[], secret?: string ): WebhookConfig { const id = `wh_${crypto.randomBytes(8).toString('hex')}`; const webhookSecret = secret || crypto.randomBytes(32).toString('hex'); const webhook: WebhookConfig = { id, url, secret: webhookSecret, types, active: true, retryCount: 0, maxRetries: 3, failureCount: 0 }; this.webhooks.set(id, webhook); this.saveConfig(); return webhook; } /** * Remove a webhook */ removeWebhook(webhookId: string): boolean { const result = this.webhooks.delete(webhookId); if (result) { this.saveConfig(); } return result; } /** * Update webhook status */ updateWebhook(webhookId: string, updates: Partial): WebhookConfig | null { const webhook = this.webhooks.get(webhookId); if (!webhook) return null; const updated = { ...webhook, ...updates }; this.webhooks.set(webhookId, updated); this.saveConfig(); return updated; } /** * Get all webhooks */ getWebhooks(): WebhookConfig[] { return Array.from(this.webhooks.values()); } private notifySubscriptions(event: TransparencyEvent): void { for (const [id, sub] of this.subscriptions) { if (this.matchesSubscription(event, sub)) { try { sub.callback?.(event); sub.lastEventAt = new Date().toISOString(); sub.eventCount++; } catch (error) { console.error(`[EventStream] Error in subscription ${id}:`, error); } } } } private matchesSubscription(event: TransparencyEvent, sub: EventSubscription): boolean { // Check type if (!sub.types.includes(event.type)) { return false; } // Check priority if (sub.priorities && sub.priorities.length > 0) { if (!sub.priorities.includes(event.priority)) { return false; } } // Check filters if (sub.filters) { if (sub.filters.source && event.source !== sub.filters.source) { return false; } if (sub.filters.userId && event.metadata?.userId !== sub.filters.userId) { return false; } if (sub.filters.plantId && event.data.plantId !== sub.filters.plantId) { return false; } if (sub.filters.region && event.data.region !== sub.filters.region) { return false; } } return true; } private notifySSE(event: TransparencyEvent): void { for (const [id, callback] of this.sseConnections) { try { callback(event); } catch (error) { console.error(`[EventStream] Error in SSE connection ${id}:`, error); this.sseConnections.delete(id); } } } private async triggerWebhooks(event: TransparencyEvent): Promise { for (const [id, webhook] of this.webhooks) { if (!webhook.active) continue; if (!webhook.types.includes(event.type)) continue; this.sendWebhook(webhook, event); } } private async sendWebhook(webhook: WebhookConfig, event: TransparencyEvent): Promise { try { const payload = JSON.stringify(event); const signature = crypto .createHmac('sha256', webhook.secret) .update(payload) .digest('hex'); const response = await fetch(webhook.url, { method: 'POST', headers: { 'Content-Type': 'application/json', 'X-LocalGreenChain-Signature': signature, 'X-LocalGreenChain-Event': event.type, 'X-LocalGreenChain-Timestamp': event.timestamp }, body: payload }); if (response.ok) { webhook.lastSuccess = new Date().toISOString(); webhook.failureCount = 0; webhook.retryCount = 0; } else { throw new Error(`HTTP ${response.status}`); } } catch (error) { webhook.lastFailure = new Date().toISOString(); webhook.failureCount++; if (webhook.retryCount < webhook.maxRetries) { webhook.retryCount++; // Exponential backoff retry setTimeout(() => this.sendWebhook(webhook, event), Math.pow(2, webhook.retryCount) * 1000); } else { // Disable webhook after max retries if (webhook.failureCount >= 10) { webhook.active = false; console.warn(`[EventStream] Webhook ${webhook.id} disabled after ${webhook.failureCount} failures`); } } } this.saveConfig(); } /** * Get recent events */ getRecent(limit: number = 50, types?: TransparencyEventType[]): TransparencyEvent[] { let events = [...this.events]; if (types && types.length > 0) { events = events.filter(e => types.includes(e.type)); } return events.slice(-limit).reverse(); } /** * Get events by time range */ getByTimeRange( startTime: string, endTime: string, options: { types?: TransparencyEventType[]; priorities?: EventPriority[]; limit?: number; } = {} ): TransparencyEvent[] { let events = this.events.filter(e => e.timestamp >= startTime && e.timestamp <= endTime ); if (options.types && options.types.length > 0) { events = events.filter(e => options.types!.includes(e.type)); } if (options.priorities && options.priorities.length > 0) { events = events.filter(e => options.priorities!.includes(e.priority)); } if (options.limit) { events = events.slice(-options.limit); } return events.reverse(); } /** * Get event statistics */ getStats(): EventStats { const now = new Date(); const day = 24 * 60 * 60 * 1000; const eventsByType: Record = {}; const eventsByPriority: Record = {}; let eventsLast24h = 0; for (const event of this.events) { eventsByType[event.type] = (eventsByType[event.type] || 0) + 1; eventsByPriority[event.priority] = (eventsByPriority[event.priority] || 0) + 1; if (now.getTime() - new Date(event.timestamp).getTime() <= day) { eventsLast24h++; } } const activeWebhooks = Array.from(this.webhooks.values()).filter(w => w.active).length; return { totalEvents: this.events.length, eventsLast24h, eventsByType, eventsByPriority: eventsByPriority as Record, activeSubscriptions: this.subscriptions.size, activeWebhooks, averageEventsPerMinute: eventsLast24h / (24 * 60) }; } /** * Format event for SSE */ formatSSE(event: TransparencyEvent): string { return `event: ${event.type}\nid: ${event.id}\ndata: ${JSON.stringify(event)}\n\n`; } /** * Get event types available for subscription */ getAvailableEventTypes(): TransparencyEventType[] { return [ 'plant.registered', 'plant.cloned', 'plant.transferred', 'plant.updated', 'transport.started', 'transport.completed', 'transport.verified', 'demand.created', 'demand.matched', 'supply.committed', 'farm.registered', 'farm.updated', 'batch.started', 'batch.harvested', 'agent.alert', 'agent.task_completed', 'agent.error', 'blockchain.block_added', 'blockchain.verified', 'blockchain.error', 'system.health', 'system.alert', 'system.metric', 'audit.logged', 'audit.anomaly' ]; } } // Singleton instance let eventStreamInstance: EventStream | null = null; export function getEventStream(): EventStream { if (!eventStreamInstance) { eventStreamInstance = new EventStream(); } return eventStreamInstance; } export default EventStream;