Kafka-源码分析之服务端

Scroll Down

从服务端的启动类Kafka.scala入口,我们可以看到主要的操作是读取配置文件,加载到内存,注册一个关闭钩子,用于优雅停机,然后启动Kafka Server,并且await住,kafkaServerStartable.awaitShutdown()本质是CountDownLatch倒置锁。

def main(args: Array[String]): Unit = {
    try {
      val serverProps = getPropsFromArgs(args)
      val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)

      // attach shutdown handler to catch control-c
      Runtime.getRuntime().addShutdownHook(new Thread("kafka-shutdown-hook") {
        override def run(): Unit = kafkaServerStartable.shutdown()
      })

      kafkaServerStartable.startup()
      kafkaServerStartable.awaitShutdown()
    }
    catch {
      case e: Throwable =>
        fatal(e)
        Exit.exit(1)
    }
    Exit.exit(0)
  }

读取配置文件方法,该方法主要提供了二个功能,首先读取server.properties,其次允许命令行参数覆盖server.properties中的配置项,如果命令行参数是server.properties文件中不存在的,那么会导致Kafka Broker启动失败。

  def getPropsFromArgs(args: Array[String]): Properties = {
    val optionParser = new OptionParser(false)
    val overrideOpt = optionParser.accepts("override", "Optional property that should override values set in server.properties file")
      .withRequiredArg()
      .ofType(classOf[String])

    if (args.length == 0) {
      CommandLineUtils.printUsageAndDie(optionParser, "USAGE: java [options] %s server.properties [--override property=value]*".format(classOf[KafkaServer].getSimpleName()))
    }
    val props = Utils.loadProps(args(0))
    if(args.length > 1) {
      val options = optionParser.parse(args.slice(1, args.length): _*)

      if(options.nonOptionArguments().size() > 0) {
        CommandLineUtils.printUsageAndDie(optionParser, "Found non argument parameters: " + options.nonOptionArguments().toArray.mkString(","))
      } props.putAll(CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala))
    }
    props
  }

Kafka真正的启动是kafkaServerStartable.startup(),再分析之前先简单介绍了解下Kafka的服务端网络成的设计,方便更好的理解KafkaServer.首先在客户端(生产者/消费者)来看,高并发和低延迟的要求并不是很大,使用NetWorkClient组件管理则就满足了,而服务端的场景则是极致的高并发和低延迟,所以使用Reactor模式(事件驱动)来实现。
image
JAVA的NIO提拱了Reactor模式的API。上图是常见的单线程编程模式。代码如下:

Server端
public void start() throws IOException {
        //打开服务器套接字通道
        ServerSocketChannel ssc= ServerSocketChannel.open();
        //配置为非阻塞 即异步IO
        ssc.configureBlocking(false); 
         //绑定本地端口
        ssc.bind(new InetSocketAddress(3400));
        //创建选择器
        selector = Selector.open();
        //ssc注册到selector准备连接
        ssc.register(selector, SelectionKey.OP_ACCEPT);
        //无限判断当前线程状态,如果没有中断,就一直执行while内容。
        while(! Thread.currentThread().isInterrupted()){
            int total=selector.select();
            System.out.println("有"+total+"个 Channel 可操作");
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = keys.iterator();
            //处理客户端连接
            while (keyIterator.hasNext()){
                SelectionKey key = keyIterator.next();
                if (!key.isValid()){
                    continue;
                }
                if (key.isAcceptable()){
                	//当客户端连接之后执行
                    accept(key);
                }
                if(key.isReadable()){
                	//当客户端读取
                    read(key);
                }
                if (key.isWritable()){
                	//当客户端写入
                    write(key);
                }
                keyIterator.remove(); 
            }
        }
    }
Client端
public void start() throws IOException{
        //打开socket通道
        SocketChannel sc = SocketChannel.open();
        sc.configureBlocking(false);
        sc.connect(new InetSocketAddress("localhost",3400));
        //创建选择器
        Selector selector = Selector.open();
        //将channel注册到selector中
        sc.register(selector, SelectionKey.OP_CONNECT);

        Scanner scanner = new Scanner(System.in);
        while (true){
            selector.select();
            Set<SelectionKey> keys = selector.selectedKeys();
            System.out.println("keys:"+keys.size());
            Iterator<SelectionKey> iterator = keys.iterator();
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                iterator.remove();
                //判断此通道上是否在进行连接操作
                if (key.isConnectable()){
                    sc.finishConnect();
                    //注册写操作
                sc.register(selector,SelectionKey.OP_WRITE);
                    System.out.println("server connected...");
                    break;
                }else  if (key.isWritable()){
                    System.out.println("please input message:");
                    String message = scanner.nextLine();
                    writeBuffer.clear();
                    writeBuffer.put(message.getBytes());
                    //将缓冲区各标志复位,因为向里面put了数据标志被改变要想从中读取数据发向服务器,就要复位
                    writeBuffer.flip();
                    sc.write(writeBuffer);

                    //注册写操作,每个chanel只能注册一个操作,最后注册的一个生效
                    //如果你对不止一种事件感兴趣,那么可以用“位或”操作符将常量连接起来
                    //int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
                    //使用interest集合
                    sc.register(selector,SelectionKey.OP_WRITE | SelectionKey.OP_READ);
                }else if(key.isReadable()){
                    System.out.print("receive message:");
                    SocketChannel client = (SocketChannel) key.channel();
                    //将缓冲区清空以备下次读取
                    readBuffer.clear();
                    int num = client.read(readBuffer);
                    System.out.println(new String(readBuffer.array(),0, num));
                    //注册写操作,下一次写
                    sc.register(selector, SelectionKey.OP_WRITE);
                }
            }

        }
    }

  • 服务端

