Kafka — Inside of Broker

Jinhan Choi
5 min readSep 29, 2022

--

Project — kafka:core

  • github: https://github.com/apache/kafka
  • kafka의 핵심 동작 및 공통 로직, 통신을 위한 Protocol , Broker 코드 등을 포함하고 있는 프로젝트.
  • 개발 언어 : Scala

Server (Broker 시작 하기)

시작

script (shell) 사용하기

ex) sudo /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties

main 함수 실행하기 (공부용)

kafka.scala 파일 def main(args: Array[String]): Unit 함수 실행하기
- 환경 변수에 server.properties 설정 필요함

buildServer

config.requiresZookeeper 설정에 따라 Broker 구현체가 달라진다. (zookeeper에서 독립하려는 시도)

-> KafkaServer, KafkaRaftServer

private def parseProcessRoles(): Set[ProcessRole] = {
val roles = getList(KafkaConfig.ProcessRolesProp).asScala.map {
case "broker" => BrokerRole
case "controller" => ControllerRole
case role => throw new ConfigException(s"Unknown process role '$role'" +
" (only 'broker' and 'controller' are allowed roles)")
}

KafkaServer : KafkaBroker

BrokerState

  • NOT_RUNNING,
  • STARTING,
  • RECOVERY,
  • RUNNING,
  • PENDING_CONTROLLED_SHUTDOWN,
  • SHUTTING_DOWN,
  • UNKNOWN
class KafkaServer(
val config: KafkaConfig,
time: Time = Time.SYSTEM,
threadNamePrefix: Option[String] = None,
enableForwarding: Boolean = false
) extends KafkaBroker with Server {

private val startupComplete = new AtomicBoolean(false)
private val isShuttingDown = new AtomicBoolean(false)
private val isStartingUp = new AtomicBoolean(false)

@volatile private var _brokerState: BrokerState = BrokerState.NOT_RUNNING
private var shutdownLatch = new CountDownLatch(1)
private var logContext: LogContext = _

private val kafkaMetricsReporters: Seq[KafkaMetricsReporter] =
KafkaMetricsReporter.startReporters(VerifiableProperties(config.originals))
var kafkaYammerMetrics: KafkaYammerMetrics = _
var metrics: Metrics = _

@volatile var dataPlaneRequestProcessor: KafkaApis = _
var controlPlaneRequestProcessor: KafkaApis = _


var authorizer: Option[Authorizer] = None
@volatile var socketServer: SocketServer = _
var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = _
var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = _
... @volatile private var _kafkaController: KafkaController = _
var quotaManagers: QuotaFactory.QuotaManagers = _

def startup()

override def startup(): Unit = {
try {
info("starting")

if (isShuttingDown.get)
throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")

if (startupComplete.get)
return
// 시작 가능 상태 확인
val canStartup = isStartingUp.compareAndSet(false, true)
if (canStartup) {
_brokerState
= BrokerState.STARTING


/* setup zookeeper */
initZkClient(time)
configRepository
= new ZkConfigRepository(new AdminZkClient(zkClient))
/* Get or create cluster_id */
_clusterId = getOrGenerateClusterId(zkClient)
info(s"Cluster ID = $clusterId")

/* load metadata */
/* check cluster id */
/* generate brokerId */
// initialize dynamic broker configs from ZooKeeper. Any updates made after this will be
// applied after ZkConfigManager starts.
/* start scheduler */
kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
kafkaScheduler
.startup()
/* create and configure metrics */
/* register broker metrics */
quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))/* start log manager */
_logManager = LogManager(
...
/* initialize feature change listener */// Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
// This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider
= new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
/* start forwarding manager */
// Create and start the socket server acceptor threads so that the bound port is known.
// Delay starting processors until the end of the initialization sequence to ensure
// that credentials have been loaded before processing authentications.
//
// Note that we allow the use of KRaft mode controller APIs when forwarding is enabled
// so that the Envelope request is exposed. This is only used in testing currently.
socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)
// Start alter partition manager based on the IBP version
// Start replica manager
/* start token manager */
/* start kafka controller */
/* start group coordinator */
/* create producer ids manager */
/* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */
/* start auto topic creation manager */
/* start processing requests */

SCRAM 인증 실습 : https://devidea.tistory.com/102

dataPlaneRequestProcessor : KafkaApis
dataPlaneRequestHandlerPool

def createKafkaApis(requestChannel: RequestChannel): KafkaApis = new KafkaApis(
requestChannel = requestChannel,
metadataSupport = zkSupport,
replicaManager = replicaManager,
groupCoordinator = groupCoordinator,
txnCoordinator = transactionCoordinator,
autoTopicCreationManager = autoTopicCreationManager,
brokerId = config.brokerId,
config = config,
configRepository = configRepository,
metadataCache = metadataCache,
metrics = metrics,
authorizer = authorizer,
quotas = quotaManagers,
fetchManager = fetchManager,
brokerTopicStats = brokerTopicStats,
clusterId = clusterId,
time = time,
tokenManager = tokenManager,
apiVersionManager = apiVersionManager)

dataPlaneRequestProcessor = createKafkaApis(socketServer.dataPlaneRequestChannel)

dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
config.numIoThreads, s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent", DataPlaneAcceptor.ThreadPrefix)

socketServer
.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel =>
controlPlaneRequestProcessor = createKafkaApis(controlPlaneRequestChannel)
controlPlaneRequestHandlerPool
= new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
1, s"${ControlPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent", ControlPlaneAcceptor.ThreadPrefix)
}
...socketServer.enableRequestProcessing(authorizerFutures)

_brokerState
= BrokerState.RUNNING
shutdownLatch
= new CountDownLatch(1)
startupComplete
.set(true)
isStartingUp
.set(false)
AppInfoParser.registerAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics, time.milliseconds())

