BLOG POSTS
    MangoHost Blog / Building Real-Time Apps with Node.js Server-Sent Events
Building Real-Time Apps with Node.js Server-Sent Events

Building Real-Time Apps with Node.js Server-Sent Events

Server-Sent Events (SSE) in Node.js provide a straightforward way to push real-time data from server to client without the complexity of WebSockets. Unlike traditional polling or complex messaging protocols, SSE creates a persistent HTTP connection that streams data to browsers using the native EventSource API. This tutorial walks through building production-ready real-time applications using SSE, covering implementation patterns, performance optimization, troubleshooting common issues, and comparing SSE with alternative solutions.

How Server-Sent Events Work

SSE operates over standard HTTP connections using the text/event-stream content type. The server keeps the connection open and periodically sends formatted data chunks to connected clients. Each message follows a specific format with optional fields for event type, data payload, and unique identifiers.

The protocol uses simple text-based formatting:

data: Hello World\n\n

event: userUpdate
data: {"userId": 123, "status": "online"}
id: 1001
retry: 3000

data: Multi-line messages
data: work by sending multiple
data: data fields together

Browsers automatically handle connection management, including reconnection with exponential backoff when connections drop. The EventSource API provides event handlers for message reception, connection opening, and error handling.

Step-by-Step Implementation Guide

Here’s a complete implementation starting with a basic Express.js server setup:

const express = require('express');
const cors = require('cors');
const app = express();

// Enable CORS for SSE endpoints
app.use(cors({
    origin: '*',
    credentials: true
}));

// Store active connections
const connections = new Set();

// SSE endpoint
app.get('/events', (req, res) => {
    // Set SSE headers
    res.writeHead(200, {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache',
        'Connection': 'keep-alive',
        'Access-Control-Allow-Origin': '*',
        'Access-Control-Allow-Headers': 'Cache-Control'
    });

    // Send initial connection confirmation
    res.write('data: Connected to event stream\n\n');

    // Store connection for broadcasting
    connections.add(res);

    // Handle client disconnect
    req.on('close', () => {
        connections.delete(res);
        console.log('Client disconnected');
    });

    // Keep connection alive with periodic heartbeats
    const heartbeat = setInterval(() => {
        res.write(': heartbeat\n\n');
    }, 30000);

    req.on('close', () => {
        clearInterval(heartbeat);
    });
});

// Broadcast function for sending data to all connected clients
function broadcast(event, data) {
    const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
    
    connections.forEach(res => {
        try {
            res.write(message);
        } catch (error) {
            // Remove broken connections
            connections.delete(res);
        }
    });
}

// Example API endpoint that triggers broadcasts
app.post('/api/notify', express.json(), (req, res) => {
    const { message, type } = req.body;
    
    broadcast(type || 'notification', {
        message,
        timestamp: new Date().toISOString()
    });
    
    res.json({ success: true, clients: connections.size });
});

app.listen(3000, () => {
    console.log('SSE server running on port 3000');
});

Client-side implementation using the EventSource API:

// Basic EventSource setup
const eventSource = new EventSource('http://localhost:3000/events');

// Handle different event types
eventSource.addEventListener('notification', (event) => {
    const data = JSON.parse(event.data);
    console.log('Notification:', data.message);
    updateUI(data);
});

eventSource.addEventListener('userUpdate', (event) => {
    const userData = JSON.parse(event.data);
    updateUserStatus(userData);
});

// Connection management
eventSource.onopen = () => {
    console.log('Connected to event stream');
    setConnectionStatus('connected');
};

eventSource.onerror = (error) => {
    console.error('EventSource error:', error);
    setConnectionStatus('disconnected');
};

// Custom reconnection logic with authentication
function createAuthenticatedEventSource(token) {
    const eventSource = new EventSource(`/events?token=${token}`);
    
    eventSource.onerror = () => {
        if (eventSource.readyState === EventSource.CLOSED) {
            // Attempt to refresh token and reconnect
            setTimeout(() => {
                refreshAuthToken().then(newToken => {
                    createAuthenticatedEventSource(newToken);
                });
            }, 5000);
        }
    };
    
    return eventSource;
}

Real-World Examples and Use Cases

