0. 前言
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。 Kafka 是一种高吞吐量的 分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。本章介绍SpringBoot集成Kafka 收发消息。本文介绍了SpringBoot集成Kafka。
1. 环境
- spring-boot: 2.6.6
- spring-kafka: 2.8.4
2. 配置文件
2.1 yml配置文件
spring:
application:
name: springboot-kafka
kafka:
bootstrap-servers: 192.168.30.130:9092
producer:
# 序列化key的类
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 序列化value的类
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
partitioner:
# 指定分区规则
class: com.engrz.lab.springboot.kafka.CustomPartitioner
spring:
json:
value:
default:
type:
trusted:
# 允许json反序列化的包
packages: com.engrz.lab.*
add:
type:
headers: false
consumer:
#bootstrap-servers: 192.168.30.130:9092 # 会覆盖 spring.kafka.bootstrap-servers 配置
# 消费者所属消息组
group-id: group_1
# 反序列化key的类
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 反序列化value的类
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
template:
default-topic: kwz_1
2.2 KafkaProducerConfig
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String producerBootstrapServers;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configs);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
2.3 KafkaConsumerConfig
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String consumerBootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String consumerGroupId;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(3);
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
3. 生产消费者配置
3.1 producer
@Component
public class Producer {
@Value("${spring.kafka.template.default-topic}")
private String defaultTopic;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message) {
kafkaTemplate.send(defaultTopic, message);
}
public void sendMessage(String key, String message) {
kafkaTemplate.send(defaultTopic, key, message);
}
}
3.2 consumer
@Component
@Slf4j
public class Consumer {
@KafkaListener(topics = "kwz_1")
public void listener(ConsumerRecord<String, String> record) {
String value = record.value();
log.info("【receive】:{}", value);
}
}