KafkaApis : 무슨일을 하지?? (처리 프로세스의 중심)

/**
* Logic to handle the various Kafka requests
*/
class KafkaApis(val requestChannel: RequestChannel,
val metadataSupport: MetadataSupport,
val replicaManager: ReplicaManager,
val groupCoordinator: GroupCoordinator,
val txnCoordinator: TransactionCoordinator,
val autoTopicCreationManager: AutoTopicCreationManager,
val brokerId: Int,
val config: KafkaConfig,
val configRepository: ConfigRepository,
val metadataCache: MetadataCache,
val metrics: Metrics,
val authorizer: Option[Authorizer],
val quotas: QuotaManagers,
val fetchManager: FetchManager,
brokerTopicStats: BrokerTopicStats,
val clusterId: String,
time: Time,
val tokenManager: DelegationTokenManager,
val apiVersionManager: ApiVersionManager) extends ApiRequestHandler

Logic to handle the various Kafka requests

모든 로직이 모이는 지점이다.
수 많은 의존성을 갖고 있다.

ApiRequestHandler Trait(일종의 interface) 의 구현체

def handle(request: RequestChannel.Request, requestLocal: RequestLocal):Unit

어떤 요청이 오면 어디선가 handle 을 호출 할 것을 예상할 수 있다.

KafkaRequestHandlerPool

Constructor 의 로직을 살펴보면 apis 를 이용해서 KafkaRequestHandler 를 만들고, HandlerPool에 Ref를 유지한 상태에서 start() 시킨다.

class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: ApiRequestHandler,
time: Time,
numThreads: Int,
requestHandlerAvgIdleMetricName: String,
logAndThreadNamePrefix : String) extends Logging with KafkaMetricsGroup {

private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
/* a meter to track the average free capacity of the request handlers */
private val aggregateIdleMeter = newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS)

this.logIdent = "[" + logAndThreadNamePrefix + " Kafka Request Handler on Broker " + brokerId + "], "
val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
for (i <- 0 until numThreads) {
createHandler(i)
}

def createHandler(id: Int): Unit = synchronized {
runnables
+= new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()
}

KafkaRequestHandler는 Runnable 구현체이다. 별도의 Thread에서 동작하고, apis의 handle 함수를 부르는 지점을 갖고있다.

즉 producer나 consumer의 요청이 발생하면, 이곳으로 들어온다.

def run(): Unit = {
while (!stopped) {
// We use a single meter for aggregate idle percentage for the thread pool.
// Since meter is calculated as total_recorded_value / time_window and
// time_window is independent of the number of threads, each recorded idle
// time should be discounted by # threads.
val startSelectTime = time.nanoseconds

val req = requestChannel.receiveRequest(300)
val endTime = time.nanoseconds
val idleTime = endTime - startSelectTime
aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)

req match {
case RequestChannel.ShutdownRequest =>
debug(s"Kafka request handler $id on broker $brokerId received shut down command")
completeShutdown()
return

case request: RequestChannel.Request =>
try {
request.requestDequeueTimeNanos = endTime
trace(s"Kafka request handler $id on broker $brokerId handling request $request")
apis.handle(request, requestLocal)

KafkaApis.handle()

/**
* Top-level method that handles all requests and multiplexes to the right api
*/
override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
try {
trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")

if (!apiVersionManager.isApiEnabled(request.header.apiKey)) {
// The socket server will reject APIs which are not exposed in this scope and close the connection
// before handing them to the request handler, so this path should not be exercised in practice
throw new IllegalStateException(s"API ${request.header.apiKey} is not enabled")
}

request.header.apiKey match {
case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal)
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request, requestLocal)
case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request, requestLocal)
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request, requestLocal)
case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request, requestLocal)

to be continue… (KafkaController)

--

--

No responses yet