Dual Write: Mantendo a Consistência

Hoje, vou falar um pouco da nossa abordagem para resolver o desafio do “dual write” ao lidar com o envio de eventos e destacar a importância desse processo na prevenção de inconsistências. Além disso, vou mostrar por que é crucial ter mecanismos que permitam o reprocessamento e o replay eficientes desses eventos. Antes de entrar nesses detalhes, vamos explorar um pouco o contexto e entender por que escolhemos esse caminho. 

Estamos em 2016, e eu dou os primeiros passos no desenvolvimento das primeiras linhas de código da Monkey, no nosso primeiro escritório, onde passava longas horas. Desde o início, mesmo sendo o único membro da equipe na época, decidi optar por microservices. 

Mas por que escolhi esse caminho? Reconhecia o potencial do projeto e entendia a importância de minimizar erros. Ter deploys separados proporciona uma maior segurança operacional. Para ilustrar, o core do projeto, responsável por funções críticas como o cálculo de juros, não sofre alterações há 42 dias. Isso significa que a parte mais vital do sistema, que traz receita para empresa, continua operando sem interferências de deploys não relacionados a ele, garantindo estabilidade e consistência. 

E o exemplo acima é apenas um dos motivos, mas temos outros por exemplo a capacidade de escalar o sistema nos pontos realmente necessários resulta em uma redução de custos significativa. Além disso, a possibilidade de atualizar a versão de frameworks, como o Spring, em projetos de menor impacto, observando seu comportamento em produção antes de aplicar em projetos críticos, proporciona tranquilidade para toda a equipe, garantindo noites de sono mais tranquilas. 🙂 

Vamos direto ao ponto. Para compreender o desafio do “dual write”, imagine que um usuário realizou uma operação de crédito na plataforma. Agora, é necessário enviar essa informação para o nosso broker, garantindo que outros microservices recebam a atualização. Nesse cenário, não podemos nos dar ao luxo de “perder” uma mensagem, especialmente considerando a integração crucial com a instituição financeira responsável pela operação. Portanto, é imprescindível que a escrita na base de dados e a publicação da mensagem sejam executadas de maneira consistente. 

Agora que nós temos a visão do problema, vamos focar na solução, então, dado o problema eu preciso que: Uma única transação com a base de dados armazene os dados da operação e também armazene e dispare a operação para a base de dados. 

Vou abstrair a classe da operação e vou focar apenas na camada de eventos, nós temos 2 classes que representam eventos: 

StoredDomainEvent: É a classe que representa um evento enviado de um microservice, então ele sempre salva esses dados antes de enviar a mensagem. 

@Entity
public class StoredDomainEvent implements Auditable {

	@Id
	@Getter
	private String id;

	private String relationId;

	@Getter
	private String content;

	private boolean sent;

	private Instant eventTimestamp;

	private String eventType;

	@CreatedBy
	private String createUserId;

	@CreatedIp
	private String createUserIp;
   
    ...
}

ReceivedDomainEvent: É a classe que representa um evento recebido por um dos nossos microserives, então ele sempre salva esses dados quando inicia o consumo da mensagem do broker. 

@Entity
public class ReceivedDomainEvent implements Auditable {

	@Id
	private String id;

	private String relationId;

	@Getter
	private String content;

	private boolean replay;

	private Instant eventTimestamp;

	@Getter
	private String eventType;

	@Getter
	@CreatedBy
	@Column(updatable = false)
	private String createUserId;

	@Setter
	@CreatedIp
	@Column(updatable = false)
	private String createUserIp;

    ...

}


Agora que entendemos a estrutura dos nossos eventos, vamos examinar a minha classe abstrata. Todas as entidades utilizam essa classe abstrata para registrar os metadados (IP, usuário e datas), além de empregar o método registerEvent. Essa função salva o StoredDomainEvent e faz o registro do evento no mecanismo de Application Events do Spring.

public abstract class MonkeyAbstractEntity<T> implements Auditable, Serializable {

	@CreatedDate
	@Column(updatable = false)
	private Instant createdAt;

	@CreatedBy
	@Column(updatable = false)
	private String createUserId;

	@CreatedIp
	@Column(updatable = false)
	private String createUserIp;

	@LastModifiedDate
	private Instant updatedAt;

	@LastModifiedBy
	private String lastUserId;

	@LastModifiedIp
	private String lastUserIp;

	public T registerEvent(DomainEvent event) {
		BeanUtil.getBean(DomainEventsStorage.class).save(new StoredDomainEvent(event));
		BeanUtil.getBean(MonkeyEventPublisher.class).publishEvent(event);
		return (T) this;
	}

    ...

}

