Kafka — Inside of Broker
Project — kafka:core
- github: https://github.com/apache/kafka
- kafka의 핵심 동작 및 공통 로직, 통신을 위한 Protocol , Broker 코드 등을 포함하고 있는 프로젝트.
- 개발 언어 : Scala
Server (Broker 시작 하기)
- server.properties (https://kafka.apache.org/documentation/#brokerconfigs)
- kafka/config/server.properties 파일을 복사해서 사용
- server 시작 시 config 파일 경로를 명시함.
시작
script (shell) 사용하기
ex) sudo /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
main 함수 실행하기 (공부용)
kafka.scala 파일 def main(args: Array[String]): Unit 함수 실행하기
- 환경 변수에 server.properties 설정 필요함
buildServer
config.requiresZookeeper 설정에 따라 Broker 구현체가 달라진다. (zookeeper에서 독립하려는 시도)
-> KafkaServer, KafkaRaftServer
private def parseProcessRoles(): Set[ProcessRole] = {
val roles = getList(KafkaConfig.ProcessRolesProp).asScala.map {
case "broker" => BrokerRole
case "controller" => ControllerRole
case role => throw new ConfigException(s"Unknown process role '$role'" +
" (only 'broker' and 'controller' are allowed roles)")
}
KafkaServer : KafkaBroker
BrokerState
- NOT_RUNNING,
- STARTING,
- RECOVERY,
- RUNNING,
- PENDING_CONTROLLED_SHUTDOWN,
- SHUTTING_DOWN,
- UNKNOWN
class KafkaServer(
val config: KafkaConfig,
time: Time = Time.SYSTEM,
threadNamePrefix: Option[String] = None,
enableForwarding: Boolean = false
) extends KafkaBroker with Server {
private val startupComplete = new AtomicBoolean(false)
private val isShuttingDown = new AtomicBoolean(false)
private val isStartingUp = new AtomicBoolean(false)
@volatile private var _brokerState: BrokerState = BrokerState.NOT_RUNNING
private var shutdownLatch = new CountDownLatch(1)
private var logContext: LogContext = _
private val kafkaMetricsReporters: Seq[KafkaMetricsReporter] =
KafkaMetricsReporter.startReporters(VerifiableProperties(config.originals))
var kafkaYammerMetrics: KafkaYammerMetrics = _
var metrics: Metrics = _
@volatile var dataPlaneRequestProcessor: KafkaApis = _
var controlPlaneRequestProcessor: KafkaApis = _
var authorizer: Option[Authorizer] = None @volatile var socketServer: SocketServer = _
var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = _
var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = _... @volatile private var _kafkaController: KafkaController = _
var quotaManagers: QuotaFactory.QuotaManagers = _
def startup()
override def startup(): Unit = {
try {
info("starting")
if (isShuttingDown.get)
throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
if (startupComplete.get)
return// 시작 가능 상태 확인
val canStartup = isStartingUp.compareAndSet(false, true)
if (canStartup) {
_brokerState = BrokerState.STARTING
/* setup zookeeper */
initZkClient(time)
configRepository = new ZkConfigRepository(new AdminZkClient(zkClient))/* Get or create cluster_id */
_clusterId = getOrGenerateClusterId(zkClient)
info(s"Cluster ID = $clusterId")
/* load metadata */
/* check cluster id */
/* generate brokerId */
// initialize dynamic broker configs from ZooKeeper. Any updates made after this will be
// applied after ZkConfigManager starts./* start scheduler */
kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
kafkaScheduler.startup()/* create and configure metrics */
/* register broker metrics */quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))/* start log manager */
_logManager = LogManager(
.../* initialize feature change listener */// Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
// This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)/* start forwarding manager */
// Create and start the socket server acceptor threads so that the bound port is known.
// Delay starting processors until the end of the initialization sequence to ensure
// that credentials have been loaded before processing authentications.
//
// Note that we allow the use of KRaft mode controller APIs when forwarding is enabled
// so that the Envelope request is exposed. This is only used in testing currently.
socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)// Start alter partition manager based on the IBP version
// Start replica manager
/* start token manager */
/* start kafka controller */
/* start group coordinator */
/* create producer ids manager */
/* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */
/* start auto topic creation manager *//* start processing requests */
SCRAM 인증 실습 : https://devidea.tistory.com/102
dataPlaneRequestProcessor : KafkaApis
dataPlaneRequestHandlerPool
def createKafkaApis(requestChannel: RequestChannel): KafkaApis = new KafkaApis(
requestChannel = requestChannel,
metadataSupport = zkSupport,
replicaManager = replicaManager,
groupCoordinator = groupCoordinator,
txnCoordinator = transactionCoordinator,
autoTopicCreationManager = autoTopicCreationManager,
brokerId = config.brokerId,
config = config,
configRepository = configRepository,
metadataCache = metadataCache,
metrics = metrics,
authorizer = authorizer,
quotas = quotaManagers,
fetchManager = fetchManager,
brokerTopicStats = brokerTopicStats,
clusterId = clusterId,
time = time,
tokenManager = tokenManager,
apiVersionManager = apiVersionManager)
dataPlaneRequestProcessor = createKafkaApis(socketServer.dataPlaneRequestChannel)
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
config.numIoThreads, s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent", DataPlaneAcceptor.ThreadPrefix)
socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel =>
controlPlaneRequestProcessor = createKafkaApis(controlPlaneRequestChannel)
controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
1, s"${ControlPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent", ControlPlaneAcceptor.ThreadPrefix)
}...socketServer.enableRequestProcessing(authorizerFutures)
_brokerState = BrokerState.RUNNING
shutdownLatch = new CountDownLatch(1)
startupComplete.set(true)
isStartingUp.set(false)
AppInfoParser.registerAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics, time.milliseconds())
KafkaApis : 무슨일을 하지?? (처리 프로세스의 중심)
/**
* Logic to handle the various Kafka requests
*/
class KafkaApis(val requestChannel: RequestChannel,
val metadataSupport: MetadataSupport,
val replicaManager: ReplicaManager,
val groupCoordinator: GroupCoordinator,
val txnCoordinator: TransactionCoordinator,
val autoTopicCreationManager: AutoTopicCreationManager,
val brokerId: Int,
val config: KafkaConfig,
val configRepository: ConfigRepository,
val metadataCache: MetadataCache,
val metrics: Metrics,
val authorizer: Option[Authorizer],
val quotas: QuotaManagers,
val fetchManager: FetchManager,
brokerTopicStats: BrokerTopicStats,
val clusterId: String,
time: Time,
val tokenManager: DelegationTokenManager,
val apiVersionManager: ApiVersionManager) extends ApiRequestHandler
Logic to handle the various Kafka requests
모든 로직이 모이는 지점이다.
수 많은 의존성을 갖고 있다.
ApiRequestHandler Trait(일종의 interface) 의 구현체
def handle(request: RequestChannel.Request, requestLocal: RequestLocal):Unit
어떤 요청이 오면 어디선가 handle 을 호출 할 것을 예상할 수 있다.
KafkaRequestHandlerPool
Constructor 의 로직을 살펴보면 apis 를 이용해서 KafkaRequestHandler 를 만들고, HandlerPool에 Ref를 유지한 상태에서 start() 시킨다.
class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: ApiRequestHandler,
time: Time,
numThreads: Int,
requestHandlerAvgIdleMetricName: String,
logAndThreadNamePrefix : String) extends Logging with KafkaMetricsGroup {
private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
/* a meter to track the average free capacity of the request handlers */
private val aggregateIdleMeter = newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS)
this.logIdent = "[" + logAndThreadNamePrefix + " Kafka Request Handler on Broker " + brokerId + "], "
val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
for (i <- 0 until numThreads) {
createHandler(i)
}
def createHandler(id: Int): Unit = synchronized {
runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()
}
KafkaRequestHandler는 Runnable 구현체이다. 별도의 Thread에서 동작하고, apis의 handle 함수를 부르는 지점을 갖고있다.
즉 producer나 consumer의 요청이 발생하면, 이곳으로 들어온다.
def run(): Unit = {
while (!stopped) {
// We use a single meter for aggregate idle percentage for the thread pool.
// Since meter is calculated as total_recorded_value / time_window and
// time_window is independent of the number of threads, each recorded idle
// time should be discounted by # threads.
val startSelectTime = time.nanoseconds
val req = requestChannel.receiveRequest(300)
val endTime = time.nanoseconds
val idleTime = endTime - startSelectTime
aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)
req match {
case RequestChannel.ShutdownRequest =>
debug(s"Kafka request handler $id on broker $brokerId received shut down command")
completeShutdown()
return
case request: RequestChannel.Request =>
try {
request.requestDequeueTimeNanos = endTime
trace(s"Kafka request handler $id on broker $brokerId handling request $request")
apis.handle(request, requestLocal)
KafkaApis.handle()
/**
* Top-level method that handles all requests and multiplexes to the right api
*/
override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
try {
trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
if (!apiVersionManager.isApiEnabled(request.header.apiKey)) {
// The socket server will reject APIs which are not exposed in this scope and close the connection
// before handing them to the request handler, so this path should not be exercised in practice
throw new IllegalStateException(s"API ${request.header.apiKey} is not enabled")
}
request.header.apiKey match {
case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal)
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request, requestLocal)
case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request, requestLocal)
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request, requestLocal)
case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request, requestLocal)
to be continue… (KafkaController)