Kafka 源码分析之SocketServer

Scroll Down
成员属性
  • endpoints

网卡集合,Kafka支持机器多网卡,可以同时监听多个端口。这个类中封装了host,port,还有传输协议。

object EndPoint {
		private val uriParseExp = """^(.*)://\[?([0-9a-zA-Z\-%._:]*)\]?:(-?[0-9]+)""".r
		private[kafka] val DefaultSecurityProtocolMap: Map[ListenerName, SecurityProtocol] =
 	SecurityProtocol.values.map(sp => ListenerName.forSecurityProtocol(sp) -> sp).toMap
def createEndPoint(connectionString: String, securityProtocolMap: Option[Map[ListenerName, SecurityProtocol]]): EndPoint = {
 	val protocolMap = securityProtocolMap.getOrElse(DefaultSecurityProtocolMap)
 	def securityProtocol(listenerName: ListenerName): SecurityProtocol =
   protocolMap.getOrElse(listenerName,
     	throw new IllegalArgumentException(s"No security protocol defined for listener ${listenerName.value}"))
 connectionString match {
   	case uriParseExp(listenerNameString, "", port) =>
     		val listenerName = ListenerName.normalised(listenerNameString)
     		new EndPoint(null, port.toInt, listenerName, securityProtocol(listenerName))
   	case uriParseExp(listenerNameString, host, port) =>
     		val listenerName = ListenerName.normalised(listenerNameString)
     		new EndPoint(host, port.toInt, listenerName, securityProtocol(listenerName))
   	case _ => throw new KafkaException(s"Unable to parse $connectionString to a broker endpoint")
 	}
	}
}
abstract case class EndPoint(host: String, port: Int, listenerName: ListenerName, securityProtocol: SecurityProtocol) {
def connectionString: String = {
 val hostport =
   if (host == null)
     ":"+port
   else
     Utils.formatAddress(host, port)
 listenerName.value + "://" + hostport
}
}
  • numProcessorThreads

Processor线程的个数

  • maxQueuedRequests

在RequestChannel中最大缓存的个数

  • totalProcessorThreads

Processor线程的总个数,看启动方法可以得知该值=endpoints.size*numProcessorThreads

  • maxConnectionsPerIp

每个IP上最大的连接数

  • maxConnectionsPerIpOverrides

这个是控制ip的最大连接数,Map[String,Int]类型。该值会覆盖maxConnectionsPerIp的值

  • requestChannel

Processor线程和Handler线程的共享队列。

  • processors

Processor线程集合

  • acceptors

Acceptor对象集合,一个Endpoint对象对应一个Acceptor对象

  • connectionQuotas

ConnectionQuotas的对象,底层也还是使用Map对象去配合maxConnectionsPerIpOverrides在创建新链接的时候去校验,超过最大数则报错,由于有多个Acceptor线程所以Map需要同步访问。

构造方法
private val endpoints = config.listeners.map(l => l.listenerName -> l).toMap
  private val numProcessorThreads = config.numNetworkThreads
  private val maxQueuedRequests = config.queuedMaxRequests
  private val totalProcessorThreads = numProcessorThreads * endpoints.size

  private val maxConnectionsPerIp = config.maxConnectionsPerIp
  private val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides

  this.logIdent = "[Socket Server on Broker " + config.brokerId + "], "

  val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)
  private val processors = new Array[Processor](totalProcessorThreads)

  private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()
  private var connectionQuotas: ConnectionQuotas = _
  / register the processor threads for notification of responses
  requestChannel.addResponseListener(id => processors(id).wakeup())

在创建SocketServer对象的时候,主要进行了成员字段的赋值初始化,主要看下在startup方法之下的最后一行,负责向requestChannel添加一个监听器,当Handler线程向requestChannel中的ResponseQuene写入的时候,去唤醒对应的Processors线程回写数据。此处负责把Processor线程和Handler线程关联起来。

