
Building Real-Time Apps with Node.js Server-Sent Events
Server-Sent Events (SSE) in Node.js provide a straightforward way to push real-time data from server to client without the complexity of WebSockets. Unlike traditional polling or complex messaging protocols, SSE creates a persistent HTTP connection that streams data to browsers using the native EventSource API. This tutorial walks through building production-ready real-time applications using SSE, covering implementation patterns, performance optimization, troubleshooting common issues, and comparing SSE with alternative solutions.
How Server-Sent Events Work
SSE operates over standard HTTP connections using the text/event-stream
content type. The server keeps the connection open and periodically sends formatted data chunks to connected clients. Each message follows a specific format with optional fields for event type, data payload, and unique identifiers.
The protocol uses simple text-based formatting:
data: Hello World\n\n
event: userUpdate
data: {"userId": 123, "status": "online"}
id: 1001
retry: 3000
data: Multi-line messages
data: work by sending multiple
data: data fields together
Browsers automatically handle connection management, including reconnection with exponential backoff when connections drop. The EventSource API provides event handlers for message reception, connection opening, and error handling.
Step-by-Step Implementation Guide
Here’s a complete implementation starting with a basic Express.js server setup:
const express = require('express');
const cors = require('cors');
const app = express();
// Enable CORS for SSE endpoints
app.use(cors({
origin: '*',
credentials: true
}));
// Store active connections
const connections = new Set();
// SSE endpoint
app.get('/events', (req, res) => {
// Set SSE headers
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Cache-Control'
});
// Send initial connection confirmation
res.write('data: Connected to event stream\n\n');
// Store connection for broadcasting
connections.add(res);
// Handle client disconnect
req.on('close', () => {
connections.delete(res);
console.log('Client disconnected');
});
// Keep connection alive with periodic heartbeats
const heartbeat = setInterval(() => {
res.write(': heartbeat\n\n');
}, 30000);
req.on('close', () => {
clearInterval(heartbeat);
});
});
// Broadcast function for sending data to all connected clients
function broadcast(event, data) {
const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
connections.forEach(res => {
try {
res.write(message);
} catch (error) {
// Remove broken connections
connections.delete(res);
}
});
}
// Example API endpoint that triggers broadcasts
app.post('/api/notify', express.json(), (req, res) => {
const { message, type } = req.body;
broadcast(type || 'notification', {
message,
timestamp: new Date().toISOString()
});
res.json({ success: true, clients: connections.size });
});
app.listen(3000, () => {
console.log('SSE server running on port 3000');
});
Client-side implementation using the EventSource API:
// Basic EventSource setup
const eventSource = new EventSource('http://localhost:3000/events');
// Handle different event types
eventSource.addEventListener('notification', (event) => {
const data = JSON.parse(event.data);
console.log('Notification:', data.message);
updateUI(data);
});
eventSource.addEventListener('userUpdate', (event) => {
const userData = JSON.parse(event.data);
updateUserStatus(userData);
});
// Connection management
eventSource.onopen = () => {
console.log('Connected to event stream');
setConnectionStatus('connected');
};
eventSource.onerror = (error) => {
console.error('EventSource error:', error);
setConnectionStatus('disconnected');
};
// Custom reconnection logic with authentication
function createAuthenticatedEventSource(token) {
const eventSource = new EventSource(`/events?token=${token}`);
eventSource.onerror = () => {
if (eventSource.readyState === EventSource.CLOSED) {
// Attempt to refresh token and reconnect
setTimeout(() => {
refreshAuthToken().then(newToken => {
createAuthenticatedEventSource(newToken);
});
}, 5000);
}
};
return eventSource;
}
Real-World Examples and Use Cases
SSE excels in scenarios requiring one-way server-to-client communication. Here are production-ready implementations for common use cases:
Live Dashboard with System Metrics
const os = require('os');
const fs = require('fs');
// System monitoring class
class SystemMonitor {
constructor() {
this.connections = new Set();
this.startMonitoring();
}
addConnection(res) {
this.connections.add(res);
}
removeConnection(res) {
this.connections.delete(res);
}
startMonitoring() {
setInterval(() => {
const metrics = {
cpu: this.getCPUUsage(),
memory: this.getMemoryUsage(),
uptime: os.uptime(),
timestamp: Date.now()
};
this.broadcast('metrics', metrics);
}, 2000);
}
getCPUUsage() {
const cpus = os.cpus();
let totalIdle = 0;
let totalTick = 0;
cpus.forEach(cpu => {
for (let type in cpu.times) {
totalTick += cpu.times[type];
}
totalIdle += cpu.times.idle;
});
return 100 - ~~(100 * totalIdle / totalTick);
}
getMemoryUsage() {
const total = os.totalmem();
const free = os.freemem();
return {
total,
used: total - free,
percentage: Math.round(((total - free) / total) * 100)
};
}
broadcast(event, data) {
const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
this.connections.forEach(connection => {
try {
connection.write(message);
} catch (error) {
this.connections.delete(connection);
}
});
}
}
const monitor = new SystemMonitor();
app.get('/metrics', (req, res) => {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
monitor.addConnection(res);
req.on('close', () => {
monitor.removeConnection(res);
});
});
Chat Application with Message Broadcasting
class ChatRoom {
constructor(roomId) {
this.roomId = roomId;
this.connections = new Map(); // userId -> response object
this.messageHistory = [];
this.maxHistorySize = 100;
}
addUser(userId, res) {
this.connections.set(userId, res);
// Send recent message history
this.messageHistory.slice(-10).forEach(msg => {
this.sendToUser(userId, 'message', msg);
});
// Notify others about user joining
this.broadcast('userJoined', {
userId,
timestamp: new Date().toISOString(),
activeUsers: Array.from(this.connections.keys())
}, userId);
}
removeUser(userId) {
this.connections.delete(userId);
this.broadcast('userLeft', {
userId,
timestamp: new Date().toISOString(),
activeUsers: Array.from(this.connections.keys())
});
}
sendMessage(fromUserId, message) {
const messageData = {
id: Date.now(),
fromUserId,
message,
timestamp: new Date().toISOString()
};
// Store in history
this.messageHistory.push(messageData);
if (this.messageHistory.length > this.maxHistorySize) {
this.messageHistory.shift();
}
// Broadcast to all users
this.broadcast('message', messageData);
}
sendToUser(userId, event, data) {
const connection = this.connections.get(userId);
if (connection) {
try {
connection.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`);
} catch (error) {
this.connections.delete(userId);
}
}
}
broadcast(event, data, excludeUserId = null) {
this.connections.forEach((connection, userId) => {
if (userId !== excludeUserId) {
this.sendToUser(userId, event, data);
}
});
}
}
const chatRooms = new Map();
app.get('/chat/:roomId', (req, res) => {
const { roomId } = req.params;
const userId = req.query.userId;
if (!userId) {
return res.status(400).json({ error: 'userId required' });
}
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
// Get or create chat room
if (!chatRooms.has(roomId)) {
chatRooms.set(roomId, new ChatRoom(roomId));
}
const room = chatRooms.get(roomId);
room.addUser(userId, res);
req.on('close', () => {
room.removeUser(userId);
// Clean up empty rooms
if (room.connections.size === 0) {
chatRooms.delete(roomId);
}
});
});
app.post('/chat/:roomId/message', express.json(), (req, res) => {
const { roomId } = req.params;
const { userId, message } = req.body;
const room = chatRooms.get(roomId);
if (room) {
room.sendMessage(userId, message);
res.json({ success: true });
} else {
res.status(404).json({ error: 'Room not found' });
}
});
Comparison with Alternative Solutions
Feature | Server-Sent Events | WebSockets | Long Polling | Socket.IO |
---|---|---|---|---|
Connection Type | HTTP (unidirectional) | TCP (bidirectional) | HTTP (request/response) | Multiple transports |
Browser Support | 95%+ (IE/Edge 12+) | 97%+ (IE 10+) | 100% | 99%+ (with fallbacks) |
Implementation Complexity | Low | Medium | Low | Low (abstracted) |
Server Resource Usage | Low-Medium | Medium | High (frequent requests) | Medium-High |
Reconnection Handling | Automatic | Manual implementation | Not applicable | Automatic |
Proxy/Firewall Friendly | High (standard HTTP) | Medium (some issues) | High | High (fallback support) |
Client-to-Server Messaging | Separate HTTP requests | Built-in | Standard HTTP | Built-in |
Performance comparison based on 1000 concurrent connections:
Metric | SSE | WebSockets | Long Polling |
---|---|---|---|
Memory Usage (MB) | 45-60 | 55-75 | 80-120 |
CPU Usage (%) | 8-12 | 10-15 | 25-40 |
Network Overhead | Low | Very Low | High |
Latency (ms) | 50-100 | 10-30 | 200-500 |
Best Practices and Performance Optimization
Connection management is crucial for scalable SSE implementations. Implement proper cleanup and resource monitoring:
class SSEConnectionManager {
constructor(options = {}) {
this.connections = new Map();
this.maxConnections = options.maxConnections || 10000;
this.heartbeatInterval = options.heartbeatInterval || 30000;
this.connectionTimeout = options.connectionTimeout || 300000; // 5 minutes
this.startCleanupJob();
}
addConnection(id, res, metadata = {}) {
if (this.connections.size >= this.maxConnections) {
throw new Error('Maximum connections reached');
}
const connection = {
id,
res,
metadata,
lastSeen: Date.now(),
created: Date.now()
};
this.connections.set(id, connection);
this.setupHeartbeat(connection);
return connection;
}
setupHeartbeat(connection) {
const heartbeat = setInterval(() => {
try {
connection.res.write(': heartbeat\n\n');
connection.lastSeen = Date.now();
} catch (error) {
this.removeConnection(connection.id);
clearInterval(heartbeat);
}
}, this.heartbeatInterval);
connection.heartbeat = heartbeat;
}
removeConnection(id) {
const connection = this.connections.get(id);
if (connection) {
clearInterval(connection.heartbeat);
this.connections.delete(id);
}
}
broadcast(event, data, filter = null) {
const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
let sent = 0;
let failed = 0;
this.connections.forEach((connection, id) => {
if (filter && !filter(connection)) {
return;
}
try {
connection.res.write(message);
connection.lastSeen = Date.now();
sent++;
} catch (error) {
this.removeConnection(id);
failed++;
}
});
return { sent, failed, total: this.connections.size };
}
startCleanupJob() {
setInterval(() => {
const now = Date.now();
const toRemove = [];
this.connections.forEach((connection, id) => {
if (now - connection.lastSeen > this.connectionTimeout) {
toRemove.push(id);
}
});
toRemove.forEach(id => this.removeConnection(id));
if (toRemove.length > 0) {
console.log(`Cleaned up ${toRemove.length} stale connections`);
}
}, 60000); // Run every minute
}
getStats() {
const now = Date.now();
let totalAge = 0;
this.connections.forEach(connection => {
totalAge += now - connection.created;
});
return {
totalConnections: this.connections.size,
averageAge: this.connections.size > 0 ? totalAge / this.connections.size : 0,
maxConnections: this.maxConnections
};
}
}
Implement message queuing for reliability during connection drops:
class MessageQueue {
constructor(maxSize = 1000) {
this.queues = new Map(); // connectionId -> messages[]
this.maxSize = maxSize;
}
enqueue(connectionId, message) {
if (!this.queues.has(connectionId)) {
this.queues.set(connectionId, []);
}
const queue = this.queues.get(connectionId);
queue.push({
...message,
timestamp: Date.now()
});
// Maintain queue size
if (queue.length > this.maxSize) {
queue.shift();
}
}
dequeue(connectionId, count = 10) {
const queue = this.queues.get(connectionId);
if (!queue || queue.length === 0) {
return [];
}
return queue.splice(0, count);
}
flush(connectionId) {
this.queues.delete(connectionId);
}
getQueueSize(connectionId) {
const queue = this.queues.get(connectionId);
return queue ? queue.length : 0;
}
}
// Integration with connection manager
const messageQueue = new MessageQueue();
const connectionManager = new SSEConnectionManager();
app.get('/events/:userId', (req, res) => {
const userId = req.params.userId;
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
try {
connectionManager.addConnection(userId, res);
// Send queued messages
const queuedMessages = messageQueue.dequeue(userId);
queuedMessages.forEach(msg => {
res.write(`event: ${msg.event}\ndata: ${JSON.stringify(msg.data)}\n\n`);
});
} catch (error) {
res.status(503).json({ error: 'Service unavailable' });
return;
}
req.on('close', () => {
connectionManager.removeConnection(userId);
});
});
Common Issues and Troubleshooting
SSE implementations often encounter specific problems. Here are solutions for the most frequent issues:
CORS Configuration Problems
// Comprehensive CORS setup for SSE
app.use((req, res, next) => {
if (req.path.startsWith('/events')) {
res.header('Access-Control-Allow-Origin', req.headers.origin || '*');
res.header('Access-Control-Allow-Methods', 'GET, OPTIONS');
res.header('Access-Control-Allow-Headers', 'Cache-Control, Last-Event-ID');
res.header('Access-Control-Allow-Credentials', 'true');
if (req.method === 'OPTIONS') {
return res.sendStatus(200);
}
}
next();
});
Connection Limit Issues
Browsers limit concurrent connections per domain (typically 6). Use connection pooling or subdomain distribution:
// Client-side connection pooling
class SSEConnectionPool {
constructor(baseUrl, maxConnections = 4) {
this.baseUrl = baseUrl;
this.maxConnections = maxConnections;
this.connections = [];
this.subscriptions = new Map();
this.currentIndex = 0;
}
subscribe(channel, callback) {
let connection = this.findAvailableConnection();
if (!connection) {
if (this.connections.length < this.maxConnections) {
connection = this.createConnection();
} else {
// Use round-robin assignment
connection = this.connections[this.currentIndex % this.connections.length];
this.currentIndex++;
}
}
this.subscriptions.set(channel, { connection, callback });
connection.channels.add(channel);
// Send subscription message
fetch(`${this.baseUrl}/subscribe`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ channel, connectionId: connection.id })
});
}
createConnection() {
const connectionId = Date.now() + Math.random();
const eventSource = new EventSource(`${this.baseUrl}/events?id=${connectionId}`);
const connection = {
id: connectionId,
eventSource,
channels: new Set()
};
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
const subscription = this.subscriptions.get(data.channel);
if (subscription) {
subscription.callback(data);
}
};
this.connections.push(connection);
return connection;
}
findAvailableConnection() {
return this.connections.find(conn => conn.channels.size < 10);
}
}
Memory Leaks from Unclosed Connections
// Server-side connection tracking with automatic cleanup
class ConnectionTracker {
constructor() {
this.connections = new Map();
this.startMonitoring();
}
track(req, res) {
const connectionId = Date.now() + Math.random();
const connection = {
id: connectionId,
req,
res,
created: Date.now(),
lastActivity: Date.now()
};
this.connections.set(connectionId, connection);
// Set up multiple cleanup triggers
req.on('close', () => this.cleanup(connectionId));
req.on('error', () => this.cleanup(connectionId));
res.on('finish', () => this.cleanup(connectionId));
res.on('error', () => this.cleanup(connectionId));
// Set timeout for inactive connections
setTimeout(() => {
if (this.connections.has(connectionId)) {
this.cleanup(connectionId);
}
}, 300000); // 5 minutes
return connectionId;
}
cleanup(connectionId) {
const connection = this.connections.get(connectionId);
if (connection) {
try {
if (!connection.res.destroyed) {
connection.res.end();
}
} catch (error) {
// Connection already closed
}
this.connections.delete(connectionId);
}
}
startMonitoring() {
setInterval(() => {
console.log(`Active SSE connections: ${this.connections.size}`);
// Force cleanup of very old connections
const cutoff = Date.now() - 600000; // 10 minutes
this.connections.forEach((connection, id) => {
if (connection.created < cutoff) {
this.cleanup(id);
}
});
}, 60000);
}
}
const tracker = new ConnectionTracker();
app.get('/events', (req, res) => {
const connectionId = tracker.track(req, res);
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
res.write(`data: Connected with ID ${connectionId}\n\n`);
});
Handling Authentication and Authorization
const jwt = require('jsonwebtoken');
// Middleware for SSE authentication
function authenticateSSE(req, res, next) {
const token = req.query.token || req.headers.authorization;
if (!token) {
return res.status(401).json({ error: 'Authentication required' });
}
try {
const decoded = jwt.verify(token, process.env.JWT_SECRET);
req.user = decoded;
next();
} catch (error) {
return res.status(401).json({ error: 'Invalid token' });
}
}
// Secure SSE endpoint
app.get('/secure-events', authenticateSSE, (req, res) => {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
const userChannels = getUserChannels(req.user.id);
// Only send events for authorized channels
const subscription = {
userId: req.user.id,
channels: userChannels,
connection: res
};
addSecureSubscription(subscription);
req.on('close', () => {
removeSecureSubscription(req.user.id);
});
});
function broadcastToAuthorizedUsers(channel, data) {
secureSubscriptions.forEach(sub => {
if (sub.channels.includes(channel)) {
try {
sub.connection.write(`data: ${JSON.stringify(data)}\n\n`);
} catch (error) {
removeSecureSubscription(sub.userId);
}
}
});
}
Server-Sent Events provide an excellent balance between simplicity and functionality for real-time web applications. The implementation patterns covered here scale effectively to thousands of concurrent connections while maintaining code clarity and debugging capability. For applications requiring bidirectional communication, consider hybrid approaches using SSE for server-to-client streaming combined with standard HTTP POST requests for client-to-server messaging.
Additional resources for deeper implementation details include the MDN Server-Sent Events documentation and the official HTML specification for complete protocol details.

This article incorporates information and material from various online sources. We acknowledge and appreciate the work of all original authors, publishers, and websites. While every effort has been made to appropriately credit the source material, any unintentional oversight or omission does not constitute a copyright infringement. All trademarks, logos, and images mentioned are the property of their respective owners. If you believe that any content used in this article infringes upon your copyright, please contact us immediately for review and prompt action.
This article is intended for informational and educational purposes only and does not infringe on the rights of the copyright owners. If any copyrighted material has been used without proper credit or in violation of copyright laws, it is unintentional and we will rectify it promptly upon notification. Please note that the republishing, redistribution, or reproduction of part or all of the contents in any form is prohibited without express written permission from the author and website owner. For permissions or further inquiries, please contact us.