localgreenchain/lib/transparency/EventStream.ts
Claude 0fcc2763fe
Add comprehensive transparency system for LocalGreenChain
This commit introduces a complete transparency infrastructure including:

Core Transparency Modules:
- AuditLog: Immutable, cryptographically-linked audit trail for all actions
- EventStream: Real-time SSE streaming and webhook support
- TransparencyDashboard: Aggregated metrics and system health monitoring
- DigitalSignatures: Cryptographic verification for handoffs and certificates

API Endpoints:
- /api/transparency/dashboard - Full platform metrics
- /api/transparency/audit - Query and log audit entries
- /api/transparency/events - SSE stream and event history
- /api/transparency/webhooks - Webhook management
- /api/transparency/signatures - Digital signature operations
- /api/transparency/certificate/[plantId] - Plant authenticity certificates
- /api/transparency/export - Multi-format data export
- /api/transparency/report - Compliance reporting
- /api/transparency/health - System health checks

Features:
- Immutable audit logging with chain integrity verification
- Real-time event streaming via Server-Sent Events
- Webhook support with HMAC signature verification
- Digital signatures for transport handoffs and ownership transfers
- Certificate of Authenticity generation for plants
- Multi-format data export (JSON, CSV, summary)
- Public transparency portal at /transparency
- System health monitoring for all components

Documentation:
- Comprehensive TRANSPARENCY.md guide with API examples
2025-11-23 03:29:56 +00:00

505 lines
13 KiB
TypeScript