SSE excels in scenarios requiring one-way server-to-client communication. Here are production-ready implementations for common use cases:

Live Dashboard with System Metrics

const os = require('os');
const fs = require('fs');

// System monitoring class
class SystemMonitor {
    constructor() {
        this.connections = new Set();
        this.startMonitoring();
    }

    addConnection(res) {
        this.connections.add(res);
    }

    removeConnection(res) {
        this.connections.delete(res);
    }

    startMonitoring() {
        setInterval(() => {
            const metrics = {
                cpu: this.getCPUUsage(),
                memory: this.getMemoryUsage(),
                uptime: os.uptime(),
                timestamp: Date.now()
            };

            this.broadcast('metrics', metrics);
        }, 2000);
    }

    getCPUUsage() {
        const cpus = os.cpus();
        let totalIdle = 0;
        let totalTick = 0;

        cpus.forEach(cpu => {
            for (let type in cpu.times) {
                totalTick += cpu.times[type];
            }
            totalIdle += cpu.times.idle;
        });

        return 100 - ~~(100 * totalIdle / totalTick);
    }

    getMemoryUsage() {
        const total = os.totalmem();
        const free = os.freemem();
        return {
            total,
            used: total - free,
            percentage: Math.round(((total - free) / total) * 100)
        };
    }

    broadcast(event, data) {
        const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
        
        this.connections.forEach(connection => {
            try {
                connection.write(message);
            } catch (error) {
                this.connections.delete(connection);
            }
        });
    }
}

const monitor = new SystemMonitor();

app.get('/metrics', (req, res) => {
    res.writeHead(200, {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache',
        'Connection': 'keep-alive'
    });

    monitor.addConnection(res);

    req.on('close', () => {
        monitor.removeConnection(res);
    });
});

Chat Application with Message Broadcasting

class ChatRoom {
    constructor(roomId) {
        this.roomId = roomId;
        this.connections = new Map(); // userId -> response object
        this.messageHistory = [];
        this.maxHistorySize = 100;
    }

    addUser(userId, res) {
        this.connections.set(userId, res);
        
        // Send recent message history
        this.messageHistory.slice(-10).forEach(msg => {
            this.sendToUser(userId, 'message', msg);
        });

        // Notify others about user joining
        this.broadcast('userJoined', {
            userId,
            timestamp: new Date().toISOString(),
            activeUsers: Array.from(this.connections.keys())
        }, userId);
    }

    removeUser(userId) {
        this.connections.delete(userId);
        
        this.broadcast('userLeft', {
            userId,
            timestamp: new Date().toISOString(),
            activeUsers: Array.from(this.connections.keys())
        });
    }

    sendMessage(fromUserId, message) {
        const messageData = {
            id: Date.now(),
            fromUserId,
            message,
            timestamp: new Date().toISOString()
        };

        // Store in history
        this.messageHistory.push(messageData);
        if (this.messageHistory.length > this.maxHistorySize) {
            this.messageHistory.shift();
        }

        // Broadcast to all users
        this.broadcast('message', messageData);
    }

