MessageBatchingService¶
This docs was updated at: 2026-02-23
com.paragon.messaging.batching.MessageBatchingService · Class
Serviço principal de batching e rate limiting de mensagens.
Funcionalidades:
- Batching adaptativo com timeout e silence threshold
- Rate limiting híbrido (Token Bucket + Sliding Window)
- Deduplicação via MessageStore
- Backpressure handling configurável
- Error handling com retry exponencial
- Pre/post hooks extensíveis
- Processamento assíncrono com virtual threads
Exemplo de uso:
MessageBatchingService service = MessageBatchingService.builder()
.config(batchingConfig)
.processor(messageProcessor)
.addPreHook(loggingHook)
.addPostHook(metricsHook)
.build();
// No webhook do WhatsApp
service.receiveMessage(userId, messageId, content, Instant.now());
Since: 1.0
Methods¶
builder¶
Builder para MessageBatchingService.
receiveMessage¶
Recebe mensagem nova do usuário para batching.
Thread Safety: Este método é thread-safe.
Fluxo:
- Verifica deduplicação (se MessageStore configurado)
- Aplica rate limiting
- Adiciona ao buffer do usuário
- Agenda processamento adaptativo
Parameters
| Name | Description |
|---|---|
userId |
ID do usuário (ex: número WhatsApp) |
message |
mensagem completa recebida |
isDuplicate¶
Verifica se mensagem é duplicata.
isRateLimited¶
Verifica se usuário excedeu rate limit.
scheduleAdaptiveProcessing¶
Agenda processamento adaptativo do buffer.
Usa silence threshold: se usuário para de enviar por X segundos, processa imediatamente (não espera timeout completo).
checkAndProcessIfSilent¶
Verifica se houve silêncio e processa se sim.
processIfPending¶
Processa buffer se ainda houver mensagens pendentes (timeout atingido).
processBatch¶
Processa batch de mensagens em virtual thread.
processInVirtualThread¶
Processamento real em virtual thread (com retry).
createHookContext¶
private HookContext createHookContext(
String userId, List<InboundMessage> messages, int retryCount)
Cria HookContext para hooks.
executeHooks¶
Executa lista de hooks.
markAsProcessed¶
Marca mensagens como processadas (deduplicação).
handleProcessingError¶
private void handleProcessingError(
String userId,
List<InboundMessage> messages,
int retryCount,
Exception error,
HookContext context)
Trata erro de processamento (retry com exponential backoff).
handleBackpressure¶
private void handleBackpressure(
String userId, UserMessageBuffer buffer, InboundMessage newMessage)
Trata backpressure quando buffer cheio.
handleRateLimitExceeded¶
Trata rate limit excedido.
notifyUser¶
Notifica usuário (deve ser implementado pelo consumidor da API).
getOrCreateBuffer¶
Obtém ou cria buffer para usuário.
getRateLimiter¶
Obtém ou cria rate limiter para usuário.
shutdown¶
Shutdown graceful do serviço.
getStats¶
Retorna estatísticas do serviço (útil para monitoramento).