Kafka之消费与激情
副标题[/!--empirenews.page--]
首先,我们来看看消费。Kafka提供了非常简单的消费API,使用者只需初始化Kafka的Broker Server地址,然后实例化KafkaConsumer类即可拿到Topic中的数据。一个简单的Kafka消费实例代码如下所示: public class JConsumerSubscribe extends Thread { public static void main(String[] args) { JConsumerSubscribe jconsumer = new JConsumerSubscribe(); jconsumer.start(); } /** 初始化Kafka集群信息. */ private Properties configure() { Properties props = new Properties(); props.put("bootstrap.servers", "dn1:9092,dn2:9092,dn3:9092");// 指定Kafka集群地址 props.put("group.id", "ke");// 指定消费者组 props.put("enable.auto.commit", "true");// 开启自动提交 props.put("auto.commit.interval.ms", "1000");// 自动提交的时间间隔 // 反序列化消息主键 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 反序列化消费记录 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return props; } /** 实现一个单线程消费者. */ @Override public void run() { // 创建一个消费者实例对象 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configure()); // 订阅消费主题集合 consumer.subscribe(Arrays.asList("test_kafka_topic")); // 实时消费标识 boolean flag = true; while (flag) { // 获取主题消息数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) // 循环打印消息记录 System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } // 出现异常关闭消费者对象 consumer.close(); }} 上述代码我们就可以非常便捷地拿到Topic中的数据。但是,当我们调用poll方法拉取数据的时候,Kafka Broker Server做了那些事情。接下来,我们可以去看看源代码的实现细节。核心代码如下: org.apache.kafka.clients.consumer.KafkaConsumer private ConsumerRecords<K, V> poll(final long timeoutMs, final boolean includeMetadataInTimeout) { acquireAndEnsureOpen(); try { (编辑:沧州站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |