Kafka:使用Java更改特定主题的分区数

问题描述

是的,有可能。您必须在中访问AdminUtilsscala类kafka_2.11-0.9.0.1.jar添加分区。

AdminUtils支持主题中的分区数只能增加。您可能需要kafka_2.11-0.9.0.1.jarzk- client-0.8.jarscala-library-2.11.8.jarscala-parser- combinators_2.11-1.0.4.jar在类路径罐子。

以下代码的某些部分是从kafka-cloudera示例中借用/启发的。

package org.apache.kafka.examples;

import java.io.Closeable;

import org.I0Itec.zkclient.ZkClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import kafka.admin.AdminOperationException;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode.Enforced$;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;

public class Test {

    static final Logger logger = LogManager.getLogger();

    public Test() {
        // TODO Auto-generated constructor stub
    }

    public static void addPartitions(String zkServers, String topic, int partitions) {

        try (AutoZkClient zkClient = new AutoZkClient(zkServers)) {
            ZkUtils zkUtils = ZkUtils.apply(zkClient, false);

            if (AdminUtils.topicExists(zkUtils, topic)) {
                logger.info("Altering topic {}", topic); 
                try {
                    AdminUtils.addPartitions(zkUtils, topic, partitions, "", true, Enforced$.MODULE$);
                    logger.info("Topic {} altered with partitions : {}", topic, partitions); 
                } catch (AdminOperationException aoe) {
                    logger.info("Error while altering partitions for topic : {}", topic, aoe); 
                } 
            } else {
                logger.info("Topic {} doesn't exists", topic); 
            } 
        } 
    }

    // Just exists for Closeable convenience 
    private static final class AutoZkClient extends ZkClient implements Closeable {

        static int sessionTimeout = 30_000;
        static int connectionTimeout = 6_000;

        AutoZkClient(String zkServers) {
            super(zkServers, sessionTimeout, connectionTimeout, ZKStringSerializer$.MODULE$);
        }
    }

    public static void main(String[] args) {

        addPartitions("localhost:2181", "hello", 20);
    }
}

解决方法

我是Kafka的新手,正在使用新的KafkaProducer和KafkaConsumer,版本:0.9.0.1

在创建特定主题之后,java中是否有任何方法可以更改/更新特定主题的分区数。

我没有使用Zookeeper创建主题。当发布请求到达时,我的KafkaProducer会自动创建主题。

如果还不够,我还可以提供更多详细信息

猜你在找的技术问答相关文章

如何检查配对的蓝牙设备是打印机还是扫描仪(Android)
是否允许实体正文进行HTTP DELETE请求?
如何将ZipInputStream转换为InputStream?
java.util.logging Java 8中的变量
PowerMockito.doReturn返回null
Java中的RESTful调用
Swing / Java:如何正确使用getText和setText字符串
特殊字符和重音字符
Android Studio中的ndk.dir错误
错误“找不到主类”