BlankEventKafkaPublisher.java
package com.blanksystem.blank.service.message.publisher.kafka;
import com.blanksystem.blank.service.domain.config.BlankServiceConfigData;
import com.blanksystem.blank.service.domain.entity.Blank;
import com.blanksystem.blank.service.domain.event.BlankCreatedEvent;
import com.blanksystem.blank.service.domain.ports.output.message.publisher.BlankMessagePublisher;
import com.blanksystem.blank.service.domain.valueobject.BlankId;
import com.blanksystem.blank.service.message.mapper.BlankMessagingDataMapper;
import com.blanksystem.blank.service.message.model.avro.BlankAvroModel;
import com.lg5.spring.kafka.producer.KafkaMessageHelper;
import com.lg5.spring.kafka.producer.service.KafkaProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class BlankEventKafkaPublisher implements BlankMessagePublisher {
private final BlankMessagingDataMapper customerMessagingDataMapper;
private final KafkaProducer<String, BlankAvroModel> kafkaProducer;
private final BlankServiceConfigData customerServiceConfigData;
private final KafkaMessageHelper kafkaMessageHelper;
public BlankEventKafkaPublisher(BlankMessagingDataMapper customerMessagingDataMapper,
KafkaProducer<String, BlankAvroModel> kafkaProducer,
BlankServiceConfigData customerServiceConfigData, KafkaMessageHelper kafkaMessageHelper) {
this.customerMessagingDataMapper = customerMessagingDataMapper;
this.kafkaProducer = kafkaProducer;
this.customerServiceConfigData = customerServiceConfigData;
this.kafkaMessageHelper = kafkaMessageHelper;
}
@Override
public void publish(BlankCreatedEvent blankCreatedEvent) {
final Blank blank = blankCreatedEvent.getBlank();
final BlankId blankId = blank.getId();
log.info("Received BlankCreatedEvent for blank id: {}", blankId.getValue());
try {
final BlankAvroModel blankAvroModel =
customerMessagingDataMapper.customerCreatedEventToCustomerRequestAvroModel(blankCreatedEvent);
kafkaProducer.send(
customerServiceConfigData.getTopic(),
blankAvroModel.getId(),
blankAvroModel,
kafkaMessageHelper.getCallback(blankAvroModel.getId(), blankAvroModel));
log.info("BlankCreatedEvent sent to kafka for blank id: {} ", blankAvroModel.getId());
} catch (Exception e) {
log.error("Error while sending BlankCreatedEvent to kafka for blank id: {},"
+ " error: {}", blank.getId().getValue(), e.getMessage());
}
}
}