Saltar al contenido principal

Comunicación Inter-Servicios

Este documento explica cómo se comunican los 13 microservicios de SwapBits entre sí.


Arquitectura de Comunicación


Patrones de Comunicación

1. HTTP REST (Síncrono)

Cuándo usar:

  • ✅ Operaciones que requieren respuesta inmediata
  • ✅ Validaciones (tokens, permisos, etc.)
  • ✅ Consultas de datos
  • ✅ Transacciones que requieren confirmación

Ejemplo real:

// Wallet Service llama a Exchange Service
async function getQuote(from: string, to: string, amount: number) {
const response = await axios.post(
`${EXCHANGE_SERVICE_URL}/internal/quote`,
{ from, to, amount },
{
headers: {
'X-Internal-Service': 'wallet-service',
'X-Service-Token': INTERNAL_SERVICE_TOKEN
},
timeout: 5000 // 5 segundos
}
);

return response.data;
}

2. Redis Pub/Sub (Asíncrono)

Cuándo usar:

  • ✅ Notificaciones en tiempo real
  • ✅ Eventos que no requieren respuesta
  • ✅ Broadcasting a múltiples servicios
  • ✅ Desacoplar servicios

Ejemplo real:

// Publisher (Wallet Service)
async function notifyTransactionConfirmed(tx: Transaction) {
await redisClient.publish('transaction:confirmed', JSON.stringify({
userId: tx.userId,
txHash: tx.txHash,
amount: tx.amount,
coin: tx.coin
}));
}

// Subscriber (WebSocket Service)
redisClient.subscribe('transaction:confirmed');

redisClient.on('message', (channel, message) => {
const data = JSON.parse(message);

// Encontrar socket del usuario
const userSocket = activeConnections.get(data.userId);

if (userSocket) {
userSocket.emit('transaction_confirmed', data);
}
});

3. Event-Driven (Eventos de Dominio)

Cuándo usar:

  • ✅ Acciones que disparan múltiples efectos
  • ✅ Auditoría de eventos
  • ✅ Procesamiento asíncrono
  • ✅ Histórico de eventos

Eventos principales:

// Eventos del sistema
const DomainEvents = {
// Auth
'user.registered': { userId, email, timestamp },
'user.logged_in': { userId, ip, device },
'user.password_reset': { userId, timestamp },

// Wallet
'wallet.created': { userId, walletId, coin },
'wallet.deleted': { userId, walletId },

// Transaction
'transaction.pending': { userId, txHash, amount },
'transaction.confirmed': { userId, txHash, confirmations },
'transaction.failed': { userId, txHash, reason },

// Trading
'swap.initiated': { userId, from, to, amount },
'swap.completed': { userId, swapId, txHash },

// Banking
'deposit.received': { userId, amount, currency },
'withdrawal.completed': { userId, amount, bankAccount },

// KYC
'kyc.submitted': { userId, level },
'kyc.approved': { userId, level },
'kyc.rejected': { userId, reason }
};

Autenticación Entre Servicios

Service-to-Service Auth

Implementación:

// Middleware de validación (Service B)
async function validateInternalRequest(req, res, next) {
const serviceToken = req.headers['x-service-token'];
const serviceName = req.headers['x-internal-service'];

if (!serviceToken) {
return res.status(401).json({ error: 'Service token required' });
}

// Validar contra secret almacenado
const expectedToken = await getSecret('INTERNAL_SERVICE_TOKEN');

if (serviceToken !== expectedToken) {
logger.warn(`Invalid service token from ${serviceName}`);
return res.status(401).json({ error: 'Invalid service token' });
}

// Agregar metadata al request
req.callerService = serviceName;
next();
}

// Uso en rutas internas
app.post('/internal/validate-token',
validateInternalRequest, // Middleware
async (req, res) => {
// Solo servicios internos pueden acceder
const { token } = req.body;
const valid = await validateJWT(token);
res.json({ valid, userId: valid ? decoded.userId : null });
}
);