启动方法
/**
   * Start the socket server
   */
  def startup() {
    this.synchronized {

      connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)

      val sendBufferSize = config.socketSendBufferBytes
      val recvBufferSize = config.socketReceiveBufferBytes
      val brokerId = config.brokerId

      var processorBeginIndex = 0
      config.listeners.foreach { endpoint =>
        val listenerName = endpoint.listenerName
        val securityProtocol = endpoint.securityProtocol
        val processorEndIndex = processorBeginIndex + numProcessorThreads

        for (i <- processorBeginIndex until processorEndIndex)
          processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol)

        val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
          processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
        acceptors.put(endpoint, acceptor)
        Utils.newThread(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor, false).start()
        acceptor.awaitStartup()

        processorBeginIndex = processorEndIndex
      }
    }

    newGauge("NetworkProcessorAvgIdlePercent",
      new Gauge[Double] {
        private val ioWaitRatioMetricNames = processors.map { p =>
          metrics.metricName("io-wait-ratio", "socket-server-metrics", p.metricTags)
        }

        def value = ioWaitRatioMetricNames.map { metricName =>
          Option(metrics.metric(metricName)).fold(0.0)(_.value)
        }.sum / totalProcessorThreads
      }
    )

    info("Started " + acceptors.size + " acceptor threads")
  }

启动方法首先去创建ConnectionQuotas用于控制ip连接数,初始化接受和发送Buffer,然后遍历Endpoints结合去创建Acceptor线程并启动,观察Acceptor线程构造函数传入了N个Processor线程,在创建Acceptor线程的时候遍历启动了Processor线程,阻塞主线程,直到Acceptor线程启动完毕之后(run方法执行startupComplete()之后),遍历下一个Endpoint。最后打印启动了多少个Acceptor线程。

关闭方法
  /**
   * Shutdown the socket server
   */
  def shutdown() = {
    info("Shutting down")
    this.synchronized {
      acceptors.values.foreach(_.shutdown)
      processors.foreach(_.shutdown)
    }
    info("Shutdown completed")
  }

先遍历所有的Acceptor线程集合acceptors执行关闭,然后遍历所有的Processor线程集合processors执行关闭,最后打印日志。

线程介绍

先列一下Kafka的网络层实现所需的三种线程,在SocketServer中方便理解

数量线程备注
1kafka-socket-acceptor_%x一个EndPoint对应一个acceptor,监听网卡处的Client的请求
Nkafka-network-thread_%dProcessor线程,负责对Socket进行读写
Mkafka-request-handler-_%dHandler线程,处理具体的业务逻辑并生成Response返回给Processor线程。
Acceptor

image
上图是Endpoint和Acceptor和Processor三者的对应关系。Endpoint类在介绍成员变量时候提到过,现在来分析下Acceptor类,从该类的定义class Acceptor(val endPoint: EndPoint,val sendBufferSize: Int,val recvBufferSize: Int,brokerId: Int,processors:Array[Processor],connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup来看,可以看到该类继承了AbstractServerThread,Scala中封装之后的抽象线程类,观察内部的关键字段只有4个

  • alive

标示当前的线程是否存活,startup时候设置为true,shutdown时候为false

  • shutdownLatch

count等于1的倒置锁,标示当前线程是否shutdown

  • startupLatch

count等一1的倒置锁,标示当前线程是否启动

  • connectionQuotas

SocketChannel打开新链接的配额,每打开一个对该配额实行原子递增。close方法进行原子递减

private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQuotas) extends Runnable with Logging {

  private val startupLatch = new CountDownLatch(1)

  // `shutdown()` is invoked before `startupComplete` and `shutdownComplete` if an exception is thrown in the constructor
  // (e.g. if the address is already in use). We want `shutdown` to proceed in such cases, so we first assign an open
  // latch and then replace it in `startupComplete()`.
  @volatile private var shutdownLatch = new CountDownLatch(0)

  private val alive = new AtomicBoolean(true)

  def wakeup(): Unit

  /**
   * Initiates a graceful shutdown by signaling to stop and waiting for the shutdown to complete
   */
  def shutdown(): Unit = {
    alive.set(false)
    wakeup()
    shutdownLatch.await()
  }

  /**
   * Wait for the thread to completely start up
   */
  def awaitStartup(): Unit = startupLatch.await

  /**
   * Record that the thread startup is complete
   */
  protected def startupComplete(): Unit = {
    // Replace the open latch with a closed one
    shutdownLatch = new CountDownLatch(1)
    startupLatch.countDown()
  }

  /**
   * Record that the thread shutdown is complete
   */
  protected def shutdownComplete(): Unit = shutdownLatch.countDown()

  /**
   * Is the server still running?
   */
  protected def isRunning: Boolean = alive.get

  /**
   * Close the connection identified by `connectionId` and decrement the connection count.
   */
  def close(selector: KSelector, connectionId: String): Unit = {
    val channel = selector.channel(connectionId)
    if (channel != null) {
      debug(s"Closing selector connection $connectionId")
      val address = channel.socketAddress
      if (address != null)
        connectionQuotas.dec(address)
      selector.close(connectionId)
    }
  }

  /**
   * Close `channel` and decrement the connection count.
   */
  def close(channel: SocketChannel): Unit = {
    if (channel != null) {
      debug("Closing connection from " + channel.socket.getRemoteSocketAddress())
      connectionQuotas.dec(channel.socket.getInetAddress)
      swallowError(channel.socket().close())
      swallowError(channel.close())
    }
  }
}  

