Saltar al contenido principal

Communication Layer - Capa de Comunicación en Tiempo Real

La Communication Layer es responsable de toda la comunicación en tiempo real entre el backend y los clientes, y de recibir eventos de sistemas externos.


🔌 WebSocket Service (WS Service)

Comunicación Bidireccional en Tiempo Real

Puerto: 3002
Path WebSocket: /ws
Tecnología: NestJS + Socket.io + Redis Pub/Sub

Propósito

El WS Service es el canal de comunicación en tiempo real entre el backend y todos los clientes (web, móvil, admin). Permite que los datos fluyan instantáneamente sin necesidad de polling.

¿Por qué WebSocket?

Sin WebSocket (Polling tradicional):

[X] Ineficiente, latencia alta, sobrecarga del servidor

Con WebSocket:

WebSocket (ACTUAL):
✓ Eficiente, latencia baja, server push


Arquitectura Multi-Instancia con Redis Pub/Sub

Problema: Si tienes 3 instancias de WS Service, ¿cómo envías un mensaje a un usuario específico?

Solución con Redis Pub/Sub:

  1. Servicio publica mensaje en canal user:user_123
  2. Todas las instancias WS escuchan el canal
  3. Solo la instancia que tiene conectado a user_123 le envía el mensaje

Eventos Soportados

1. Market Data (Datos de Mercado)

Canales disponibles:

  • ticker:{symbol}: Precio en tiempo real
  • orderbook:{symbol}: Libro de órdenes
  • trades:{symbol}: Trades recientes
  • kline:{symbol}_{interval}: Velas (1m, 5m, 1h, etc.)

2. Notificaciones de Usuario

Tipos de notificaciones:

  • transaction_received: Recibiste crypto
  • transaction_sent: Transacción enviada
  • transaction_confirmed: Transacción confirmada
  • order_filled: Orden ejecutada
  • kyc_approved: KYC aprobado
  • kyc_rejected: KYC rechazado
  • withdrawal_completed: Retiro completado
  • system_announcement: Anuncio del sistema

3. Chat en Tiempo Real

Eventos de chat:

  • send_message: Enviar mensaje
  • new_message: Nuevo mensaje recibido
  • typing: Usuario está escribiendo
  • message_read: Mensaje leído
  • thread_closed: Conversación cerrada

4. Balance Updates (Actualización de Balances)

5. Order Updates (Actualización de Órdenes)


Autenticación WebSocket

Opción 1: JWT Token (para usuarios autenticados)

// Cliente
const socket = io('wss://api.swapbits.com/ws', {
auth: {
token: 'eyJhbGciOiJIUzI1NiIsInR5...'
}
});

// Servidor
@WebSocketGateway()
export class ClientSocketGateway {
@WebSocketServer() server: Server;

async handleConnection(client: Socket) {
const token = client.handshake.auth.token;
const user = await this.verifyToken(token);

if (!user) {
client.disconnect();
return;
}

// Asociar socket con usuario
client.data.userId = user.id;
client.join(`user:${user.id}`);
}
}

Opción 2: API Key (para servicios internos)

// Cliente
const socket = io('wss://api.swapbits.com/ws', {
auth: {
apiKey: 'swapbits-api-key-xxx'
}
});

Rooms y Namespaces

Rooms (salas) para agrupar conexiones:

// Usuario se une a sala de un símbolo
socket.join('ticker:BTCUSDT');

// Emitir a todos en la sala
this.server.to('ticker:BTCUSDT').emit('price_update', data);

// Usuario sale de la sala
socket.leave('ticker:BTCUSDT');

Namespaces para separar tipos de conexiones:

/ws            - Conexiones de usuarios
/admin - Conexiones de admins
/internal - Conexiones entre servicios

Responsabilidades Detalladas

AcciónDescripción
Connection ManagementGestionar conexiones WebSocket
AuthenticationVerificar JWT/API Key en handshake
Room ManagementUnir/sacar usuarios de salas
Event BroadcastingEmitir eventos a usuarios específicos
Market Data StreamingStream de precios en tiempo real
NotificationsEnviar notificaciones push
ChatSistema de chat bidireccional
HeartbeatPing/pong para mantener conexión

Endpoints WebSocket

// Eventos que el cliente puede emitir
socket.emit('subscribe', {channel: 'ticker:BTCUSDT'});
socket.emit('unsubscribe', {channel: 'ticker:BTCUSDT'});
socket.emit('send_message', {threadId, message});
socket.emit('typing', {threadId});
socket.emit('ping');

// Eventos que el servidor emite
socket.on('ticker', (data) => { /* precio actualizado */ });
socket.on('notification', (data) => { /* nueva notificación */ });
socket.on('new_message', (data) => { /* mensaje de chat */ });
socket.on('balance_update', (data) => { /* balance actualizado */ });
socket.on('order_update', (data) => { /* orden actualizada */ });
socket.on('pong', () => { /* respuesta a ping */ });

Gestión de Desconexiones

async handleDisconnect(client: Socket) {
const userId = client.data.userId;

// Remover de todas las salas
const rooms = Array.from(client.rooms);
rooms.forEach(room => client.leave(room));

// Notificar a otros servicios
await this.redis.publish('user:disconnected', userId);

// Log
this.logger.log(`User ${userId} disconnected`);
}

Reconexión automática (cliente):

socket.io.on('disconnect', () => {
console.log('Disconnected, reconnecting...');
});

socket.io.on('connect', () => {
console.log('Reconnected!');
// Resubscribirse a canales
socket.emit('subscribe', {channel: 'ticker:BTCUSDT'});
});

Escalabilidad

