从服务端的启动类Kafka.scala入口,我们可以看到主要的操作是读取配置文件,加载到内存,注册一个关闭钩子,用于优雅停机,然后启动Kafka Server,并且await住,kafkaServerStartable.awaitShutdown()
本质是CountDownLatch倒置锁。
def main(args: Array[String]): Unit = {
try {
val serverProps = getPropsFromArgs(args)
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("kafka-shutdown-hook") {
override def run(): Unit = kafkaServerStartable.shutdown()
})
kafkaServerStartable.startup()
kafkaServerStartable.awaitShutdown()
}
catch {
case e: Throwable =>
fatal(e)
Exit.exit(1)
}
Exit.exit(0)
}
读取配置文件方法,该方法主要提供了二个功能,首先读取server.properties
,其次允许命令行参数覆盖server.properties
中的配置项,如果命令行参数是server.properties
文件中不存在的,那么会导致Kafka Broker启动失败。
def getPropsFromArgs(args: Array[String]): Properties = {
val optionParser = new OptionParser(false)
val overrideOpt = optionParser.accepts("override", "Optional property that should override values set in server.properties file")
.withRequiredArg()
.ofType(classOf[String])
if (args.length == 0) {
CommandLineUtils.printUsageAndDie(optionParser, "USAGE: java [options] %s server.properties [--override property=value]*".format(classOf[KafkaServer].getSimpleName()))
}
val props = Utils.loadProps(args(0))
if(args.length > 1) {
val options = optionParser.parse(args.slice(1, args.length): _*)
if(options.nonOptionArguments().size() > 0) {
CommandLineUtils.printUsageAndDie(optionParser, "Found non argument parameters: " + options.nonOptionArguments().toArray.mkString(","))
} props.putAll(CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala))
}
props
}
Kafka真正的启动是kafkaServerStartable.startup()
,再分析之前先简单介绍了解下Kafka的服务端网络成的设计,方便更好的理解KafkaServer
.首先在客户端(生产者/消费者)来看,高并发和低延迟的要求并不是很大,使用NetWorkClient组件管理则就满足了,而服务端的场景则是极致的高并发和低延迟,所以使用Reactor模式(事件驱动)来实现。
JAVA的NIO提拱了Reactor模式的API。上图是常见的单线程编程模式。代码如下:
Server端
public void start() throws IOException {
//打开服务器套接字通道
ServerSocketChannel ssc= ServerSocketChannel.open();
//配置为非阻塞 即异步IO
ssc.configureBlocking(false);
//绑定本地端口
ssc.bind(new InetSocketAddress(3400));
//创建选择器
selector = Selector.open();
//ssc注册到selector准备连接
ssc.register(selector, SelectionKey.OP_ACCEPT);
//无限判断当前线程状态,如果没有中断,就一直执行while内容。
while(! Thread.currentThread().isInterrupted()){
int total=selector.select();
System.out.println("有"+total+"个 Channel 可操作");
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = keys.iterator();
//处理客户端连接
while (keyIterator.hasNext()){
SelectionKey key = keyIterator.next();
if (!key.isValid()){
continue;
}
if (key.isAcceptable()){
//当客户端连接之后执行
accept(key);
}
if(key.isReadable()){
//当客户端读取
read(key);
}
if (key.isWritable()){
//当客户端写入
write(key);
}
keyIterator.remove();
}
}
}
Client端
public void start() throws IOException{
//打开socket通道
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
sc.connect(new InetSocketAddress("localhost",3400));
//创建选择器
Selector selector = Selector.open();
//将channel注册到selector中
sc.register(selector, SelectionKey.OP_CONNECT);
Scanner scanner = new Scanner(System.in);
while (true){
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
System.out.println("keys:"+keys.size());
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
//判断此通道上是否在进行连接操作
if (key.isConnectable()){
sc.finishConnect();
//注册写操作
sc.register(selector,SelectionKey.OP_WRITE);
System.out.println("server connected...");
break;
}else if (key.isWritable()){
System.out.println("please input message:");
String message = scanner.nextLine();
writeBuffer.clear();
writeBuffer.put(message.getBytes());
//将缓冲区各标志复位,因为向里面put了数据标志被改变要想从中读取数据发向服务器,就要复位
writeBuffer.flip();
sc.write(writeBuffer);
//注册写操作,每个chanel只能注册一个操作,最后注册的一个生效
//如果你对不止一种事件感兴趣,那么可以用“位或”操作符将常量连接起来
//int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
//使用interest集合
sc.register(selector,SelectionKey.OP_WRITE | SelectionKey.OP_READ);
}else if(key.isReadable()){
System.out.print("receive message:");
SocketChannel client = (SocketChannel) key.channel();
//将缓冲区清空以备下次读取
readBuffer.clear();
int num = client.read(readBuffer);
System.out.println(new String(readBuffer.array(),0, num));
//注册写操作,下一次写
sc.register(selector, SelectionKey.OP_WRITE);
}
}
}
}
- 服务端
创建一个
ServerSocketChannel
,并且配置为非阻塞IO,绑定本地端口,打开Selector,然后把ServerSocketChannel
在selector
上注册一个SelectionKey.OP_ACCEPT
事件,等待连接。当客户端连接之后,服务端的这个selector监听到这个SelectionKey.OP_ACCEPT
事件,就会触发Acceptor线程来处理该事件。这个Acceptor
也就是当前这个start方法的线程。该线程负责接受所有的请求,然后根据不同的事件去执行处理。如OP_READ,OP_WRITE事件
- 客户端
创建一个
SocketChannel
,并且配置为非阻塞IO,连接本地端口,打开Selector,然后把SocketChannel
在selector
上注册一个SelectionKey.OP_CONNECT
事件,等待连接。当客户端连接服务端之后,客户端的这个selector监听到这个SelectionKey.OP_CONNECT
事件,当前线程接受服务端通道通知的事件。然后根据不同的事件去执行处理。如OP_READ,OP_WRITE事件
这就是单线程模式的Reactor模式,所有的事件处理逻辑都是在一个线程中处理,生产者的Sender和消费者KafkaConsumer的代码就是和这个一样,但是这个如果在服务端上来使用的话,如果一个事件的处理阻塞了,但在会导致整个服务端失去处理能力。我们就可以使用多线程,多个selector,多个队列来处理,总的来讲就是充分利用多核服务器的能力。避免性能瓶颈。
Kafka服务端就是采取使用多线程,多个selector,多个队列来处理。Acceptor线程来接收所有的请求(接受请求不会阻塞),Acceptor线程对应多个Process线程,每个Process线程有自己的selector,用于从连接中读取请求和响应写回。Acceptor线程对应多个Handle线程,Handle线程和Process线程通过呢队列RequestChannel来通信。从Kafka的启动函数startup()方法来看,核心类是KafkaServer
。先看下核心的字段。
控制是否启动完毕
private val startupComplete = new AtomicBoolean(false)
控制是否关闭
private val isShuttingDown = new AtomicBoolean(false)
控制是否已启动
private val isStartingUp = new AtomicBoolean(false)
关闭Server倒置锁
private var shutdownLatch = new CountDownLatch(1)
JMX前缀
private val jmxPrefix: String = "kafka.server"
Kafka指标
var metrics: Metrics = null
Broker states
val brokerState: BrokerState = new BrokerState
KafkaAPi层
var apis: KafkaApis = null
身份认证
var authorizer: Option[Authorizer] = None
服务端网络核心网络类✨
var socketServer: SocketServer = null
Hander线程池
var requestHandlerPool: KafkaRequestHandlerPool = null
日志存储
var logManager: LogManager = null
副本管理
var replicaManager: ReplicaManager = null
var adminManager: AdminManager = null
动态配置
var dynamicConfigHandlers: Map[String, ConfigHandler] = null
动态配置
var dynamicConfigManager: DynamicConfigManager = null
var credentialProvider: CredentialProvider = null
组协调者
var groupCoordinator: GroupCoordinator = null
事务协调者
var transactionCoordinator: TransactionCoordinator = null
kafkaController
var kafkaController: KafkaController = null
val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
心跳
var kafkaHealthcheck: KafkaHealthcheck = null
var metadataCache: MetadataCache = null
var quotaManagers: QuotaFactory.QuotaManagers = null
zookeeper操作类
var zkUtils: ZkUtils = null
val correlationId: AtomicInteger = new AtomicInteger(0)
val brokerMetaPropsFile = "meta.properties"
val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator +brokerMetaPropsFile)))).toMap
private var _clusterId: String = null
private var _brokerTopicStats: BrokerTopicStats = null
简单了解一下启动方法,我先在这里贴出。
def startup() {
try {
info("starting")
if(isShuttingDown.get)
throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
if(startupComplete.get)
return
val canStartup = isStartingUp.compareAndSet(false, true)
if (canStartup) {
brokerState.newState(Starting)
/* start scheduler */
kafkaScheduler.startup()
/* setup zookeeper */
zkUtils = initZk()
/* Get or create cluster_id */
_clusterId = getOrGenerateClusterId(zkUtils)
info(s"Cluster ID = $clusterId")
/* generate brokerId */
config.brokerId = getBrokerId
this.logIdent = "[Kafka Server " + config.brokerId + "], "
/* create and configure metrics */
val reporters = config.getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, classOf[MetricsReporter],
Map[String, AnyRef](KafkaConfig.BrokerIdProp -> (config.brokerId.toString)).asJava)
reporters.add(new JmxReporter(jmxPrefix))
val metricConfig = KafkaServer.metricConfig(config)
metrics = new Metrics(metricConfig, reporters, time, true)
/* register broker metrics */
_brokerTopicStats = new BrokerTopicStats
quotaManagers = QuotaFactory.instantiate(config, metrics, time)
notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala)
/* start log manager */
logManager = LogManager(config, zkUtils, brokerState, kafkaScheduler, time, brokerTopicStats)
logManager.startup()
metadataCache = new MetadataCache(config.brokerId)
credentialProvider = new CredentialProvider(config.saslEnabledMechanisms)
socketServer = new SocketServer(config, metrics, time, credentialProvider)
socketServer.startup()
/* start replica manager */
replicaManager = createReplicaManager(isShuttingDown)
replicaManager.startup()
/* start kafka controller */
kafkaController = new KafkaController(config, zkUtils, time, metrics, threadNamePrefix)
kafkaController.startup()
adminManager = new AdminManager(config, metrics, metadataCache, zkUtils)
/* start group coordinator */
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, Time.SYSTEM)
groupCoordinator.startup()
/* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), zkUtils, metrics, metadataCache, Time.SYSTEM)
transactionCoordinator.startup()
/* Get the authorizer and initialize it if one is specified.*/
authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
authZ.configure(config.originals())
authZ
}
/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
brokerTopicStats, clusterId, time)
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
config.numIoThreads)
Mx4jLoader.maybeLoad()
/* start dynamic config manager */
dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers),
ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),
ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
// Create the config manager. start listening to notifications
dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)
dynamicConfigManager.startup()
/* tell everyone we are alive */
val listeners = config.advertisedListeners.map { endpoint =>
if (endpoint.port == 0)
endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
else
endpoint
}
kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack,
config.interBrokerProtocolVersion)
kafkaHealthcheck.startup()
// Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it
checkpointBrokerId(config.brokerId)
brokerState.newState(RunningAsBroker)
shutdownLatch = new CountDownLatch(1)
startupComplete.set(true)
isStartingUp.set(false)
AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString)
info("started")
}
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
isStartingUp.set(false)
shutdown()
throw e
}
}
进入startup()
方法,首先执行的日志打印,然后去校验当前Broker的状态,如果当前Broker已经执行了stop还没有执行完毕。会提示异常中的信息,如果当前Broker是已经启动完成的状态,那么则直接返回。如果没有启动,执行一下CAS设置isStartingUp
状态,如果设置成功。那么就设置broker的状态为Starting(1启动中参见BrokerStates trait
),然后启动后台线程,连接ZK,在zookeeper上创建clusterId节点(首次创建,之后读取,持久节点),在日志中打印配置文件的BrokerID,配置Kafka的Metric,然后启动LogManager,创建缓存,启动网络核心类SocketServer,启动副本管理,启动组协调者,再启动事务协调器(主要处理事务日志加载,使用单独的后台线程调度器),设置权限,启动processing模块,创建requestHandlerPool,启动动态配置更新,基于listener同志所有的监听,启动成功,启动心跳线程,然后设置原子变量,然后设置broker的状态为RunningAsBroker(3,启动中参见BrokerStates trait
),表示当前broker已启动完毕。
这就是启动方法的主要执行过程,详细的逻辑在SocketServer
,KafkaApis
,RequestHandlerPool
,LogManager
中实现,接下来我们去看网络核心类SocketServer
.