Ubuntu 环境安装 Kafka、配置运行测试 Kafka 流程笔记

  • 阿里云国际版折扣https://www.yundadi.com

  • 阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

    Kafka 介绍

    Kafka 是一个由 Apache 软件基金会开发的开源流式处理平台。它被设计用于处理大规模数据流提供高可靠性、高吞吐量和低延迟的消息传递系统。Kafka 可以用于构建实时数据管道和流式应用程序让不同应用、系统或者数据源之间能够高效地进行数据交换和通信。

    在这里插入图片描述

    Kafka 的核心概念包括以下几个部分

    1. 消息: Kafka 是基于发布/订阅模式的消息系统它通过主题Topics来组织消息。消息由生产者发布到主题消费者可以订阅一个或多个主题以接收消息。

    2. 主题: 主题是消息的分类每个主题可以包含一个或多个分区Partitions。消息发布到主题后会根据一定规则被分发到不同的分区中。

    3. 分区: 主题可以被分为多个分区每个分区都是有序且持久化的消息记录序列。分区使得 Kafka 能够水平扩展允许多个消费者并行地处理消息。

    4. 生产者: 生产者负责向 Kafka 的主题发布消息。

    5. 消费者: 消费者从 Kafka 主题订阅并处理消息。

    6. 代理Broker: Kafka 集群由多个代理组成每个代理是一个独立的 Kafka 服务器负责存储数据和处理消息。

    Kafka 的特点包括

    • 持久性: Kafka 将消息持久化存储在磁盘上保证消息不会丢失。

    • 高吞吐量: Kafka 能够处理大量数据并保持低延迟适用于大规模的数据处理和分析场景。

    • 可扩展性: 可以水平扩展以处理更多数据和更高的负载。

    • 容错性: Kafka 集群通过副本机制实现数据备份和容错即使部分节点出现故障仍能保证数据可靠性和可用性。

    Kafka 在数据流处理、实时日志处理、指标监控等领域有着广泛的应用被许多公司用于构建实时数据管道和处理大规模数据。

    在 Ubuntu 环境下如何安装 Kafka、Kafka with Kraft

    安装 Kafka 在 Ubuntu 环境下可以通过以下步骤进行。请注意这里描述的是安装 Kafka 3.6.0 版本的方法。在安装之前请确保已经安装了 Java 8 或更新版本。

    在这里插入图片描述

    了解一下 Kraft

    Kafka 2.8 版本引入了 KRaftKafka Raft作为 Kafka 的新的元数据管理方式用来替代原本依赖 ZooKeeper 的方案。KRaft 是一个基于 Raft 一致性协议实现的元数据管理系统它可以作为 Kafka 的替代方案不再依赖 ZooKeeper。

    Kafka with KRaft 使用 Raft 协议来管理和维护 Kafka 的元数据信息包括分区分配、集群配置等。这样可以简化 Kafka 部署和管理过程不再需要维护额外的 ZooKeeper 集群。

    步骤

    1. 安装 Java

    检查是否已经安装 Java

    java -version
    

    如果未安装 Java 或需要更新可以使用以下命令安装 OpenJDK

    sudo apt update
    sudo apt install default-jdk
    
    2. 下载 Kafka

    在 Apache Kafka 的官方网站下载所需的 Kafka 版本例如 3.6.0 版本。

    在这里插入图片描述

    Kafka 的版本号按照 <Scala 版本>-<Kafka 版本> 的格式命名。例如kafka_2.13-3.6.0.tgz 中的 3.6.0 是 Kafka 的版本号而 2.13 表示这个 Kafka 版本是用 Scala 2.13 构建的。Kafka 发布的软件包已经包含了编译后的 Scala 代码因此你只需按照 Kafka 的安装步骤进行操作即可无需单独安装 Scala。

    wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
    
    3. 解压并移动 Kafka

    解压下载的 Kafka 压缩包

    tar -xzf kafka_2.13-3.6.0.tgz
    

    将解压后的文件夹移动到所需位置例如 /opt 目录

    sudo mv kafka_2.13-3.6.0 /opt/kafka
    
    4. 以 Kraft 方式启动 Kafka

    生成集群 UUID

    KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
    

    使用 bin/kafka-storage.sh format 命令来为 Kafka with KRaft 集群的日志目录进行格式化

    bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
    

    启动 Kafka 服务器

    # 正常运行
    bin/kafka-server-start.sh config/kraft/server.properties
    # 也可以选择后台运行
    nohup bin/kafka-server-start.sh config/kraft/server.properties > my_kafka_run.log 2>&1 &
    

    一旦 Kafka 服务器成功启动你就会拥有一个基本的 Kafka 环境可以开始使用了。

    在这里插入图片描述

    启动后的输出信息
    [2023-11-28 07:46:27,307] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
    [2023-11-28 07:46:27,603] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util)
    [2023-11-28 07:46:27,761] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
    [2023-11-28 07:46:27,764] INFO [ControllerServer id=1] Starting controller (kafka.server.ControllerServer)
    [2023-11-28 07:46:27,782] INFO authorizerStart completed for endpoint CONTROLLER. Endpoint is now READY. (org.apache.kafka.server.network.EndpointReadyFutures)
    [2023-11-28 07:46:28,132] INFO Updated connection-accept-rate max connection creation rate to 2147483647 (kafka.network.ConnectionQuotas)
    [2023-11-28 07:46:28,165] INFO [SocketServer listenerType=CONTROLLER, nodeId=1] Created data-plane acceptor and processors for endpoint : ListenerName(CONTROLLER) (kafka.network.SocketServer)
    [2023-11-28 07:46:28,166] INFO [SharedServer id=1] Starting SharedServer (kafka.server.SharedServer)
    [2023-11-28 07:46:28,224] INFO [LogLoader partition=__cluster_metadata-0, dir=/tmp/kraft-combined-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.UnifiedLog$)
    [2023-11-28 07:46:28,224] INFO [LogLoader partition=__cluster_metadata-0, dir=/tmp/kraft-combined-logs] Reloading from producer snapshot and rebuilding producer state from offset 0 (kafka.log.UnifiedLog$)
    [2023-11-28 07:46:28,224] INFO [LogLoader partition=__cluster_metadata-0, dir=/tmp/kraft-combined-logs] Producer state recovery took 0ms for snapshot load and 0ms for segment recovery from offset 0 (kafka.log.UnifiedLog$)
    [2023-11-28 07:46:28,262] INFO Initialized snapshots with IDs SortedSet() from /tmp/kraft-combined-logs/__cluster_metadata-0 (kafka.raft.KafkaMetadataLog$)
    [2023-11-28 07:46:28,301] INFO [raft-expiration-reaper]: Starting (kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper)
    [2023-11-28 07:46:28,490] INFO [RaftManager id=1] Completed transition to Unattached(epoch=0, voters=[1], electionTimeoutMs=1226) from null (org.apache.kafka.raft.QuorumState)
    [2023-11-28 07:46:28,563] INFO [RaftManager id=1] Completed transition to CandidateState(localId=1, epoch=1, retries=1, voteStates={1=GRANTED}, highWatermark=Optional.empty, electionTimeoutMs=1279) from Unattached(epoch=0, voters=[1], electionTimeoutMs=1226) (org.apache.kafka.raft.QuorumState)
    [2023-11-28 07:46:28,572] INFO [RaftManager id=1] Completed transition to Leader(localId=1, epoch=1, epochStartOffset=0, highWatermark=Optional.empty, voterStates={1=ReplicaState(nodeId=1, endOffset=Optional.empty, lastFetchTimestamp=-1, lastCaughtUpTimestamp=-1, hasAcknowledgedLeader=true)}) from CandidateState(localId=1, epoch=1, retries=1, voteStates={1=GRANTED}, highWatermark=Optional.empty, electionTimeoutMs=1279) (org.apache.kafka.raft.QuorumState)
    [2023-11-28 07:46:28,596] INFO [kafka-1-raft-outbound-request-thread]: Starting (kafka.raft.RaftSendThread)
    [2023-11-28 07:46:28,596] INFO [kafka-1-raft-io-thread]: Starting (kafka.raft.KafkaRaftManager$RaftIoThread)
    [2023-11-28 07:46:28,617] INFO [RaftManager id=1] High watermark set to LogOffsetMetadata(offset=1, metadata=Optional[(segmentBaseOffset=0,relativePositionInSegment=91)]) for the first time for epoch 1 based on indexOfHw 0 and voters [ReplicaState(nodeId=1, endOffset=Optional[LogOffsetMetadata(offset=1, metadata=Optional[(segmentBaseOffset=0,relativePositionInSegment=91)])], lastFetchTimestamp=-1, lastCaughtUpTimestamp=-1, hasAcknowledgedLeader=true)] (org.apache.kafka.raft.LeaderState)
    [2023-11-28 07:46:28,619] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1 (org.apache.kafka.image.loader.MetadataLoader)
    [2023-11-28 07:46:28,620] INFO [ControllerServer id=1] Waiting for controller quorum voters future (kafka.server.ControllerServer)
    [2023-11-28 07:46:28,621] INFO [ControllerServer id=1] Finished waiting for controller quorum voters future (kafka.server.ControllerServer)
    [2023-11-28 07:46:28,659] INFO [controller-1-ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
    [2023-11-28 07:46:28,660] INFO [controller-1-ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
    [2023-11-28 07:46:28,661] INFO [controller-1-ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
    [2023-11-28 07:46:28,662] INFO [controller-1-ThrottledChannelReaper-ControllerMutation]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
    [2023-11-28 07:46:28,678] INFO [ExpirationReaper-1-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
    [2023-11-28 07:46:28,686] INFO [ControllerServer id=1] Waiting for the controller metadata publishers to be installed (kafka.server.ControllerServer)
    [2023-11-28 07:46:28,686] INFO [ControllerServer id=1] Finished waiting for the controller metadata publishers to be installed (kafka.server.ControllerServer)
    [2023-11-28 07:46:28,686] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1 (org.apache.kafka.image.loader.MetadataLoader)
    [2023-11-28 07:46:28,686] INFO [SocketServer listenerType=CONTROLLER, nodeId=1] Enabling request processing. (kafka.network.SocketServer)
    [2023-11-28 07:46:28,690] INFO Awaiting socket connections on 0.0.0.0:9093. (kafka.network.DataPlaneAcceptor)
    [2023-11-28 07:46:28,696] INFO [ControllerServer id=1] Waiting for all of the authorizer futures to be completed (kafka.server.ControllerServer)
    [2023-11-28 07:46:28,696] INFO [ControllerServer id=1] Finished waiting for all of the authorizer futures to be completed (kafka.server.ControllerServer)
    [2023-11-28 07:46:28,696] INFO [ControllerServer id=1] Waiting for all of the SocketServer Acceptors to be started (kafka.server.ControllerServer)
    [2023-11-28 07:46:28,696] INFO [ControllerServer id=1] Finished waiting for all of the SocketServer Acceptors to be started (kafka.server.ControllerServer)
    [2023-11-28 07:46:28,698] INFO [BrokerServer id=1] Transition from SHUTDOWN to STARTING (kafka.server.BrokerServer)
    [2023-11-28 07:46:28,699] INFO [BrokerServer id=1] Starting broker (kafka.server.BrokerServer)
    [2023-11-28 07:46:28,706] INFO [broker-1-ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
    [2023-11-28 07:46:28,707] INFO [broker-1-ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
    [2023-11-28 07:46:28,707] INFO [broker-1-ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
    [2023-11-28 07:46:28,707] INFO [broker-1-ThrottledChannelReaper-ControllerMutation]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
    [2023-11-28 07:46:28,724] INFO [BrokerServer id=1] Waiting for controller quorum voters future (kafka.server.BrokerServer)
    [2023-11-28 07:46:28,724] INFO [BrokerServer id=1] Finished waiting for controller quorum voters future (kafka.server.BrokerServer)
    [2023-11-28 07:46:28,729] INFO [broker-1-to-controller-forwarding-channel-manager]: Starting (kafka.server.BrokerToControllerRequestThread)
    [2023-11-28 07:46:28,731] INFO [broker-1-to-controller-forwarding-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
    [2023-11-28 07:46:28,755] INFO Updated connection-accept-rate max connection creation rate to 2147483647 (kafka.network.ConnectionQuotas)
    [2023-11-28 07:46:28,760] INFO [SocketServer listenerType=BROKER, nodeId=1] Created data-plane acceptor and processors for endpoint : ListenerName(PLAINTEXT) (kafka.network.SocketServer)
    [2023-11-28 07:46:28,764] INFO [broker-1-to-controller-alter-partition-channel-manager]: Starting (kafka.server.BrokerToControllerRequestThread)
    [2023-11-28 07:46:28,764] INFO [broker-1-to-controller-alter-partition-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
    [2023-11-28 07:46:28,782] INFO [ExpirationReaper-1-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
    [2023-11-28 07:46:28,783] INFO [ExpirationReaper-1-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
    [2023-11-28 07:46:28,784] INFO [ExpirationReaper-1-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
    [2023-11-28 07:46:28,786] INFO [ExpirationReaper-1-ElectLeader]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
    [2023-11-28 07:46:28,786] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1 (org.apache.kafka.image.loader.MetadataLoader)
    [2023-11-28 07:46:28,786] INFO [ExpirationReaper-1-RemoteFetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
    [2023-11-28 07:46:28,801] INFO [ExpirationReaper-1-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
    [2023-11-28 07:46:28,804] INFO [ExpirationReaper-1-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
    [2023-11-28 07:46:28,836] INFO [broker-1-to-controller-heartbeat-channel-manager]: Starting (kafka.server.BrokerToControllerRequestThread)
    [2023-11-28 07:46:28,836] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
    [2023-11-28 07:46:28,837] INFO [BrokerLifecycleManager id=1] Incarnation rXokDA-kRI2e0TCw3qUr4g of broker 1 in cluster ktQqKm60RwiR-s4Dts0HDg is now STARTING. (kafka.server.BrokerLifecycleManager)
    [2023-11-28 07:46:28,857] INFO [ExpirationReaper-1-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
    [2023-11-28 07:46:28,877] INFO [BrokerServer id=1] Waiting for the broker metadata publishers to be installed (kafka.server.BrokerServer)
    [2023-11-28 07:46:28,877] INFO [BrokerServer id=1] Finished waiting for the broker metadata publishers to be installed (kafka.server.BrokerServer)
    [2023-11-28 07:46:28,877] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1 (org.apache.kafka.image.loader.MetadataLoader)
    [2023-11-28 07:46:28,877] INFO [BrokerServer id=1] Waiting for the controller to acknowledge that we are caught up (kafka.server.BrokerServer)
    [2023-11-28 07:46:28,920] INFO [BrokerToControllerChannelManager id=1 name=heartbeat] Client requested disconnect from node 1 (org.apache.kafka.clients.NetworkClient)
    [2023-11-28 07:46:28,921] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
    [2023-11-28 07:46:28,972] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
    [2023-11-28 07:46:28,977] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1 (org.apache.kafka.image.loader.MetadataLoader)
    [2023-11-28 07:46:28,979] INFO [BrokerToControllerChannelManager id=1 name=heartbeat] Client requested disconnect from node 1 (org.apache.kafka.clients.NetworkClient)
    [2023-11-28 07:46:28,979] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
    [2023-11-28 07:46:29,029] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
    [2023-11-28 07:46:29,035] INFO [BrokerToControllerChannelManager id=1 name=heartbeat] Client requested disconnect from node 1 (org.apache.kafka.clients.NetworkClient)
    [2023-11-28 07:46:29,036] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
    [2023-11-28 07:46:29,077] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1 (org.apache.kafka.image.loader.MetadataLoader)
    [2023-11-28 07:46:29,086] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
    [2023-11-28 07:46:29,091] INFO [BrokerToControllerChannelManager id=1 name=heartbeat] Client requested disconnect from node 1 (org.apache.kafka.clients.NetworkClient)
    [2023-11-28 07:46:29,091] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
    [2023-11-28 07:46:29,141] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
    [2023-11-28 07:46:29,147] INFO [BrokerToControllerChannelManager id=1 name=heartbeat] Client requested disconnect from node 1 (org.apache.kafka.clients.NetworkClient)
    [2023-11-28 07:46:29,147] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
    [2023-11-28 07:46:29,178] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1 (org.apache.kafka.image.loader.MetadataLoader)
    [2023-11-28 07:46:29,197] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
    [2023-11-28 07:46:29,202] INFO [BrokerToControllerChannelManager id=1 name=heartbeat] Client requested disconnect from node 1 (org.apache.kafka.clients.NetworkClient)
    [2023-11-28 07:46:29,202] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
    [2023-11-28 07:46:29,253] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
    [2023-11-28 07:46:29,270] INFO [BrokerToControllerChannelManager id=1 name=heartbeat] Client requested disconnect from node 1 (org.apache.kafka.clients.NetworkClient)
    [2023-11-28 07:46:29,271] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
    [2023-11-28 07:46:29,276] INFO [RaftManager id=1] Registered the listener org.apache.kafka.image.loader.MetadataLoader@382374793 (org.apache.kafka.raft.KafkaRaftClient)
    [2023-11-28 07:46:29,276] INFO [RaftManager id=1] Registered the listener org.apache.kafka.controller.QuorumController$QuorumMetaLogListener@859950147 (org.apache.kafka.raft.KafkaRaftClient)
    [2023-11-28 07:46:29,281] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1 (org.apache.kafka.image.loader.MetadataLoader)
    [2023-11-28 07:46:29,288] INFO [MetadataLoader id=1] maybePublishMetadata(LOG_DELTA): The loader is still catching up because we have not loaded a controller record as of offset 0 and high water mark is 1 (org.apache.kafka.image.loader.MetadataLoader)
    [2023-11-28 07:46:29,320] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
    [2023-11-28 07:46:29,332] INFO [MetadataLoader id=1] maybePublishMetadata(LOG_DELTA): The loader finished catching up to the current high water mark of 5 (org.apache.kafka.image.loader.MetadataLoader)
    [2023-11-28 07:46:29,360] INFO [BrokerLifecycleManager id=1] Successfully registered broker 1 with broker epoch 5 (kafka.server.BrokerLifecycleManager)
    [2023-11-28 07:46:29,382] INFO [BrokerLifecycleManager id=1] The broker has caught up. Transitioning from STARTING to RECOVERY. (kafka.server.BrokerLifecycleManager)
    [2023-11-28 07:46:29,383] INFO [BrokerServer id=1] Finished waiting for the controller to acknowledge that we are caught up (kafka.server.BrokerServer)
    [2023-11-28 07:46:29,383] INFO [BrokerServer id=1] Waiting for the initial broker metadata update to be published (kafka.server.BrokerServer)
    [2023-11-28 07:46:29,386] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing SnapshotGenerator with a snapshot at offset 5 (org.apache.kafka.image.loader.MetadataLoader)
    [2023-11-28 07:46:29,386] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing FeaturesPublisher with a snapshot at offset 5 (org.apache.kafka.image.loader.MetadataLoader)
    [2023-11-28 07:46:29,386] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing DynamicConfigPublisher controller id=1 with a snapshot at offset 5 (org.apache.kafka.image.loader.MetadataLoader)
    [2023-11-28 07:46:29,388] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing DynamicClientQuotaPublisher controller id=1 with a snapshot at offset 5 (org.apache.kafka.image.loader.MetadataLoader)
    [2023-11-28 07:46:29,389] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing ScramPublisher controller id=1 with a snapshot at offset 5 (org.apache.kafka.image.loader.MetadataLoader)
    [2023-11-28 07:46:29,390] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing DelegationTokenPublisher controller id=1 with a snapshot at offset 5 (org.apache.kafka.image.loader.MetadataLoader)
    [2023-11-28 07:46:29,392] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing ControllerMetadataMetricsPublisher with a snapshot at offset 5 (org.apache.kafka.image.loader.MetadataLoader)
    [2023-11-28 07:46:29,393] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing AclPublisher controller id=1 with a snapshot at offset 5 (org.apache.kafka.image.loader.MetadataLoader)
    [2023-11-28 07:46:29,394] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing BrokerMetadataPublisher with a snapshot at offset 5 (org.apache.kafka.image.loader.MetadataLoader)
    [2023-11-28 07:46:29,394] INFO [BrokerMetadataPublisher id=1] Publishing initial metadata at offset OffsetAndEpoch(offset=5, epoch=1) with metadata.version 3.6-IV2. (kafka.server.metadata.BrokerMetadataPublisher)
    [2023-11-28 07:46:29,395] INFO [BrokerLifecycleManager id=1] The broker is in RECOVERY. (kafka.server.BrokerLifecycleManager)
    [2023-11-28 07:46:29,397] INFO Loading logs from log dirs ArraySeq(/tmp/kraft-combined-logs) (kafka.log.LogManager)
    [2023-11-28 07:46:29,402] INFO No logs found to be loaded in /tmp/kraft-combined-logs (kafka.log.LogManager)
    [2023-11-28 07:46:29,409] INFO Loaded 0 logs in 12ms (kafka.log.LogManager)
    [2023-11-28 07:46:29,410] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
    [2023-11-28 07:46:29,415] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
    [2023-11-28 07:46:29,555] INFO [kafka-log-cleaner-thread-0]: Starting (kafka.log.LogCleaner$CleanerThread)
    [2023-11-28 07:46:29,556] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)
    [2023-11-28 07:46:29,557] INFO [AddPartitionsToTxnSenderThread-1]: Starting (kafka.server.AddPartitionsToTxnManager)
    [2023-11-28 07:46:29,557] INFO [GroupCoordinator 1]: Starting up. (kafka.coordinator.group.GroupCoordinator)
    [2023-11-28 07:46:29,561] INFO [GroupCoordinator 1]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
    [2023-11-28 07:46:29,562] INFO [TransactionCoordinator id=1] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
    [2023-11-28 07:46:29,563] INFO [TransactionCoordinator id=1] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
    [2023-11-28 07:46:29,563] INFO [BrokerMetadataPublisher id=1] Updating metadata.version to 14 at offset OffsetAndEpoch(offset=5, epoch=1). (kafka.server.metadata.BrokerMetadataPublisher)
    [2023-11-28 07:46:29,566] INFO [TxnMarkerSenderThread-1]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
    [2023-11-28 07:46:29,568] INFO [BrokerServer id=1] Finished waiting for the initial broker metadata update to be published (kafka.server.BrokerServer)
    [2023-11-28 07:46:29,570] INFO KafkaConfig values:
    	advertised.listeners = PLAINTEXT://localhost:9092
    	alter.config.policy.class.name = null
    	alter.log.dirs.replication.quota.window.num = 11
    	alter.log.dirs.replication.quota.window.size.seconds = 1
    	authorizer.class.name =
    	auto.create.topics.enable = true
    	auto.include.jmx.reporter = true
    	auto.leader.rebalance.enable = true
    	background.threads = 10
    	broker.heartbeat.interval.ms = 2000
    	broker.id = 1
    	broker.id.generation.enable = true
    	broker.rack = null
    	broker.session.timeout.ms = 9000
    	client.quota.callback.class = null
    	compression.type = producer
    	connection.failed.authentication.delay.ms = 100
    	connections.max.idle.ms = 600000
    	connections.max.reauth.ms = 0
    	control.plane.listener.name = null
    	controlled.shutdown.enable = true
    	controlled.shutdown.max.retries = 3
    	controlled.shutdown.retry.backoff.ms = 5000
    	controller.listener.names = CONTROLLER
    	controller.quorum.append.linger.ms = 25
    	controller.quorum.election.backoff.max.ms = 1000
    	controller.quorum.election.timeout.ms = 1000
    	controller.quorum.fetch.timeout.ms = 2000
    	controller.quorum.request.timeout.ms = 2000
    	controller.quorum.retry.backoff.ms = 20
    	controller.quorum.voters = [1@localhost:9093]
    	controller.quota.window.num = 11
    	controller.quota.window.size.seconds = 1
    	controller.socket.timeout.ms = 30000
    	create.topic.policy.class.name = null
    	default.replication.factor = 1
    	delegation.token.expiry.check.interval.ms = 3600000
    	delegation.token.expiry.time.ms = 86400000
    	delegation.token.master.key = null
    	delegation.token.max.lifetime.ms = 604800000
    	delegation.token.secret.key = null
    	delete.records.purgatory.purge.interval.requests = 1
    	delete.topic.enable = true
    	early.start.listeners = null
    	fetch.max.bytes = 57671680
    	fetch.purgatory.purge.interval.requests = 1000
    	group.consumer.assignors = [org.apache.kafka.coordinator.group.assignor.RangeAssignor]
    	group.consumer.heartbeat.interval.ms = 5000
    	group.consumer.max.heartbeat.interval.ms = 15000
    	group.consumer.max.session.timeout.ms = 60000
    	group.consumer.max.size = 2147483647
    	group.consumer.min.heartbeat.interval.ms = 5000
    	group.consumer.min.session.timeout.ms = 45000
    	group.consumer.session.timeout.ms = 45000
    	group.coordinator.new.enable = false
    	group.coordinator.threads = 1
    	group.initial.rebalance.delay.ms = 3000
    	group.max.session.timeout.ms = 1800000
    	group.max.size = 2147483647
    	group.min.session.timeout.ms = 6000
    	initial.broker.registration.timeout.ms = 60000
    	inter.broker.listener.name = PLAINTEXT
    	inter.broker.protocol.version = 3.6-IV2
    	kafka.metrics.polling.interval.secs = 10
    	kafka.metrics.reporters = []
    	leader.imbalance.check.interval.seconds = 300
    	leader.imbalance.per.broker.percentage = 10
    	listener.security.protocol.map = CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
    	listeners = PLAINTEXT://:9092,CONTROLLER://:9093
    	log.cleaner.backoff.ms = 15000
    	log.cleaner.dedupe.buffer.size = 134217728
    	log.cleaner.delete.retention.ms = 86400000
    	log.cleaner.enable = true
    	log.cleaner.io.buffer.load.factor = 0.9
    	log.cleaner.io.buffer.size = 524288
    	log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
    	log.cleaner.max.compaction.lag.ms = 9223372036854775807
    	log.cleaner.min.cleanable.ratio = 0.5
    	log.cleaner.min.compaction.lag.ms = 0
    	log.cleaner.threads = 1
    	log.cleanup.policy = [delete]
    	log.dir = /tmp/kafka-logs
    	log.dirs = /tmp/kraft-combined-logs
    	log.flush.interval.messages = 9223372036854775807
    	log.flush.interval.ms = null
    	log.flush.offset.checkpoint.interval.ms = 60000
    	log.flush.scheduler.interval.ms = 9223372036854775807
    	log.flush.start.offset.checkpoint.interval.ms = 60000
    	log.index.interval.bytes = 4096
    	log.index.size.max.bytes = 10485760
    	log.local.retention.bytes = -2
    	log.local.retention.ms = -2
    	log.message.downconversion.enable = true
    	log.message.format.version = 3.0-IV1
    	log.message.timestamp.after.max.ms = 9223372036854775807
    	log.message.timestamp.before.max.ms = 9223372036854775807
    	log.message.timestamp.difference.max.ms = 9223372036854775807
    	log.message.timestamp.type = CreateTime
    	log.preallocate = false
    	log.retention.bytes = -1
    	log.retention.check.interval.ms = 300000
    	log.retention.hours = 168
    	log.retention.minutes = null
    	log.retention.ms = null
    	log.roll.hours = 168
    	log.roll.jitter.hours = 0
    	log.roll.jitter.ms = null
    	log.roll.ms = null
    	log.segment.bytes = 1073741824
    	log.segment.delete.delay.ms = 60000
    	max.connection.creation.rate = 2147483647
    	max.connections = 2147483647
    	max.connections.per.ip = 2147483647
    	max.connections.per.ip.overrides =
    	max.incremental.fetch.session.cache.slots = 1000
    	message.max.bytes = 1048588
    	metadata.log.dir = null
    	metadata.log.max.record.bytes.between.snapshots = 20971520
    	metadata.log.max.snapshot.interval.ms = 3600000
    	metadata.log.segment.bytes = 1073741824
    	metadata.log.segment.min.bytes = 8388608
    	metadata.log.segment.ms = 604800000
    	metadata.max.idle.interval.ms = 500
    	metadata.max.retention.bytes = 104857600
    	metadata.max.retention.ms = 604800000
    	metric.reporters = []
    	metrics.num.samples = 2
    	metrics.recording.level = INFO
    	metrics.sample.window.ms = 30000
    	min.insync.replicas = 1
    	node.id = 1
    	num.io.threads = 8
    	num.network.threads = 3
    	num.partitions = 1
    	num.recovery.threads.per.data.dir = 1
    	num.replica.alter.log.dirs.threads = null
    	num.replica.fetchers = 1
    	offset.metadata.max.bytes = 4096
    	offsets.commit.required.acks = -1
    	offsets.commit.timeout.ms = 5000
    	offsets.load.buffer.size = 5242880
    	offsets.retention.check.interval.ms = 600000
    	offsets.retention.minutes = 10080
    	offsets.topic.compression.codec = 0
    	offsets.topic.num.partitions = 50
    	offsets.topic.replication.factor = 1
    	offsets.topic.segment.bytes = 104857600
    	password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding
    	password.encoder.iterations = 4096
    	password.encoder.key.length = 128
    	password.encoder.keyfactory.algorithm = null
    	password.encoder.old.secret = null
    	password.encoder.secret = null
    	principal.builder.class = class org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
    	process.roles = [broker, controller]
    	producer.id.expiration.check.interval.ms = 600000
    	producer.id.expiration.ms = 86400000
    	producer.purgatory.purge.interval.requests = 1000
    	queued.max.request.bytes = -1
    	queued.max.requests = 500
    	quota.window.num = 11
    	quota.window.size.seconds = 1
    	remote.log.index.file.cache.total.size.bytes = 1073741824
    	remote.log.manager.task.interval.ms = 30000
    	remote.log.manager.task.retry.backoff.max.ms = 30000
    	remote.log.manager.task.retry.backoff.ms = 500
    	remote.log.manager.task.retry.jitter = 0.2
    	remote.log.manager.thread.pool.size = 10
    	remote.log.metadata.custom.metadata.max.bytes = 128
    	remote.log.metadata.manager.class.name = org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager
    	remote.log.metadata.manager.class.path = null
    	remote.log.metadata.manager.impl.prefix = rlmm.config.
    	remote.log.metadata.manager.listener.name = null
    	remote.log.reader.max.pending.tasks = 100
    	remote.log.reader.threads = 10
    	remote.log.storage.manager.class.name = null
    	remote.log.storage.manager.class.path = null
    	remote.log.storage.manager.impl.prefix = rsm.config.
    	remote.log.storage.system.enable = false
    	replica.fetch.backoff.ms = 1000
    	replica.fetch.max.bytes = 1048576
    	replica.fetch.min.bytes = 1
    	replica.fetch.response.max.bytes = 10485760
    	replica.fetch.wait.max.ms = 500
    	replica.high.watermark.checkpoint.interval.ms = 5000
    	replica.lag.time.max.ms = 30000
    	replica.selector.class = null
    	replica.socket.receive.buffer.bytes = 65536
    	replica.socket.timeout.ms = 30000
    	replication.quota.window.num = 11
    	replication.quota.window.size.seconds = 1
    	request.timeout.ms = 30000
    	reserved.broker.max.id = 1000
    	sasl.client.callback.handler.class = null
    	sasl.enabled.mechanisms = [GSSAPI]
    	sasl.jaas.config = null
    	sasl.kerberos.kinit.cmd = /usr/bin/kinit
    	sasl.kerberos.min.time.before.relogin = 60000
    	sasl.kerberos.principal.to.local.rules = [DEFAULT]
    	sasl.kerberos.service.name = null
    	sasl.kerberos.ticket.renew.jitter = 0.05
    	sasl.kerberos.ticket.renew.window.factor = 0.8
    	sasl.login.callback.handler.class = null
    	sasl.login.class = null
    	sasl.login.connect.timeout.ms = null
    	sasl.login.read.timeout.ms = null
    	sasl.login.refresh.buffer.seconds = 300
    	sasl.login.refresh.min.period.seconds = 60
    	sasl.login.refresh.window.factor = 0.8
    	sasl.login.refresh.window.jitter = 0.05
    	sasl.login.retry.backoff.max.ms = 10000
    	sasl.login.retry.backoff.ms = 100
    	sasl.mechanism.controller.protocol = GSSAPI
    	sasl.mechanism.inter.broker.protocol = GSSAPI
    	sasl.oauthbearer.clock.skew.seconds = 30
    	sasl.oauthbearer.expected.audience = null
    	sasl.oauthbearer.expected.issuer = null
    	sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
    	sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
    	sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
    	sasl.oauthbearer.jwks.endpoint.url = null
    	sasl.oauthbearer.scope.claim.name = scope
    	sasl.oauthbearer.sub.claim.name = sub
    	sasl.oauthbearer.token.endpoint.url = null
    	sasl.server.callback.handler.class = null
    	sasl.server.max.receive.size = 524288
    	security.inter.broker.protocol = PLAINTEXT
    	security.providers = null
    	server.max.startup.time.ms = 9223372036854775807
    	socket.connection.setup.timeout.max.ms = 30000
    	socket.connection.setup.timeout.ms = 10000
    	socket.listen.backlog.size = 50
    	socket.receive.buffer.bytes = 102400
    	socket.request.max.bytes = 104857600
    	socket.send.buffer.bytes = 102400
    	ssl.cipher.suites = []
    	ssl.client.auth = none
    	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    	ssl.endpoint.identification.algorithm = https
    	ssl.engine.factory.class = null
    	ssl.key.password = null
    	ssl.keymanager.algorithm = SunX509
    	ssl.keystore.certificate.chain = null
    	ssl.keystore.key = null
    	ssl.keystore.location = null
    	ssl.keystore.password = null
    	ssl.keystore.type = JKS
    	ssl.principal.mapping.rules = DEFAULT
    	ssl.protocol = TLSv1.3
    	ssl.provider = null
    	ssl.secure.random.implementation = null
    	ssl.trustmanager.algorithm = PKIX
    	ssl.truststore.certificates = null
    	ssl.truststore.location = null
    	ssl.truststore.password = null
    	ssl.truststore.type = JKS
    	transaction.abort.timed.out.transaction.cleanup.interval.ms = 10000
    	transaction.max.timeout.ms = 900000
    	transaction.partition.verification.enable = true
    	transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
    	transaction.state.log.load.buffer.size = 5242880
    	transaction.state.log.min.isr = 1
    	transaction.state.log.num.partitions = 50
    	transaction.state.log.replication.factor = 1
    	transaction.state.log.segment.bytes = 104857600
    	transactional.id.expiration.ms = 604800000
    	unclean.leader.election.enable = false
    	unstable.api.versions.enable = false
    	zookeeper.clientCnxnSocket = null
    	zookeeper.connect = null
    	zookeeper.connection.timeout.ms = null
    	zookeeper.max.in.flight.requests = 10
    	zookeeper.metadata.migration.enable = false
    	zookeeper.session.timeout.ms = 18000
    	zookeeper.set.acl = false
    	zookeeper.ssl.cipher.suites = null
    	zookeeper.ssl.client.enable = false
    	zookeeper.ssl.crl.enable = false
    	zookeeper.ssl.enabled.protocols = null
    	zookeeper.ssl.endpoint.identification.algorithm = HTTPS
    	zookeeper.ssl.keystore.location = null
    	zookeeper.ssl.keystore.password = null
    	zookeeper.ssl.keystore.type = null
    	zookeeper.ssl.ocsp.enable = false
    	zookeeper.ssl.protocol = TLSv1.2
    	zookeeper.ssl.truststore.location = null
    	zookeeper.ssl.truststore.password = null
    	zookeeper.ssl.truststore.type = null
     (kafka.server.KafkaConfig)
    [2023-11-28 07:46:29,577] INFO [BrokerServer id=1] Waiting for the broker to be unfenced (kafka.server.BrokerServer)
    [2023-11-28 07:46:29,612] INFO [BrokerLifecycleManager id=1] The broker has been unfenced. Transitioning from RECOVERY to RUNNING. (kafka.server.BrokerLifecycleManager)
    [2023-11-28 07:46:29,661] INFO [BrokerServer id=1] Finished waiting for the broker to be unfenced (kafka.server.BrokerServer)
    [2023-11-28 07:46:29,662] INFO authorizerStart completed for endpoint PLAINTEXT. Endpoint is now READY. (org.apache.kafka.server.network.EndpointReadyFutures)
    [2023-11-28 07:46:29,663] INFO [SocketServer listenerType=BROKER, nodeId=1] Enabling request processing. (kafka.network.SocketServer)
    [2023-11-28 07:46:29,663] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.DataPlaneAcceptor)
    [2023-11-28 07:46:29,664] INFO [BrokerServer id=1] Waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
    [2023-11-28 07:46:29,664] INFO [BrokerServer id=1] Finished waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
    [2023-11-28 07:46:29,664] INFO [BrokerServer id=1] Waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
    [2023-11-28 07:46:29,664] INFO [BrokerServer id=1] Finished waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
    [2023-11-28 07:46:29,664] INFO [BrokerServer id=1] Transition from STARTING to STARTED (kafka.server.BrokerServer)
    [2023-11-28 07:46:29,665] INFO Kafka version: 3.6.0 (org.apache.kafka.common.utils.AppInfoParser)
    [2023-11-28 07:46:29,665] INFO Kafka commitId: 60e845626d8a465a (org.apache.kafka.common.utils.AppInfoParser)
    [2023-11-28 07:46:29,665] INFO Kafka startTimeMs: 1701157589664 (org.apache.kafka.common.utils.AppInfoParser)
    [2023-11-28 07:46:29,666] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)
    [2023-11-28 07:53:16,542] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
    [2023-11-28 07:53:16,543] INFO [BrokerLifecycleManager id=1] Unable to send a heartbeat because the RPC got timed out before it could be sent. (kafka.server.BrokerLifecycleManager)
    
    6. 测试 Kafka

    创建一个主题Topic并发送/接收一些消息来测试 Kafka。例如创建名为 test-topic 的主题

    bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
    

    生产者发送消息到该主题

    bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
    

    在另一个终端窗口中启动消费者以接收消息

    bin/kafka-console-consumer.sh --topic test-topic --bootstrap-server localhost:9092 --from-beginning
    

    在这里插入图片描述

    这些步骤将帮助你在 Ubuntu 上安装并启动 Kafka并进行简单的测试以确保 Kafka 正常运行。

  • 阿里云国际版折扣https://www.yundadi.com

  • 阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6