Acceptor线程负责接收客户端的请求,创建Socket连接,并且把请求交给Processor去处理。分析Kafka 源码分析的第一篇Kafka 源码分析之服务端中无曾写过一个Java版本的单线程版本的NIO范例。看过这一篇的话很容易理解Acceptor线程的构造方法。

private val nioSelector = NSelector.open()
  val serverChannel = openServerSocket(endPoint.host, endPoint.port)

  this.synchronized {
    processors.foreach { processor =>
      Utils.newThread(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
        processor, false).start()
    }
  }
openServerSocket方法
/*
   * Create a server socket to listen for connections on.
   */
  private def openServerSocket(host: String, port: Int): ServerSocketChannel = {
    val socketAddress =
      if(host == null || host.trim.isEmpty)
        new InetSocketAddress(port)
      else
        new InetSocketAddress(host, port)
    val serverChannel = ServerSocketChannel.open()
    serverChannel.configureBlocking(false)
    if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
      serverChannel.socket().setReceiveBufferSize(recvBufferSize)

    try {
      serverChannel.socket.bind(socketAddress)
      info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostString, serverChannel.socket.getLocalPort))
    } catch {
      case e: SocketException =>
        throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostString, port, e.getMessage), e)
    }
    serverChannel
  }

第一步去创建了Selector,然后创建ScoketChannel,然后就是遍历启动Processor线程。核心呢我们要去看Acceptor线程的run方法。

 /**
   * Accept loop that checks for new connection attempts
   */
  def run() {
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    startupComplete()
    try {
      var currentProcessor = 0
      while (isRunning) {
        try {
          val ready = nioSelector.select(500)
          if (ready > 0) {
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            while (iter.hasNext && isRunning) {
              try {
                val key = iter.next
                iter.remove()
                if (key.isAcceptable)
                  accept(key, processors(currentProcessor))
                else
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")

                // round robin to the next processor thread
                currentProcessor = (currentProcessor + 1) % processors.length
              } catch {
                case e: Throwable => error("Error while accepting connection", e)
              }
            }
          }
        }
        catch {
          // We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
          // to a select operation on a specific channel or a bad request. We don't want
          // the broker to stop responding to requests from other clients in these scenarios.
          case e: ControlThrowable => throw e
          case e: Throwable => error("Error occurred", e)
        }
      }
    } finally {
      debug("Closing server socket and selector.")
      swallowError(serverChannel.close())
      swallowError(nioSelector.close())
      shutdownComplete()
    }
  }

首先将SocketChannel注册到selector准备接受客户端的连接,然后就是释放倒置锁,主线程继续进行下一个的Endpoint的Acceptor线程的创建和启动。Acceptor线程进行自旋等待客户端的连接事件,然后当接收到客户端的请求之后,判断是否就绪,就绪之后调用accept方法,让Processor去处理,多个请求到来之后,轮询Processor线程。然后就是当调用了Acceptor线程的shutdown方法之后,自旋被释放,然后去关闭serverChannel,selector,然后释放shutdownLatch。这就是Acceptor的Run方法的执行过程。

Processor

