This commit introduces a complete transparency infrastructure including: Core Transparency Modules: - AuditLog: Immutable, cryptographically-linked audit trail for all actions - EventStream: Real-time SSE streaming and webhook support - TransparencyDashboard: Aggregated metrics and system health monitoring - DigitalSignatures: Cryptographic verification for handoffs and certificates API Endpoints: - /api/transparency/dashboard - Full platform metrics - /api/transparency/audit - Query and log audit entries - /api/transparency/events - SSE stream and event history - /api/transparency/webhooks - Webhook management - /api/transparency/signatures - Digital signature operations - /api/transparency/certificate/[plantId] - Plant authenticity certificates - /api/transparency/export - Multi-format data export - /api/transparency/report - Compliance reporting - /api/transparency/health - System health checks Features: - Immutable audit logging with chain integrity verification - Real-time event streaming via Server-Sent Events - Webhook support with HMAC signature verification - Digital signatures for transport handoffs and ownership transfers - Certificate of Authenticity generation for plants - Multi-format data export (JSON, CSV, summary) - Public transparency portal at /transparency - System health monitoring for all components Documentation: - Comprehensive TRANSPARENCY.md guide with API examples
505 lines
13 KiB
TypeScript
505 lines
13 KiB
TypeScript
/**
|
|
* Real-Time Event Stream System for LocalGreenChain
|
|
*
|
|
* Provides Server-Sent Events (SSE) and webhook support for
|
|
* real-time transparency and notifications.
|
|
*/
|
|
|
|
import * as crypto from 'crypto';
|
|
import * as fs from 'fs';
|
|
import * as path from 'path';
|
|
|
|
// Event Types
|
|
export type TransparencyEventType =
|
|
| 'plant.registered' | 'plant.cloned' | 'plant.transferred' | 'plant.updated'
|
|
| 'transport.started' | 'transport.completed' | 'transport.verified'
|
|
| 'demand.created' | 'demand.matched' | 'supply.committed'
|
|
| 'farm.registered' | 'farm.updated' | 'batch.started' | 'batch.harvested'
|
|
| 'agent.alert' | 'agent.task_completed' | 'agent.error'
|
|
| 'blockchain.block_added' | 'blockchain.verified' | 'blockchain.error'
|
|
| 'system.health' | 'system.alert' | 'system.metric'
|
|
| 'audit.logged' | 'audit.anomaly';
|
|
|
|
export type EventPriority = 'LOW' | 'NORMAL' | 'HIGH' | 'CRITICAL';
|
|
|
|
export interface TransparencyEvent {
|
|
id: string;
|
|
type: TransparencyEventType;
|
|
priority: EventPriority;
|
|
timestamp: string;
|
|
source: string;
|
|
data: Record<string, any>;
|
|
metadata?: {
|
|
correlationId?: string;
|
|
causationId?: string;
|
|
userId?: string;
|
|
sessionId?: string;
|
|
};
|
|
}
|
|
|
|
export interface EventSubscription {
|
|
id: string;
|
|
types: TransparencyEventType[];
|
|
priorities?: EventPriority[];
|
|
filters?: {
|
|
source?: string;
|
|
userId?: string;
|
|
plantId?: string;
|
|
region?: string;
|
|
};
|
|
callback?: (event: TransparencyEvent) => void;
|
|
webhookUrl?: string;
|
|
createdAt: string;
|
|
lastEventAt?: string;
|
|
eventCount: number;
|
|
}
|
|
|
|
export interface WebhookConfig {
|
|
id: string;
|
|
url: string;
|
|
secret: string;
|
|
types: TransparencyEventType[];
|
|
active: boolean;
|
|
retryCount: number;
|
|
maxRetries: number;
|
|
lastSuccess?: string;
|
|
lastFailure?: string;
|
|
failureCount: number;
|
|
}
|
|
|
|
export interface EventStats {
|
|
totalEvents: number;
|
|
eventsLast24h: number;
|
|
eventsByType: Record<string, number>;
|
|
eventsByPriority: Record<EventPriority, number>;
|
|
activeSubscriptions: number;
|
|
activeWebhooks: number;
|
|
averageEventsPerMinute: number;
|
|
}
|
|
|
|
type EventCallback = (event: TransparencyEvent) => void;
|
|
|
|
class EventStream {
|
|
private events: TransparencyEvent[] = [];
|
|
private subscriptions: Map<string, EventSubscription> = new Map();
|
|
private webhooks: Map<string, WebhookConfig> = new Map();
|
|
private sseConnections: Map<string, EventCallback> = new Map();
|
|
private maxEvents: number = 10000;
|
|
private dataDir: string;
|
|
private configFile: string;
|
|
|
|
constructor() {
|
|
this.dataDir = path.join(process.cwd(), 'data');
|
|
this.configFile = path.join(this.dataDir, 'event-stream-config.json');
|
|
this.loadConfig();
|
|
}
|
|
|
|
private loadConfig(): void {
|
|
try {
|
|
if (fs.existsSync(this.configFile)) {
|
|
const data = JSON.parse(fs.readFileSync(this.configFile, 'utf-8'));
|
|
if (data.webhooks) {
|
|
data.webhooks.forEach((wh: WebhookConfig) => {
|
|
this.webhooks.set(wh.id, wh);
|
|
});
|
|
}
|
|
console.log(`[EventStream] Loaded ${this.webhooks.size} webhook configurations`);
|
|
}
|
|
} catch (error) {
|
|
console.error('[EventStream] Error loading config:', error);
|
|
}
|
|
}
|
|
|
|
private saveConfig(): void {
|
|
try {
|
|
if (!fs.existsSync(this.dataDir)) {
|
|
fs.mkdirSync(this.dataDir, { recursive: true });
|
|
}
|
|
const config = {
|
|
webhooks: Array.from(this.webhooks.values()),
|
|
savedAt: new Date().toISOString()
|
|
};
|
|
fs.writeFileSync(this.configFile, JSON.stringify(config, null, 2));
|
|
} catch (error) {
|
|
console.error('[EventStream] Error saving config:', error);
|
|
}
|
|
}
|
|
|
|
private generateId(): string {
|
|
return `evt_${Date.now()}_${crypto.randomBytes(6).toString('hex')}`;
|
|
}
|
|
|
|
/**
|
|
* Emit a new event
|
|
*/
|
|
emit(
|
|
type: TransparencyEventType,
|
|
source: string,
|
|
data: Record<string, any>,
|
|
options: {
|
|
priority?: EventPriority;
|
|
metadata?: TransparencyEvent['metadata'];
|
|
} = {}
|
|
): TransparencyEvent {
|
|
const event: TransparencyEvent = {
|
|
id: this.generateId(),
|
|
type,
|
|
priority: options.priority || 'NORMAL',
|
|
timestamp: new Date().toISOString(),
|
|
source,
|
|
data,
|
|
metadata: options.metadata
|
|
};
|
|
|
|
// Store event
|
|
this.events.push(event);
|
|
|
|
// Trim old events if needed
|
|
if (this.events.length > this.maxEvents) {
|
|
this.events = this.events.slice(-this.maxEvents);
|
|
}
|
|
|
|
// Notify subscriptions
|
|
this.notifySubscriptions(event);
|
|
|
|
// Trigger webhooks
|
|
this.triggerWebhooks(event);
|
|
|
|
// Notify SSE connections
|
|
this.notifySSE(event);
|
|
|
|
return event;
|
|
}
|
|
|
|
/**
|
|
* Subscribe to events
|
|
*/
|
|
subscribe(
|
|
types: TransparencyEventType[],
|
|
callback: EventCallback,
|
|
options: {
|
|
priorities?: EventPriority[];
|
|
filters?: EventSubscription['filters'];
|
|
} = {}
|
|
): string {
|
|
const id = `sub_${crypto.randomBytes(8).toString('hex')}`;
|
|
|
|
const subscription: EventSubscription = {
|
|
id,
|
|
types,
|
|
priorities: options.priorities,
|
|
filters: options.filters,
|
|
callback,
|
|
createdAt: new Date().toISOString(),
|
|
eventCount: 0
|
|
};
|
|
|
|
this.subscriptions.set(id, subscription);
|
|
return id;
|
|
}
|
|
|
|
/**
|
|
* Unsubscribe from events
|
|
*/
|
|
unsubscribe(subscriptionId: string): boolean {
|
|
return this.subscriptions.delete(subscriptionId);
|
|
}
|
|
|
|
/**
|
|
* Register an SSE connection
|
|
*/
|
|
registerSSE(connectionId: string, callback: EventCallback): void {
|
|
this.sseConnections.set(connectionId, callback);
|
|
}
|
|
|
|
/**
|
|
* Unregister an SSE connection
|
|
*/
|
|
unregisterSSE(connectionId: string): void {
|
|
this.sseConnections.delete(connectionId);
|
|
}
|
|
|
|
/**
|
|
* Register a webhook
|
|
*/
|
|
registerWebhook(
|
|
url: string,
|
|
types: TransparencyEventType[],
|
|
secret?: string
|
|
): WebhookConfig {
|
|
const id = `wh_${crypto.randomBytes(8).toString('hex')}`;
|
|
const webhookSecret = secret || crypto.randomBytes(32).toString('hex');
|
|
|
|
const webhook: WebhookConfig = {
|
|
id,
|
|
url,
|
|
secret: webhookSecret,
|
|
types,
|
|
active: true,
|
|
retryCount: 0,
|
|
maxRetries: 3,
|
|
failureCount: 0
|
|
};
|
|
|
|
this.webhooks.set(id, webhook);
|
|
this.saveConfig();
|
|
|
|
return webhook;
|
|
}
|
|
|
|
/**
|
|
* Remove a webhook
|
|
*/
|
|
removeWebhook(webhookId: string): boolean {
|
|
const result = this.webhooks.delete(webhookId);
|
|
if (result) {
|
|
this.saveConfig();
|
|
}
|
|
return result;
|
|
}
|
|
|
|
/**
|
|
* Update webhook status
|
|
*/
|
|
updateWebhook(webhookId: string, updates: Partial<WebhookConfig>): WebhookConfig | null {
|
|
const webhook = this.webhooks.get(webhookId);
|
|
if (!webhook) return null;
|
|
|
|
const updated = { ...webhook, ...updates };
|
|
this.webhooks.set(webhookId, updated);
|
|
this.saveConfig();
|
|
|
|
return updated;
|
|
}
|
|
|
|
/**
|
|
* Get all webhooks
|
|
*/
|
|
getWebhooks(): WebhookConfig[] {
|
|
return Array.from(this.webhooks.values());
|
|
}
|
|
|
|
private notifySubscriptions(event: TransparencyEvent): void {
|
|
for (const [id, sub] of this.subscriptions) {
|
|
if (this.matchesSubscription(event, sub)) {
|
|
try {
|
|
sub.callback?.(event);
|
|
sub.lastEventAt = new Date().toISOString();
|
|
sub.eventCount++;
|
|
} catch (error) {
|
|
console.error(`[EventStream] Error in subscription ${id}:`, error);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private matchesSubscription(event: TransparencyEvent, sub: EventSubscription): boolean {
|
|
// Check type
|
|
if (!sub.types.includes(event.type)) {
|
|
return false;
|
|
}
|
|
|
|
// Check priority
|
|
if (sub.priorities && sub.priorities.length > 0) {
|
|
if (!sub.priorities.includes(event.priority)) {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
// Check filters
|
|
if (sub.filters) {
|
|
if (sub.filters.source && event.source !== sub.filters.source) {
|
|
return false;
|
|
}
|
|
if (sub.filters.userId && event.metadata?.userId !== sub.filters.userId) {
|
|
return false;
|
|
}
|
|
if (sub.filters.plantId && event.data.plantId !== sub.filters.plantId) {
|
|
return false;
|
|
}
|
|
if (sub.filters.region && event.data.region !== sub.filters.region) {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
private notifySSE(event: TransparencyEvent): void {
|
|
for (const [id, callback] of this.sseConnections) {
|
|
try {
|
|
callback(event);
|
|
} catch (error) {
|
|
console.error(`[EventStream] Error in SSE connection ${id}:`, error);
|
|
this.sseConnections.delete(id);
|
|
}
|
|
}
|
|
}
|
|
|
|
private async triggerWebhooks(event: TransparencyEvent): Promise<void> {
|
|
for (const [id, webhook] of this.webhooks) {
|
|
if (!webhook.active) continue;
|
|
if (!webhook.types.includes(event.type)) continue;
|
|
|
|
this.sendWebhook(webhook, event);
|
|
}
|
|
}
|
|
|
|
private async sendWebhook(webhook: WebhookConfig, event: TransparencyEvent): Promise<void> {
|
|
try {
|
|
const payload = JSON.stringify(event);
|
|
const signature = crypto
|
|
.createHmac('sha256', webhook.secret)
|
|
.update(payload)
|
|
.digest('hex');
|
|
|
|
const response = await fetch(webhook.url, {
|
|
method: 'POST',
|
|
headers: {
|
|
'Content-Type': 'application/json',
|
|
'X-LocalGreenChain-Signature': signature,
|
|
'X-LocalGreenChain-Event': event.type,
|
|
'X-LocalGreenChain-Timestamp': event.timestamp
|
|
},
|
|
body: payload
|
|
});
|
|
|
|
if (response.ok) {
|
|
webhook.lastSuccess = new Date().toISOString();
|
|
webhook.failureCount = 0;
|
|
webhook.retryCount = 0;
|
|
} else {
|
|
throw new Error(`HTTP ${response.status}`);
|
|
}
|
|
} catch (error) {
|
|
webhook.lastFailure = new Date().toISOString();
|
|
webhook.failureCount++;
|
|
|
|
if (webhook.retryCount < webhook.maxRetries) {
|
|
webhook.retryCount++;
|
|
// Exponential backoff retry
|
|
setTimeout(() => this.sendWebhook(webhook, event), Math.pow(2, webhook.retryCount) * 1000);
|
|
} else {
|
|
// Disable webhook after max retries
|
|
if (webhook.failureCount >= 10) {
|
|
webhook.active = false;
|
|
console.warn(`[EventStream] Webhook ${webhook.id} disabled after ${webhook.failureCount} failures`);
|
|
}
|
|
}
|
|
}
|
|
|
|
this.saveConfig();
|
|
}
|
|
|
|
/**
|
|
* Get recent events
|
|
*/
|
|
getRecent(limit: number = 50, types?: TransparencyEventType[]): TransparencyEvent[] {
|
|
let events = [...this.events];
|
|
|
|
if (types && types.length > 0) {
|
|
events = events.filter(e => types.includes(e.type));
|
|
}
|
|
|
|
return events.slice(-limit).reverse();
|
|
}
|
|
|
|
/**
|
|
* Get events by time range
|
|
*/
|
|
getByTimeRange(
|
|
startTime: string,
|
|
endTime: string,
|
|
options: {
|
|
types?: TransparencyEventType[];
|
|
priorities?: EventPriority[];
|
|
limit?: number;
|
|
} = {}
|
|
): TransparencyEvent[] {
|
|
let events = this.events.filter(e =>
|
|
e.timestamp >= startTime && e.timestamp <= endTime
|
|
);
|
|
|
|
if (options.types && options.types.length > 0) {
|
|
events = events.filter(e => options.types!.includes(e.type));
|
|
}
|
|
|
|
if (options.priorities && options.priorities.length > 0) {
|
|
events = events.filter(e => options.priorities!.includes(e.priority));
|
|
}
|
|
|
|
if (options.limit) {
|
|
events = events.slice(-options.limit);
|
|
}
|
|
|
|
return events.reverse();
|
|
}
|
|
|
|
/**
|
|
* Get event statistics
|
|
*/
|
|
getStats(): EventStats {
|
|
const now = new Date();
|
|
const day = 24 * 60 * 60 * 1000;
|
|
|
|
const eventsByType: Record<string, number> = {};
|
|
const eventsByPriority: Record<string, number> = {};
|
|
let eventsLast24h = 0;
|
|
|
|
for (const event of this.events) {
|
|
eventsByType[event.type] = (eventsByType[event.type] || 0) + 1;
|
|
eventsByPriority[event.priority] = (eventsByPriority[event.priority] || 0) + 1;
|
|
|
|
if (now.getTime() - new Date(event.timestamp).getTime() <= day) {
|
|
eventsLast24h++;
|
|
}
|
|
}
|
|
|
|
const activeWebhooks = Array.from(this.webhooks.values()).filter(w => w.active).length;
|
|
|
|
return {
|
|
totalEvents: this.events.length,
|
|
eventsLast24h,
|
|
eventsByType,
|
|
eventsByPriority: eventsByPriority as Record<EventPriority, number>,
|
|
activeSubscriptions: this.subscriptions.size,
|
|
activeWebhooks,
|
|
averageEventsPerMinute: eventsLast24h / (24 * 60)
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Format event for SSE
|
|
*/
|
|
formatSSE(event: TransparencyEvent): string {
|
|
return `event: ${event.type}\nid: ${event.id}\ndata: ${JSON.stringify(event)}\n\n`;
|
|
}
|
|
|
|
/**
|
|
* Get event types available for subscription
|
|
*/
|
|
getAvailableEventTypes(): TransparencyEventType[] {
|
|
return [
|
|
'plant.registered', 'plant.cloned', 'plant.transferred', 'plant.updated',
|
|
'transport.started', 'transport.completed', 'transport.verified',
|
|
'demand.created', 'demand.matched', 'supply.committed',
|
|
'farm.registered', 'farm.updated', 'batch.started', 'batch.harvested',
|
|
'agent.alert', 'agent.task_completed', 'agent.error',
|
|
'blockchain.block_added', 'blockchain.verified', 'blockchain.error',
|
|
'system.health', 'system.alert', 'system.metric',
|
|
'audit.logged', 'audit.anomaly'
|
|
];
|
|
}
|
|
}
|
|
|
|
// Singleton instance
|
|
let eventStreamInstance: EventStream | null = null;
|
|
|
|
export function getEventStream(): EventStream {
|
|
if (!eventStreamInstance) {
|
|
eventStreamInstance = new EventStream();
|
|
}
|
|
return eventStreamInstance;
|
|
}
|
|
|
|
export default EventStream;
|