Rutas Internas vs Públicas

Separación de Endpoints

// auth-service/src/app.ts

// Rutas PÚBLICAS (accesibles desde internet)
app.use('/api/v1/auth', publicAuthRouter);
// POST /api/v1/auth/register
// POST /api/v1/auth/login
// POST /api/v1/auth/logout

// Rutas INTERNAS (solo servicios internos)
app.use('/internal', validateInternalRequest, internalRouter);
// POST /internal/validate-token
// POST /internal/verify-permissions
// GET /internal/user/:id/roles

Configuración de red:

# docker-compose.yml
services:
auth-service:
networks:
- public_network # Internet-facing
- internal_network # Solo entre servicios
ports:
- "3001:3000" # Puerto público
expose:
- "8080" # Puerto interno (no mapeado)

Circuit Breaker Pattern

Protección contra Fallos en Cascada

Implementación:

import CircuitBreaker from 'opossum';

// Configurar circuit breaker para Exchange Service
const exchangeServiceBreaker = new CircuitBreaker(callExchangeService, {
timeout: 5000, // 5 segundos
errorThresholdPercentage: 50, // Abrir si > 50% fallan
resetTimeout: 30000, // Reintentar después de 30s
rollingCountTimeout: 10000 // Ventana de 10s para calcular %
});

// Usar circuit breaker
async function getQuote(from: string, to: string, amount: number) {
try {
const quote = await exchangeServiceBreaker.fire(from, to, amount);
return quote;
} catch (error) {
// Circuit breaker abierto o servicio falló
logger.error('Exchange service unavailable', error);

// Fallback: usar última cotización en cache
return getCachedQuote(from, to, amount);
}
}

// Eventos del circuit breaker
exchangeServiceBreaker.on('open', () => {
logger.warn('Circuit breaker OPEN - Exchange service down');
alerting.notify('Exchange service circuit breaker opened');
});

exchangeServiceBreaker.on('halfOpen', () => {
logger.info('Circuit breaker HALF-OPEN - Testing exchange service');
});

exchangeServiceBreaker.on('close', () => {
logger.info('Circuit breaker CLOSED - Exchange service recovered');
});

Retry Strategies

Reintentos Inteligentes

import retry from 'async-retry';

async function callServiceWithRetry(url: string, data: any) {
return await retry(
async (bail, attempt) => {
try {
logger.debug(`Attempt ${attempt} to call ${url}`);

const response = await axios.post(url, data, {
timeout: 5000
});

return response.data;

} catch (error) {
// No reintentar en errores 4xx (client errors)
if (error.response?.status >= 400 && error.response?.status < 500) {
bail(error); // Detener reintentos
return;
}

// Reintentar en errores 5xx o network errors
logger.warn(`Attempt ${attempt} failed, retrying...`);
throw error;
}
},
{
retries: 3, // 3 reintentos
factor: 2, // Backoff exponencial (2x)
minTimeout: 1000, // Mínimo 1s entre reintentos
maxTimeout: 10000, // Máximo 10s
randomize: true // Jitter para evitar thundering herd
}
);
}

Estrategias por tipo de operación:

const RetryStrategies = {
// Lectura (idempotente)
read: {
retries: 5,
minTimeout: 500,
maxTimeout: 5000
},

// Escritura (NO idempotente)
write: {
retries: 2, // Menos reintentos
minTimeout: 2000,
maxTimeout: 8000,
onRetry: () => {
// Verificar si ya se procesó antes de reintentar
}
},

// Crítico (no puede fallar)
critical: {
retries: 10,
minTimeout: 1000,
maxTimeout: 30000,
forever: false
}
};

Rate Limiting Entre Servicios

Protección de Sobrecarga

// Implementar rate limiting para llamadas internas
import rateLimit from 'express-rate-limit';
import RedisStore from 'rate-limit-redis';