接着Acceptor线程的分析,和Processor线程的交互在于accept(key, processors(currentProcessor))方法。accept方法首先基于ServerSocketChannel创建SocketChannel交给Processor线程处理,同时还实现connectionQuotas中连接数的原子自增。然后就是设置socket的连接属性。在这里要介绍一下Scala中isInstanceOfasInstanceOf,理解为Java的类型检查和转换,instanceof关键字和类强转。对应关系如下:

ScalaJava
obj.isInstanceOf[T]obj instanceof T
obj.asInstanceOf[T](T)obj
classOf[T]T.class

还有就是在socket的配置的地方,配置了TcpNoDelay和
KeepAlive,关键我说下TcpNoDelay,这个配置是针对Tcp传输进行优化,这个配置需要提到Nagle算法。简单点说就是有数据就直接发送,俩次发送的间隔时间只在于TCP确认延迟机制的时间默认是40ms。

  TCP/IP协议中,无论发送多少数据,总是要在数据前面加上协议头,同时,对方接收到数据,也需要发送ACK表示确认。为了尽可能的利用网络带宽,TCP总是希望尽可能的发送足够大的数据。(一个连接会设置MSS参数,因此,TCP/IP希望每次都能够以MSS尺寸的数据块来发送数据)。Nagle算法就是为了尽可能发送大块数据,避免网络中充斥着许多小数据块。

  Nagle算法的基本定义是任意时刻,最多只能有一个未被确认的小段。 所谓“小段”,指的是小于MSS尺寸的数据块,所谓“未被确认”,是指一个数据块发送出去后,没有收到对方发送的ACK确认该数据已收到。

  Nagle算法的规则(可参考tcp_output.c文件里tcp_nagle_check函数注释):
(1)如果包长度达到MSS,则允许发送;

(2)如果该包含有FIN,则允许发送;

(3)设置了TCP_NODELAY选项,则允许发送;

(4)未设置TCP_CORK选项时,若所有发出去的小数据包(包长度小于MSS)均被确认,则允许发送;

(5)上述条件都未满足,但发生了超时(一般为200ms),则立即发送。

伪代码:

if there is new data to send #有数据要发送
    # 发送窗口缓冲区和队列数据 >=mss,队列数据(available data)为原有的队列数据加上新到来的数据
    # 也就是说缓冲区数据超过mss大小,nagle算法尽可能发送足够大的数据包
    if the window size >= MSS and available data is >= MSS 
        send complete MSS segment now # 立即发送
    else
        if there is unconfirmed data still in the pipe # 前一次发送的包没有收到ack
            # 将该包数据放入队列中,直到收到一个ack再发送缓冲区数据
            enqueue data in the buffer until an acknowledge is received 
        else
            send data immediately # 立即发送
        end if
    end if
end if 

Nagle算法只允许一个未被ACK的包存在于网络,它并不管包的大小,因此它事实上就是一个扩展的停-等协议,只不过它是基于包停-等的,而不是基于字节停-等的。Nagle算法完全由TCP协议的ACK机制决定,这会带来一些问题,比如如果对端ACK回复很快的话,Nagle事实上不会拼接太多的数据包,虽然避免了网络拥塞,网络总体的利用率依然很低。

默认启用Nagle算法,提高了网络的吞吐量,但是降低了实时性。kafka中Socket属性中关闭TcpNoDelay,为了实现实时性。

 /*
   * Accept a new connection
   */
  def accept(key: SelectionKey, processor: Processor) {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    val socketChannel = serverSocketChannel.accept()
    try {
      connectionQuotas.inc(socketChannel.socket().getInetAddress)
      socketChannel.configureBlocking(false)
      socketChannel.socket().setTcpNoDelay(true)
      socketChannel.socket().setKeepAlive(true)
      if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
        socketChannel.socket().setSendBufferSize(sendBufferSize)

      debug("Accepted connection from %s on %s and assigned it to processor %d, sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]"
            .format(socketChannel.socket.getRemoteSocketAddress, socketChannel.socket.getLocalSocketAddress, processor.id,
                  socketChannel.socket.getSendBufferSize, sendBufferSize,
                  socketChannel.socket.getReceiveBufferSize, recvBufferSize))

      processor.accept(socketChannel)
    } catch {
      case e: TooManyConnectionsException =>
        info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count))
        close(socketChannel)
    }
  }

Processor线程主要做读取Socket请求和Hander线程处理的响应写回操作,不参与业务逻辑的处理。

