1.前言
从Yarn RPC Server到Kafka Server,凡是有高并发需求的服务端,无一例外采用了基于了Reactor设计模式。在我的博客《Hadoop RPC Server基于Reactor模式和Java NIO 的架构和原理》中,分析了Yarn的基于Reactor设计模式和Java NIO实现的RPC Server的架构和设计,而Kafka的Server端网络层也同样使用了Reactor设计模式。Reactor模式有以下显著特定:
1. 通过事件派发、分离IO操作,来提高系统的并发特性
2. 提供粗粒度的并发控制,使用单线程实现,避免了复杂的并发处理
下文中,我们就从Kafka Server端网络层代码入手,分析和讲解Kafka基于Ractor模式的具体网络层实现以及网络层和具体的业务层的衔接逻辑。在我的另外一篇博客《基于Java NIO的Kafka底层网络层源码和架构》中,已经详细讲解了Kafka使用Java NIO实现的底层网络架构,本文讲解Reactor设计模式,将忽略具体的网络层,而将注意力放在NIO的上层,即网络事件的派发、请求和响应的处理上面。
同样,事先说明,Kafka自己对NIO的Selector进行了封装,放在了org.apache.kafka.common.network.Selector
,为了与java原生的java.nio.channels.Selector
区分,我参照《Apache Kafka源码剖析》一书的叫法,将org.apache.kafka.common.network.Selector
叫做KSelector
。Kafka的客户端与服务器端的通信以及服务器之间的通信,底层都是用KSelector
进行。
为了便于在代码层理解,我花了整个KafkaServer从网络层消息到业务处理,以及相反,业务处理结果通过网络层返回的基于Reactor模式的处理流如下:
从网络层接收请求到交付给业务处理流程示意图:
从业务处理层返回结果,到将结果通过网络层返回给客户端的流程图:
2. SocketServer初始化和启动
Kafka的网络层入口类是SocketServer。
我们知道,kafka.Kafka
是Kafka Broker的入口类,kafka.Kafka.main()
是Kafka Server的main()方法,即Kafka Broker的启动入口。我们跟踪代码,即沿着方法调用栈kafka.Kafka.main() -> KafkaServerStartable() -> KafkaServer().startup
可以从main()
方法入口一直跟踪到SocketServer即网络层对象的创建,这意味着Kafka Server启动的时候会初始化并启动SocketServer,即我们所熟知的打开9092端口,这样,SocketServer启动完毕以后,就可以通过9092端口监听、解析、处理所有请求。
SocketServer的核心任务,是完成9092端口的绑定,监听并处理客户端或者其它Kafka Broker的请求,将请求分派,然后将相应返回给发起请求的对应客户端或者其它的Kafka Broker。核心角色,是同基本Reactor模式一致的负责接收请求的Acceptor角色、负责具体请求管理的Processor角色、负责请求和响应队列的RequestChannel角色以及负责管理、限制整个网络负载的ConnectionQuotas角色。
private val endpoints = config.listeners
private val numProcessorThreads = config.numNetworkThreads
private val maxQueuedRequests = config.queuedMaxRequests
private val totalProcessorThreads = numProcessorThreads * endpoints.size
private val maxConnectionsPerIp = config.maxConnectionsPerIp
private val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides
//创建RequestChannel,有totalProcessorThreads个responseQueue队列,
val requestChannel = new RequestChannel(totalProcessorThreads,maxQueuedRequests)
//所有的processers,长度为totalProcessorThreads
private val processors = new Array[Processor](totalProcessorThreads)
def startup() {
this.synchronized {
connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp,maxConnectionsPerIpOverrides)
val sendBufferSize = config.socketSendBufferBytes
val recvBufferSize = config.socketReceiveBufferBytes
val brokerId = config.brokerId
var processorBeginIndex = 0
endpoints.values.foreach { endpoint =>//遍历endPoint集合,对于每一个endpoint,创建一个acceptor和多个processor
val protocol = endpoint.protocolType
val processorEndIndex = processorBeginIndex + numProcessorThreads //按照序号,创建process
for (i <- processorBeginIndex until processorEndIndex)
processors(i) = newProcessor(i,connectionQuotas,protocol)
val acceptor = new Acceptor(endpoint,sendBufferSize,recvBufferSize,brokerId,processors.slice(processorBeginIndex,processorEndIndex),connectionQuotas)
acceptors.put(endpoint,acceptor)//保存endr的point和accepto对应关系
//起一个名字叫做"kafka-socket-acceptor-%s-%d".format(protocol.toString,endpoint.port)的acceptor线程,非deamon
Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString,endpoint.port),acceptor,false).start()
acceptor.awaitStartup()//一直等到acceptor的run()方法的第一条语句开始执行,才证明已经启动
processorBeginIndex = processorEndIndex
}
}
SocketServer的启动方法中,会遍历本机的所有EndPoint(一个EndPoint一般对应一个网卡),为每一个EndPoint创建一个唯一独立的SocketServer.Acceptor()对象,负责处理这个EndPoint上的所有请求。我们从代码private val endpoints = config.listeners
可以看到EndPoint是通过server.properties的listeners
中进行配置的,用户可以配置一个或者多个listener。
- numProcessorThreads:通过num.network.threads进行配置,单个Acceptor所管理的Processor对象的数量;
- maxQueuedRequests:通过queued.max.requests进行配置,请求队列所允许的最大的未响应请求的数量,用来给ConnectionQuotas进行请求限额控制,避免Kafka Server产生过大的网络负载;
- totalProcessorThreads:计算方式为numProcessorThreads * endpoints.size,即单台机器总的Processor的数量;
- maxConnectionsPerIp:配置项为max.connections.per.ip,单个IP上的最大连接数,用来给ConnectionQuotas控制连接数;
- maxConnectionsPerIpOverrides:针对每个IP进行的个性化限制,因为每个IP的最大连接数限额可能会差异化,因此可以通过此项覆盖maxConnectionsPerIp中的全局统一配置,这是在KAFKA-1512中增加的improvement;
同时,负责创建一个空的SocketServer.Acceptor数组,用来放置所有的Processor线程对象,但是并未实际创建Processor对象。Processor是由Acceptor直接管理,因此也是由SocketServer.Acceptor负责创建的,这符合角色分层的原则。角色分层在Yarn的代码中也体现得非常好,这样做可以让一个复杂的系统的每个功能模块都变得清晰可控。每层角色只会负责自己还有自己直接管理的直接下层角色的初始化和启动,绝对不会去触碰不是自己所直接管理的角色。
根据Acceptor的数量,将这些线程对象平均分配给Acceptor。然后,一直等待Acceptor启动完成,SocketServer.startup()会一直阻塞等待启动完成才会退出代表SocketServer完成启动。阻塞方式我们在SocketServer.Acceptor的讲解中会进行分析。总之,下层多个服务全部启动完成,上层服务才算启动完成,这也是职责分层的原则。
3.SocketServer.Acceptor
每一个SocketServer.Acceptor负责唯一一个endpoint上的网络请求和响应的管理。正常情况下,我们会给的Kafka Server配置唯一的一个Endpoint。但是有些服务器具有多个网卡驱动,因此可以配置多个EndPoint。旧版本的Kafka使用host.name来标识集群中的主机身份,到了新版本的kakfa,则改为使用listeners和advertised.listeners 进行配置,这主要用于复杂网络环境下的Kafka配置,比如,我们的Kafka Server安装在亚马逊云上,三台服务器,每台服务器都有一个内网IP和外网IP,Server之间的通信走内网IP以节省网络流量,而外网用户的访问则只能使用公网IP,这时候我们就可以将内网IP的host配置在listeners
中,而将外网IP配置在advertised.listeners
中。在下文中,我们默认一台机器只有一个网卡的基本情况,以避免不必要的麻烦。
private[kafka] class Acceptor(val endPoint: EndPoint,val sendBufferSize: Int,val recvBufferSize: Int,brokerId: Int,processors: Array[Processor],connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
private val nioSelector = NSelector.open()
val serverChannel = openServerSocket(endPoint.host,endPoint.port)//创建一个ServerSocketChannel,监听endPoint.host,endPoint.port套接字
//Acceptor被构造的时候就会启动所有的processor线程
this.synchronized {
//每个processor创建一个单独线程
processors.foreach { processor =>
Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId,endPoint.protocolType.toString,processor.id),processor,false).start()
}
}
Acceptor的构造方法中,首先通过openServerSocket()打开自己负责的EndPoint的Socket,即打开端口并启动监听。
然后,Acceptor会负责构造自己管理的一个或者多个Processor对象。其实,每一个Processor都是一个独立线程,下面在介绍Processor的时候会说到。
Acceptor使用CountDownLatch
对象来标记启动或者关闭是否完成:
当Acceptor()完成启动,会调用
/** * Record that the thread startup is complete */
protected def startupComplete() = {
startupLatch.countDown()
}
方法将startupLatch置位为0,通知上层SocketServer自己已经完成启动。此时上层SocketServer正通过调用startupLatch.await一直阻塞等待当前acceptor完成初始化。当SocketServer管理的所有Acceptor均完成了启动,SocketServer完成启动。
def run() {
//向selector注册channel,可以接收ACCEPT事件,只有非阻塞的serverChannel才可以注册给Selector
serverChannel.register(nioSelector,SelectionKey.OP_ACCEPT)
startupComplete() //启动完成的标记,放在run()方法的第一行,说明当确认线程开始运行,则认为启动成功,启动过以后就是运行了
try {
var currentProcessor = 0
while (isRunning) {///无限循环,持续等待OP_ACCEPT事件发生
try {
val ready = nioSelector.select(500)
if (ready > 0) {//已经有对应的Accept事件发生
val keys = nioSelector.selectedKeys()//取出发了了对应事件的事件的key,即有连接事件发生
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
try {
val key = iter.next
iter.remove()//NIO的通用做法,取出一个有相关事件发生的channel以后,必须remove掉对应的SelectionKey,防止下次重复取出
if (key.isAcceptable)//
accept(key,processors(currentProcessor))//通过round-robin的方式取出一个acceptor进行处理
else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
//round-robin序号增1,下一个连接会取出下一个processor
currentProcessor = (currentProcessor + 1) % processors.length
} catch {
case e: Throwable => error("Error while accepting connection",e)
}
}
}
}
catch { //略 }
}
} finally { //略 }
}
Acceptor线程的run()方法,是不断监听对应ServerChannel上的连接请求,如果有新的连接请求,就选择出一个Processor,用来处理这个请求,将这个新连接交付给Processor是在方法Acceptor.accept()
中:
def accept(key: SelectionKey,processor: Processor) {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]//取出channel
val socketChannel = serverSocketChannel.accept()//创建socketChannel,专门负责与这个客户端的连接
try {
//socketChannel参数设置
processor.accept(socketChannel)//将SocketChannel交给process进行处理
} catch {
//异常处理
}
}
//Processor.accept():
/** * Queue up a new connection for reading */
def accept(socketChannel: SocketChannel) {
newConnections.add(socketChannel)
wakeup()
}
Processor将这个新的SocketChannel加入到自己维护的ConcurrentLinkedQueue[SocketChannel] newConnections
中。
下面介绍Processor().
4.Processor
Processor负责不断检查Acceptor是否有交付给自己新的连接,如果有,就负责这个channel上消息的读写操作,即:监听自己维护的所有channel上的读请求,解析并交付给对应的业务处理逻辑;监听新的写操作,把服务端的响应数据通过SocketChannel准确地发送给客户端。
注意,每一个Processor都维护了一个单独的KSelector对象,这个KSelector只负责这个Processor上所有channel的监听。这样最大程度上保证了不同Processor线程之间的完全并行和业务隔离,尽管,在异步IO情况下,一个Selector负责成百上千个socketChannel的状态监控也不会带来效率问题。
override def run() {
startupComplete()//表示初始化流程已经结束,通过这个CountDownLatch代表初始化已经结束,这个Processor已经开始正常运行了
while (isRunning) {
try {
// setup any new connections that have been queued up
configureNewConnections()//为已经接受的请求注册OR_READ事件
// register any new responses for writing
processNewResponses()//处理响应队列,这个响应队列是Handler线程处理以后的结果,会交付给RequestChannel.responseQueue.同时调用unmute,开始接受请求
poll() //调用KSelector.poll(),进行真正的数据读写
processCompletedReceives()//调用mute,停止接受新的请求
processCompletedSends()
processDisconnected()
} catch {
//异常处理 略
}
debug("Closing selector - processor " + id)
swallowError(closeAll())
shutdownComplete()
}
在Processor.run()
方法中,Processor主要进行以下几步操作:
1. 新连接的处理:通过configureNewConnections()
来处理Acceptor交付给自己的新的连接请求,即:把在自己的Selector上注册这个SocketChannel,并监听SelectionKey.OP_READ
操作:
2. 响应处理:Processor如何将响应交付给业务层进行处理的我在后面会讲到,在这里,Processor通过processNewResponses()
方法,从RequestChannel.responseQueue
中取出当前Processor的一个Response,通过对应的SocketChannel,发送给Client。注意,这里的”发送”,其实并没有真正发送,只是通过调用KafkaChannel.send()
方法,将待发送数据交付给对应的Channel,真正的数据发送是调用KafkaChannel.write()
方法,这是在下面第三步的poll()
中进行的;关于RequestChannel的职责,下文中会详细讲解。
3. 响应处理完毕,开始进行真正的读写操作,即监控自己负责的所有SocketChannel,是否有新的读请求进来,如果有,就进行相应的处理,主要是对请求进行解析并交付给业务逻辑进行具体调用,如果有待写数据,就执行写操作,真正将数据发送给远程;如果有新的连接,则放到connected中,如果有断开的连接,则放入到disconnected中:
- List<String> connected:已经建立连接的KafkaChannel的id的list
- List<String> disconnected:出现异常的KafkaChannel的id的list
- List<Send> completedSends:已经完成的发送请求,服务端可以通过查询completedSends,知道哪些请求已经发送出去
- List<NetworkReceive> completedReceives:已经接收到的数据,服务端拿到这些已经接收到的数据以后,会将这些数据交付给具体数据的处理者进行处理
我们先来看Processor对于已经收到的请求,是如何交付给业务端进行处理的:
* 将completedReceived中的对象进行封装,交付给requestQueue.completRequets
*/
private def processCompletedReceives() {
selector.completedReceives.asScala.foreach { receive =>//每一个receive是一个NetworkReceivedui'xiagn
try {
//receive.source代表了这个请求的发送者的身份,KSelector保存了channel另一端的身份和对应的SocketChannel之间的对应关系
val channel = selector.channel(receive.source)
val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE,channel.principal.getName),channel.socketAddress)
val req = RequestChannel.Request(processor = id,connectionId = receive.source,session = session,buffer = receive.payload,startTimeMs = time.milliseconds,securityProtocol = protocol)
requestChannel.sendRequest(req)//将请求通过RequestChannel.requestQueue交付给Handler
selector.mute(receive.source)//不再接受Read请求,发送响应之前,不可以再接收任何请求
} catch {
//异常处理 略
}
}
}
Processor.processCompletedReceives()
通过遍历completedReceives
,对于每一个已经完成接收的数据,对数据进行解析和封装,交付给RequestChannel,RequestChannel会交付给具体的业务处理层进行处理。
从上面的分析可以知道,Processor从RequetsChannel中获取业务层的处理结果然后将结果发送给客户端,同时,收到远程的某些请求,也是交付给RequetsChannel进行处理。可见,RequetsChannel负责了网络层和业务层的数据交付。下文中,我们一起来看RequestChannel的工作机制。
5. RequestChannel
RequestChannel负责消息从网络层转接到业务层,以及将业务层的处理结果交付给网络层进而返回给客户端。每一个SocketServer只有一个RequestChannel对象,在SocketServer中构造。
//创建RequestChannel,maxQueuedRequests)
我们一起来看RequestChannel的构造方法:
class RequestChannel(val numProcessors: Int,val queueSize: Int) extends KafkaMetricsGroup {
private var responseListeners: List[(Int) => Unit] = Nil
//request存放了所有Processor接收到的远程请求,负责把requestQueue中的请求交付给具体业务逻辑进行处理
private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
//responseQueues存放了所有Processor的带出来的response,即每一个Processor都有一个response queue
private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
for(i <- 0 until numProcessors) //初始化responseQueues
responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
//一些metrics用来监控request和response的数量,代码略
}
RequestChannel构造方法中初始化了requestQueue,用来存放网络层接收到的请求,这些请求即将交付给业务层进行处理。同时,初始化了responseQueues,为每一个Processor建立了一个response队列,用来存放这个Processor的一个或者多个Response,这些response即将交付给网络层返回给客户端。
6. KafkaRequestHandler线程和KafkaRequestHandlerPool线程池
在上面讲到Processor.processCompletedReceives()
方法时,可以看到Processor调用了RequestChannel.sendRequest()
方法将请求交付给了RequestChannel。而从RequestChannel取出请求并交付业务层处理的逻辑,是在独立线程中完成的,它是KafkaRequestHandler。并且,KafkaRequestHandler是由 KafkaRequestHandlerPool线程池进行管理的。与我们文章最开始提到的SocketServer创建相同,KafkaRequestHandlerPool也是在KafkaServer.startup()
中创建的:
/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel,replicaManager,groupCoordinator,kafkaController,zkUtils,config.brokerId,config,MetadataCache,metrics,authorizer)
//KafkaRequestHandlerPool线程池,用来管理所有KafkaRequestHandler线程
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId,socketServer.requestChannel,apis,config.numIoThreads)
这里,KafkaApis是Kafka的API接口层,可以理解为一个工具类,职责就是解析请求然后获取请求类型,根据请求类型将请求交付给对应的业务层,可见,KafkaApis是实现了网络层到业务层的真正映射关系,下文会详解。
class KafkaRequestHandlerPool(val brokerId: Int,val requestChannel: RequestChannel,val apis: KafkaApis,numThreads: Int) extends Logging with KafkaMetricsGroup {
/* a meter to track the average free capacity of the request handlers */
private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent","percent",TimeUnit.NANOSECONDS)
this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "],"
val threads = new Array[Thread](numThreads)
//初始化由KafkaRequestHandler线程构成的线程数组
val runnables = new Array[KafkaRequestHandler](numThreads)
for(i <- 0 until numThreads) {
runnables(i) = new KafkaRequestHandler(i,aggregateIdleMeter,numThreads,requestChannel,apis)
threads(i) = Utils.daemonThread("kafka-request-handler-" + i,runnables(i))
threads(i).start()
}
KafkaRequestHandlerPool构造方法中初始化并启动了多个KafkaRequestHandler线程对象,线程池大小通过Kafka配置文件配置项num.io.threads
进行配置。
KafkaRequestHandlerPool线程池中的所有KafkaRequestHandler,通过竞争方式从RequestChannel.requestQueue
中获取请求进行处理。由于requestQueue
的类型是ArrayBlockingQueue,通过调用ArrayBlockingQueue.poll()
方法取出请求。我们看ArrayBlockingQueue.poll()
方法源码,可以看到ArrayBlockingQueue.poll()
方法线程安全,因此多个KafkaRequestHandler线程竞争requestQueue
不会出现线程安全问题。
这是KafkaRequestHandler.run()
方法,就是不断从requestQueue
中取出请求,调用API层业务处理逻辑进行处理。
def run() {
while(true) {
try {
var req : RequestChannel.Request = null
while (req == null) {
//略
req = requestChannel.receiveRequest(300)//从RequestChannel.requestQueue中取出请求
//略
apis.handle(req)//调用KafkaApi.handle(),将请求交付给业务
} catch {}
}
}
6.KafkaApis
KafkaApis类似一个工具类,解析用户请求并将请求交付给业务层,我们可以把它看做Kafka的API层。从上面KafkaRequestHandler.run()
方法可以看到,这是通过调用KafkaApis.handle()
方法完成的。
def handle(request: RequestChannel.Request) {
try {
ApiKeys.forId(request.requestId) match {
case ApiKeys.PRODUCE => handleProducerRequest(request)
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
case ApiKeys.MetaDATA => handleTopicMetadataRequest(request)
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
//其它ApiKeys,略
}
} catch { //异常处理,略 }
} finally{
request.apiLocalCompleteTimeMs = SystemTime.milliseconds
}
}
通过Switch-Case代码块,根据请求中的requestId,将请求交付给handleProducerRequest()
、handleFetchRequest()
等等handle*Request()
方法。具体每个handle*Request()
方法的处理流程不在本文描述范围之内,但是都是在获取返回结果以后,调用requestChannel.sendResponse()
将response交付给RequestChannel。这里只是交付给RequestChannel保存在RequestChannel.responseQueues
中。
在上文讲解Processor的时候说过,Procossor.processNewResponses()
就是从requestChannel.responseQueues
取出属于自己的连接上的响应,准备返回给客户端。
结束
这样,我们通过对Acceptor、Processor、RequestChannel、KafkaRequestHandler以及KafkaApis多个角色的解析,完成了整个Kafka的消息流通闭环,即从客户端建立连接、发送请求给Kafka Server进行处理、Kafka Server将请求交付给具体业务进行处理、业务将处理结果返回给网络层、网络层将结果通过NIO返回给客户端的整个流程。同时,由于多Processor线程、以及KafkaRequestHandlerPoll线程池的存在,通过交付-获取的方式而不是阻塞等待的方式,让整个消息处理实现完全的异步化,各个角色各司其职,模块之间无耦合,线程之间或者相互竞争任务,或者被上层安排处理部分任务,整个效率非常高,结构也相当清晰。Processor线程的数量、KafkaRequestHandlerPool线程池可配置,因此可以根据cpu以及内存性能,合理调整Kafka Server的并行程度和处理能力。对Kafka基于Reactor模式的网络层的理解,以及消息从网络层到业务层交付逻辑的理解,非常有利于我们对Kafka集群的管理以及对Kafka问题原因的排查诊断,同时,也可以基于各个不同的角色暴露出来的一个java metrics,我们可以对Kafka进行有效的调整优化。