    sendToUser(userId, event, data) {
        const connection = this.connections.get(userId);
        if (connection) {
            try {
                connection.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`);
            } catch (error) {
                this.connections.delete(userId);
            }
        }
    }

    broadcast(event, data, excludeUserId = null) {
        this.connections.forEach((connection, userId) => {
            if (userId !== excludeUserId) {
                this.sendToUser(userId, event, data);
            }
        });
    }
}

const chatRooms = new Map();

app.get('/chat/:roomId', (req, res) => {
    const { roomId } = req.params;
    const userId = req.query.userId;

    if (!userId) {
        return res.status(400).json({ error: 'userId required' });
    }

    res.writeHead(200, {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache',
        'Connection': 'keep-alive'
    });

    // Get or create chat room
    if (!chatRooms.has(roomId)) {
        chatRooms.set(roomId, new ChatRoom(roomId));
    }

    const room = chatRooms.get(roomId);
    room.addUser(userId, res);

    req.on('close', () => {
        room.removeUser(userId);
        
        // Clean up empty rooms
        if (room.connections.size === 0) {
            chatRooms.delete(roomId);
        }
    });
});

app.post('/chat/:roomId/message', express.json(), (req, res) => {
    const { roomId } = req.params;
    const { userId, message } = req.body;

    const room = chatRooms.get(roomId);
    if (room) {
        room.sendMessage(userId, message);
        res.json({ success: true });
    } else {
        res.status(404).json({ error: 'Room not found' });
    }
});

Comparison with Alternative Solutions

Feature Server-Sent Events WebSockets Long Polling Socket.IO
Connection Type HTTP (unidirectional) TCP (bidirectional) HTTP (request/response) Multiple transports
Browser Support 95%+ (IE/Edge 12+) 97%+ (IE 10+) 100% 99%+ (with fallbacks)
Implementation Complexity Low Medium Low Low (abstracted)
Server Resource Usage Low-Medium Medium High (frequent requests) Medium-High
Reconnection Handling Automatic Manual implementation Not applicable Automatic
Proxy/Firewall Friendly High (standard HTTP) Medium (some issues) High High (fallback support)
Client-to-Server Messaging Separate HTTP requests Built-in Standard HTTP Built-in

Performance comparison based on 1000 concurrent connections:

Metric SSE WebSockets Long Polling
Memory Usage (MB) 45-60 55-75 80-120
CPU Usage (%) 8-12 10-15 25-40
Network Overhead Low Very Low High
Latency (ms) 50-100 10-30 200-500

Best Practices and Performance Optimization

Connection management is crucial for scalable SSE implementations. Implement proper cleanup and resource monitoring:

class SSEConnectionManager {
    constructor(options = {}) {
        this.connections = new Map();
        this.maxConnections = options.maxConnections || 10000;
        this.heartbeatInterval = options.heartbeatInterval || 30000;
        this.connectionTimeout = options.connectionTimeout || 300000; // 5 minutes
        
        this.startCleanupJob();
    }

    addConnection(id, res, metadata = {}) {
        if (this.connections.size >= this.maxConnections) {
            throw new Error('Maximum connections reached');
        }

        const connection = {
            id,
            res,
            metadata,
            lastSeen: Date.now(),
            created: Date.now()
        };

        this.connections.set(id, connection);
        this.setupHeartbeat(connection);
        
        return connection;
    }

    setupHeartbeat(connection) {
        const heartbeat = setInterval(() => {
            try {
                connection.res.write(': heartbeat\n\n');
                connection.lastSeen = Date.now();
            } catch (error) {
                this.removeConnection(connection.id);
                clearInterval(heartbeat);
            }
        }, this.heartbeatInterval);

        connection.heartbeat = heartbeat;
    }

    removeConnection(id) {
        const connection = this.connections.get(id);
        if (connection) {
            clearInterval(connection.heartbeat);
            this.connections.delete(id);
        }
    }

    broadcast(event, data, filter = null) {
        const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
        let sent = 0;
        let failed = 0;

        this.connections.forEach((connection, id) => {
            if (filter && !filter(connection)) {
                return;
            }

            try {
                connection.res.write(message);
                connection.lastSeen = Date.now();
                sent++;
            } catch (error) {
                this.removeConnection(id);
                failed++;
            }
        });

        return { sent, failed, total: this.connections.size };
    }

    startCleanupJob() {
        setInterval(() => {
            const now = Date.now();
            const toRemove = [];

            this.connections.forEach((connection, id) => {
                if (now - connection.lastSeen > this.connectionTimeout) {
                    toRemove.push(id);
                }
            });

            toRemove.forEach(id => this.removeConnection(id));
            
            if (toRemove.length > 0) {
                console.log(`Cleaned up ${toRemove.length} stale connections`);
            }
        }, 60000); // Run every minute
    }

    getStats() {
        const now = Date.now();
        let totalAge = 0;

        this.connections.forEach(connection => {
            totalAge += now - connection.created;
        });

        return {
            totalConnections: this.connections.size,
            averageAge: this.connections.size > 0 ? totalAge / this.connections.size : 0,
            maxConnections: this.maxConnections
        };
    }
}

Implement message queuing for reliability during connection drops:

class MessageQueue {
    constructor(maxSize = 1000) {
        this.queues = new Map(); // connectionId -> messages[]
        this.maxSize = maxSize;
    }

    enqueue(connectionId, message) {
        if (!this.queues.has(connectionId)) {
            this.queues.set(connectionId, []);
        }

        const queue = this.queues.get(connectionId);
        queue.push({
            ...message,
            timestamp: Date.now()
        });

        // Maintain queue size
        if (queue.length > this.maxSize) {
            queue.shift();
        }
    }

    dequeue(connectionId, count = 10) {
        const queue = this.queues.get(connectionId);
        if (!queue || queue.length === 0) {
            return [];
        }

        return queue.splice(0, count);
    }

    flush(connectionId) {
        this.queues.delete(connectionId);
    }

    getQueueSize(connectionId) {
        const queue = this.queues.get(connectionId);
        return queue ? queue.length : 0;
    }
}

// Integration with connection manager
const messageQueue = new MessageQueue();
const connectionManager = new SSEConnectionManager();

app.get('/events/:userId', (req, res) => {
    const userId = req.params.userId;
    
    res.writeHead(200, {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache',
        'Connection': 'keep-alive'
    });

    try {
        connectionManager.addConnection(userId, res);
        
        // Send queued messages
        const queuedMessages = messageQueue.dequeue(userId);
        queuedMessages.forEach(msg => {
            res.write(`event: ${msg.event}\ndata: ${JSON.stringify(msg.data)}\n\n`);
        });

    } catch (error) {
        res.status(503).json({ error: 'Service unavailable' });
        return;
    }

    req.on('close', () => {
        connectionManager.removeConnection(userId);
    });
});

Common Issues and Troubleshooting

SSE implementations often encounter specific problems. Here are solutions for the most frequent issues:

CORS Configuration Problems

// Comprehensive CORS setup for SSE
app.use((req, res, next) => {
    if (req.path.startsWith('/events')) {
        res.header('Access-Control-Allow-Origin', req.headers.origin || '*');
        res.header('Access-Control-Allow-Methods', 'GET, OPTIONS');
        res.header('Access-Control-Allow-Headers', 'Cache-Control, Last-Event-ID');
        res.header('Access-Control-Allow-Credentials', 'true');
        
        if (req.method === 'OPTIONS') {
            return res.sendStatus(200);
        }
    }
    next();
});

Connection Limit Issues

Browsers limit concurrent connections per domain (typically 6). Use connection pooling or subdomain distribution:

// Client-side connection pooling
class SSEConnectionPool {
    constructor(baseUrl, maxConnections = 4) {
        this.baseUrl = baseUrl;
        this.maxConnections = maxConnections;
        this.connections = [];
        this.subscriptions = new Map();
        this.currentIndex = 0;
    }

    subscribe(channel, callback) {
        let connection = this.findAvailableConnection();
        
        if (!connection) {
            if (this.connections.length < this.maxConnections) {
                connection = this.createConnection();
            } else {
                // Use round-robin assignment
                connection = this.connections[this.currentIndex % this.connections.length];
                this.currentIndex++;
            }
        }

        this.subscriptions.set(channel, { connection, callback });
        connection.channels.add(channel);
        
        // Send subscription message
        fetch(`${this.baseUrl}/subscribe`, {
            method: 'POST',
            headers: { 'Content-Type': 'application/json' },
            body: JSON.stringify({ channel, connectionId: connection.id })
        });
    }

    createConnection() {
        const connectionId = Date.now() + Math.random();
        const eventSource = new EventSource(`${this.baseUrl}/events?id=${connectionId}`);
        
        const connection = {
            id: connectionId,
            eventSource,
            channels: new Set()
        };

        eventSource.onmessage = (event) => {
            const data = JSON.parse(event.data);
            const subscription = this.subscriptions.get(data.channel);
            
            if (subscription) {
                subscription.callback(data);
            }
        };

        this.connections.push(connection);
        return connection;
    }

    findAvailableConnection() {
        return this.connections.find(conn => conn.channels.size < 10);
    }
}

Memory Leaks from Unclosed Connections

// Server-side connection tracking with automatic cleanup
class ConnectionTracker {
    constructor() {
        this.connections = new Map();
        this.startMonitoring();
    }

    track(req, res) {
        const connectionId = Date.now() + Math.random();
        const connection = {
            id: connectionId,
            req,
            res,
            created: Date.now(),
            lastActivity: Date.now()
        };

        this.connections.set(connectionId, connection);

        // Set up multiple cleanup triggers
        req.on('close', () => this.cleanup(connectionId));
        req.on('error', () => this.cleanup(connectionId));
        res.on('finish', () => this.cleanup(connectionId));
        res.on('error', () => this.cleanup(connectionId));

        // Set timeout for inactive connections
        setTimeout(() => {
            if (this.connections.has(connectionId)) {
                this.cleanup(connectionId);
            }
        }, 300000); // 5 minutes

        return connectionId;
    }

    cleanup(connectionId) {
        const connection = this.connections.get(connectionId);
        if (connection) {
            try {
                if (!connection.res.destroyed) {
                    connection.res.end();
                }
            } catch (error) {
                // Connection already closed
            }
            this.connections.delete(connectionId);
        }
    }

    startMonitoring() {
        setInterval(() => {
            console.log(`Active SSE connections: ${this.connections.size}`);
            
            // Force cleanup of very old connections
            const cutoff = Date.now() - 600000; // 10 minutes
            this.connections.forEach((connection, id) => {
                if (connection.created < cutoff) {
                    this.cleanup(id);
                }
            });
        }, 60000);
    }
}

const tracker = new ConnectionTracker();

app.get('/events', (req, res) => {
    const connectionId = tracker.track(req, res);
    
    res.writeHead(200, {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache',
        'Connection': 'keep-alive'
    });

    res.write(`data: Connected with ID ${connectionId}\n\n`);
});

Handling Authentication and Authorization

const jwt = require('jsonwebtoken');

// Middleware for SSE authentication
function authenticateSSE(req, res, next) {
    const token = req.query.token || req.headers.authorization;
    
    if (!token) {
        return res.status(401).json({ error: 'Authentication required' });
    }

    try {
        const decoded = jwt.verify(token, process.env.JWT_SECRET);
        req.user = decoded;
        next();
    } catch (error) {
        return res.status(401).json({ error: 'Invalid token' });
    }
}

// Secure SSE endpoint
app.get('/secure-events', authenticateSSE, (req, res) => {
    res.writeHead(200, {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache',
        'Connection': 'keep-alive'
    });

    const userChannels = getUserChannels(req.user.id);
    
    // Only send events for authorized channels
    const subscription = {
        userId: req.user.id,
        channels: userChannels,
        connection: res
    };

    addSecureSubscription(subscription);

    req.on('close', () => {
        removeSecureSubscription(req.user.id);
    });
});

function broadcastToAuthorizedUsers(channel, data) {
    secureSubscriptions.forEach(sub => {
        if (sub.channels.includes(channel)) {
            try {
                sub.connection.write(`data: ${JSON.stringify(data)}\n\n`);
            } catch (error) {
                removeSecureSubscription(sub.userId);
            }
        }
    });
}

Server-Sent Events provide an excellent balance between simplicity and functionality for real-time web applications. The implementation patterns covered here scale effectively to thousands of concurrent connections while maintaining code clarity and debugging capability. For applications requiring bidirectional communication, consider hybrid approaches using SSE for server-to-client streaming combined with standard HTTP POST requests for client-to-server messaging.

Additional resources for deeper implementation details include the MDN Server-Sent Events documentation and the official HTML specification for complete protocol details.



This article incorporates information and material from various online sources. We acknowledge and appreciate the work of all original authors, publishers, and websites. While every effort has been made to appropriately credit the source material, any unintentional oversight or omission does not constitute a copyright infringement. All trademarks, logos, and images mentioned are the property of their respective owners. If you believe that any content used in this article infringes upon your copyright, please contact us immediately for review and prompt action.

This article is intended for informational and educational purposes only and does not infringe on the rights of the copyright owners. If any copyrighted material has been used without proper credit or in violation of copyright laws, it is unintentional and we will rectify it promptly upon notification. Please note that the republishing, redistribution, or reproduction of part or all of the contents in any form is prohibited without express written permission from the author and website owner. For permissions or further inquiries, please contact us.

Leave a reply

Your email address will not be published. Required fields are marked