成员属性
- endpoints
网卡集合,Kafka支持机器多网卡,可以同时监听多个端口。这个类中封装了host,port,还有传输协议。
object EndPoint {
private val uriParseExp = """^(.*)://\[?([0-9a-zA-Z\-%._:]*)\]?:(-?[0-9]+)""".r
private[kafka] val DefaultSecurityProtocolMap: Map[ListenerName, SecurityProtocol] =
SecurityProtocol.values.map(sp => ListenerName.forSecurityProtocol(sp) -> sp).toMap
def createEndPoint(connectionString: String, securityProtocolMap: Option[Map[ListenerName, SecurityProtocol]]): EndPoint = {
val protocolMap = securityProtocolMap.getOrElse(DefaultSecurityProtocolMap)
def securityProtocol(listenerName: ListenerName): SecurityProtocol =
protocolMap.getOrElse(listenerName,
throw new IllegalArgumentException(s"No security protocol defined for listener ${listenerName.value}"))
connectionString match {
case uriParseExp(listenerNameString, "", port) =>
val listenerName = ListenerName.normalised(listenerNameString)
new EndPoint(null, port.toInt, listenerName, securityProtocol(listenerName))
case uriParseExp(listenerNameString, host, port) =>
val listenerName = ListenerName.normalised(listenerNameString)
new EndPoint(host, port.toInt, listenerName, securityProtocol(listenerName))
case _ => throw new KafkaException(s"Unable to parse $connectionString to a broker endpoint")
}
}
}
abstract case class EndPoint(host: String, port: Int, listenerName: ListenerName, securityProtocol: SecurityProtocol) {
def connectionString: String = {
val hostport =
if (host == null)
":"+port
else
Utils.formatAddress(host, port)
listenerName.value + "://" + hostport
}
}
- numProcessorThreads
Processor线程的个数
- maxQueuedRequests
在RequestChannel中最大缓存的个数
- totalProcessorThreads
Processor线程的总个数,看启动方法可以得知该值=endpoints.size*numProcessorThreads
- maxConnectionsPerIp
每个IP上最大的连接数
- maxConnectionsPerIpOverrides
这个是控制ip的最大连接数,Map[String,Int]类型。该值会覆盖maxConnectionsPerIp的值
- requestChannel
Processor线程和Handler线程的共享队列。
- processors
Processor线程集合
- acceptors
Acceptor对象集合,一个Endpoint对象对应一个Acceptor对象
- connectionQuotas
ConnectionQuotas的对象,底层也还是使用Map对象去配合maxConnectionsPerIpOverrides在创建新链接的时候去校验,超过最大数则报错,由于有多个Acceptor线程所以Map需要同步访问。
构造方法
private val endpoints = config.listeners.map(l => l.listenerName -> l).toMap
private val numProcessorThreads = config.numNetworkThreads
private val maxQueuedRequests = config.queuedMaxRequests
private val totalProcessorThreads = numProcessorThreads * endpoints.size
private val maxConnectionsPerIp = config.maxConnectionsPerIp
private val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides
this.logIdent = "[Socket Server on Broker " + config.brokerId + "], "
val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)
private val processors = new Array[Processor](totalProcessorThreads)
private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()
private var connectionQuotas: ConnectionQuotas = _
/ register the processor threads for notification of responses
requestChannel.addResponseListener(id => processors(id).wakeup())
在创建SocketServer
对象的时候,主要进行了成员字段的赋值初始化,主要看下在startup方法之下的最后一行,负责向requestChannel添加一个监听器,当Handler线程向requestChannel中的ResponseQuene写入的时候,去唤醒对应的Processors线程回写数据。此处负责把Processor线程和Handler线程关联起来。
启动方法
/**
* Start the socket server
*/
def startup() {
this.synchronized {
connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
val sendBufferSize = config.socketSendBufferBytes
val recvBufferSize = config.socketReceiveBufferBytes
val brokerId = config.brokerId
var processorBeginIndex = 0
config.listeners.foreach { endpoint =>
val listenerName = endpoint.listenerName
val securityProtocol = endpoint.securityProtocol
val processorEndIndex = processorBeginIndex + numProcessorThreads
for (i <- processorBeginIndex until processorEndIndex)
processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol)
val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
acceptors.put(endpoint, acceptor)
Utils.newThread(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor, false).start()
acceptor.awaitStartup()
processorBeginIndex = processorEndIndex
}
}
newGauge("NetworkProcessorAvgIdlePercent",
new Gauge[Double] {
private val ioWaitRatioMetricNames = processors.map { p =>
metrics.metricName("io-wait-ratio", "socket-server-metrics", p.metricTags)
}
def value = ioWaitRatioMetricNames.map { metricName =>
Option(metrics.metric(metricName)).fold(0.0)(_.value)
}.sum / totalProcessorThreads
}
)
info("Started " + acceptors.size + " acceptor threads")
}
启动方法首先去创建ConnectionQuotas
用于控制ip连接数,初始化接受和发送Buffer,然后遍历Endpoints结合去创建Acceptor线程并启动,观察Acceptor线程构造函数传入了N个Processor线程,在创建Acceptor线程的时候遍历启动了Processor线程,阻塞主线程,直到Acceptor线程启动完毕之后(run方法执行startupComplete()
之后),遍历下一个Endpoint。最后打印启动了多少个Acceptor线程。
关闭方法
/**
* Shutdown the socket server
*/
def shutdown() = {
info("Shutting down")
this.synchronized {
acceptors.values.foreach(_.shutdown)
processors.foreach(_.shutdown)
}
info("Shutdown completed")
}
先遍历所有的Acceptor线程集合acceptors执行关闭,然后遍历所有的Processor线程集合processors执行关闭,最后打印日志。
线程介绍
先列一下Kafka的网络层实现所需的三种线程,在SocketServer中方便理解
数量 | 线程 | 备注 |
---|---|---|
1 | kafka-socket-acceptor_%x | 一个EndPoint对应一个acceptor,监听网卡处的Client的请求 |
N | kafka-network-thread_%d | Processor线程,负责对Socket进行读写 |
M | kafka-request-handler-_%d | Handler线程,处理具体的业务逻辑并生成Response返回给Processor线程。 |
Acceptor
上图是Endpoint和Acceptor和Processor三者的对应关系。Endpoint类在介绍成员变量时候提到过,现在来分析下Acceptor类,从该类的定义class Acceptor(val endPoint: EndPoint,val sendBufferSize: Int,val recvBufferSize: Int,brokerId: Int,processors:Array[Processor],connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup
来看,可以看到该类继承了AbstractServerThread
,Scala中封装之后的抽象线程类,观察内部的关键字段只有4个
- alive
标示当前的线程是否存活,startup时候设置为true,shutdown时候为false
- shutdownLatch
count等于1的倒置锁,标示当前线程是否shutdown
- startupLatch
count等一1的倒置锁,标示当前线程是否启动
- connectionQuotas
SocketChannel打开新链接的配额,每打开一个对该配额实行原子递增。close方法进行原子递减
private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQuotas) extends Runnable with Logging {
private val startupLatch = new CountDownLatch(1)
// `shutdown()` is invoked before `startupComplete` and `shutdownComplete` if an exception is thrown in the constructor
// (e.g. if the address is already in use). We want `shutdown` to proceed in such cases, so we first assign an open
// latch and then replace it in `startupComplete()`.
@volatile private var shutdownLatch = new CountDownLatch(0)
private val alive = new AtomicBoolean(true)
def wakeup(): Unit
/**
* Initiates a graceful shutdown by signaling to stop and waiting for the shutdown to complete
*/
def shutdown(): Unit = {
alive.set(false)
wakeup()
shutdownLatch.await()
}
/**
* Wait for the thread to completely start up
*/
def awaitStartup(): Unit = startupLatch.await
/**
* Record that the thread startup is complete
*/
protected def startupComplete(): Unit = {
// Replace the open latch with a closed one
shutdownLatch = new CountDownLatch(1)
startupLatch.countDown()
}
/**
* Record that the thread shutdown is complete
*/
protected def shutdownComplete(): Unit = shutdownLatch.countDown()
/**
* Is the server still running?
*/
protected def isRunning: Boolean = alive.get
/**
* Close the connection identified by `connectionId` and decrement the connection count.
*/
def close(selector: KSelector, connectionId: String): Unit = {
val channel = selector.channel(connectionId)
if (channel != null) {
debug(s"Closing selector connection $connectionId")
val address = channel.socketAddress
if (address != null)
connectionQuotas.dec(address)
selector.close(connectionId)
}
}
/**
* Close `channel` and decrement the connection count.
*/
def close(channel: SocketChannel): Unit = {
if (channel != null) {
debug("Closing connection from " + channel.socket.getRemoteSocketAddress())
connectionQuotas.dec(channel.socket.getInetAddress)
swallowError(channel.socket().close())
swallowError(channel.close())
}
}
}
Acceptor线程负责接收客户端的请求,创建Socket连接,并且把请求交给Processor去处理。分析Kafka 源码分析的第一篇Kafka 源码分析之服务端中无曾写过一个Java版本的单线程版本的NIO范例。看过这一篇的话很容易理解Acceptor线程的构造方法。
private val nioSelector = NSelector.open()
val serverChannel = openServerSocket(endPoint.host, endPoint.port)
this.synchronized {
processors.foreach { processor =>
Utils.newThread(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
processor, false).start()
}
}
openServerSocket方法
/*
* Create a server socket to listen for connections on.
*/
private def openServerSocket(host: String, port: Int): ServerSocketChannel = {
val socketAddress =
if(host == null || host.trim.isEmpty)
new InetSocketAddress(port)
else
new InetSocketAddress(host, port)
val serverChannel = ServerSocketChannel.open()
serverChannel.configureBlocking(false)
if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
serverChannel.socket().setReceiveBufferSize(recvBufferSize)
try {
serverChannel.socket.bind(socketAddress)
info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostString, serverChannel.socket.getLocalPort))
} catch {
case e: SocketException =>
throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostString, port, e.getMessage), e)
}
serverChannel
}
第一步去创建了Selector,然后创建ScoketChannel,然后就是遍历启动Processor线程。核心呢我们要去看Acceptor线程的run方法。
/**
* Accept loop that checks for new connection attempts
*/
def run() {
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
startupComplete()
try {
var currentProcessor = 0
while (isRunning) {
try {
val ready = nioSelector.select(500)
if (ready > 0) {
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
try {
val key = iter.next
iter.remove()
if (key.isAcceptable)
accept(key, processors(currentProcessor))
else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
// round robin to the next processor thread
currentProcessor = (currentProcessor + 1) % processors.length
} catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
}
catch {
// We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
// to a select operation on a specific channel or a bad request. We don't want
// the broker to stop responding to requests from other clients in these scenarios.
case e: ControlThrowable => throw e
case e: Throwable => error("Error occurred", e)
}
}
} finally {
debug("Closing server socket and selector.")
swallowError(serverChannel.close())
swallowError(nioSelector.close())
shutdownComplete()
}
}
首先将SocketChannel注册到selector准备接受客户端的连接,然后就是释放倒置锁,主线程继续进行下一个的Endpoint的Acceptor线程的创建和启动。Acceptor线程进行自旋等待客户端的连接事件,然后当接收到客户端的请求之后,判断是否就绪,就绪之后调用accept方法,让Processor去处理,多个请求到来之后,轮询Processor线程。然后就是当调用了Acceptor线程的shutdown方法之后,自旋被释放,然后去关闭serverChannel
,selector
,然后释放shutdownLatch
。这就是Acceptor的Run方法的执行过程。
Processor
接着Acceptor线程的分析,和Processor线程的交互在于accept(key, processors(currentProcessor))
方法。accept
方法首先基于ServerSocketChannel
创建SocketChannel
交给Processor线程处理,同时还实现connectionQuotas中连接数的原子自增。然后就是设置socket
的连接属性。在这里要介绍一下Scala中isInstanceOf
和asInstanceOf
,理解为Java的类型检查和转换,instanceof
关键字和类强转。对应关系如下:
Scala | Java |
---|---|
obj.isInstanceOf[T] | obj instanceof T |
obj.asInstanceOf[T] | (T)obj |
classOf[T] | T.class |
还有就是在socket的配置的地方,配置了TcpNoDelay和
KeepAlive,关键我说下TcpNoDelay,这个配置是针对Tcp传输进行优化,这个配置需要提到Nagle
算法。简单点说就是有数据就直接发送,俩次发送的间隔时间只在于TCP确认延迟机制的时间默认是40ms。
TCP/IP协议中,无论发送多少数据,总是要在数据前面加上协议头,同时,对方接收到数据,也需要发送ACK表示确认。为了尽可能的利用网络带宽,TCP总是希望尽可能的发送足够大的数据。(一个连接会设置MSS参数,因此,TCP/IP希望每次都能够以MSS尺寸的数据块来发送数据)。Nagle算法就是为了尽可能发送大块数据,避免网络中充斥着许多小数据块。
Nagle算法的基本定义是任意时刻,最多只能有一个未被确认的小段。 所谓“小段”,指的是小于MSS尺寸的数据块,所谓“未被确认”,是指一个数据块发送出去后,没有收到对方发送的ACK确认该数据已收到。
Nagle算法的规则(可参考tcp_output.c文件里tcp_nagle_check函数注释):
(1)如果包长度达到MSS,则允许发送;
(2)如果该包含有FIN,则允许发送;
(3)设置了TCP_NODELAY选项,则允许发送;
(4)未设置TCP_CORK选项时,若所有发出去的小数据包(包长度小于MSS)均被确认,则允许发送;
(5)上述条件都未满足,但发生了超时(一般为200ms),则立即发送。
伪代码:
if there is new data to send #有数据要发送
# 发送窗口缓冲区和队列数据 >=mss,队列数据(available data)为原有的队列数据加上新到来的数据
# 也就是说缓冲区数据超过mss大小,nagle算法尽可能发送足够大的数据包
if the window size >= MSS and available data is >= MSS
send complete MSS segment now # 立即发送
else
if there is unconfirmed data still in the pipe # 前一次发送的包没有收到ack
# 将该包数据放入队列中,直到收到一个ack再发送缓冲区数据
enqueue data in the buffer until an acknowledge is received
else
send data immediately # 立即发送
end if
end if
end if
Nagle算法只允许一个未被ACK的包存在于网络,它并不管包的大小,因此它事实上就是一个扩展的停-等协议,只不过它是基于包停-等的,而不是基于字节停-等的。Nagle算法完全由TCP协议的ACK机制决定,这会带来一些问题,比如如果对端ACK回复很快的话,Nagle事实上不会拼接太多的数据包,虽然避免了网络拥塞,网络总体的利用率依然很低。
默认启用Nagle算法,提高了网络的吞吐量,但是降低了实时性。kafka中Socket属性中关闭TcpNoDelay,为了实现实时性。
/*
* Accept a new connection
*/
def accept(key: SelectionKey, processor: Processor) {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
val socketChannel = serverSocketChannel.accept()
try {
connectionQuotas.inc(socketChannel.socket().getInetAddress)
socketChannel.configureBlocking(false)
socketChannel.socket().setTcpNoDelay(true)
socketChannel.socket().setKeepAlive(true)
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socketChannel.socket().setSendBufferSize(sendBufferSize)
debug("Accepted connection from %s on %s and assigned it to processor %d, sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]"
.format(socketChannel.socket.getRemoteSocketAddress, socketChannel.socket.getLocalSocketAddress, processor.id,
socketChannel.socket.getSendBufferSize, sendBufferSize,
socketChannel.socket.getReceiveBufferSize, recvBufferSize))
processor.accept(socketChannel)
} catch {
case e: TooManyConnectionsException =>
info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count))
close(socketChannel)
}
}
Processor线程主要做读取Socket请求和Hander线程处理的响应写回操作,不参与业务逻辑的处理。
/**
* Queue up a new connection for reading
*/
def accept(socketChannel: SocketChannel) {
newConnections.add(socketChannel)
wakeup()
}
从Acceptor的accept关联到Processor的accept方法。负责把socketChannel
添加到连接队列内部,并调用KSelector.wakeup方法,底层调用selector.wakeup,该方法用于唤醒阻塞在select方法上的线程,在linux上就是创建一个管道并加入poll的fd(文件描述符)集合,wakeup就是往管道里写一个字节,那么阻塞的poll方法有数据可读就立即返回。从该方法进入之后的操作暂时由Processor线程接管。来看run
方法
override def run() {
1:startupComplete()
while (isRunning) {
try {
// setup any new connections that have been queued up
2:configureNewConnections()
// register any new responses for writing
3:processNewResponses()
4:poll()
5:processCompletedReceives()
6:processCompletedSends()
7:processDisconnected()
} catch {
// We catch all the throwables here to prevent the processor thread from exiting. We do this because
// letting a processor exit might cause a bigger impact on the broker. Usually the exceptions thrown would
// be either associated with a specific socket channel or a bad request. We just ignore the bad socket channel
// or request. This behavior might need to be reviewed if we see an exception that need the entire broker to stop.
case e: ControlThrowable => throw e
case e: Throwable =>
error("Processor got uncaught exception.", e)
}
}
debug("Closing selector - processor " + id)
8:swallowError(closeAll())
9:shutdownComplete()
}
- startupComplete
分析Acceptor线程也遇到了该方法,同样这里也是唤醒阻塞在启动Processor线程的地方,告知已启动,释放阻塞信号。
- configureNewConnections
该方法从socketChannel队列中取出来Channel连接,然后把SelectionKey.OP_READ事件注册在selector上。并且把连接中的远程主机端口和本地主机端口,映射成为key,value是该channel,然后存放在
org.apache.kafka.common.network.Selector
的channels内。
private def configureNewConnections() {
while (!newConnections.isEmpty) {
val channel = newConnections.poll()
try {
debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
val localHost = channel.socket().getLocalAddress.getHostAddress
val localPort = channel.socket().getLocalPort
val remoteHost = channel.socket().getInetAddress.getHostAddress
val remotePort = channel.socket().getPort
val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString
selector.register(connectionId, channel)
} catch {
// We explicitly catch all non fatal exceptions and close the socket to avoid a socket leak. The other
// throwables will be caught in processor and logged as uncaught exceptions.
case NonFatal(e) =>
val remoteAddress = channel.getRemoteAddress
// need to close the channel here to avoid a socket leak.
close(channel)
error(s"Processor $id closed connection from $remoteAddress", e)
}
}
}
- processNewResponses
从response队列获取第一个队首的数据,然后获取response的responseAction的值。作
case match
匹配,当类型是NoOpAction
,给当前Canannel添加SelectionKey.OP_READ事件,当类型是SendAction
,给当前Canannel添加SelectionKey.OP_WRITE事件,当类型是CloseConnectionAction
,关闭当前Channel,并且释放连接数的占用。同时删除在org.apache.kafka.common.network.Selector
的channels的channel。
private def processNewResponses() {
var curr = requestChannel.receiveResponse(id)
while (curr != null) {
try {
curr.responseAction match {
case RequestChannel.NoOpAction =>
// There is no response to send to the client, we need to read more pipelined requests
// that are sitting in the server's socket buffer
updateRequestMetrics(curr.request)
trace("Socket server received empty response to send, registering for read: " + curr)
val channelId = curr.request.connectionId
if (selector.channel(channelId) != null || selector.closingChannel(channelId) != null)
selector.unmute(channelId)
case RequestChannel.SendAction =>
val responseSend = curr.responseSend.getOrElse(
throw new IllegalStateException(s"responseSend must be defined for SendAction, response: $curr"))
sendResponse(curr, responseSend)
case RequestChannel.CloseConnectionAction =>
updateRequestMetrics(curr.request)
trace("Closing socket connection actively according to the response code.")
close(selector, curr.request.connectionId)
}
} finally {
curr = requestChannel.receiveResponse(id)
}
}
}
- poll
调用
org.apache.kafka.common.network.Selector
的poll方法channel。先把上一次poll得到的Acceptor线程的请求从对应的队列中移除,然后把本次的读取请求,写回请求,断开的连接请求的都放入对应的队列中。这个KSelector是服务端和客户端公用的。所以会对进行if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
的判断,客户端使用immediatelyConnectedKeys
,对服务端而言,如果本次poll的stagedReceives
Map中的channel已经就绪直接返回就绪的key,反之需要阻塞300ms获取。pollSelectionKeys
方法内部可以看到读取上面获取的所有的就绪key,判断读就绪就加入本阶段的stagedReceives
队列,判断写就绪则增加在completedSends
队列,如果该代表断开连接则从连接队列中移除该channel,同时把该channel加入到closingChannels
队列.同时在过期连接队列中删除该channel。addToCompletedReceives
方法负责把本次就绪读的所有请求填充在completedReceives
队列。
public void poll(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("timeout should be >= 0");
clear();
if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
timeout = 0;
/* check ready keys */
long startSelect = time.nanoseconds();
int readyKeys = select(timeout);
long endSelect = time.nanoseconds();
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
}
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
// we use the time at the end of select to ensure that we don't close any connections that
// have just been processed in pollSelectionKeys
maybeCloseOldestConnection(endSelect);
// Add to completedReceives after closing expired connections to avoid removing
// channels with completed receives until all staged receives are completed.
addToCompletedReceives();
}
- processCompletedReceives
处理读取请求队列。遍历队列内的元素,填充RequestChannel的Request对象,并且添加中RequestChannel的requestQueue队列内。取消队列channel的读事件。
private def processCompletedReceives() {
selector.completedReceives.asScala.foreach { receive =>
try {
val openChannel = selector.channel(receive.source)
// Only methods that are safe to call on a disconnected channel should be invoked on 'openOrClosingChannel'.
val openOrClosingChannel = if (openChannel != null) openChannel else selector.closingChannel(receive.source)
val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress)
val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session,
buffer = receive.payload, startTimeNanos = time.nanoseconds,
listenerName = listenerName, securityProtocol = securityProtocol)
requestChannel.sendRequest(req)
selector.mute(receive.source)
} catch {
case e @ (_: InvalidRequestException | _: SchemaException) =>
// note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
error(s"Closing socket for ${receive.source} because of error", e)
close(selector, receive.source)
}
}
}
- processCompletedSends
处理写回请求队列,遍历队列内的元素,
inflightResponses
代表已发出但是没有收到响应的数据,在处理写回请求的时候,需要先把inflightResponses
中的清除,用于释放空间,支撑消息的堆积容错。取消写回队列内channel的写事件。重新注册OP_READ事件。让其可以重新接受写回请求。
private def processCompletedSends() {
selector.completedSends.asScala.foreach { send =>
val resp = inflightResponses.remove(send.destination).getOrElse {
throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
}
updateRequestMetrics(resp.request)
selector.unmute(send.destination)
}
}
- processDisconnected
处理断开连接请求的队列,处理的是dischannel队列。这个比较暴力,遍历队列内的元素,发现该channel在
inflightResponses
中的就去清除,然后还释放连接数,用于下一次的连接新建。
private def processDisconnected() {
selector.disconnected.keySet.asScala.foreach { connectionId =>
val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
}.remoteHost
inflightResponses.remove(connectionId).foreach(response => updateRequestMetrics(response.request))
// the channel has been closed by the selector but the quotas still need to be updated
connectionQuotas.dec(InetAddress.getByName(remoteHost))
}
}
- swallowError
打印信息关闭连接,释放连接数。
- shutdownComplete
释放倒置锁,告诉执行完毕,释放资源,等待下一次
篇幅有点长了,RequestChannel的分析放在另一篇中进行。