BlankKafkaListener.java
package com.blanksystem.blank.service.message.listener.kafka;
import com.blanksystem.blank.service.domain.exception.BlankApplicationServiceException;
import com.blanksystem.blank.service.domain.ports.input.message.listener.blank.BlankMessageListener;
import com.blanksystem.blank.service.message.mapper.BlankMessagingDataMapper;
import com.blanksystem.blank.service.message.model.avro.BlankAvroModel;
import com.lg5.spring.kafka.consumer.KafkaConsumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import java.util.List;
@Slf4j
@Service
public class BlankKafkaListener implements KafkaConsumer<BlankAvroModel> {
private final BlankMessageListener blankMessageListener;
private final BlankMessagingDataMapper mapper;
public BlankKafkaListener(BlankMessageListener blankMessageListener, BlankMessagingDataMapper mapper) {
this.blankMessageListener = blankMessageListener;
this.mapper = mapper;
}
@Override
@KafkaListener(
id = "${blanksystem.blank.events.journal.blank.consumer.group}",
topics = "${blanksystem.blank.events.journal.blank.topic}"
)
public void receive(@Payload List<BlankAvroModel> messages,
@Header(KafkaHeaders.RECEIVED_KEY) List<String> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
log.info("{} number of blank create messages received with keys {}, partitions {} and offsets {}"
+ ", sending for blank",
messages.size(),
keys.toString(),
partitions.toString(),
offsets.toString());
messages.forEach(blankAvroModel -> {
try {
log.info("Processing blank created for id: {}", blankAvroModel.getId());
blankMessageListener.blankCreated(mapper
.blankAvroModelToBlankModel(blankAvroModel));
} catch (Exception e) {
throw new BlankApplicationServiceException("Throwing DataAccessException in"
+ " BlankKafkaListener: " + e.getMessage(), e);
}
});
}
}