const internalRateLimiter = rateLimit({
store: new RedisStore({
client: redisClient,
prefix: 'internal_rl:'
}),
windowMs: 60000, // 1 minuto
max: (req) => {
// Límites diferentes por servicio
const limits = {
'wallet-service': 1000, // 1000 req/min
'auth-service': 2000, // 2000 req/min
'monitor-btc': 500 // 500 req/min (menos frecuente)
};

return limits[req.callerService] || 100; // Default: 100 req/min
},
message: 'Too many requests from this service',
standardHeaders: true,
legacyHeaders: false
});

// Aplicar a rutas internas
app.use('/internal', internalRateLimiter);

Message Queue (Futuro)

RabbitMQ para Procesamiento Asíncrono

Casos de uso futuros:

// Procesamiento de KYC (puede tardar minutos)
await queue.publish('kyc.verification', {
userId: user._id,
documents: uploadedDocs,
priority: 'high'
});

// Envío de emails masivos
await queue.publish('email.campaign', {
recipients: userList,
template: 'newsletter',
priority: 'low'
});

// Generación de reportes
await queue.publish('report.generate', {
userId: admin._id,
type: 'monthly_summary',
format: 'pdf'
});

Logging Distribuido

Correlación de Requests

import { v4 as uuidv4 } from 'uuid';

// Middleware para generar request ID
function requestIdMiddleware(req, res, next) {
req.id = req.headers['x-request-id'] || uuidv4();
res.setHeader('X-Request-ID', req.id);
next();
}

// Al llamar otro servicio, propagar request ID
async function callOtherService(url: string, data: any, req: Request) {
const response = await axios.post(url, data, {
headers: {
'X-Request-ID': req.id, // Propagar ID
'X-Internal-Service': SERVICE_NAME,
'X-Service-Token': INTERNAL_TOKEN
}
});

return response.data;
}

// Logger que incluye request ID
logger.info({
requestId: req.id,
service: 'wallet-service',
action: 'create_wallet',
userId: user._id,
coin: 'ETH'
});

// En otro servicio, mismo request ID
logger.info({
requestId: req.id, // Mismo ID!
service: 'lambda-service',
action: 'generate_keypair',
coin: 'ETH'
});

Beneficio: Rastrear un request completo a través de múltiples servicios.


Health Checks

Monitoreo de Servicios

// health-check endpoint en cada servicio
app.get('/health', async (req, res) => {
const health = {
status: 'healthy',
timestamp: new Date().toISOString(),
service: SERVICE_NAME,
version: process.env.VERSION,
uptime: process.uptime(),

dependencies: {
mongodb: await checkMongoDB(),
redis: await checkRedis(),
externalAPIs: await checkExternalAPIs()
}
};

// Status code basado en dependencias
const allHealthy = Object.values(health.dependencies).every(d => d.healthy);

res.status(allHealthy ? 200 : 503).json(health);
});

// Verificar MongoDB
async function checkMongoDB(): Promise<{ healthy: boolean; latency: number }> {
const start = Date.now();

try {
await mongoose.connection.db.admin().ping();
return { healthy: true, latency: Date.now() - start };
} catch (error) {
return { healthy: false, latency: -1 };
}
}

// Verificar Redis
async function checkRedis(): Promise<{ healthy: boolean; latency: number }> {
const start = Date.now();

try {
await redisClient.ping();
return { healthy: true, latency: Date.now() - start };
} catch (error) {
return { healthy: false, latency: -1 };
}
}

Service Discovery (Docker)

Descubrimiento de Servicios en Docker

# docker-compose.yml
version: '3.8'

services:
auth-service:
image: swapbits/auth-service
networks:
- swapbits-network
# DNS automático: auth-service.swapbits-network

