Hoje, vou falar um pouco da nossa abordagem para resolver o desafio do “dual write” ao lidar com o envio de eventos e destacar a importância desse processo na prevenção de inconsistências. Além disso, vou mostrar por que é crucial ter mecanismos que permitam o reprocessamento e o replay eficientes desses eventos. Antes de entrar nesses detalhes, vamos explorar um pouco o contexto e entender por que escolhemos esse caminho.
Estamos em 2016, e eu dou os primeiros passos no desenvolvimento das primeiras linhas de código da Monkey, no nosso primeiro escritório, onde passava longas horas. Desde o início, mesmo sendo o único membro da equipe na época, decidi optar por microservices.
Mas por que escolhi esse caminho? Reconhecia o potencial do projeto e entendia a importância de minimizar erros. Ter deploys separados proporciona uma maior segurança operacional. Para ilustrar, o core do projeto, responsável por funções críticas como o cálculo de juros, não sofre alterações há 42 dias. Isso significa que a parte mais vital do sistema, que traz receita para empresa, continua operando sem interferências de deploys não relacionados a ele, garantindo estabilidade e consistência.
E o exemplo acima é apenas um dos motivos, mas temos outros por exemplo a capacidade de escalar o sistema nos pontos realmente necessários resulta em uma redução de custos significativa. Além disso, a possibilidade de atualizar a versão de frameworks, como o Spring, em projetos de menor impacto, observando seu comportamento em produção antes de aplicar em projetos críticos, proporciona tranquilidade para toda a equipe, garantindo noites de sono mais tranquilas. 🙂
Vamos direto ao ponto. Para compreender o desafio do “dual write”, imagine que um usuário realizou uma operação de crédito na plataforma. Agora, é necessário enviar essa informação para o nosso broker, garantindo que outros microservices recebam a atualização. Nesse cenário, não podemos nos dar ao luxo de “perder” uma mensagem, especialmente considerando a integração crucial com a instituição financeira responsável pela operação. Portanto, é imprescindível que a escrita na base de dados e a publicação da mensagem sejam executadas de maneira consistente.
Agora que nós temos a visão do problema, vamos focar na solução, então, dado o problema eu preciso que: Uma única transação com a base de dados armazene os dados da operação e também armazene e dispare a operação para a base de dados.
Vou abstrair a classe da operação e vou focar apenas na camada de eventos, nós temos 2 classes que representam eventos:
StoredDomainEvent: É a classe que representa um evento enviado de um microservice, então ele sempre salva esses dados antes de enviar a mensagem.
@Entity
public class StoredDomainEvent implements Auditable {
@Id
@Getter
private String id;
private String relationId;
@Getter
private String content;
private boolean sent;
private Instant eventTimestamp;
private String eventType;
@CreatedBy
private String createUserId;
@CreatedIp
private String createUserIp;
...
}
ReceivedDomainEvent: É a classe que representa um evento recebido por um dos nossos microserives, então ele sempre salva esses dados quando inicia o consumo da mensagem do broker.
@Entity
public class ReceivedDomainEvent implements Auditable {
@Id
private String id;
private String relationId;
@Getter
private String content;
private boolean replay;
private Instant eventTimestamp;
@Getter
private String eventType;
@Getter
@CreatedBy
@Column(updatable = false)
private String createUserId;
@Setter
@CreatedIp
@Column(updatable = false)
private String createUserIp;
...
}
Agora que entendemos a estrutura dos nossos eventos, vamos examinar a minha classe abstrata. Todas as entidades utilizam essa classe abstrata para registrar os metadados (IP, usuário e datas), além de empregar o método registerEvent
. Essa função salva o StoredDomainEvent
e faz o registro do evento no mecanismo de Application Events do Spring.
public abstract class MonkeyAbstractEntity<T> implements Auditable, Serializable {
@CreatedDate
@Column(updatable = false)
private Instant createdAt;
@CreatedBy
@Column(updatable = false)
private String createUserId;
@CreatedIp
@Column(updatable = false)
private String createUserIp;
@LastModifiedDate
private Instant updatedAt;
@LastModifiedBy
private String lastUserId;
@LastModifiedIp
private String lastUserIp;
public T registerEvent(DomainEvent event) {
BeanUtil.getBean(DomainEventsStorage.class).save(new StoredDomainEvent(event));
BeanUtil.getBean(MonkeyEventPublisher.class).publishEvent(event);
return (T) this;
}
...
}
A classe StoredDomainEvent
é sempre salva com o atributo “sent” omo “false”. Esse valor só é atualizado quando a mensagem é efetivamente enviada para o broker. Essa abordagem permite realizar o reprocessamento em caso de falhas de entrega para o broker. Veja um exemplo de uma classe responsável por “escutar” o evento e enviá-lo para o broker:
@Component
@AllArgsConstructor
class PurchaseProducer {
private final PurchaseMessageChannel messageChannel;
@TransactionalEventListener(PurchaseCreatedEvent.class)
public void produce(OperationCreatedEvent event) {
messageChannel.send(event);
}
}
Agora, vamos analisar como fica nossa classe de “service” que realiza a persistência de uma oferta e registra o evento para que seja enviado para o broker:
@Service
@Transactional(rollbackFor = Exception.class)
class OperationCommand {
private final OperationRepository repository;
public Operation create(Operation operation) {
return repository.save(operation).registerEvent(operation);
}
}
Até este ponto, tudo está fluindo bem. No entanto, quem é responsável por definir o atributo “sent” da StoredDomainEvent
como “true”? Aqui é onde entra um pouco de magia. Temos um BeanPostProcessor
que intercepta todos os nossos métodos anotados com TransactionalEventListener
, desde que não estejam também marcados com CompletableEventOff
, que utilizamos para desativar o comportamento quando não é necessário. Abaixo, um exemplo de como realizamos essa interceptação:
static boolean needToComplete(Method method) {
CompletableEventOff completableEventOff = AnnotatedElementUtils.getMergedAnnotation(method,
CompletableEventOff.class);
if (completableEventOff != null) {
return false;
}
TransactionalEventListener annotation = AnnotatedElementUtils.getMergedAnnotation(method,
TransactionalEventListener.class);
return annotation != null
&& (annotation.phase().equals(AFTER_COMMIT) || annotation.phase().equals(AFTER_ROLLBACK));
}
Caso você não tenha familiaridade com o BeanPostProcessor
e com proxys, você pode entender um pouco aqui nesse artigo e facilmente criar um interceptor no seu projeto 🙂
Portanto, todos os métodos que disparam eventos, agora encapsulados pelo proxy, passarão pelo código responsável por marcar o evento como enviado. Confira o trecho de código abaixo:
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
try {
Object result = invocation.proceed();
if (needToComplete(invocation.getMethod())) {
if (invocation.getArguments()[0] instanceof DomainEvent event) {
if (StringUtils.isNotEmpty(event.getStoredEventId())) {
storedDomainEventRepository.markUncompleted(event.getStoredEventId());
}
}
}
return result;
}
catch (Exception e) {
LOGGER.all()
.logKey("error-on-publish-message")
.value("Invocation of listener {" + invocation.getMethod()
+ "} failed. Leaving event publication uncompleted.")
.logKey("exception")
.value(getStackTrace(e))
.asError();
throw e;
}
}
Agora, tenho a garantia de que o evento foi enviado e, caso contrário, o atributo “sent” permanecerá como false. Um schedule, que é executado a cada 5 minutos, verifica todos os eventos com “sent” igual a false e os reenvia para o broker.
@Component
@AllArgsConstructor
public class MessageReprocessorSchedule {
private final MonkeyReprocessorCommand command;
private final MonkeyReplayCommand replayCommand;
@Scheduled(cron = "0 0/5 * * * *", zone = "${internationalization.timezone}")
@SchedulerLock(name = "event-reprocessor", lockAtMostFor = "PT5M", lockAtLeastFor = "PT5M")
public void process() {
command.process();
replayCommand.process();
}
}
Para não me estender muito, vou abordar brevemente a classe ReceivedDomainEvent
. Essa classe é essencialmente utilizada para armazenar todos os eventos quando são consumidos do broker. Por que fazemos isso? A ideia principal aqui é possibilitar o reprocessamento de um evento em um microserviço sem a necessidade de reenviá-lo para o broker, evitando que seja consumido por todos os microservices novamente.
Imagine um cenário em que, ao criar um usuário, envio um e-mail de boas-vindas e adiciono seus dados ao meu backoffice. Se, por algum motivo, o backoffice não conseguir consumir o evento, posso simplesmente fazer o replay, sem a necessidade de reenviar o e-mail de boas-vindas ao usuário.
Isso é realizado pelo mesmo schedule mostrado acima que busca todos os eventos marcados com o atributo “replay” igual a true. Esses eventos são então enviados ao mecanismo EventListener
do Spring e reprocessados.
É isso, o texto ficou um pouco extenso, mas acredito que consegui transmitir a ideia. Se tiverem dúvidas ou comentários, fiquem à vontade para me contatar nas redes sociais ou deixar um comentário aqui. 🙂