/**
   * Queue up a new connection for reading
   */
  def accept(socketChannel: SocketChannel) {
    newConnections.add(socketChannel)
    wakeup()
  }

从Acceptor的accept关联到Processor的accept方法。负责把socketChannel添加到连接队列内部,并调用KSelector.wakeup方法,底层调用selector.wakeup,该方法用于唤醒阻塞在select方法上的线程,在linux上就是创建一个管道并加入poll的fd(文件描述符)集合,wakeup就是往管道里写一个字节,那么阻塞的poll方法有数据可读就立即返回。从该方法进入之后的操作暂时由Processor线程接管。来看run方法

override def run() {
    1:startupComplete()
    while (isRunning) {
      try {
        // setup any new connections that have been queued up
        2:configureNewConnections()
        // register any new responses for writing
        3:processNewResponses()
        4:poll()
        5:processCompletedReceives()
        6:processCompletedSends()
        7:processDisconnected()
      } catch {
        // We catch all the throwables here to prevent the processor thread from exiting. We do this because
        // letting a processor exit might cause a bigger impact on the broker. Usually the exceptions thrown would
        // be either associated with a specific socket channel or a bad request. We just ignore the bad socket channel
        // or request. This behavior might need to be reviewed if we see an exception that need the entire broker to stop.
        case e: ControlThrowable => throw e
        case e: Throwable =>
          error("Processor got uncaught exception.", e)
      }
    }

    debug("Closing selector - processor " + id)
    8:swallowError(closeAll())
    9:shutdownComplete()
  }
  1. startupComplete

分析Acceptor线程也遇到了该方法,同样这里也是唤醒阻塞在启动Processor线程的地方,告知已启动,释放阻塞信号。

  1. configureNewConnections

该方法从socketChannel队列中取出来Channel连接,然后把SelectionKey.OP_READ事件注册在selector上。并且把连接中的远程主机端口和本地主机端口,映射成为key,value是该channel,然后存放在org.apache.kafka.common.network.Selector的channels内。

 private def configureNewConnections() {
 while (!newConnections.isEmpty) {
   val channel = newConnections.poll()
   try {
     debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
     val localHost = channel.socket().getLocalAddress.getHostAddress
     val localPort = channel.socket().getLocalPort
     val remoteHost = channel.socket().getInetAddress.getHostAddress
     val remotePort = channel.socket().getPort
     val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString
     selector.register(connectionId, channel)
   } catch {
     // We explicitly catch all non fatal exceptions and close the socket to avoid a socket leak. The other
     // throwables will be caught in processor and logged as uncaught exceptions.
     case NonFatal(e) =>
       val remoteAddress = channel.getRemoteAddress
       // need to close the channel here to avoid a socket leak.
       close(channel)
       error(s"Processor $id closed connection from $remoteAddress", e)
   }
 }
}
  1. processNewResponses

从response队列获取第一个队首的数据,然后获取response的responseAction的值。作case match匹配,当类型是NoOpAction,给当前Canannel添加SelectionKey.OP_READ事件,当类型是SendAction,给当前Canannel添加SelectionKey.OP_WRITE事件,当类型是CloseConnectionAction,关闭当前Channel,并且释放连接数的占用。同时删除在org.apache.kafka.common.network.Selector的channels的channel。

private def processNewResponses() {
    var curr = requestChannel.receiveResponse(id)
    while (curr != null) {
      try {
        curr.responseAction match {
          case RequestChannel.NoOpAction =>
            // There is no response to send to the client, we need to read more pipelined requests
            // that are sitting in the server's socket buffer
            updateRequestMetrics(curr.request)
            trace("Socket server received empty response to send, registering for read: " + curr)
            val channelId = curr.request.connectionId
            if (selector.channel(channelId) != null || selector.closingChannel(channelId) != null)
                selector.unmute(channelId)
          case RequestChannel.SendAction =>
            val responseSend = curr.responseSend.getOrElse(
              throw new IllegalStateException(s"responseSend must be defined for SendAction, response: $curr"))
            sendResponse(curr, responseSend)
          case RequestChannel.CloseConnectionAction =>
            updateRequestMetrics(curr.request)
            trace("Closing socket connection actively according to the response code.")
            close(selector, curr.request.connectionId)
        }
      } finally {
        curr = requestChannel.receiveResponse(id)
      }
    }
  }
  1. poll