创建一个ServerSocketChannel,并且配置为非阻塞IO,绑定本地端口,打开Selector,然后把ServerSocketChannelselector上注册一个SelectionKey.OP_ACCEPT事件,等待连接。当客户端连接之后,服务端的这个selector监听到这个SelectionKey.OP_ACCEPT事件,就会触发Acceptor线程来处理该事件。这个Acceptor也就是当前这个start方法的线程。该线程负责接受所有的请求,然后根据不同的事件去执行处理。如OP_READ,OP_WRITE事件

  • 客户端

创建一个SocketChannel,并且配置为非阻塞IO,连接本地端口,打开Selector,然后把SocketChannelselector上注册一个SelectionKey.OP_CONNECT事件,等待连接。当客户端连接服务端之后,客户端的这个selector监听到这个SelectionKey.OP_CONNECT事件,当前线程接受服务端通道通知的事件。然后根据不同的事件去执行处理。如OP_READ,OP_WRITE事件

这就是单线程模式的Reactor模式,所有的事件处理逻辑都是在一个线程中处理,生产者的Sender和消费者KafkaConsumer的代码就是和这个一样,但是这个如果在服务端上来使用的话,如果一个事件的处理阻塞了,但在会导致整个服务端失去处理能力。我们就可以使用多线程,多个selector,多个队列来处理,总的来讲就是充分利用多核服务器的能力。避免性能瓶颈。

Kafka服务端就是采取使用多线程,多个selector,多个队列来处理。Acceptor线程来接收所有的请求(接受请求不会阻塞),Acceptor线程对应多个Process线程,每个Process线程有自己的selector,用于从连接中读取请求和响应写回。Acceptor线程对应多个Handle线程,Handle线程和Process线程通过呢队列RequestChannel来通信。从Kafka的启动函数startup()方法来看,核心类是KafkaServer。先看下核心的字段。

  控制是否启动完毕
  private val startupComplete = new AtomicBoolean(false)
  控制是否关闭
  private val isShuttingDown = new AtomicBoolean(false)
  控制是否已启动
  private val isStartingUp = new AtomicBoolean(false)
  关闭Server倒置锁
  private var shutdownLatch = new CountDownLatch(1)
  JMX前缀
  private val jmxPrefix: String = "kafka.server"
  Kafka指标
  var metrics: Metrics = null
  Broker states 
  val brokerState: BrokerState = new BrokerState
  KafkaAPi层
  var apis: KafkaApis = null
  身份认证
  var authorizer: Option[Authorizer] = None
  服务端网络核心网络类✨
  var socketServer: SocketServer = null
  Hander线程池
  var requestHandlerPool: KafkaRequestHandlerPool = null
  日志存储
  var logManager: LogManager = null
  副本管理
  var replicaManager: ReplicaManager = null
  
  var adminManager: AdminManager = null
  动态配置
  var dynamicConfigHandlers: Map[String, ConfigHandler] = null
  动态配置
  var dynamicConfigManager: DynamicConfigManager = null
  var credentialProvider: CredentialProvider = null
  组协调者
  var groupCoordinator: GroupCoordinator = null
  事务协调者
  var transactionCoordinator: TransactionCoordinator = null
  kafkaController
  var kafkaController: KafkaController = null

  val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
  心跳
  var kafkaHealthcheck: KafkaHealthcheck = null
  var metadataCache: MetadataCache = null
  var quotaManagers: QuotaFactory.QuotaManagers = null
  zookeeper操作类
  var zkUtils: ZkUtils = null
  val correlationId: AtomicInteger = new AtomicInteger(0)
  val brokerMetaPropsFile = "meta.properties"
  val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator +brokerMetaPropsFile)))).toMap

  private var _clusterId: String = null
  private var _brokerTopicStats: BrokerTopicStats = null