/**
* Real-Time Event Stream System for LocalGreenChain
*
* Provides Server-Sent Events (SSE) and webhook support for
* real-time transparency and notifications.
*/
import * as crypto from 'crypto';
import * as fs from 'fs';
import * as path from 'path';
// Event Types
export type TransparencyEventType =
| 'plant.registered' | 'plant.cloned' | 'plant.transferred' | 'plant.updated'
| 'transport.started' | 'transport.completed' | 'transport.verified'
| 'demand.created' | 'demand.matched' | 'supply.committed'
| 'farm.registered' | 'farm.updated' | 'batch.started' | 'batch.harvested'
| 'agent.alert' | 'agent.task_completed' | 'agent.error'
| 'blockchain.block_added' | 'blockchain.verified' | 'blockchain.error'
| 'system.health' | 'system.alert' | 'system.metric'
| 'audit.logged' | 'audit.anomaly';
export type EventPriority = 'LOW' | 'NORMAL' | 'HIGH' | 'CRITICAL';
export interface TransparencyEvent {
id: string;
type: TransparencyEventType;
priority: EventPriority;
timestamp: string;
source: string;
data: Record<string, any>;
metadata?: {
correlationId?: string;
causationId?: string;
userId?: string;
sessionId?: string;
};
}
export interface EventSubscription {
id: string;
types: TransparencyEventType[];
priorities?: EventPriority[];
filters?: {
source?: string;
userId?: string;
plantId?: string;
region?: string;
};
callback?: (event: TransparencyEvent) => void;
webhookUrl?: string;
createdAt: string;
lastEventAt?: string;
eventCount: number;
}
export interface WebhookConfig {
id: string;
url: string;
secret: string;
types: TransparencyEventType[];
active: boolean;
retryCount: number;
maxRetries: number;
lastSuccess?: string;
lastFailure?: string;
failureCount: number;
}
export interface EventStats {
totalEvents: number;
eventsLast24h: number;
eventsByType: Record<string, number>;
eventsByPriority: Record<EventPriority, number>;
activeSubscriptions: number;
activeWebhooks: number;
averageEventsPerMinute: number;
}
type EventCallback = (event: TransparencyEvent) => void;
class EventStream {
private events: TransparencyEvent[] = [];
private subscriptions: Map<string, EventSubscription> = new Map();
private webhooks: Map<string, WebhookConfig> = new Map();
private sseConnections: Map<string, EventCallback> = new Map();
private maxEvents: number = 10000;
private dataDir: string;
private configFile: string;
constructor() {
this.dataDir = path.join(process.cwd(), 'data');
this.configFile = path.join(this.dataDir, 'event-stream-config.json');
this.loadConfig();
}
private loadConfig(): void {
try {
if (fs.existsSync(this.configFile)) {
const data = JSON.parse(fs.readFileSync(this.configFile, 'utf-8'));
if (data.webhooks) {
data.webhooks.forEach((wh: WebhookConfig) => {
this.webhooks.set(wh.id, wh);
});
}
console.log(`[EventStream] Loaded ${this.webhooks.size} webhook configurations`);
}
} catch (error) {
console.error('[EventStream] Error loading config:', error);
}
}
private saveConfig(): void {
try {
if (!fs.existsSync(this.dataDir)) {
fs.mkdirSync(this.dataDir, { recursive: true });
}
const config = {
webhooks: Array.from(this.webhooks.values()),
savedAt: new Date().toISOString()
};
fs.writeFileSync(this.configFile, JSON.stringify(config, null, 2));
} catch (error) {
console.error('[EventStream] Error saving config:', error);
}
}
private generateId(): string {
return `evt_${Date.now()}_${crypto.randomBytes(6).toString('hex')}`;
}
/**
* Emit a new event
*/
emit(
type: TransparencyEventType,
source: string,
data: Record<string, any>,
options: {
priority?: EventPriority;
metadata?: TransparencyEvent['metadata'];
} = {}
): TransparencyEvent {
const event: TransparencyEvent = {
id: this.generateId(),
type,
priority: options.priority || 'NORMAL',
timestamp: new Date().toISOString(),
source,
data,
metadata: options.metadata
};
// Store event
this.events.push(event);
// Trim old events if needed
if (this.events.length > this.maxEvents) {
this.events = this.events.slice(-this.maxEvents);
}
// Notify subscriptions
this.notifySubscriptions(event);
// Trigger webhooks
this.triggerWebhooks(event);
// Notify SSE connections
this.notifySSE(event);
return event;
}
/**
* Subscribe to events
*/
subscribe(
types: TransparencyEventType[],
callback: EventCallback,
options: {
priorities?: EventPriority[];
filters?: EventSubscription['filters'];
} = {}
): string {
const id = `sub_${crypto.randomBytes(8).toString('hex')}`;
const subscription: EventSubscription = {
id,
types,
priorities: options.priorities,
filters: options.filters,
callback,
createdAt: new Date().toISOString(),
eventCount: 0
};
this.subscriptions.set(id, subscription);
return id;
}
/**
* Unsubscribe from events
*/
unsubscribe(subscriptionId: string): boolean {
return this.subscriptions.delete(subscriptionId);
}
/**
* Register an SSE connection
*/
registerSSE(connectionId: string, callback: EventCallback): void {
this.sseConnections.set(connectionId, callback);
}
/**
* Unregister an SSE connection
*/
unregisterSSE(connectionId: string): void {
this.sseConnections.delete(connectionId);
}
/**
* Register a webhook
*/
registerWebhook(
url: string,
types: TransparencyEventType[],
secret?: string
): WebhookConfig {
const id = `wh_${crypto.randomBytes(8).toString('hex')}`;
const webhookSecret = secret || crypto.randomBytes(32).toString('hex');
const webhook: WebhookConfig = {
id,
url,
secret: webhookSecret,
types,
active: true,
retryCount: 0,
maxRetries: 3,
failureCount: 0
};
this.webhooks.set(id, webhook);
this.saveConfig();
return webhook;
}
/**
* Remove a webhook
*/
removeWebhook(webhookId: string): boolean {
const result = this.webhooks.delete(webhookId);
if (result) {
this.saveConfig();
}
return result;
}
/**
* Update webhook status
*/
updateWebhook(webhookId: string, updates: Partial<WebhookConfig>): WebhookConfig | null {
const webhook = this.webhooks.get(webhookId);
if (!webhook) return null;
const updated = { ...webhook, ...updates };
this.webhooks.set(webhookId, updated);
this.saveConfig();
return updated;
}
/**
* Get all webhooks
*/
getWebhooks(): WebhookConfig[] {
return Array.from(this.webhooks.values());
}
private notifySubscriptions(event: TransparencyEvent): void {
for (const [id, sub] of this.subscriptions) {
if (this.matchesSubscription(event, sub)) {
try {
sub.callback?.(event);
sub.lastEventAt = new Date().toISOString();
sub.eventCount++;
} catch (error) {
console.error(`[EventStream] Error in subscription ${id}:`, error);
}
}
}
}
private matchesSubscription(event: TransparencyEvent, sub: EventSubscription): boolean {
// Check type
if (!sub.types.includes(event.type)) {
return false;
}
// Check priority
if (sub.priorities && sub.priorities.length > 0) {
if (!sub.priorities.includes(event.priority)) {
return false;
}
}
// Check filters
if (sub.filters) {
if (sub.filters.source && event.source !== sub.filters.source) {
return false;
}
if (sub.filters.userId && event.metadata?.userId !== sub.filters.userId) {
return false;
}
if (sub.filters.plantId && event.data.plantId !== sub.filters.plantId) {
return false;
}
if (sub.filters.region && event.data.region !== sub.filters.region) {
return false;
}
}
return true;
}
private notifySSE(event: TransparencyEvent): void {
for (const [id, callback] of this.sseConnections) {
try {
callback(event);
} catch (error) {
console.error(`[EventStream] Error in SSE connection ${id}:`, error);
this.sseConnections.delete(id);
}
}
}
private async triggerWebhooks(event: TransparencyEvent): Promise<void> {
for (const [id, webhook] of this.webhooks) {
if (!webhook.active) continue;
if (!webhook.types.includes(event.type)) continue;
this.sendWebhook(webhook, event);
}
}
private async sendWebhook(webhook: WebhookConfig, event: TransparencyEvent): Promise<void> {
try {
const payload = JSON.stringify(event);
const signature = crypto
.createHmac('sha256', webhook.secret)
.update(payload)
.digest('hex');
const response = await fetch(webhook.url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-LocalGreenChain-Signature': signature,
'X-LocalGreenChain-Event': event.type,
'X-LocalGreenChain-Timestamp': event.timestamp
},
body: payload
});
if (response.ok) {
webhook.lastSuccess = new Date().toISOString();
webhook.failureCount = 0;
webhook.retryCount = 0;
} else {
throw new Error(`HTTP ${response.status}`);
}
} catch (error) {
webhook.lastFailure = new Date().toISOString();
webhook.failureCount++;
if (webhook.retryCount < webhook.maxRetries) {
webhook.retryCount++;
// Exponential backoff retry
setTimeout(() => this.sendWebhook(webhook, event), Math.pow(2, webhook.retryCount) * 1000);
} else {
// Disable webhook after max retries
if (webhook.failureCount >= 10) {
webhook.active = false;
console.warn(`[EventStream] Webhook ${webhook.id} disabled after ${webhook.failureCount} failures`);
}
}
}
this.saveConfig();
}
/**
* Get recent events
*/
getRecent(limit: number = 50, types?: TransparencyEventType[]): TransparencyEvent[] {
let events = [...this.events];
if (types && types.length > 0) {
events = events.filter(e => types.includes(e.type));
}
return events.slice(-limit).reverse();
}
/**
* Get events by time range
*/
getByTimeRange(
startTime: string,
endTime: string,
options: {
types?: TransparencyEventType[];
priorities?: EventPriority[];
limit?: number;
} = {}
): TransparencyEvent[] {
let events = this.events.filter(e =>
e.timestamp >= startTime && e.timestamp <= endTime
);
if (options.types && options.types.length > 0) {
events = events.filter(e => options.types!.includes(e.type));
}
if (options.priorities && options.priorities.length > 0) {
events = events.filter(e => options.priorities!.includes(e.priority));
}
if (options.limit) {
events = events.slice(-options.limit);
}
return events.reverse();
}
/**
* Get event statistics
*/
getStats(): EventStats {
const now = new Date();
const day = 24 * 60 * 60 * 1000;
const eventsByType: Record<string, number> = {};
const eventsByPriority: Record<string, number> = {};
let eventsLast24h = 0;
for (const event of this.events) {
eventsByType[event.type] = (eventsByType[event.type] || 0) + 1;
eventsByPriority[event.priority] = (eventsByPriority[event.priority] || 0) + 1;
if (now.getTime() - new Date(event.timestamp).getTime() <= day) {
eventsLast24h++;
}
}
const activeWebhooks = Array.from(this.webhooks.values()).filter(w => w.active).length;
return {
totalEvents: this.events.length,
eventsLast24h,
eventsByType,
eventsByPriority: eventsByPriority as Record<EventPriority, number>,
activeSubscriptions: this.subscriptions.size,
activeWebhooks,
averageEventsPerMinute: eventsLast24h / (24 * 60)
};
}
/**
* Format event for SSE
*/
formatSSE(event: TransparencyEvent): string {
return `event: ${event.type}\nid: ${event.id}\ndata: ${JSON.stringify(event)}\n\n`;
}
/**
* Get event types available for subscription
*/
getAvailableEventTypes(): TransparencyEventType[] {
return [
'plant.registered', 'plant.cloned', 'plant.transferred', 'plant.updated',
'transport.started', 'transport.completed', 'transport.verified',
'demand.created', 'demand.matched', 'supply.committed',
'farm.registered', 'farm.updated', 'batch.started', 'batch.harvested',
'agent.alert', 'agent.task_completed', 'agent.error',
'blockchain.block_added', 'blockchain.verified', 'blockchain.error',
'system.health', 'system.alert', 'system.metric',
'audit.logged', 'audit.anomaly'
];
}
}
// Singleton instance
let eventStreamInstance: EventStream | null = null;
export function getEventStream(): EventStream {
if (!eventStreamInstance) {
eventStreamInstance = new EventStream();
}
return eventStreamInstance;
}
export default EventStream;