- Published on
Spring-kafka简单使用示例
- Authors
- Name
- Jason Yang
- @yangjinlong86
要实现向指定 topic 发送消息的功能,需要使用 Spring Kafka 提供的 KafkaTemplate
。以下是实现步骤:
添加依赖:确保项目中包含了 Spring Kafka 的依赖。如果使用的是 Maven,可以在
pom.xml
中添加以下依赖:<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
配置 KafkaTemplate:在 Spring 配置类中配置
KafkaTemplate
。例如:import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.serializer.JsonSerializer; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaProducerConfig { @Bean public ProducerFactory<String, Object> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProps.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, Object> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
在 KafkaService 中使用 KafkaTemplate 发送消息:
package com.example.springboot.service; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class KafkaService { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; public void sendMessage(String topic, Object message) { kafkaTemplate.send(topic, message); } }
使用 KafkaService 发送消息:可以在其他服务或控制器中注入
KafkaService
并调用sendMessage
方法来发送消息。import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class MessageController { @Autowired private KafkaService kafkaService; @PostMapping("/send") public String sendMessage(@RequestParam String topic, @RequestBody Object message) { kafkaService.sendMessage(topic, message); return "Message sent to topic: " + topic; } }