原创

Kafka源码分析(十六)——Broker:网络层——RequestChannel和RequestHandler

本章,我将对网络层中的最后两个组件RequestChannelRequestHandler进行讲解。严格来说,RequestChannelRequestHandler并不属于网络层(Network Layer),当然,它们也不属于API层(API Layer)。

我将RequestChannel和RequestHandler定位为Kafka Server端网络层与API层之间交互的中间件。本章,我会分析RequestChannel和RequestHandler的整体架构,以及它们对请求/响应的底层处理细节。

一、RequestChannel

首先,我们来看RequestChannel。RequestChannel会缓存Processor线程发送过来的Request请求,同时也接受API层的Response响应。它的内部有两个重要的队列:

  • 一个请求缓存队列requestQueue
  • N个响应缓存队列responseQueues

requestQueue本质是一个ArrayBlockingQueue,所有Processor线程共享一个,每个Processor线程都会将请求封装成Request对象并入到这个队列里面。

responseQueue本质是一个LinkedBlockingQueue,每个Processor线程独立拥有一个,API层会将响应封装成Response对象并入队。

1.1 内部结构

整体结构大致如下图:



接受Processor线程发送过来的Request请求:

// RequestChannel.scala

def sendRequest(request: RequestChannel.Request) {
  requestQueue.put(request)
}

获取Response对象,返回给Processor线程处理:

// RequestChannel.scala

def receiveResponse(processor: Int): RequestChannel.Response = {
  val response = responseQueues(processor).poll()
  if (response != null)
    response.request.responseDequeueTimeMs = Time.SYSTEM.milliseconds
  response
}

接受API层的响应结果,并添加到响应队列中:

// RequestChannel.scala

def sendResponse(response: RequestChannel.Response) {
  responseQueues(response.processor).put(response)
  for(onResponse <- responseListeners)
    onResponse(response.processor)
}

二、RequestHandler

我们再来看另一个组件——RequestHandler,它是在Kafka Server启动时创建的,内部委托了对API层的调用:

// KafkaServer.scala

var apis: KafkaApis = null
var requestHandlerPool: KafkaRequestHandlerPool = null
def startup() {
    try {
        //...
        if (canStartup) {
            // 14.启动请求处理线程池
            apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager,
                                 groupCoordinator,kafkaController, zkUtils, config.brokerId,
                                 config, metadataCache, metrics, authorizer, quotaManagers,
                                 clusterId, time)

            requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId,
                                                             socketServer.requestChannel,
                                                             apis, time, config.numIoThreads)
        }
    }
    catch {
        case e: Throwable =>
        fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
        isStartingUp.set(false)
        shutdown()
        throw e
    }
}

整个处理流程可以用下面这张图表述:



2.1 KafkaRequestHandlerPool

我们来看下KafkaRequestHandlerPool的内部,它封装了一堆线程,可以看成是一个线程池,这些线程启动后,就不断执行KafkaRequestHandler任务:

// KafkaRequestHandlerPool.scala

class KafkaRequestHandlerPool(val brokerId: Int,
                              val requestChannel: RequestChannel,
                              val apis: KafkaApis,
                              time: Time,
                              numThreads: Int) extends Logging with KafkaMetricsGroup {
  // 工作线程,默认8个
  val threads = new Array[Thread](numThreads)
  // KafkaRequestHandler任务
  val runnables = new Array[KafkaRequestHandler](numThreads)
  for(i <- 0 until numThreads) {
    runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis, time)
    threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
    // 启动任务
    threads(i).start()
  }
}

2.2 KafkaRequestHandler

KafkaRequestHandler本质是一个Runnable任务,它会不断从RequestChannel的请求队列中获取Request对象,然后交给Kafak API层进行处理:

// KafkaRequestHandler.scala

def run() {
  while(true) {
    try {
      var req : RequestChannel.Request = null
      while (req == null) {
        val startSelectTime = time.nanoseconds
        // 从RequestChannel的请求队列中获取Request
        req = requestChannel.receiveRequest(300)
      }
      //...
      // 将请求交给API层处理
      apis.handle(req)
    } catch {
      case e: Throwable => error("Exception when handling request", e)
    }
  }
}

这里我额外展开下,我们看下Kafka API层的内部,其实就是根据Request请求的不同类型进行处理:

// KafkaApis.scala

def handle(request: RequestChannel.Request) {
  try {
    trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
      format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))
    // 根据Request请求的不同类型进行处理
    ApiKeys.forId(request.requestId) match {
      case ApiKeys.PRODUCE => handleProducerRequest(request)
      case ApiKeys.FETCH => handleFetchRequest(request)
      case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
      case ApiKeys.METADATA => handleTopicMetadataRequest(request)
      case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
      case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
      case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)
      case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
      case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
      case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
      case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
      case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
      case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
      case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
      case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
      case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
      case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
      case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
      case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
      case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
      case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
      case requestId => throw new KafkaException("Unknown api code " + requestId)
    }
  } catch {
    //...
  } finally
    request.apiLocalCompleteTimeMs = time.milliseconds
}

我们重点看下handleProducerRequest,它对普通Producer生产者的请求进行处理,处理完成后会触发以下回调函数。可以看到, 最终处理完的响应结果会被封装程Response对象,交给RequestChannel处理:

def produceResponseCallback(delayTimeMs: Int) {
  // 如果该请求不需要ACK确认
  if (produceRequest.acks == 0) {
    //...
  } 
  // 如果该请求需要ACK确认
  else {
  val respBody = request.header.apiVersion match {
     case 0 => new ProduceResponse(mergedResponseStatus.asJava)
     case version@(1 | 2) => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs, version)
     case version => throw new IllegalArgumentException(s"Version `$version` of ProduceRequest is not handled. Code must be updated.")
  }
   // 将响应入队到RequestChannel
   requestChannel.sendResponse(new RequestChannel.Response(request, respBody))
  }
}

三、总结

本章,我对网络层的最后两个组件RequestChannel和RequestHandler进行了讲解。它们的整体设计思路还是比较清晰简单的,本质就是利用不用的内存队列和线程池对请求/响应进行处理,提升整体吞吐量,这种思路在实际的生产应用中运用非常多,我在《分布式系统从理论到实战系列》专栏的实战篇中也讲解过这类运用,感兴趣的读者可以去看一看。

正文到此结束

感谢赞赏~

本文目录