A classe StoredDomainEvent é sempre salva com o atributo “sent” omo “false”. Esse valor só é atualizado quando a mensagem é efetivamente enviada para o broker. Essa abordagem permite realizar o reprocessamento em caso de falhas de entrega para o broker. Veja um exemplo de uma classe responsável por “escutar” o evento e enviá-lo para o broker:

@Component
@AllArgsConstructor
class PurchaseProducer {

	private final PurchaseMessageChannel messageChannel;

	@TransactionalEventListener(PurchaseCreatedEvent.class)
	public void produce(OperationCreatedEvent event) {
		messageChannel.send(event);
	}
}

Agora, vamos analisar como fica nossa classe de “service” que realiza a persistência de uma oferta e registra o evento para que seja enviado para o broker:

@Service
@Transactional(rollbackFor = Exception.class)
class OperationCommand {
    
    private final OperationRepository repository;

    public Operation create(Operation operation) {
        return repository.save(operation).registerEvent(operation);
    }
}


Até este ponto, tudo está fluindo bem. No entanto, quem é responsável por definir o atributo “sent” da StoredDomainEvent como “true”? Aqui é onde entra um pouco de magia. Temos um BeanPostProcessor que intercepta todos os nossos métodos anotados com TransactionalEventListener, desde que não estejam também marcados com CompletableEventOff, que utilizamos para desativar o comportamento quando não é necessário. Abaixo, um exemplo de como realizamos essa interceptação:

static boolean needToComplete(Method method) {

				CompletableEventOff completableEventOff = AnnotatedElementUtils.getMergedAnnotation(method,
						CompletableEventOff.class);

				if (completableEventOff != null) {
					return false;
				}
				
				TransactionalEventListener annotation = AnnotatedElementUtils.getMergedAnnotation(method,
						TransactionalEventListener.class);

				return annotation != null
						&& (annotation.phase().equals(AFTER_COMMIT) || annotation.phase().equals(AFTER_ROLLBACK));
}

Caso você não tenha familiaridade com o BeanPostProcessor e com proxys, você pode entender um pouco aqui nesse artigo e facilmente criar um interceptor no seu projeto 🙂 


Portanto, todos os métodos que disparam eventos, agora encapsulados pelo proxy, passarão pelo código responsável por marcar o evento como enviado. Confira o trecho de código abaixo:

@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
    try {
        Object result = invocation.proceed();
        if (needToComplete(invocation.getMethod())) {
            if (invocation.getArguments()[0] instanceof DomainEvent event) {
                if (StringUtils.isNotEmpty(event.getStoredEventId())) {
                    storedDomainEventRepository.markUncompleted(event.getStoredEventId());
                }
            }
        }
        return result;
    }
    catch (Exception e) {
        LOGGER.all()
                .logKey("error-on-publish-message")
                .value("Invocation of listener {" + invocation.getMethod()
                        + "} failed. Leaving event publication uncompleted.")
                .logKey("exception")
                .value(getStackTrace(e))
                .asError();
        throw e;
    }
}

Agora, tenho a garantia de que o evento foi enviado e, caso contrário, o atributo “sent” permanecerá como false. Um schedule, que é executado a cada 5 minutos, verifica todos os eventos com “sent” igual a false e os reenvia para o broker.

@Component
@AllArgsConstructor
public class MessageReprocessorSchedule {

	private final MonkeyReprocessorCommand command;

	private final MonkeyReplayCommand replayCommand;

	@Scheduled(cron = "0 0/5 * * * *", zone = "${internationalization.timezone}")
	@SchedulerLock(name = "event-reprocessor", lockAtMostFor = "PT5M", lockAtLeastFor = "PT5M")
	public void process() {
		command.process();
		replayCommand.process();
	}

}


Para não me estender muito, vou abordar brevemente a classe ReceivedDomainEvent. Essa classe é essencialmente utilizada para armazenar todos os eventos quando são consumidos do broker. Por que fazemos isso? A ideia principal aqui é possibilitar o reprocessamento de um evento em um microserviço sem a necessidade de reenviá-lo para o broker, evitando que seja consumido por todos os microservices novamente.

Imagine um cenário em que, ao criar um usuário, envio um e-mail de boas-vindas e adiciono seus dados ao meu backoffice. Se, por algum motivo, o backoffice não conseguir consumir o evento, posso simplesmente fazer o replay, sem a necessidade de reenviar o e-mail de boas-vindas ao usuário.

Isso é realizado pelo mesmo schedule mostrado acima que busca todos os eventos marcados com o atributo “replay” igual a true. Esses eventos são então enviados ao mecanismo EventListener do Spring e reprocessados.

É isso, o texto ficou um pouco extenso, mas acredito que consegui transmitir a ideia. Se tiverem dúvidas ou comentários, fiquem à vontade para me contatar nas redes sociais ou deixar um comentário aqui. 🙂

Dual Write: Mantendo a Consistência