Communication Layer - Capa de Comunicación en Tiempo Real
La Communication Layer es responsable de toda la comunicación en tiempo real entre el backend y los clientes, y de recibir eventos de sistemas externos.
🔌 WebSocket Service (WS Service)
Comunicación Bidireccional en Tiempo Real
Puerto: 3002
Path WebSocket: /ws
Tecnología: NestJS + Socket.io + Redis Pub/Sub
Propósito
El WS Service es el canal de comunicación en tiempo real entre el backend y todos los clientes (web, móvil, admin). Permite que los datos fluyan instantáneamente sin necesidad de polling.
¿Por qué WebSocket?
Sin WebSocket (Polling tradicional):
[X] Ineficiente, latencia alta, sobrecarga del servidor
Con WebSocket:
WebSocket (ACTUAL):
✓ Eficiente, latencia baja, server push
Arquitectura Multi-Instancia con Redis Pub/Sub
Problema: Si tienes 3 instancias de WS Service, ¿cómo envías un mensaje a un usuario específico?
Solución con Redis Pub/Sub:
- Servicio publica mensaje en canal
user:user_123 - Todas las instancias WS escuchan el canal
- Solo la instancia que tiene conectado a
user_123le envía el mensaje
Eventos Soportados
1. Market Data (Datos de Mercado)
Canales disponibles:
ticker:{symbol}: Precio en tiempo realorderbook:{symbol}: Libro de órdenestrades:{symbol}: Trades recienteskline:{symbol}_{interval}: Velas (1m, 5m, 1h, etc.)
2. Notificaciones de Usuario
Tipos de notificaciones:
transaction_received: Recibiste cryptotransaction_sent: Transacción enviadatransaction_confirmed: Transacción confirmadaorder_filled: Orden ejecutadakyc_approved: KYC aprobadokyc_rejected: KYC rechazadowithdrawal_completed: Retiro completadosystem_announcement: Anuncio del sistema
3. Chat en Tiempo Real
Eventos de chat:
send_message: Enviar mensajenew_message: Nuevo mensaje recibidotyping: Usuario está escribiendomessage_read: Mensaje leídothread_closed: Conversación cerrada
4. Balance Updates (Actualización de Balances)
5. Order Updates (Actualización de Órdenes)
Autenticación WebSocket
Opción 1: JWT Token (para usuarios autenticados)
// Cliente
const socket = io('wss://api.swapbits.com/ws', {
auth: {
token: 'eyJhbGciOiJIUzI1NiIsInR5...'
}
});
// Servidor
@WebSocketGateway()
export class ClientSocketGateway {
@WebSocketServer() server: Server;
async handleConnection(client: Socket) {
const token = client.handshake.auth.token;
const user = await this.verifyToken(token);
if (!user) {
client.disconnect();
return;
}
// Asociar socket con usuario
client.data.userId = user.id;
client.join(`user:${user.id}`);
}
}
Opción 2: API Key (para servicios internos)
// Cliente
const socket = io('wss://api.swapbits.com/ws', {
auth: {
apiKey: 'swapbits-api-key-xxx'
}
});
Rooms y Namespaces
Rooms (salas) para agrupar conexiones:
// Usuario se une a sala de un símbolo
socket.join('ticker:BTCUSDT');
// Emitir a todos en la sala
this.server.to('ticker:BTCUSDT').emit('price_update', data);
// Usuario sale de la sala
socket.leave('ticker:BTCUSDT');
Namespaces para separar tipos de conexiones:
/ws - Conexiones de usuarios
/admin - Conexiones de admins
/internal - Conexiones entre servicios
Responsabilidades Detalladas
| Acción | Descripción |
|---|---|
| Connection Management | Gestionar conexiones WebSocket |
| Authentication | Verificar JWT/API Key en handshake |
| Room Management | Unir/sacar usuarios de salas |
| Event Broadcasting | Emitir eventos a usuarios específicos |
| Market Data Streaming | Stream de precios en tiempo real |
| Notifications | Enviar notificaciones push |
| Chat | Sistema de chat bidireccional |
| Heartbeat | Ping/pong para mantener conexión |
Endpoints WebSocket
// Eventos que el cliente puede emitir
socket.emit('subscribe', {channel: 'ticker:BTCUSDT'});
socket.emit('unsubscribe', {channel: 'ticker:BTCUSDT'});
socket.emit('send_message', {threadId, message});
socket.emit('typing', {threadId});
socket.emit('ping');
// Eventos que el servidor emite
socket.on('ticker', (data) => { /* precio actualizado */ });
socket.on('notification', (data) => { /* nueva notificación */ });
socket.on('new_message', (data) => { /* mensaje de chat */ });
socket.on('balance_update', (data) => { /* balance actualizado */ });
socket.on('order_update', (data) => { /* orden actualizada */ });
socket.on('pong', () => { /* respuesta a ping */ });
Gestión de Desconexiones
async handleDisconnect(client: Socket) {
const userId = client.data.userId;
// Remover de todas las salas
const rooms = Array.from(client.rooms);
rooms.forEach(room => client.leave(room));
// Notificar a otros servicios
await this.redis.publish('user:disconnected', userId);
// Log
this.logger.log(`User ${userId} disconnected`);
}
Reconexión automática (cliente):
socket.io.on('disconnect', () => {
console.log('Disconnected, reconnecting...');
});
socket.io.on('connect', () => {
console.log('Reconnected!');
// Resubscribirse a canales
socket.emit('subscribe', {channel: 'ticker:BTCUSDT'});
});
Escalabilidad
Problema: WebSocket es stateful (mantiene conexiones abiertas)
Solución:
- Sticky Sessions: Load balancer envía requests del mismo usuario a la misma instancia
- Redis Pub/Sub: Coordina mensajes entre instancias
- Horizontal Scaling: Agregar más instancias según necesidad
🪝 Webhook Service
Recepción de Eventos Externos
Puerto: 3009
Responsabilidad: Recibir y procesar webhooks de servicios externos
Tecnología: NestJS + Queue Processing
Propósito
El Webhook Service es el receptor de eventos de sistemas externos como exchanges, bancos y proveedores KYC.
¿Qué es un Webhook?
Un webhook es una llamada HTTP POST que un servicio externo hace a tu API cuando ocurre un evento.
Webhooks Soportados
1. Webhooks de Bybit (Exchange)
Eventos:
order.created: Orden creadaorder.partially_filled: Orden parcialmente ejecutadaorder.filled: Orden completamente ejecutadaorder.cancelled: Orden canceladadeposit.confirmed: Depósito confirmadowithdrawal.completed: Retiro completado
Verificación de firma:
function verifyBybitSignature(body: string, signature: string): boolean {
const secret = process.env.BYBIT_WEBHOOK_SECRET;
const computedSignature = crypto
.createHmac('sha256', secret)
.update(body)
.digest('hex');
return crypto.timingSafeEqual(
Buffer.from(signature),
Buffer.from(computedSignature)
);
}
2. Webhooks de Manteca (Banco)
Eventos:
deposit.pending: Depósito iniciadodeposit.completed: Depósito confirmadodeposit.failed: Depósito fallidowithdrawal.processing: Retiro en procesowithdrawal.completed: Retiro completadowithdrawal.rejected: Retiro rechazado
Estructura típica:
{
"event": "deposit.completed",
"timestamp": "2025-10-20T15:30:00Z",
"data": {
"orderId": "order_123",
"userId": "user_456",
"amount": 100.00,
"currency": "USD",
"status": "completed"
},
"signature": "abc123..."
}
3. Webhooks de Bridge (Banco Alternativo)
Similar a Manteca pero con estructura diferente.
4. Webhooks de Proveedores KYC
Eventos:
verification.pending: Verificación iniciadaverification.completed: Verificación completadaverification.approved: Usuario aprobadoverification.rejected: Usuario rechazado
Flujo de Procesamiento
Por qué usar cola:
- El procesamiento puede ser lento
- El servicio externo espera respuesta rápida (<5s)
- Si falla, se puede reintentar
Implementación de Cola
@Processor('webhooks')
export class WebhookProcessor {
@Process('bybit.order.filled')
async processOrderFilled(job: Job) {
const { orderId, userId, price, quantity } = job.data;
try {
// 1. Actualizar orden en DB
await this.orderService.updateStatus(orderId, 'filled');
// 2. Actualizar balance
await this.walletService.creditBalance(userId, quantity);
// 3. Notificar usuario
await this.notificationService.send(userId, {
type: 'order_filled',
data: { orderId, price, quantity }
});
return { success: true };
} catch (error) {
// Si falla, Bull reintentará automáticamente
throw error;
}
}
}
Reintentos Automáticos
@Process({
name: 'manteca.deposit.completed',
attempts: 5, // Reintentar hasta 5 veces
backoff: {
type: 'exponential',
delay: 1000 // 1s, 2s, 4s, 8s, 16s
}
})
async processDeposit(job: Job) {
// ...
}
Idempotencia
Problema: ¿Qué pasa si el mismo webhook llega dos veces?
Solución: Usar el event_id único del webhook
async processWebhook(webhook: WebhookDto) {
// Verificar si ya procesamos este evento
const exists = await this.db.webhookEvents.findOne({
eventId: webhook.event_id
});
if (exists) {
// Ya procesado, ignorar
return { status: 'duplicate' };
}
// Guardar para marcar como procesado
await this.db.webhookEvents.create({
eventId: webhook.event_id,
processed: true,
timestamp: new Date()
});
// Procesar
await this.processEvent(webhook);
}
Endpoints
POST /webhooks/bybit - Webhook de Bybit
POST /webhooks/manteca - Webhook de Manteca
POST /webhooks/bridge - Webhook de Bridge
POST /webhooks/kyc - Webhook de KYC provider
GET /webhooks/status - Estado del servicio (health)
Monitoreo
// Métricas importantes
- webhooks_received_total (contador)
- webhooks_processed_success (contador)
- webhooks_processed_failure (contador)
- webhook_processing_duration (histograma)
- webhook_queue_size (gauge)
Alertas:
- Si
webhook_queue_size> 1000: Sistema sobrecargado - Si
webhooks_processed_failure> 10% por hora: Investigar
Comunicación Entre Servicios
Resumen
| Servicio | Puerto | Función Principal |
|---|---|---|
| WebSocket Service | 3002 | Comunicación tiempo real bidireccional |
| Webhook Service | 3009 | Recibir eventos de servicios externos |
Para Desarrolladores
Communication Layer es crítica para UX. Sin WebSocket, los usuarios tendrían que refrescar la página constantemente.
Debugging tip WebSocket:
- Usa herramienta de dev tools del navegador (pestaña WS)
- Revisa logs de Redis Pub/Sub
- Verifica que el usuario esté en la room correcta
Debugging tip Webhooks:
- SIEMPRE verifica la firma antes de procesar
- Guarda webhooks crudos en DB para debugging
- Usa Ngrok en desarrollo para recibir webhooks localmente