Saltar al contenido principal

Sistema WebSocket en Tiempo Real

Este documento explica la arquitectura de WebSocket para comunicación en tiempo real entre el servidor y los clientes.


Arquitectura WebSocket


1. Conexión de Cliente

Flujo de Conexión

Implementación del Cliente (Flutter)

// Conectar al WebSocket
import 'package:socket_io_client/socket_io_client.dart' as IO;

class WebSocketService {
IO.Socket? socket;
String userId;
String accessToken;

void connect() {
socket = IO.io(
'wss://api.swapbits.com',
IO.OptionBuilder()
.setTransports(['websocket']) // Solo WebSocket
.setAuth({'token': accessToken}) // Enviar token
.enableAutoConnect()
.enableReconnection() // Auto-reconectar
.setReconnectionDelay(1000) // 1s
.setReconnectionAttempts(5) // 5 intentos
.build()
);

socket!.onConnect((_) {
print('WebSocket connected');
});

socket!.on('connected', (data) {
print('Authenticated as user: ${data['userId']}');
});

socket!.on('transaction_confirmed', (data) {
// Actualizar UI
showNotification('Transaction confirmed!');
updateBalance(data);
});

socket!.onDisconnect((_) {
print('WebSocket disconnected');
});

socket!.onReconnect((_) {
print('WebSocket reconnected');
});
}

void disconnect() {
socket?.disconnect();
socket?.dispose();
}
}

2. Rooms y Namespaces

Organización de Conexiones

// ws-service/src/socket/namespaces.ts

// Namespace por defecto
io.on('connection', (socket) => {
const userId = socket.data.userId;

// Unirse a room personal (solo este usuario)
socket.join(`user:${userId}`);

// Unirse a rooms según suscripciones
socket.on('join_room', (roomName) => {
if (isAllowedRoom(roomName, userId)) {
socket.join(roomName);
}
});
});

// Namespace de precios en tiempo real
const pricesNamespace = io.of('/prices');

pricesNamespace.on('connection', (socket) => {
// Suscribir a precios de coins específicas
socket.on('subscribe_prices', (coins: string[]) => {
coins.forEach(coin => {
socket.join(`price:${coin}`);
});
});

socket.on('unsubscribe_prices', (coins: string[]) => {
coins.forEach(coin => {
socket.leave(`price:${coin}`);
});
});
});

Rooms Disponibles

const SocketRooms = {
// Personal
USER: (userId) => `user:${userId}`,

// Precios
PRICE: (coin) => `price:${coin}`,
PRICES_ALL: 'prices:all',

// Trading
ORDERBOOK: (pair) => `orderbook:${pair}`,
TRADES: (pair) => `trades:${pair}`,

// Admin
ADMIN_DASHBOARD: 'admin:dashboard',
ADMIN_CHAT: 'admin:chat',

// Sistema
SYSTEM_ALERTS: 'system:alerts'
};

3. Eventos del Sistema

Eventos Cliente → Servidor

// Eventos que el cliente puede enviar
socket.on('ping', () => {
socket.emit('pong');
});

// Suscribirse a actualizaciones de precios
socket.on('subscribe_prices', (coins: string[]) => {
// Validar límite de suscripciones
if (coins.length > 50) {
return socket.emit('error', { message: 'Max 50 coins' });
}

coins.forEach(coin => {
socket.join(`price:${coin}`);
});

socket.emit('subscribed', { coins });
});

// Desuscribirse
socket.on('unsubscribe_prices', (coins: string[]) => {
coins.forEach(coin => {
socket.leave(`price:${coin}`);
});

socket.emit('unsubscribed', { coins });
});

// Typing indicator (chat)
socket.on('typing', (data) => {
const { chatId } = data;
socket.to(`chat:${chatId}`).emit('user_typing', {
userId: socket.data.userId,
chatId
});
});

// Mark message as read
socket.on('mark_read', async (messageId) => {
await markMessageAsRead(messageId, socket.data.userId);
socket.to(`chat:${chatId}`).emit('message_read', {
messageId,
userId: socket.data.userId
});
});

Eventos Servidor → Cliente

// Eventos que el servidor envía al cliente

// Transacciones
io.to(`user:${userId}`).emit('transaction_pending', {
txHash: '0x123...',
amount: 0.5,
coin: 'ETH',
type: 'sent'
});

io.to(`user:${userId}`).emit('transaction_confirmed', {
txHash: '0x123...',
confirmations: 12,
status: 'confirmed'
});