调用org.apache.kafka.common.network.Selector的poll方法channel。先把上一次poll得到的Acceptor线程的请求从对应的队列中移除,然后把本次的读取请求,写回请求,断开的连接请求的都放入对应的队列中。这个KSelector是服务端和客户端公用的。所以会对进行if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())的判断,客户端使用immediatelyConnectedKeys,对服务端而言,如果本次poll的 stagedReceivesMap中的channel已经就绪直接返回就绪的key,反之需要阻塞300ms获取。pollSelectionKeys方法内部可以看到读取上面获取的所有的就绪key,判断读就绪就加入本阶段的stagedReceives队列,判断写就绪则增加在completedSends队列,如果该代表断开连接则从连接队列中移除该channel,同时把该channel加入到closingChannels队列.同时在过期连接队列中删除该channel。addToCompletedReceives方法负责把本次就绪读的所有请求填充在completedReceives队列。

public void poll(long timeout) throws IOException {
        if (timeout < 0)
            throw new IllegalArgumentException("timeout should be >= 0");
        clear();
        if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
            timeout = 0;
        /* check ready keys */
        long startSelect = time.nanoseconds();
        int readyKeys = select(timeout);
        long endSelect = time.nanoseconds();
        this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
        if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
          pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
            pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
        }
        long endIo = time.nanoseconds();
        this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
        // we use the time at the end of select to ensure that we don't close any connections that
        // have just been processed in pollSelectionKeys
        maybeCloseOldestConnection(endSelect);
        // Add to completedReceives after closing expired connections to avoid removing
        // channels with completed receives until all staged receives are completed.
        addToCompletedReceives();
 }
  1. processCompletedReceives

处理读取请求队列。遍历队列内的元素,填充RequestChannel的Request对象,并且添加中RequestChannel的requestQueue队列内。取消队列channel的读事件。

private def processCompletedReceives() {
    selector.completedReceives.asScala.foreach { receive =>
      try {
        val openChannel = selector.channel(receive.source)
        // Only methods that are safe to call on a disconnected channel should be invoked on 'openOrClosingChannel'.
        val openOrClosingChannel = if (openChannel != null) openChannel else selector.closingChannel(receive.source)
        val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress)
        val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session,
          buffer = receive.payload, startTimeNanos = time.nanoseconds,
          listenerName = listenerName, securityProtocol = securityProtocol)
        requestChannel.sendRequest(req)
        selector.mute(receive.source)
      } catch {
        case e @ (_: InvalidRequestException | _: SchemaException) =>
          // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
          error(s"Closing socket for ${receive.source} because of error", e)
          close(selector, receive.source)
      }
    }
  }
  1. processCompletedSends

处理写回请求队列,遍历队列内的元素,inflightResponses代表已发出但是没有收到响应的数据,在处理写回请求的时候,需要先把inflightResponses中的清除,用于释放空间,支撑消息的堆积容错。取消写回队列内channel的写事件。重新注册OP_READ事件。让其可以重新接受写回请求。

private def processCompletedSends() {
    selector.completedSends.asScala.foreach { send =>
      val resp = inflightResponses.remove(send.destination).getOrElse {
        throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
      }
      updateRequestMetrics(resp.request)
      selector.unmute(send.destination)
    }
  }
  1. processDisconnected

处理断开连接请求的队列,处理的是dischannel队列。这个比较暴力,遍历队列内的元素,发现该channel在inflightResponses中的就去清除,然后还释放连接数,用于下一次的连接新建。

private def processDisconnected() {
    selector.disconnected.keySet.asScala.foreach { connectionId =>
      val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
        throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
      }.remoteHost
     inflightResponses.remove(connectionId).foreach(response => updateRequestMetrics(response.request))
      // the channel has been closed by the selector but the quotas still need to be updated
     connectionQuotas.dec(InetAddress.getByName(remoteHost))
    }
  }
  1. swallowError

打印信息关闭连接,释放连接数。

  1. shutdownComplete

释放倒置锁,告诉执行完毕,释放资源,等待下一次

篇幅有点长了,RequestChannel的分析放在另一篇中进行。