wallet-service:
image: swapbits/wallet-service
networks:
- swapbits-network
environment:
- AUTH_SERVICE_URL=http://auth-service:3000
- EXCHANGE_SERVICE_URL=http://exchange-service:3000
# Puede resolver "auth-service" por DNS

networks:
swapbits-network:
driver: bridge

En el código:

// No hardcodear IPs, usar nombres de servicio
const AUTH_SERVICE_URL = process.env.AUTH_SERVICE_URL || 'http://auth-service:3000';
const EXCHANGE_SERVICE_URL = process.env.EXCHANGE_SERVICE_URL || 'http://exchange-service:3000';

Timeouts y Deadlines

Configuración de Timeouts

const ServiceTimeouts = {
// Servicios internos rápidos
auth: {
validateToken: 2000, // 2s
getUserRoles: 1000 // 1s
},

// Servicios con lógica de negocio
wallet: {
createWallet: 10000, // 10s (llama Lambda)
getBalance: 5000, // 5s (llama blockchain)
sendTransaction: 15000 // 15s (llama Lambda)
},

exchange: {
getQuote: 3000, // 3s
executeSwap: 30000 // 30s (puede tardar)
},

// Servicios externos
blockchain: {
getBlock: 10000,
getTransaction: 10000,
estimateGas: 5000
},

// Servicios de terceros
bybit: {
placeOrder: 5000,
getOrderbook: 3000
}
};

Canales de Redis Pub/Sub

Organización de Canales

const RedisChannels = {
// Transacciones
'transaction:pending': 'Nueva transacción pendiente',
'transaction:confirmed': 'Transacción confirmada',
'transaction:failed': 'Transacción fallida',

// Wallets
'wallet:created': 'Wallet creada',
'wallet:balance_updated': 'Balance actualizado',

// Trading
'swap:initiated': 'Swap iniciado',
'swap:completed': 'Swap completado',
'order:filled': 'Orden ejecutada',

// Banking
'deposit:received': 'Depósito recibido',
'withdrawal:completed': 'Retiro completado',

// Notificaciones
'notification:*': 'Notificaciones por userId',

// Sistema
'system:alert': 'Alertas del sistema',
'system:health': 'Health check events'
};

Métricas de Comunicación

Monitoreo de Llamadas entre Servicios

interface InterServiceMetrics {
// Llamadas HTTP
totalRequests: number; // 10,000
successfulRequests: number; // 9,800
failedRequests: number; // 200
averageLatency: number; // 45ms
p95Latency: number; // 120ms
p99Latency: number; // 250ms

// Por servicio
byService: {
'auth-service': {
requests: 5000,
errors: 10,
avgLatency: 20
},
'exchange-service': {
requests: 2000,
errors: 50,
avgLatency: 80
}
};

// Circuit breakers
circuitBreakerOpen: string[]; // ['exchange-service']

// Pub/Sub
messagesPublished: number; // 50,000
messagesConsumed: number; // 49,950
messagesLost: number; // 50
}

Best Practices

Para Comunicación Inter-Servicios:

  1. Siempre usar timeouts - Nunca hacer requests sin timeout
  2. Implementar circuit breakers - Proteger contra fallos en cascada
  3. Propagar request IDs - Para debugging distribuido
  4. Validar service tokens - Autenticar servicios internos
  5. Usar rate limiting - Proteger servicios de sobrecarga
  6. Loggear todas las llamadas - Para auditoría y debugging
  7. Implementar health checks - Monitorear disponibilidad
  8. Usar retry con backoff - Reintentos inteligentes
  9. Separar rutas públicas/internas - Seguridad
  10. Preferir asíncrono cuando sea posible - Desacoplar servicios

Anti-Patterns a evitar:

  • ❌ Llamadas síncronas encadenadas largas (> 3 servicios)
  • ❌ Hardcodear URLs o IPs
  • ❌ No manejar errores de servicios externos
  • ❌ Reintentar indefinidamente
  • ❌ No propagar metadata (request ID, user context)