// Precios en tiempo real
io.to(`price:ETH`).emit('price_update', {
coin: 'ETH',
price: 2000.50,
change24h: -50.25,
timestamp: Date.now()
});

// Órdenes de trading
io.to(`user:${userId}`).emit('order_filled', {
orderId: 'order_123',
pair: 'ETH/USDT',
amount: 1.0,
price: 2000
});

// Depósitos/Retiros
io.to(`user:${userId}`).emit('deposit_confirmed', {
amount: 500,
currency: 'USD',
cryptoAmount: 500,
cryptoCurrency: 'USDT'
});

io.to(`user:${userId}`).emit('withdrawal_sent', {
amount: 1000,
bankAccount: '****1234',
estimatedArrival: '1-3 business days'
});

// Chat
io.to(`chat:${chatId}`).emit('new_message', {
messageId: 'msg_123',
chatId,
senderId: 'admin',
text: 'Hello, how can I help?',
timestamp: Date.now()
});

// Sistema
io.to('system:alerts').emit('system_alert', {
type: 'maintenance',
message: 'Scheduled maintenance in 1 hour',
severity: 'warning'
});

4. Redis Pub/Sub para Escalado Horizontal

Sincronización entre Instancias

Implementación:

// ws-service/src/redis/subscriber.ts

import Redis from 'ioredis';

const subscriber = new Redis({
host: process.env.REDIS_HOST,
port: 6379
});

// Suscribirse a canales relevantes
subscriber.subscribe(
'transaction:*',
'deposit:*',
'withdrawal:*',
'order:*',
'price:*',
'chat:*'
);

subscriber.on('message', (channel, message) => {
const data = JSON.parse(message);

// Extraer userId del canal
const [eventType, userId] = channel.split(':');

// Enviar a socket del usuario (si está conectado en esta instancia)
const socketId = activeConnections.get(userId);

if (socketId) {
const socket = io.sockets.sockets.get(socketId);

if (socket) {
socket.emit(eventType, data);
}
}
});

// Publicar evento (desde otro servicio)
import Redis from 'ioredis';

const publisher = new Redis({
host: process.env.REDIS_HOST,
port: 6379
});

async function notifyUser(userId: string, event: string, data: any) {
await publisher.publish(
`${event}:${userId}`,
JSON.stringify(data)
);
}

// Uso en Wallet Service
await notifyUser(userId, 'transaction_confirmed', {
txHash: tx.txHash,
amount: tx.amount,
coin: tx.coin
});

5. Autenticación de WebSocket

Validación de Token

// Middleware de autenticación
io.use(async (socket, next) => {
try {
const token = socket.handshake.auth.token;

if (!token) {
return next(new Error('Authentication token required'));
}

// Validar token con Auth Service
const response = await axios.post(
`${AUTH_SERVICE_URL}/internal/validate-token`,
{ token },
{
headers: {
'X-Internal-Service': 'ws-service',
'X-Service-Token': INTERNAL_SERVICE_TOKEN
},
timeout: 2000
}
);

if (!response.data.valid) {
return next(new Error('Invalid token'));
}

// Guardar userId en socket
socket.data.userId = response.data.userId;
socket.data.email = response.data.email;

next();

} catch (error) {
logger.error('WebSocket auth error', error);
next(new Error('Authentication failed'));
}
});

6. Heartbeat y Reconexión

Keep-Alive

// Servidor: Configurar ping/pong
io.on('connection', (socket) => {
const userId = socket.data.userId;

// Enviar ping cada 25 segundos
const pingInterval = setInterval(() => {
socket.emit('ping');
}, 25000);

// Limpiar al desconectar
socket.on('disconnect', () => {
clearInterval(pingInterval);
});

// Recibir pong del cliente
socket.on('pong', () => {
// Cliente está vivo
socket.data.lastPong = Date.now();
});
});

// Detectar conexiones muertas
setInterval(() => {
const now = Date.now();

io.sockets.sockets.forEach((socket) => {
const lastPong = socket.data.lastPong || socket.data.connectedAt;

// Si no respondió en 60 segundos, desconectar
if (now - lastPong > 60000) {
logger.warn(`Disconnecting dead socket: ${socket.id}`);
socket.disconnect(true);
}
});
}, 30000); // Cada 30 segundos

Reconexión Automática (Cliente)

// Flutter: Configurar reconexión
socket = IO.io(
wsUrl,
IO.OptionBuilder()
.enableReconnection()
.setReconnectionDelay(1000) // 1s inicial
.setReconnectionDelayMax(10000) // Máximo 10s
.setReconnectionAttempts(5) // 5 intentos
.setRandomizationFactor(0.5) // Jitter para evitar thundering herd
.build()
);