Problema: WebSocket es stateful (mantiene conexiones abiertas)

Solución:

  1. Sticky Sessions: Load balancer envía requests del mismo usuario a la misma instancia
  2. Redis Pub/Sub: Coordina mensajes entre instancias
  3. Horizontal Scaling: Agregar más instancias según necesidad

🪝 Webhook Service

Recepción de Eventos Externos

Puerto: 3009
Responsabilidad: Recibir y procesar webhooks de servicios externos
Tecnología: NestJS + Queue Processing

Propósito

El Webhook Service es el receptor de eventos de sistemas externos como exchanges, bancos y proveedores KYC.


¿Qué es un Webhook?

Un webhook es una llamada HTTP POST que un servicio externo hace a tu API cuando ocurre un evento.


Webhooks Soportados

1. Webhooks de Bybit (Exchange)

Eventos:

  • order.created: Orden creada
  • order.partially_filled: Orden parcialmente ejecutada
  • order.filled: Orden completamente ejecutada
  • order.cancelled: Orden cancelada
  • deposit.confirmed: Depósito confirmado
  • withdrawal.completed: Retiro completado

Verificación de firma:

function verifyBybitSignature(body: string, signature: string): boolean {
const secret = process.env.BYBIT_WEBHOOK_SECRET;
const computedSignature = crypto
.createHmac('sha256', secret)
.update(body)
.digest('hex');

return crypto.timingSafeEqual(
Buffer.from(signature),
Buffer.from(computedSignature)
);
}

2. Webhooks de Manteca (Banco)

Eventos:

  • deposit.pending: Depósito iniciado
  • deposit.completed: Depósito confirmado
  • deposit.failed: Depósito fallido
  • withdrawal.processing: Retiro en proceso
  • withdrawal.completed: Retiro completado
  • withdrawal.rejected: Retiro rechazado

Estructura típica:

{
"event": "deposit.completed",
"timestamp": "2025-10-20T15:30:00Z",
"data": {
"orderId": "order_123",
"userId": "user_456",
"amount": 100.00,
"currency": "USD",
"status": "completed"
},
"signature": "abc123..."
}

3. Webhooks de Bridge (Banco Alternativo)

Similar a Manteca pero con estructura diferente.

4. Webhooks de Proveedores KYC

Eventos:

  • verification.pending: Verificación iniciada
  • verification.completed: Verificación completada
  • verification.approved: Usuario aprobado
  • verification.rejected: Usuario rechazado

Flujo de Procesamiento

Por qué usar cola:

  • El procesamiento puede ser lento
  • El servicio externo espera respuesta rápida (<5s)
  • Si falla, se puede reintentar

Implementación de Cola

@Processor('webhooks')
export class WebhookProcessor {
@Process('bybit.order.filled')
async processOrderFilled(job: Job) {
const { orderId, userId, price, quantity } = job.data;

try {
// 1. Actualizar orden en DB
await this.orderService.updateStatus(orderId, 'filled');

// 2. Actualizar balance
await this.walletService.creditBalance(userId, quantity);

// 3. Notificar usuario
await this.notificationService.send(userId, {
type: 'order_filled',
data: { orderId, price, quantity }
});

return { success: true };
} catch (error) {
// Si falla, Bull reintentará automáticamente
throw error;
}
}
}

Reintentos Automáticos

@Process({
name: 'manteca.deposit.completed',
attempts: 5, // Reintentar hasta 5 veces
backoff: {
type: 'exponential',
delay: 1000 // 1s, 2s, 4s, 8s, 16s
}
})
async processDeposit(job: Job) {
// ...
}

Idempotencia

Problema: ¿Qué pasa si el mismo webhook llega dos veces?

Solución: Usar el event_id único del webhook

async processWebhook(webhook: WebhookDto) {
// Verificar si ya procesamos este evento
const exists = await this.db.webhookEvents.findOne({
eventId: webhook.event_id
});

if (exists) {
// Ya procesado, ignorar
return { status: 'duplicate' };
}

// Guardar para marcar como procesado
await this.db.webhookEvents.create({
eventId: webhook.event_id,
processed: true,
timestamp: new Date()
});

// Procesar
await this.processEvent(webhook);
}

Endpoints

POST   /webhooks/bybit      - Webhook de Bybit
POST /webhooks/manteca - Webhook de Manteca
POST /webhooks/bridge - Webhook de Bridge
POST /webhooks/kyc - Webhook de KYC provider
GET /webhooks/status - Estado del servicio (health)

Monitoreo

// Métricas importantes
- webhooks_received_total (contador)
- webhooks_processed_success (contador)
- webhooks_processed_failure (contador)
- webhook_processing_duration (histograma)
- webhook_queue_size (gauge)

Alertas:

  • Si webhook_queue_size > 1000: Sistema sobrecargado
  • Si webhooks_processed_failure > 10% por hora: Investigar

Comunicación Entre Servicios


Resumen

ServicioPuertoFunción Principal
WebSocket Service3002Comunicación tiempo real bidireccional
Webhook Service3009Recibir eventos de servicios externos

Para Desarrolladores

Communication Layer es crítica para UX. Sin WebSocket, los usuarios tendrían que refrescar la página constantemente.

Debugging tip WebSocket:

  • Usa herramienta de dev tools del navegador (pestaña WS)
  • Revisa logs de Redis Pub/Sub
  • Verifica que el usuario esté en la room correcta

Debugging tip Webhooks:

  • SIEMPRE verifica la firma antes de procesar
  • Guarda webhooks crudos en DB para debugging
  • Usa Ngrok en desarrollo para recibir webhooks localmente