/** * Socket.io Server for LocalGreenChain * * Provides real-time WebSocket communication with automatic * fallback to SSE for environments that don't support WebSockets. */ import { Server as SocketIOServer, Socket } from 'socket.io'; import type { Server as HTTPServer } from 'http'; import type { ClientToServerEvents, ServerToClientEvents, InterServerEvents, SocketData, RoomType, TransparencyEventType, TransparencyEvent, } from './types'; import { getEventStream } from '../transparency/EventStream'; import { getEventRooms, isValidRoom, canJoinRoom, getDefaultRooms } from './rooms'; import * as crypto from 'crypto'; type TypedSocket = Socket; /** * Socket.io server configuration */ interface SocketServerConfig { cors?: { origin: string | string[]; credentials?: boolean; }; pingTimeout?: number; pingInterval?: number; } /** * Socket.io server wrapper for LocalGreenChain */ class RealtimeSocketServer { private io: SocketIOServer | null = null; private eventStreamSubscriptionId: string | null = null; private connectedClients: Map = new Map(); private heartbeatInterval: NodeJS.Timeout | null = null; /** * Initialize the Socket.io server */ initialize(httpServer: HTTPServer, config: SocketServerConfig = {}): void { if (this.io) { console.log('[RealtimeSocketServer] Already initialized'); return; } this.io = new SocketIOServer(httpServer, { path: '/api/socket', cors: config.cors || { origin: process.env.NEXT_PUBLIC_APP_URL || 'http://localhost:3001', credentials: true, }, pingTimeout: config.pingTimeout || 60000, pingInterval: config.pingInterval || 25000, transports: ['websocket', 'polling'], }); this.setupMiddleware(); this.setupEventHandlers(); this.subscribeToEventStream(); this.startHeartbeat(); console.log('[RealtimeSocketServer] Initialized'); } /** * Set up authentication and rate limiting middleware */ private setupMiddleware(): void { if (!this.io) return; // Authentication middleware this.io.use((socket, next) => { const auth = socket.handshake.auth; // Generate session ID if not provided const sessionId = auth.sessionId || `sess_${crypto.randomBytes(8).toString('hex')}`; // Attach data to socket socket.data = { userId: auth.userId, sessionId, connectedAt: Date.now(), rooms: new Set(), subscribedTypes: new Set(), }; next(); }); } /** * Set up socket event handlers */ private setupEventHandlers(): void { if (!this.io) return; this.io.on('connection', (socket: TypedSocket) => { console.log(`[RealtimeSocketServer] Client connected: ${socket.id}`); // Store connected client this.connectedClients.set(socket.id, socket); // Join default rooms const defaultRooms = getDefaultRooms(socket.data.userId); defaultRooms.forEach((room) => { socket.join(room); socket.data.rooms.add(room); }); // Send connection established event socket.emit('connection:established', { socketId: socket.id, serverTime: Date.now(), }); // Handle room join socket.on('room:join', (room, callback) => { if (!isValidRoom(room)) { callback?.(false); return; } if (!canJoinRoom(Array.from(socket.data.rooms), room)) { callback?.(false); return; } socket.join(room); socket.data.rooms.add(room); socket.emit('room:joined', room); callback?.(true); }); // Handle room leave socket.on('room:leave', (room, callback) => { socket.leave(room); socket.data.rooms.delete(room); socket.emit('room:left', room); callback?.(true); }); // Handle type subscriptions socket.on('subscribe:types', (types, callback) => { types.forEach((type) => socket.data.subscribedTypes.add(type)); callback?.(true); }); // Handle type unsubscriptions socket.on('unsubscribe:types', (types, callback) => { types.forEach((type) => socket.data.subscribedTypes.delete(type)); callback?.(true); }); // Handle ping for latency measurement socket.on('ping', (callback) => { callback(Date.now()); }); // Handle recent events request socket.on('events:recent', (limit, callback) => { const eventStream = getEventStream(); const events = eventStream.getRecent(Math.min(limit, 100)); callback(events); }); // Handle disconnect socket.on('disconnect', (reason) => { console.log(`[RealtimeSocketServer] Client disconnected: ${socket.id} (${reason})`); this.connectedClients.delete(socket.id); }); }); } /** * Subscribe to the EventStream for broadcasting events */ private subscribeToEventStream(): void { const eventStream = getEventStream(); // Subscribe to all event types this.eventStreamSubscriptionId = eventStream.subscribe( eventStream.getAvailableEventTypes(), (event) => { this.broadcastEvent(event); } ); console.log('[RealtimeSocketServer] Subscribed to EventStream'); } /** * Broadcast an event to appropriate rooms */ private broadcastEvent(event: TransparencyEvent): void { if (!this.io) return; // Get rooms that should receive this event const rooms = getEventRooms(event.type, event.data); // Emit to each room rooms.forEach((room) => { this.io!.to(room).emit('event', event); }); } /** * Start heartbeat to keep connections alive */ private startHeartbeat(): void { if (this.heartbeatInterval) { clearInterval(this.heartbeatInterval); } this.heartbeatInterval = setInterval(() => { if (this.io) { this.io.emit('system:heartbeat', Date.now()); } }, 30000); // Every 30 seconds } /** * Emit an event to specific rooms */ emitToRooms(rooms: RoomType[], eventName: keyof ServerToClientEvents, data: unknown): void { if (!this.io) return; rooms.forEach((room) => { (this.io!.to(room) as any).emit(eventName, data); }); } /** * Emit an event to a specific user */ emitToUser(userId: string, eventName: keyof ServerToClientEvents, data: unknown): void { this.emitToRooms([`user:${userId}` as RoomType], eventName, data); } /** * Emit a system message to all connected clients */ emitSystemMessage(type: 'info' | 'warning' | 'error', text: string): void { if (!this.io) return; this.io.emit('system:message', { type, text }); } /** * Get connected client count */ getConnectedCount(): number { return this.connectedClients.size; } /** * Get all connected socket IDs */ getConnectedSockets(): string[] { return Array.from(this.connectedClients.keys()); } /** * Get server stats */ getStats(): { connectedClients: number; rooms: string[]; uptime: number; } { return { connectedClients: this.connectedClients.size, rooms: this.io ? Array.from(this.io.sockets.adapter.rooms.keys()) : [], uptime: process.uptime(), }; } /** * Shutdown the server gracefully */ async shutdown(): Promise { if (this.heartbeatInterval) { clearInterval(this.heartbeatInterval); } if (this.eventStreamSubscriptionId) { const eventStream = getEventStream(); eventStream.unsubscribe(this.eventStreamSubscriptionId); } if (this.io) { // Notify all clients this.io.emit('system:message', { type: 'warning', text: 'Server is shutting down', }); // Disconnect all clients this.io.disconnectSockets(true); // Close server await new Promise((resolve) => { this.io!.close(() => { console.log('[RealtimeSocketServer] Shutdown complete'); resolve(); }); }); this.io = null; } } /** * Get the Socket.io server instance */ getIO(): SocketIOServer | null { return this.io; } } // Singleton instance let socketServerInstance: RealtimeSocketServer | null = null; /** * Get the singleton socket server instance */ export function getSocketServer(): RealtimeSocketServer { if (!socketServerInstance) { socketServerInstance = new RealtimeSocketServer(); } return socketServerInstance; } export { RealtimeSocketServer }; export default RealtimeSocketServer;