maven如何实现kafka消息消费

   2025-02-22 6830
核心提示:要使用 Maven 实现 Kafka 消息消费,需要按照以下步骤进行操作:在 Maven 项目的 pom.xml 文件中添加 Kafka 客户端的依赖:depen

要使用 Maven 实现 Kafka 消息消费,需要按照以下步骤进行操作:

在 Maven 项目的 pom.xml 文件中添加 Kafka 客户端的依赖:
<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.1</version></dependency></dependencies>
创建一个 Kafka 消费者:
import org.apache.kafka.clients.consumer.Consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;import java.util.Collections;import java.util.Properties;public class KafkaConsumerExample {public static void main(String[] args) {String bootstrapServers = "localhost:9092";String groupId = "my-group";String topic = "my-topic";Properties properties = new Properties();properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());Consumer<String, String> consumer = new KafkaConsumer<>(properties);consumer.subscribe(Collections.singleton(topic));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.println("Received message: " + record.value());}}}}

在上述代码中,需要根据实际情况修改 bootstrapServersgroupIdtopic 的值。

运行上述代码,即可启动 Kafka 消费者并开始消费消息。

以上就是使用 Maven 实现 Kafka 消息消费的步骤。

 
 
更多>同类维修知识
推荐图文
推荐维修知识
点击排行
网站首页  |  关于我们  |  联系方式  |  用户协议  |  隐私政策  |  网站留言