socket!.onReconnectAttempt((attempt) {
print('Reconnection attempt: $attempt');
});

socket!.onReconnect((_) {
print('Reconnected successfully');

// Re-suscribirse a rooms
socket!.emit('subscribe_prices', ['BTC', 'ETH']);
});

socket!.onReconnectError((error) {
print('Reconnection error: $error');
});

socket!.onReconnectFailed((_) {
print('Reconnection failed after 5 attempts');
showErrorDialog('Connection lost. Please check your internet.');
});

7. Rate Limiting

Protección contra Spam

import rateLimit from 'express-rate-limit';
import RedisStore from 'rate-limit-redis';

// Rate limiting por evento
const eventRateLimits = new Map();

socket.on('*', async (eventName, ...args) => {
const userId = socket.data.userId;
const key = `ws:${userId}:${eventName}`;

// Límites por tipo de evento
const limits = {
'subscribe_prices': { max: 10, window: 60000 }, // 10/min
'mark_read': { max: 100, window: 60000 }, // 100/min
'typing': { max: 30, window: 60000 }, // 30/min
'default': { max: 50, window: 60000 } // 50/min default
};

const limit = limits[eventName] || limits.default;

// Verificar rate limit en Redis
const count = await redisClient.incr(key);

if (count === 1) {
await redisClient.pexpire(key, limit.window);
}

if (count > limit.max) {
socket.emit('error', {
message: 'Rate limit exceeded',
event: eventName,
retryAfter: await redisClient.pttl(key)
});

return; // Bloquear evento
}

// Continuar con el evento
// ...
});

8. Monitoreo de Conexiones

Métricas en Tiempo Real

interface WebSocketMetrics {
// Conexiones
totalConnections: number; // 5,000
activeConnections: number; // 4,800
connectionsPerSecond: number; // 50

// Por usuario
onlineUsers: number; // 4,500
multiDeviceUsers: number; // 300 (usuarios con 2+ conexiones)

// Eventos
eventsPerSecond: number; // 500
eventsByType: {
transaction_confirmed: 100,
price_update: 300,
order_filled: 50,
new_message: 30
};

// Performance
averageLatency: number; // 50ms
reconnections: number; // 200/min

// Errors
authenticationFailures: number; // 10/min
rateLimitExceeded: number; // 5/min
disconnectsByError: number; // 3/min
}

// Exponer métricas
app.get('/metrics/websocket', async (req, res) => {
const metrics = {
totalConnections: io.sockets.sockets.size,
activeConnections: await getActiveConnectionsCount(),
onlineUsers: await redisClient.scard('online:users'),
// ...
};

res.json(metrics);
});

Dashboard de Conexiones

// Admin endpoint para ver conexiones activas
app.get('/admin/connections', validateAdmin, async (req, res) => {
const connections = [];

io.sockets.sockets.forEach((socket) => {
connections.push({
socketId: socket.id,
userId: socket.data.userId,
email: socket.data.email,
connectedAt: socket.data.connectedAt,
lastPong: socket.data.lastPong,
rooms: Array.from(socket.rooms),
ip: socket.handshake.address
});
});

res.json({
total: connections.length,
connections
});
});

9. Broadcast Eficiente

Estrategias de Broadcasting

// 1. Broadcast a UN usuario específico
io.to(`user:${userId}`).emit('transaction_confirmed', data);

// 2. Broadcast a MÚLTIPLES usuarios
userIds.forEach(userId => {
io.to(`user:${userId}`).emit('system_alert', data);
});

// 3. Broadcast a TODOS excepto uno
socket.broadcast.emit('user_joined', { userId: socket.data.userId });

// 4. Broadcast a un ROOM
io.to(`chat:${chatId}`).emit('new_message', message);

// 5. Broadcast a MÚLTIPLES ROOMS
io.to('price:BTC').to('price:ETH').emit('market_update', data);

// 6. Broadcast GLOBAL (evitar si es posible)
io.emit('system_maintenance', {
message: 'Scheduled maintenance in 1 hour',
startTime: maintenanceStartTime
});

Optimización para grandes broadcasts:

// En lugar de iterar sobre 10,000 usuarios
// NO HACER:
for (const userId of allUserIds) {
io.to(`user:${userId}`).emit('announcement', data);
}

// MEJOR: Usar room global
io.to('announcements').emit('announcement', data);

// Usuarios se unen al room al conectar
socket.on('connection', (socket) => {
socket.join('announcements');
});

10. Seguridad

Medidas de Seguridad WebSocket