简单了解一下启动方法,我先在这里贴出。

 def startup() {
    try {
      info("starting")
      if(isShuttingDown.get)
        throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")

      if(startupComplete.get)
        return

      val canStartup = isStartingUp.compareAndSet(false, true)
      if (canStartup) {
        brokerState.newState(Starting)

        /* start scheduler */
        kafkaScheduler.startup()

        /* setup zookeeper */
        zkUtils = initZk()

        /* Get or create cluster_id */
        _clusterId = getOrGenerateClusterId(zkUtils)
        info(s"Cluster ID = $clusterId")

        /* generate brokerId */
        config.brokerId =  getBrokerId
        this.logIdent = "[Kafka Server " + config.brokerId + "], "

        /* create and configure metrics */
        val reporters = config.getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, classOf[MetricsReporter],
            Map[String, AnyRef](KafkaConfig.BrokerIdProp -> (config.brokerId.toString)).asJava)
        reporters.add(new JmxReporter(jmxPrefix))
        val metricConfig = KafkaServer.metricConfig(config)
        metrics = new Metrics(metricConfig, reporters, time, true)

        /* register broker metrics */
        _brokerTopicStats = new BrokerTopicStats

        quotaManagers = QuotaFactory.instantiate(config, metrics, time)
        notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala)

        /* start log manager */
        logManager = LogManager(config, zkUtils, brokerState, kafkaScheduler, time, brokerTopicStats)
        logManager.startup()

        metadataCache = new MetadataCache(config.brokerId)
        credentialProvider = new CredentialProvider(config.saslEnabledMechanisms)

        socketServer = new SocketServer(config, metrics, time, credentialProvider)
        socketServer.startup()

        /* start replica manager */
        replicaManager = createReplicaManager(isShuttingDown)
        replicaManager.startup()

        /* start kafka controller */
        kafkaController = new KafkaController(config, zkUtils, time, metrics, threadNamePrefix)
        kafkaController.startup()

        adminManager = new AdminManager(config, metrics, metadataCache, zkUtils)

        /* start group coordinator */
        // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
        groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, Time.SYSTEM)
        groupCoordinator.startup()

        /* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */
        // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
        transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), zkUtils, metrics, metadataCache, Time.SYSTEM)
        transactionCoordinator.startup()

        /* Get the authorizer and initialize it if one is specified.*/
        authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
          val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
          authZ.configure(config.originals())
          authZ
        }

        /* start processing requests */
        apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
          kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
          brokerTopicStats, clusterId, time)

        requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
          config.numIoThreads)

        Mx4jLoader.maybeLoad()

        /* start dynamic config manager */
        dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers),
                                                           ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
                                                           ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),
                                                           ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))

        // Create the config manager. start listening to notifications
        dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)
        dynamicConfigManager.startup()

        /* tell everyone we are alive */
        val listeners = config.advertisedListeners.map { endpoint =>
          if (endpoint.port == 0)
            endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
          else
            endpoint
        }
        kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack,
          config.interBrokerProtocolVersion)
        kafkaHealthcheck.startup()

        // Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it
        checkpointBrokerId(config.brokerId)

        brokerState.newState(RunningAsBroker)
        shutdownLatch = new CountDownLatch(1)
        startupComplete.set(true)
        isStartingUp.set(false)
        AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString)
        info("started")
      }
    }
    catch {
      case e: Throwable =>
        fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
        isStartingUp.set(false)
        shutdown()
        throw e
    }
  }

进入startup()方法,首先执行的日志打印,然后去校验当前Broker的状态,如果当前Broker已经执行了stop还没有执行完毕。会提示异常中的信息,如果当前Broker是已经启动完成的状态,那么则直接返回。如果没有启动,执行一下CAS设置isStartingUp状态,如果设置成功。那么就设置broker的状态为Starting(1启动中参见BrokerStates trait),然后启动后台线程,连接ZK,在zookeeper上创建clusterId节点(首次创建,之后读取,持久节点),在日志中打印配置文件的BrokerID,配置Kafka的Metric,然后启动LogManager,创建缓存,启动网络核心类SocketServer,启动副本管理,启动组协调者,再启动事务协调器(主要处理事务日志加载,使用单独的后台线程调度器),设置权限,启动processing模块,创建requestHandlerPool,启动动态配置更新,基于listener同志所有的监听,启动成功,启动心跳线程,然后设置原子变量,然后设置broker的状态为RunningAsBroker(3,启动中参见BrokerStates trait),表示当前broker已启动完毕。

这就是启动方法的主要执行过程,详细的逻辑在SocketServerKafkaApisRequestHandlerPoolLogManager中实现,接下来我们去看网络核心类SocketServer.