卡夫卡消费者多个主题

问题描述

我们可以使用以下API订阅多个主题:Consumer.subscribe(Arrays.asList(topic1,topic2),ConsumerRebalanceListener obj)

消费者拥有主题信息,我们可以通过如下创建OffsetAndMetadata对象来使用Consumer.commitAsync或Consumer.commitSync()。

ConsumerRecords<String, String> records = consumer.poll(long value);
for (TopicPartition partition : records.partitions()) {
    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
    for (ConsumerRecord<String, String> record : partitionRecords) {
        System.out.println(record.offset() + ": " + record.value());
    }
    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}

解决方法

我有一个主题列表(目前为10个),其规模将来可能会增加。我知道我们可以在每个主题中产生多个线程(每个主题)使用,但是就我而言,如果主题数量增加,那么从主题中使用的线程数量就会增加,这是我不希望的,因为主题不是太频繁地获取数据,因此线程将处于理想状态。

有没有办法让一个消费者从所有主题中消费?如果是,那我们如何实现呢?另外,Kafka将如何维护偏移量?请提出答案。

猜你在找的技术问答相关文章

如何检查配对的蓝牙设备是打印机还是扫描仪(Android)
是否允许实体正文进行HTTP DELETE请求?
如何将ZipInputStream转换为InputStream?
java.util.logging Java 8中的变量
PowerMockito.doReturn返回null
Java中的RESTful调用
Swing / Java:如何正确使用getText和setText字符串
特殊字符和重音字符
Android Studio中的ndk.dir错误
错误“找不到主类”