// 1. Validar origen
io.use((socket, next) => {
const origin = socket.handshake.headers.origin;

const allowedOrigins = [
'https://swapbits.com',
'https://app.swapbits.com',
'http://localhost:3000' // Solo en desarrollo
];

if (!allowedOrigins.includes(origin)) {
return next(new Error('Origin not allowed'));
}

next();
});

// 2. Limitar tamaño de mensajes
io.use((socket, next) => {
socket.on('*', (eventName, ...args) => {
const messageSize = JSON.stringify(args).length;

if (messageSize > 10000) { // 10KB max
socket.emit('error', { message: 'Message too large' });
return;
}
});

next();
});

// 3. Sanitizar inputs
socket.on('send_message', (data) => {
const sanitizedText = sanitizeHtml(data.text, {
allowedTags: [], // Sin HTML
allowedAttributes: {}
});

// Validar longitud
if (sanitizedText.length > 500) {
return socket.emit('error', { message: 'Message too long' });
}

// Procesar mensaje
// ...
});

// 4. Prevenir room hijacking
socket.on('join_room', (roomName) => {
const userId = socket.data.userId;

// Validar que el usuario puede unirse a este room
if (!canJoinRoom(userId, roomName)) {
return socket.emit('error', { message: 'Access denied' });
}

socket.join(roomName);
});

11. Desconexión y Limpieza

Manejo de Desconexión

socket.on('disconnect', async (reason) => {
const userId = socket.data.userId;

logger.info(`User ${userId} disconnected: ${reason}`);

// 1. Remover de memoria
activeConnections.delete(userId);

// 2. Actualizar estado en Redis
await redisClient.srem('online:users', userId);
await redisClient.del(`user:socket:${userId}`);

// 3. Notificar a otros usuarios (si es relevante)
socket.broadcast.emit('user_offline', { userId });

// 4. Limpiar timers/intervals
if (socket.data.pingInterval) {
clearInterval(socket.data.pingInterval);
}

// 5. Log para analytics
await logDisconnection({
userId,
socketId: socket.id,
reason,
duration: Date.now() - socket.data.connectedAt,
timestamp: new Date()
});
});

12. Testing WebSocket

Tests de Integración

import { io as ioClient } from 'socket.io-client';

describe('WebSocket Service', () => {
let clientSocket;
let accessToken;

beforeAll(async () => {
// Obtener token de test
accessToken = await getTestUserToken();
});

afterAll(() => {
clientSocket?.disconnect();
});

test('should connect with valid token', (done) => {
clientSocket = ioClient('ws://localhost:3000', {
auth: { token: accessToken }
});

clientSocket.on('connected', (data) => {
expect(data.userId).toBeDefined();
done();
});
});

test('should receive transaction event', (done) => {
clientSocket.on('transaction_confirmed', (data) => {
expect(data.txHash).toBeDefined();
expect(data.status).toBe('confirmed');
done();
});

// Simular evento desde otro servicio
publishRedisEvent('transaction:testuser', {
txHash: '0x123',
status: 'confirmed'
});
});

test('should reject connection with invalid token', (done) => {
const badClient = ioClient('ws://localhost:3000', {
auth: { token: 'invalid_token' }
});

badClient.on('connect_error', (error) => {
expect(error.message).toContain('Authentication failed');
badClient.disconnect();
done();
});
});
});

Limitaciones y Consideraciones

Limitaciones de WebSocket:

  1. No es HTTP: No puedes usar herramientas HTTP normales (Postman, curl)
  2. Stateful: Requiere mantener conexión abierta (más recursos)
  3. Sticky Sessions: Load balancer debe enviar mismo usuario a misma instancia
  4. Firewall Issues: Algunos firewalls corporativos bloquean WebSocket

Soluciones:

  • HTTP Fallback: Socket.io puede usar long polling como fallback
  • Redis Pub/Sub: Sincronizar entre instancias para evitar sticky sessions
  • Health Checks: Monitorear conexiones activas y latencia
  • Graceful Shutdown: Desconectar limpiamente antes de restart

Cuándo NO usar WebSocket:

  • ❌ Para APIs RESTful normales (usa HTTP)
  • ❌ Para transferir archivos grandes (usa HTTP con multipart)
  • ❌ Para operaciones batch (usa HTTP POST)

Cuándo SÍ usar WebSocket:

  • ✅ Notificaciones en tiempo real
  • ✅ Chat en vivo
  • ✅ Actualizaciones de precios
  • ✅ Indicadores de "typing..."
  • ✅ Alertas del sistema