Sistema WebSocket en Tiempo Real
Este documento explica la arquitectura de WebSocket para comunicación en tiempo real entre el servidor y los clientes.
Arquitectura WebSocket
1. Conexión de Cliente
Flujo de Conexión
Implementación del Cliente (Flutter)
// Conectar al WebSocket
import 'package:socket_io_client/socket_io_client.dart' as IO;
class WebSocketService {
IO.Socket? socket;
String userId;
String accessToken;
void connect() {
socket = IO.io(
'wss://api.swapbits.com',
IO.OptionBuilder()
.setTransports(['websocket']) // Solo WebSocket
.setAuth({'token': accessToken}) // Enviar token
.enableAutoConnect()
.enableReconnection() // Auto-reconectar
.setReconnectionDelay(1000) // 1s
.setReconnectionAttempts(5) // 5 intentos
.build()
);
socket!.onConnect((_) {
print('WebSocket connected');
});
socket!.on('connected', (data) {
print('Authenticated as user: ${data['userId']}');
});
socket!.on('transaction_confirmed', (data) {
// Actualizar UI
showNotification('Transaction confirmed!');
updateBalance(data);
});
socket!.onDisconnect((_) {
print('WebSocket disconnected');
});
socket!.onReconnect((_) {
print('WebSocket reconnected');
});
}
void disconnect() {
socket?.disconnect();
socket?.dispose();
}
}
2. Rooms y Namespaces
Organización de Conexiones
// ws-service/src/socket/namespaces.ts
// Namespace por defecto
io.on('connection', (socket) => {
const userId = socket.data.userId;
// Unirse a room personal (solo este usuario)
socket.join(`user:${userId}`);
// Unirse a rooms según suscripciones
socket.on('join_room', (roomName) => {
if (isAllowedRoom(roomName, userId)) {
socket.join(roomName);
}
});
});
// Namespace de precios en tiempo real
const pricesNamespace = io.of('/prices');
pricesNamespace.on('connection', (socket) => {
// Suscribir a precios de coins específicas
socket.on('subscribe_prices', (coins: string[]) => {
coins.forEach(coin => {
socket.join(`price:${coin}`);
});
});
socket.on('unsubscribe_prices', (coins: string[]) => {
coins.forEach(coin => {
socket.leave(`price:${coin}`);
});
});
});
Rooms Disponibles
const SocketRooms = {
// Personal
USER: (userId) => `user:${userId}`,
// Precios
PRICE: (coin) => `price:${coin}`,
PRICES_ALL: 'prices:all',
// Trading
ORDERBOOK: (pair) => `orderbook:${pair}`,
TRADES: (pair) => `trades:${pair}`,
// Admin
ADMIN_DASHBOARD: 'admin:dashboard',
ADMIN_CHAT: 'admin:chat',
// Sistema
SYSTEM_ALERTS: 'system:alerts'
};
3. Eventos del Sistema
Eventos Cliente → Servidor
// Eventos que el cliente puede enviar
socket.on('ping', () => {
socket.emit('pong');
});
// Suscribirse a actualizaciones de precios
socket.on('subscribe_prices', (coins: string[]) => {
// Validar límite de suscripciones
if (coins.length > 50) {
return socket.emit('error', { message: 'Max 50 coins' });
}
coins.forEach(coin => {
socket.join(`price:${coin}`);
});
socket.emit('subscribed', { coins });
});
// Desuscribirse
socket.on('unsubscribe_prices', (coins: string[]) => {
coins.forEach(coin => {
socket.leave(`price:${coin}`);
});
socket.emit('unsubscribed', { coins });
});
// Typing indicator (chat)
socket.on('typing', (data) => {
const { chatId } = data;
socket.to(`chat:${chatId}`).emit('user_typing', {
userId: socket.data.userId,
chatId
});
});
// Mark message as read
socket.on('mark_read', async (messageId) => {
await markMessageAsRead(messageId, socket.data.userId);
socket.to(`chat:${chatId}`).emit('message_read', {
messageId,
userId: socket.data.userId
});
});
Eventos Servidor → Cliente
// Eventos que el servidor envía al cliente
// Transacciones
io.to(`user:${userId}`).emit('transaction_pending', {
txHash: '0x123...',
amount: 0.5,
coin: 'ETH',
type: 'sent'
});
io.to(`user:${userId}`).emit('transaction_confirmed', {
txHash: '0x123...',
confirmations: 12,
status: 'confirmed'
});
// Precios en tiempo real
io.to(`price:ETH`).emit('price_update', {
coin: 'ETH',
price: 2000.50,
change24h: -50.25,
timestamp: Date.now()
});
// Órdenes de trading
io.to(`user:${userId}`).emit('order_filled', {
orderId: 'order_123',
pair: 'ETH/USDT',
amount: 1.0,
price: 2000
});
// Depósitos/Retiros
io.to(`user:${userId}`).emit('deposit_confirmed', {
amount: 500,
currency: 'USD',
cryptoAmount: 500,
cryptoCurrency: 'USDT'
});
io.to(`user:${userId}`).emit('withdrawal_sent', {
amount: 1000,
bankAccount: '****1234',
estimatedArrival: '1-3 business days'
});
// Chat
io.to(`chat:${chatId}`).emit('new_message', {
messageId: 'msg_123',
chatId,
senderId: 'admin',
text: 'Hello, how can I help?',
timestamp: Date.now()
});
// Sistema
io.to('system:alerts').emit('system_alert', {
type: 'maintenance',
message: 'Scheduled maintenance in 1 hour',
severity: 'warning'
});
4. Redis Pub/Sub para Escalado Horizontal
Sincronización entre Instancias
Implementación:
// ws-service/src/redis/subscriber.ts
import Redis from 'ioredis';
const subscriber = new Redis({
host: process.env.REDIS_HOST,
port: 6379
});
// Suscribirse a canales relevantes
subscriber.subscribe(
'transaction:*',
'deposit:*',
'withdrawal:*',
'order:*',
'price:*',
'chat:*'
);
subscriber.on('message', (channel, message) => {
const data = JSON.parse(message);
// Extraer userId del canal
const [eventType, userId] = channel.split(':');
// Enviar a socket del usuario (si está conectado en esta instancia)
const socketId = activeConnections.get(userId);
if (socketId) {
const socket = io.sockets.sockets.get(socketId);
if (socket) {
socket.emit(eventType, data);
}
}
});
// Publicar evento (desde otro servicio)
import Redis from 'ioredis';
const publisher = new Redis({
host: process.env.REDIS_HOST,
port: 6379
});
async function notifyUser(userId: string, event: string, data: any) {
await publisher.publish(
`${event}:${userId}`,
JSON.stringify(data)
);
}
// Uso en Wallet Service
await notifyUser(userId, 'transaction_confirmed', {
txHash: tx.txHash,
amount: tx.amount,
coin: tx.coin
});
5. Autenticación de WebSocket
Validación de Token
// Middleware de autenticación
io.use(async (socket, next) => {
try {
const token = socket.handshake.auth.token;
if (!token) {
return next(new Error('Authentication token required'));
}
// Validar token con Auth Service
const response = await axios.post(
`${AUTH_SERVICE_URL}/internal/validate-token`,
{ token },
{
headers: {
'X-Internal-Service': 'ws-service',
'X-Service-Token': INTERNAL_SERVICE_TOKEN
},
timeout: 2000
}
);
if (!response.data.valid) {
return next(new Error('Invalid token'));
}
// Guardar userId en socket
socket.data.userId = response.data.userId;
socket.data.email = response.data.email;
next();
} catch (error) {
logger.error('WebSocket auth error', error);
next(new Error('Authentication failed'));
}
});
6. Heartbeat y Reconexión
Keep-Alive
// Servidor: Configurar ping/pong
io.on('connection', (socket) => {
const userId = socket.data.userId;
// Enviar ping cada 25 segundos
const pingInterval = setInterval(() => {
socket.emit('ping');
}, 25000);
// Limpiar al desconectar
socket.on('disconnect', () => {
clearInterval(pingInterval);
});
// Recibir pong del cliente
socket.on('pong', () => {
// Cliente está vivo
socket.data.lastPong = Date.now();
});
});
// Detectar conexiones muertas
setInterval(() => {
const now = Date.now();
io.sockets.sockets.forEach((socket) => {
const lastPong = socket.data.lastPong || socket.data.connectedAt;
// Si no respondió en 60 segundos, desconectar
if (now - lastPong > 60000) {
logger.warn(`Disconnecting dead socket: ${socket.id}`);
socket.disconnect(true);
}
});
}, 30000); // Cada 30 segundos
Reconexión Automática (Cliente)
// Flutter: Configurar reconexión
socket = IO.io(
wsUrl,
IO.OptionBuilder()
.enableReconnection()
.setReconnectionDelay(1000) // 1s inicial
.setReconnectionDelayMax(10000) // Máximo 10s
.setReconnectionAttempts(5) // 5 intentos
.setRandomizationFactor(0.5) // Jitter para evitar thundering herd
.build()
);
socket!.onReconnectAttempt((attempt) {
print('Reconnection attempt: $attempt');
});
socket!.onReconnect((_) {
print('Reconnected successfully');
// Re-suscribirse a rooms
socket!.emit('subscribe_prices', ['BTC', 'ETH']);
});
socket!.onReconnectError((error) {
print('Reconnection error: $error');
});
socket!.onReconnectFailed((_) {
print('Reconnection failed after 5 attempts');
showErrorDialog('Connection lost. Please check your internet.');
});
7. Rate Limiting
Protección contra Spam
import rateLimit from 'express-rate-limit';
import RedisStore from 'rate-limit-redis';
// Rate limiting por evento
const eventRateLimits = new Map();
socket.on('*', async (eventName, ...args) => {
const userId = socket.data.userId;
const key = `ws:${userId}:${eventName}`;
// Límites por tipo de evento
const limits = {
'subscribe_prices': { max: 10, window: 60000 }, // 10/min
'mark_read': { max: 100, window: 60000 }, // 100/min
'typing': { max: 30, window: 60000 }, // 30/min
'default': { max: 50, window: 60000 } // 50/min default
};
const limit = limits[eventName] || limits.default;
// Verificar rate limit en Redis
const count = await redisClient.incr(key);
if (count === 1) {
await redisClient.pexpire(key, limit.window);
}
if (count > limit.max) {
socket.emit('error', {
message: 'Rate limit exceeded',
event: eventName,
retryAfter: await redisClient.pttl(key)
});
return; // Bloquear evento
}
// Continuar con el evento
// ...
});
8. Monitoreo de Conexiones
Métricas en Tiempo Real
interface WebSocketMetrics {
// Conexiones
totalConnections: number; // 5,000
activeConnections: number; // 4,800
connectionsPerSecond: number; // 50
// Por usuario
onlineUsers: number; // 4,500
multiDeviceUsers: number; // 300 (usuarios con 2+ conexiones)
// Eventos
eventsPerSecond: number; // 500
eventsByType: {
transaction_confirmed: 100,
price_update: 300,
order_filled: 50,
new_message: 30
};
// Performance
averageLatency: number; // 50ms
reconnections: number; // 200/min
// Errors
authenticationFailures: number; // 10/min
rateLimitExceeded: number; // 5/min
disconnectsByError: number; // 3/min
}
// Exponer métricas
app.get('/metrics/websocket', async (req, res) => {
const metrics = {
totalConnections: io.sockets.sockets.size,
activeConnections: await getActiveConnectionsCount(),
onlineUsers: await redisClient.scard('online:users'),
// ...
};
res.json(metrics);
});
Dashboard de Conexiones
// Admin endpoint para ver conexiones activas
app.get('/admin/connections', validateAdmin, async (req, res) => {
const connections = [];
io.sockets.sockets.forEach((socket) => {
connections.push({
socketId: socket.id,
userId: socket.data.userId,
email: socket.data.email,
connectedAt: socket.data.connectedAt,
lastPong: socket.data.lastPong,
rooms: Array.from(socket.rooms),
ip: socket.handshake.address
});
});
res.json({
total: connections.length,
connections
});
});
9. Broadcast Eficiente
Estrategias de Broadcasting
// 1. Broadcast a UN usuario específico
io.to(`user:${userId}`).emit('transaction_confirmed', data);
// 2. Broadcast a MÚLTIPLES usuarios
userIds.forEach(userId => {
io.to(`user:${userId}`).emit('system_alert', data);
});
// 3. Broadcast a TODOS excepto uno
socket.broadcast.emit('user_joined', { userId: socket.data.userId });
// 4. Broadcast a un ROOM
io.to(`chat:${chatId}`).emit('new_message', message);
// 5. Broadcast a MÚLTIPLES ROOMS
io.to('price:BTC').to('price:ETH').emit('market_update', data);
// 6. Broadcast GLOBAL (evitar si es posible)
io.emit('system_maintenance', {
message: 'Scheduled maintenance in 1 hour',
startTime: maintenanceStartTime
});
Optimización para grandes broadcasts:
// En lugar de iterar sobre 10,000 usuarios
// NO HACER:
for (const userId of allUserIds) {
io.to(`user:${userId}`).emit('announcement', data);
}
// MEJOR: Usar room global
io.to('announcements').emit('announcement', data);
// Usuarios se unen al room al conectar
socket.on('connection', (socket) => {
socket.join('announcements');
});
10. Seguridad
Medidas de Seguridad WebSocket
// 1. Validar origen
io.use((socket, next) => {
const origin = socket.handshake.headers.origin;
const allowedOrigins = [
'https://swapbits.com',
'https://app.swapbits.com',
'http://localhost:3000' // Solo en desarrollo
];
if (!allowedOrigins.includes(origin)) {
return next(new Error('Origin not allowed'));
}
next();
});
// 2. Limitar tamaño de mensajes
io.use((socket, next) => {
socket.on('*', (eventName, ...args) => {
const messageSize = JSON.stringify(args).length;
if (messageSize > 10000) { // 10KB max
socket.emit('error', { message: 'Message too large' });
return;
}
});
next();
});
// 3. Sanitizar inputs
socket.on('send_message', (data) => {
const sanitizedText = sanitizeHtml(data.text, {
allowedTags: [], // Sin HTML
allowedAttributes: {}
});
// Validar longitud
if (sanitizedText.length > 500) {
return socket.emit('error', { message: 'Message too long' });
}
// Procesar mensaje
// ...
});
// 4. Prevenir room hijacking
socket.on('join_room', (roomName) => {
const userId = socket.data.userId;
// Validar que el usuario puede unirse a este room
if (!canJoinRoom(userId, roomName)) {
return socket.emit('error', { message: 'Access denied' });
}
socket.join(roomName);
});
11. Desconexión y Limpieza
Manejo de Desconexión
socket.on('disconnect', async (reason) => {
const userId = socket.data.userId;
logger.info(`User ${userId} disconnected: ${reason}`);
// 1. Remover de memoria
activeConnections.delete(userId);
// 2. Actualizar estado en Redis
await redisClient.srem('online:users', userId);
await redisClient.del(`user:socket:${userId}`);
// 3. Notificar a otros usuarios (si es relevante)
socket.broadcast.emit('user_offline', { userId });
// 4. Limpiar timers/intervals
if (socket.data.pingInterval) {
clearInterval(socket.data.pingInterval);
}
// 5. Log para analytics
await logDisconnection({
userId,
socketId: socket.id,
reason,
duration: Date.now() - socket.data.connectedAt,
timestamp: new Date()
});
});
12. Testing WebSocket
Tests de Integración
import { io as ioClient } from 'socket.io-client';
describe('WebSocket Service', () => {
let clientSocket;
let accessToken;
beforeAll(async () => {
// Obtener token de test
accessToken = await getTestUserToken();
});
afterAll(() => {
clientSocket?.disconnect();
});
test('should connect with valid token', (done) => {
clientSocket = ioClient('ws://localhost:3000', {
auth: { token: accessToken }
});
clientSocket.on('connected', (data) => {
expect(data.userId).toBeDefined();
done();
});
});
test('should receive transaction event', (done) => {
clientSocket.on('transaction_confirmed', (data) => {
expect(data.txHash).toBeDefined();
expect(data.status).toBe('confirmed');
done();
});
// Simular evento desde otro servicio
publishRedisEvent('transaction:testuser', {
txHash: '0x123',
status: 'confirmed'
});
});
test('should reject connection with invalid token', (done) => {
const badClient = ioClient('ws://localhost:3000', {
auth: { token: 'invalid_token' }
});
badClient.on('connect_error', (error) => {
expect(error.message).toContain('Authentication failed');
badClient.disconnect();
done();
});
});
});
Limitaciones y Consideraciones
Limitaciones de WebSocket:
- No es HTTP: No puedes usar herramientas HTTP normales (Postman, curl)
- Stateful: Requiere mantener conexión abierta (más recursos)
- Sticky Sessions: Load balancer debe enviar mismo usuario a misma instancia
- Firewall Issues: Algunos firewalls corporativos bloquean WebSocket
Soluciones:
- HTTP Fallback: Socket.io puede usar long polling como fallback
- Redis Pub/Sub: Sincronizar entre instancias para evitar sticky sessions
- Health Checks: Monitorear conexiones activas y latencia
- Graceful Shutdown: Desconectar limpiamente antes de restart
Cuándo NO usar WebSocket:
- ❌ Para APIs RESTful normales (usa HTTP)
- ❌ Para transferir archivos grandes (usa HTTP con multipart)
- ❌ Para operaciones batch (usa HTTP POST)
Cuándo SÍ usar WebSocket:
- ✅ Notificaciones en tiempo real
- ✅ Chat en vivo
- ✅ Actualizaciones de precios
- ✅ Indicadores de "typing..."
- ✅ Alertas del sistema