Kafka教程
推荐链接:
https://en.wikipedia.org/wiki/Apache_Kafka
https://github.com/apache/kafka
MQ
介绍
MQ即MessageQueue,消息对列。我们这次要学习的RabbitMQ就是一种典型的MQ产品。
那么到底什么是MQ呢?可以分两个部分来理解:
- 消息Message:在不同应用程序之间传递的数据。
- 队列Queue:一种FIFO先进先出的数据结构。将消息以队列的形式存储起来,并且在不同的应用程序之间进行传递,这就成了MessageQueue。
**MQ产品最直接的作用,是将同步的事件驱动改为异步的消息驱动。**这话什么意思?我们从一个最常见的SpringBoot应用开始说起。
首先搭建一个普通的Maven项目在pom.xml中引入SpringBoot的依赖
1 | <dependencies> |
然后添加一个监听器类
1 | public class MyApplicationListener implements ApplicationListener<ApplicationEvent> { |
接下来,添加一个springboot启动类,在启动类中加入自己的监听器。
1 |
|
好了,不用添加配置文件,直接启动。

从控制台可以看出,SpringBoot框架在启动时,会尝试发布各种ApplicationEvent事件。此时,SpringBoot框架可以称为消息生产者Producer。同样,只要有ApplicationEvent事件发布了,就会触发MyApplicationListener监听器处理这些事件,MyApplicationListener可以称为消息消费者Consumer。
Producer和Consumer的运行状况互不干涉,不管有没有Consumer,Producer一样会发布消息。反过来,不管Producer有没有发布消息,Consumer也一样会监听这些事件。这种方式,实际上就是通过事件中包含的消息在驱动Producer和Consumer工作,这种工作方式也就称为消息驱动。
与消息驱动形成对比的是常见的事件驱动。比如经常写的Controller,只有通过一个事件主动触发,才会调用。
由此可见,SpringBoot内部集成这种消息驱动的机制。但是,这些Producer和Consumer都只能在一个进程中使用。如果需要跨进程调用呢?这就需要独立一个中间服务,才能发布和订阅这些消息。而这个中间服务,就是MQ中间件。
比如在一个大型电商项目中,订单服务完成下单,就可以发布下单事件,而下游的消费者就可以消费这个下单事件,进行一些补充的业务。

应用场景(优势)
**异步处理/异步提速:将非核心、耗时的操作放入队列,让主流程快速返回,从而大幅缩短响应时间。**举例:电商平台用户完成下单支付后,主线程只需记录订单成功。随后将“发送通知短信”、“生成积分”、“扣减库存”等任务丢入消息队列异步执行,提升用户体验。
**应用解耦:上下游系统通过MQ进行交互(即便使用不同开发语言),从而实现系统间的隔离。**举例:当用户在论坛发布新文章时,系统只需将文章内容发至消息队列,由下游的“搜索引擎”、“文章推荐系统”、“积分系统”各自独立消费处理。新增业务模块时,无需修改主发帖逻辑。
**流量削峰/削峰填谷:在上游请求激增时,将瞬时海量请求暂存到消息队列中,下游服务根据自身最大处理能力匀速消费,防止系统被打垮。**举例:电商秒杀活动中,数百万用户同时点击购买,将请求全部打入数据库会导致宕机。使用消息队列可缓冲洪峰,系统依自身能力处理(如每秒处理2000笔),未处理的请求在队列中排队等待。
**分布式事务管理:通过消息的最终一致性,保证分布式环境下各个独立数据库的数据同步。**举例:跨库业务处理。例如用户跨行转账,资金转出成功后产生一条消息,转入方系统消费该消息增加余额,若失败则不断重试,达成事务的最终一致。扩展:分布式事务的主流解决方案
劣势
引入 MQ 属于典型的“用空间换时间”和“用复杂度换并发”的方案。
系统可用性降低(引入单点风险)
- 原理:系统引入的外部依赖越多,故障概率就越高。
- 代价:原本系统只需 A 服务直接调用 B 服务即可完成业务。引入 MQ 后,一旦 MQ 出现宕机、磁盘满、或集群网络分区,整个上游的写操作将全部瘫痪。因此,为了保证高可用,必须额外投入精力去维护 MQ 的主从集群或分布式部署。
系统复杂度大幅提高(治理成本高),开发者必须处理一系列由异步带来的棘手问题:
- 消息丢失问题:如何保证消息在网络闪断、MQ 宕机或消费者挂掉时不丢失?(需要开启生产者确认、持久化、消费者手动 ACK)。
- 消息重复消费(幂等性问题):由于网络延迟,生产者可能会重复发送消息,或者 MQ 可能会重复投递消息。下游消费者必须实现幂等性设计(如通过唯一业务 ID 去重、加锁),否则会导致重复扣款、重复加积分等严重业务事故。
- 消息积压问题:当上游发送速度远大于下游消费速度时,MQ 队列会爆满,可能导致磁盘写满或消息过期丢失(需要监控、临时扩容消费者、加速消费)。
数据一致性问题(从强一致变为最终一致)
- 原理:A 服务执行完本地操作,发完 MQ 消息就直接返回成功给用户了。此时 B 服务还在队列里排队等待消费。
- 代价:这破坏了传统的强一致性。如果 B 服务在消费时发生代码异常、数据库宕机,导致这条消息最终消费失败,那么 A 和 B 的数据就会产生不一致(例如 A 钱扣了,B 商品没发)。此时必须引入分布式事务、复杂的重试机制或人工对账补偿。
消息顺序性难以保证
- 原理:分布式 MQ 为了高并发,通常会将消息分散在多个分片/队列(Partition/Queue)中并发处理。
- 代价:这导致消息的消费顺序可能与发送顺序不一致。例如:某个订单同时产生了“创建订单”和“支付订单”两条消息,如果“支付订单”因为网络快先被消费了,业务逻辑就会报错。保证严格顺序(如使用 Kafka 的 Partition Key 机制)往往需要牺牲系统的并发性能。
可视化工具
https://github.com/Bronya0/Kafka-King 使用中
https://github.com/kafbat/kafka-ui
历史版本
关于所有历史版本的 发布日期、官方博客、升级说明 及 发行说明,均可在 下载页面 集中查阅。
| 大版本 | 发布时间 | 核心技术标签 | 架构依赖 | 核心商业/运维价值 |
|---|---|---|---|---|
| Kafka 1.0 | 2017 年 | Exactly-Once 落地 | 强依赖 ZooKeeper | 解决数据重复/丢失痛点,流处理正式进入金融级金融/核心业务。 |
| Kafka 2.0 | 2018 年 | 安全、合规与运维优化 | 强依赖 ZooKeeper | 引入前缀 ACL 与动态配置,极大降低跨部门、大规模集群的权限和运维成本。 |
| Kafka 3.0 | 2021 年 | KRaft 共识预演 | 双模式(ZK/KRaft 并存) | 宣布弃用 ZK,默认开启最高安全级别数据一致性,为架构平滑过渡打下基础。 |
| Kafka 4.0 | 2025 年 | 彻底去 ZK 与纯粹云原生 | 纯 KRaft 模式(零 ZK 依赖) | 运维架构减负 50%;支持单集群百万级分区;彻底解决重平衡(Rebalance)停顿痛点。 |
1.0
- **Exactly-Once Semantics (EOS) 落地与完善:**提供了完备的幂等性与事务支持,保证消息在网络波动或 Broker 宕机时,既不重复也不丢失,使 Kafka Streams 能够安全地应用于核心记账、计费系统。
- **JBOD 磁盘容错优化:**改进了对单节点多磁盘阵列(Just a Bunch Of Disks)的支持。如果某一块磁盘损坏,Broker 能够自动隔离该盘上的分区,而不会导致整个 Broker 节点崩溃。
- **Controller 指标细化:**大幅增加了对 Controller 内部状态的监控指标,提升了系统黑盒运维的透明度。
2.0
- **基于前缀的 ACL 访问控制 (KIP-290):**支持通过
*等通配符对特定前缀的 Topic 进行批量授权,解决了大型组织内成千上万个 Topic 必须逐一赋权的运维死结。 - **免重启动态配置与安全升级:**支持在不重启 Broker 的情况下,动态修改配置参数并实时重新加载 SSL/TLS 证书,实现了真正意义上的零停机(Zero-Downtime)安全维护。
- **引入消费者领先指标 Lead Metrics (KIP-223):**引入了消费者位置与分区最前端(Log Start Offset)的距离监控。一旦该指标逼近零,表明消费者面临因数据过期被踢出、丢失数据的风险,为监控告警提供了新维度。
3.0
- **KRaft 协议预览版登场 (KIP-500):**正式拉开“去 ZK”序幕,允许在实验性场景下不部署 ZK 运行 Kafka。
- **默认开启最高级别一致性:**生产者的默认确认机制由
acks=1升级为最高安全级别的acks=all,且默认启用幂等性(enable.idempotence=true),标志着 Kafka 从“追求极致吞吐”向“默认数据绝对安全”的策略转变。 - **JDK 生态清洗:**彻底弃用 Java 8 和 Scala 2.12,全面转向对现代化高性能 JDK(如 Java 11/17)的适配。
4.0
- **100% 移除 ZooKeeper 代码:**4.0 版本彻底删除了所有与 ZK 相关的代码路径,KRaft 成为唯一合法的元数据管理架构。任何既有 ZK 集群必须先迁移至 KRaft 方可升级至 4.0。
- **新一代消费者组重平衡协议 GA (KIP-848):**彻底重构了 Consumer Group 协议。将成员状态管理和分区分配逻辑完全移至 Broker 端执行。引发全网痛点的 “Stop-the-World” 全组消费暂停现象不复存在,支持极其平滑的增量式重平衡。
- **Kafka 共享队列模式 - Share Groups (KIP-932):**引入了传统传统消息队列(如 RabbitMQ)的“竞争消费者”概念。打破了过去“一个分区只能由组内一个消费者消费”的强绑定限制,多个消费者可以并发消费同一个分区内的不同消息。
- Eligible Leader Replicas (ELR, KIP-966) 与 Pre-Vote 机制 (KIP-996):* ELR 防止在极端灾难、ISR 全挂时,由于盲目选举导致严重的数据丢失。
- Pre-Vote 引入了 Raft 的预投票机制,有效防止因个别网络抖动断连的节点重连时,引发集群无谓的 Controller 重新选举,大幅提升了集群的稳定性。
- Java 环境门槛:Broker 运行环境及组件(Connect, Tools)强制要求最低 Java 17;客户端 API 最低要求 Java 11。
核心概念
介绍系统核心组件。
Controller(控制器/集群大脑)
Controller 是整个 Kafka 集群的最高指挥官。它不负责处理客户端的具体消息读写,只负责管理整个集群元数据(Metadata)(例如:集群里有几台 Broker、有哪些 Topic、每个 Topic 有几个分区、每个分区的 Leader 在谁手上)。
核心职责:
- 全局生命周期管理:执行 Topic 的创建、删除、参数修改以及分区扩容 等管理指令。
- 集群成员管理:监控所有 Broker 的心跳,维护集群拓扑(Broker 的加入、退出)。
- 分区选主(Leader Election):当某个分区的 Leader 副本不可用时,负责从同步副本集(ISR)中选出新的分区 Leader。
4.0 版本的重要变化(KRaft 架构):
- 去 ZooKeeper 化与 KRaft 仲裁组(Quorum):Kafka 4.0 彻底移除了对 ZooKeeper 的依赖。控制权完全交由基于 Raft 共识算法构建的 Controller 仲裁组。仲裁组通过选举产生一个 Active Controller(主控制器),其余作为 Follower Controller。
- 元数据“日志化”(
__cluster_metadata):全局元数据变更为“事件驱动”模型,持久化在内部主题__cluster_metadata中。Active Controller 会将元数据变更编码为 Metadata Record,并追加到 Metadata Log 中,Broker 们像消费者一样去“拉取”消息更新自己的缓存。 - 预投票机制(Pre-Vote, KIP-996):针对 KRaft 仲裁组引入了预投票机制。当网络发生瞬时抖动时,节点在正式发起 KRaft Leader 选举前会先发起一轮“预探测”。仅在确认能获得多数派响应时才会增加纪元(Epoch)并正式发起选举,大幅降低了集群网络不稳定时的控制平面震荡。
- 秒级故障恢复与百万分区:消除了外部 ZooKeeper 交互导致的 RPC 瓶颈。当 Active Controller 宕机时,新 Controller 的接管在极短时间内完成。KRaft 架构显著提升了元数据管理能力,使 Kafka 集群能够支持数十万级甚至接近百万级 Partition 的部署规模。
Broker(数据节点/业务员)
Broker 是 Kafka 集群中的实际服务器节点,是底层服务单元,主要负责高并发的消息持久化与 I/O 读写。一个 Broker 就是一台运行着 Kafka 服务的机器。
核心职责:
- 高吞吐 I/O 处理:基于操作系统的 PageCache、零拷贝技术(Zero-Copy /
sendfile)以及磁盘顺序写特性,高效响应客户端的Produce(写入)与Fetch(读取)请求。 - 消息持久化:将生产者发送的消息以追加(Append-only)模式写入本地磁盘的日志段(Log Segment)中。
- 心跳维持:向 Active Controller 周期性发送心跳。若发生超时,Controller 将判定该 Broker 处于不可用状态(如 Fenced 隔离状态),并剥离 Partition Leader 身份。
4.0 版本的重要变化:
- 清晰的节点角色(
process.roles):节点职责边界更加严格,可配置为broker(纯数据节点)、controller(纯控制节点)或broker,controller(混合模式,适用于边缘计算或小型集群)。 - 被动监听转为主动获取:Broker 不再依赖 ZooKeeper Watch 机制,而是通过 KRaft 元数据同步协议持续获取 Controller 产生的 Metadata Log 增量,并更新本地 Metadata Cache。
- 新一代重平衡协议落地(KIP-848):4.0 将 Consumer Group(消费者组)的分配算力从客户端收回到了 Broker 端的 Group Coordinator 手中。实现了渐进式、增量式重平衡,彻底终结了过去十几年消费者上下线时导致全组消费卡死的 “Stop-The-World(全局停顿)” 问题。
Topic Partition
Topic 是一个逻辑上的消息分类;Partition(分区)是 Kafka 进行 物理存储、并发读写、高可用复制 的最小工作单元。
核心职责与特征:
- 物理分片与水平扩展:一个 Topic 可划分为多个 Partition,多个 Partition 可以分布在不同 Broker 上,从而突破单机 I/O 吞吐与存储容量的物理上限。Partition 数量决定了 Topic 可达到的理论最大消费并发度。
- 严格局部有序:单分区内部的消息严格按照写入顺序排列,每个分区的每条消息都会被分配一个唯一的、递增的序列号——Offset(偏移量)。Kafka 仅保证 Partition 级别的有序性,不保证 Topic 级别的全局有序。
4.0 版本的重要变化:
- 消除分区数量天花板:由于 ZooKeeper 无法承受频繁的元数据更新,单个集群的分区总数很难突破几万个。在 4.0 KRaft 模式下,元数据传输极快,单集群可以安全地运行数十万甚至上百万个分区。
- 引入“共享组”概念(Share Groups, KIP-932):Share Groups 是 Kafka 4.0 引入的新消费模型,与传统 Consumer Group 并存。它允许多个消费者并发处理同一 Partition 中的消息,并通过消息级 ACK/NACK 实现类似传统消息队列的竞争消费模式。
运维指南:Partition 数量与 Linux 系统的文件句柄数(File Descriptors)直接挂钩。海量分区需确保宿主机配置了足够高的 ulimit -n(如 655350),否则进程将因 Too many open files 异常而崩溃。
Leader(领导者/主副本)
为了保证高可用,Kafka 会为每个 Partition 创建多个副本(Replica),在这组副本中,有且仅有一个会被指定为 Leader。也就是说,对于某个 Topic,其每个 Partition 都有且仅有一个 Leader,因此该 Topic 的 Leader Partition 数量等于 Partition 总数。
一个 Broker 上可能存放着成百上千个 Partition 的副本,其中有些是 Leader,有些是 Follower。Leader 的分布应该尽量均衡在不同的 Broker 上,以分摊读写压力。
核心职责:
- 统一读写入口:在默认情况下,所有的生产者写入请求和消费者读取请求,都必须由该分区的 Leader 副本 亲自处理和响应。(注:虽然 Kafka 支持特定配置下的 Follower 副本读取,但 Leader 依然是绝对的写入核心)。
- 协调数据同步:Leader 收到写入数据后,会将其追加到本地 Log,并负责监控、协调其他 Follower 副本的数据同步进度。
4.0 版本的重要变化:
- 预投票机制(KIP-996 Pre-Vote):4.0 引入了 Pre-Vote 机制。当网络出现短暂抖动时,某个节点在正式发起 Leader 选举前会先进行一轮“预热投票”。如果它无法获得大多数节点的响应,就不会触发正式选举。这极大地减少了网络不稳定时的无效选举,让分区 Leader 更加稳定。
- 合格领导者副本(KIP-966 Eligible Leader Replicas,简称 ELR):Kafka 4.0 中引入了一项重要高可用机制(基于 KIP-966)。它由 KRaft Controller 动态维护,用于在 ISR 不可用时提供经过验证的安全候选副本,从而在保证已提交数据不丢失的前提下缩短 Leader 恢复时间。
Follower(跟随者/从副本)
Leader 之外的所有副本,都叫做 Follower。
核心职责:
- 数据同步(容灾备份):Follower 副本不接受客户端的写请求。其核心任务是不断地向 Leader 副本发送 Fetch 请求,拉取 Leader 副本最新追加的消息到本地,以保持与 Leader 副本的数据一致性。
- 故障转移(高可用):当 Leader 副本所在的 Broker 发生故障宕机时,Controller 会从 Follower 中选出一个新的 Leader 以接管读写。
核心机制:ISR(In-Sync Replicas 同步副本集):
- 动态维护:Leader 根据 Follower 的同步状态向 Controller 上报 ISR 变更请求,由 Controller 更新并发布最新 ISR 状态。ISR 包含 Leader 自身,以及与 Leader 保持足够紧密同步的 Follower(同步滞后时间阈值由
replica.lag.time.max.ms参数控制)。 - 选主权限(Leader Election):默认且安全的情况下,仅有处于 ISR 集合中的 Follower 才有资格在 Leader 宕机时接管分区的领导权。若 Follower 同步严重滞后会被踢出 ISR;追平进度后会再次重新加入。 (注:若强制开启
unclean.leader.election.enable=true,则允许非 ISR 副本强行当选,但这将面临数据丢失的风险,生产环境通常禁用此配置。)
脚本文件
https://github.com/apache/kafka/tree/trunk/bin
服务器与集群管理 (Server & Cluster)
这些脚本用于启动、停止和管理 Kafka 节点及存储。
kafka-server-start.sh:启动 Kafka 服务器进程(在 4.0 中,可以是 Broker 角色、Controller 角色或两者兼有)。kafka-server-stop.sh:优雅地停止正在运行的 Kafka 服务器。kafka-storage.sh:用于 KRaft 模式 下生成 Cluster ID 和 格式化日志目录。kafka-cluster.sh:集群管理工具。用于查看集群 ID 或注销不再使用的 Broker。kafka-features.sh:管理集群的特性标志(Feature Flags),例如查看或升级集群的元数据版本(Metadata Version)。
主题与分区管理 (Topics & Partitions)
用于对数据流的核心概念(Topic)进行运维。
kafka-topics.sh:最常用的脚本。用于 创建、删除、修改、查看 Topic 信息。kafka-reassign-partitions.sh:用于在 Broker 之间 或 同一台 Broker 的不同磁盘路径之间 移动分区和副本(例如节点扩容或缩容时的日志目录重平衡)。kafka-leader-election.sh:手动触发分区的 Leader 选举,通常用于恢复平衡或处理节点故障。kafka-log-dirs.sh:查询各个 Broker 上底层日志目录(Log Directory)的大小和状态。kafka-delete-records.sh:手动删除某个 Topic 中指定 Offset 之前的历史消息记录。
生产者、消费者与组管理 (Clients & Groups)
用于日常的数据发布、订阅以及消费进度管理。
kafka-console-producer.sh:控制台生产者。允许你通过终端标准输入直接向 Topic 发送文本消息。kafka-console-consumer.sh:控制台消费者。用于在终端实时读取并打印 Topic 中的消息。kafka-consumer-groups.sh:管理传统的消费者组(Consumer Groups),查看消费积压(Lag)、重置消费位点(Reset Offsets)等。kafka-groups.sh:一个更通用的组管理工具,旨在统一管理不同协议的组。
共享组特性 (Share Groups) 🌟 4.0 新特性
传统 Consumer Group 是按分区(Partition)分配的,而 Share Groups 引入了类似传统消息队列(如 RabbitMQ)的“共享拉取”模式,允许多个消费者并发消费同一个分区的消息。
kafka-console-share-consumer.sh:使用全新 Share Group 协议的控制台消费者。kafka-share-groups.sh:用于管理 Share Groups(列出、查看详情、删除)。kafka-share-consumer-perf-test.sh:针对 Share Consumer 模式的性能测试工具。kafka-verifiable-share-consumer.sh:用于系统级自动化测试,验证 Share Consumer 消费行为的正确性。
安全、权限与配置管理 (Security & Configs)
kafka-delegation-tokens.sh:管理用于轻量级身份认证的委托令牌(Delegation Tokens)。kafka-acls.sh:管理访问控制列表(ACLs),配置谁(Principal)可以对什么资源(Topic/Group)执行什么操作(读/写)。kafka-configs.sh:动态修改配置工具。可以在不停机的情况下修改 Broker、Topic 或客户端的配置。
KRaft 元数据与底层调试 (Metadata & Debugging)
kafka-metadata-quorum.sh:管理和监控 KRaft Controller 的仲裁副本(Quorum),查看 Controller 的 Leader 状态和同步情况。kafka-metadata-shell.sh:一个交互式 Shell 环境,允许你像浏览文件系统一样,浏览 KRaft 集群的内部元数据(替代了以前用zookeeper-shell.sh查看 ZK 节点的方式)。kafka-dump-log.sh:底层调试工具。将 Kafka 磁盘上的.log、.index、.timeindex等文件内容导出为可读文本,排查消息损坏或位点错误。kafka-get-offsets.sh:快速查询指定 Topic 分区的 Earliest(最早)或 Latest(最新)Offset(偏移量)。kafka-transactions.sh:用于检查 Kafka 事务状态,或者强制终止卡住的“悬挂事务”(Dangling Transactions)。
Kafka Connect (数据集成)
用于连接外部数据系统(如数据库、Elasticsearch、HDFS 等)的工具。
connect-standalone.sh:以单机(Standalone)模式启动 Kafka Connect Worker,适合测试或轻量级任务。connect-distributed.sh:以分布式(Distributed)模式启动 Kafka Connect Worker,支持高可用和弹性扩展(生产环境推荐)。connect-mirror-maker.sh:启动 MirrorMaker 2,用于在不同的 Kafka 集群之间进行数据复制和灾备。connect-plugin-path.sh:帮助管理和排查 Kafka Connect 插件(Plugins)路径配置的工具。
性能测试与自动化验证 (Perf & Testing)
kafka-producer-perf-test.sh:生产者压力测试工具,用于测试集群的写入吞吐量(MB/s,Records/s)和延迟。kafka-consumer-perf-test.sh:消费者压力测试工具,用于测试集群的读取吞吐量。kafka-e2e-latency.sh:端到端延迟测试工具,测量一条消息从发出到被消费者接收所需的总时间。kafka-replica-verification.sh:验证不同 Broker 上的副本数据是否一致。kafka-verifiable-producer.sh/kafka-verifiable-consumer.sh:生成或消费确定性事件流的工具,通常不是给普通用户用的,而是供 Apache Kafka 官方进行系统集成测试(System Tests)时验证数据不丢失、不重复使用的。
Kafka Streams (流计算)
kafka-streams-application-reset.sh:重置 Kafka Streams 应用的状态。它会清理内部 Topic(如 Changelog 和 Repartition topics),让你能重新处理数据。kafka-streams-groups.sh:用于管理 Kafka Streams(流处理应用)在 Kafka 内部注册的消费组。
通用底座工具 (General Utils)
kafka-run-class.sh:最底层的包装脚本。几乎所有的脚本,都是通过调用该脚本来执行对应的 Java 类。kafka-jmx.sh:快速查询运行中 Kafka 进程的 JMX 指标的简易命令行工具。kafka-broker-api-versions.sh:查询 Broker 支持的各项 API 版本,用于排查客户端与服务端版本兼容性问题。kafka-client-metrics.sh:管理客户端遥测(Telemetry)指标配置的工具(基于 KIP-714),允许 Broker 动态控制客户端上报哪些监控指标。
安装
下载并解压
二进制下载地址:https://kafka.apache.org/community/downloads/
1 | cd /data/local |
查看版本
1 | kafka-server-start.sh --version |
安装依赖
本地必须安装 Java 环境,兼容性:https://kafka.apache.org/43/getting-started/compatibility/
单机-KRaft 混合模式(使用中)
配置
https://github.com/apache/kafka/blob/trunk/config/server.properties
vim /data/local/kafka/config/server-standalone.properties
请修改广播地址 advertised.listeners
1 | ############################# Server Basics ############################# |
初始化数据目录
1 | 生成集群 UUID,所有机器必须使用同一个 cluster-id(全局唯一) |
启动
1 | LOG_DIR="/data/local/kafka/logs/standalone" kafka-server-start.sh /data/local/kafka/config/server-standalone.properties |
自启
vim /etc/systemd/system/kafka.service
1 | [Unit] |
启动
1 | 刷新systemd守护进程,以重新加载systemd服务配置文件 |
停止
1 | 找到你想杀掉的那个 Kafka 进程的 PID |
AdminClient 认证配置
https://kafka.apache.org/43/configuration/admin-configs/
客户端(如 Java 程序、命令行工具)连接时,需要提供对应的账号密码。
编辑 PLAIN 认证机制的客户端配置文件:vim /data/local/kafka/config/admin-plain.properties
1 | 用于与 Broker 通信的协议。 |
编辑 SCRAM 认证机制的客户端配置文件:vim /data/local/kafka/config/admin-scram.properties
1 | security.protocol=SASL_PLAINTEXT |
查看集群
1 | kafka-metadata-quorum.sh --bootstrap-controller localhost:9093 --command-config /data/local/kafka/config/admin-plain.properties describe --status |
集群-KRaft 混合模式
配置
节点 1 :vim /data/local/kafka/config/server-node-1.properties
1 | ############################# Server Basics ############################# |
节点 2 :vim /data/local/kafka/config/server-node-2.properties
1 | ############################# Server Basics ############################# |
节点 3 :vim /data/local/kafka/config/server-node-3.properties
1 | ############################# Server Basics ############################# |
初始化数据目录
1 | 生成集群 UUID,所有机器必须使用同一个 cluster-id(全局唯一) |
启动
1 | LOG_DIR="/data/local/kafka/logs/node-1" kafka-server-start.sh /data/local/kafka/config/server-node-1.properties |
集群-KRaft 专用模式
配置
https://github.com/apache/kafka/blob/trunk/config/controller.properties
https://github.com/apache/kafka/blob/trunk/config/broker.properties
Controller 节点:vim /data/local/kafka/config/server-controller.properties
1 | ############################# Server Basics ############################# |
Broker 节点:vim /data/local/kafka/config/server-broker.properties
1 | ############################# Server Basics ############################# |
初始化数据目录
1 | 生成集群 UUID,所有机器必须使用同一个 cluster-id(全局唯一) |
启动
1 | LOG_DIR="/data/local/kafka/logs/controller" kafka-server-start.sh /data/local/kafka/config/server-controller.properties |
配置指南(官网)
https://kafka.apache.org/43/configuration/
包含了 Broker(节点)、Producer(生产者)、Consumer(消费者)、Topic、Kafka Connect 以及 Kafka Streams 的所有参数配置项与默认值。
配置文件模版:https://github.com/apache/kafka/tree/trunk/config
管理员配置文件
https://kafka.apache.org/43/configuration/admin-configs/
官方未给出配置示例,admin.properties 用于控制客户端连接集群的行为(如超时时间、重试次数、加密认证)。
服务器与 KRaft 集群角色配置文件 (Server & KRaft Roles)
在 KRaft 架构下,一个 Kafka 节点可以承担不同的“角色”(Role)。这几个文件就是针对不同角色的启动配置模板。
server.properties最标准的 Kafka 节点配置文件。通常用于配置同时承担 Broker 和 Controller 双重角色的节点,或者根据其中的process.roles参数灵活调整节点职责。broker.properties纯 Broker 模式配置。使用此配置启动的节点只负责处理客户端的数据读写请求和存储消息,不参与集群元数据的管理选举。它需要连接到独立的 Controller 节点。controller.properties纯 Controller 模式配置。使用此配置启动的节点专门作为 KRaft 的仲裁节点(Quorum),负责管理集群状态、Topic 元数据等。它不存储业务消息,也不接受客户端的数据读写请求。server-standalone.properties单机快捷开发配置。专门为本地开发、测试或快速体验设计的轻量级配置文件。它通常预设了单机运行所需的所有参数,无需复杂配置即可一键启动一个功能完整的本地 Kafka 节点。
客户端配置文件 (Clients)
仅供命令行工具调用的客户端配置,kafka-console-producer.sh 、 kafka-console-consumer.sh。
producer.properties生产者的基础配置模板。包含连接集群的地址 (bootstrap.servers)、序列化器设置、压缩类型、ACK 机制等基础参数。consumer.properties消费者的基础配置模板。包含连接集群的地址、反序列化器设置、消费者组 ID (group.id)、自动提交 Offset 的策略等基础参数。
Kafka Connect 数据集成配置文件 (Connectors)
Kafka Connect 用于在 Kafka 和外部数据系统之间流转数据。这些是启动 Connect 工作节点和基础插件的配置文件。
1. 运行模式配置
connect-standalone.properties单机模式运行配置。通常用于本地测试或极轻量级的任务。所有的状态和配置都保存在本地文件中,不具备高可用性。connect-distributed.properties分布式模式运行配置。生产环境的标准用法,支持高可用、负载均衡和动态伸缩。所有的连接器状态、配置和 Offset 都存储在 Kafka 的内部 Topic 中。connect-mirror-maker.propertiesMirrorMaker 2 配置文件。MirrorMaker 基于 Kafka Connect 框架构建,这个文件专门用于配置跨数据中心、跨集群的 Kafka 数据复制与灾备。
2. 连接器插件配置
connect-file-source.properties文件源示例配置,用于读取本地某个文本文件中的每一行内容,并将其发送到 Kafka 指定的 Topic 中。connect-file-sink.properties文件接收器示例配置,用于从 Kafka 指定的 Topic 中消费消息,并按行写入本地文本文件。connect-console-source.properties控制台源配置,用于从标准输入中读取内容(例如终端键盘输入),并将其作为消息发送到 Kafka。connect-console-sink.properties控制台接收器配置,用于从 Kafka 中消费消息,并直接输出到标准输出(终端屏幕)。
日志系统配置文件 (Logging - Log4j2)
log4j2.yamlKafka 服务端的主日志配置。控制 Broker/Controller 运行时的日志输出级别(INFO, DEBUG, ERROR)、日志文件的滚动策略(Rolling)以及输出路径(通常是server.log等)。connect-log4j2.yaml专门为 Kafka Connect 进程准备的日志配置,方便将数据集成任务的日志与 Kafka 服务端日志区分开来。tools-log4j2.yaml命令行工具(如 kafka-topics、kafka-consumer-groups 等)的日志配置。当你在终端运行本时,这个文件确保终端只会输出最关键的信息,而不会被海量的底层 DEBUG 日志淹没。
[!IMPORTANT]
log4j2.yaml文件中日志输出目录为${sys:kafka.logs.dir},带有sys:前缀的变量会直接跳过内部变量配置(Property节点),仅读取 Java 系统属性中定义的值。查看 Kafka 启动脚本:
kafka-server-start.sh->kafka-run-class.sh,发现几个环境变量的使用:
KAFKA_LOG4J_CMD_OPTS="-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS"$KAFKA_OPTSexec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_CMD_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@"所以,要想改变日志位置,可以在启动之前设置环境变量
export LOG_DIR="/data/local/kafka/logs"。 JVM GC 日志的路径也依赖LOG_DIR。
测试与故障注入框架配置文件 (Testing)
trogdor.conf 这是 Trogdor 的配置文件。Trogdor 是 Kafka 自带的一个分布式系统测试框架,主要用于执行负载测试和故障注入(Fault Injection,例如模拟网络延迟、节点宕机、磁盘满等)。这个文件用于配置 Trogdor 代理(Agent)和协调器(Coordinator)的运行参数,普通用户在日常运维中极少用到。
架构设计(官网)
https://kafka.apache.org/43/design/
阐述了其高吞吐、低延迟的核心逻辑,包括:为什么要用文件系统页面缓存(Pagecache)、为什么坚持顺序读写、零拷贝(Zero-copy)机制、以及推拉模式(Push vs Pull)的权衡。
底层实现(官网)
https://kafka.apache.org/43/implementation/
主要讲解底层的网络协议、文件存储格式(如日志段文件、索引结构)、复制协议的底层细节等。
运维操作(官网)
https://kafka.apache.org/43/operations/
围绕 Kafka 集群的日常维护展开,包括:集群参数配置(Config)、监控(Monitoring)、扩展节点、工具使用、常见故障处理以及 OS 层面的调优。
安全配置(官网)
https://kafka.apache.org/43/security/
主要涵盖 Kafka 的安全合规机制,包括客户端与 Broker 之间的认证(SASL/SSL)、加密(TLS 传输加密)以及权限控制(ACLs 访问控制列表)。
使用 SSL 进行加密和身份验证
https://kafka.apache.org/43/security/encryption-and-authentication-using-ssl/
使用 SASL 进行身份验证
https://kafka.apache.org/43/security/authentication-using-sasl/
- PLAINTEXT:无身份认证,明文传输。默认配置。数据裸奔,谁都能连接,适合开发测试。
- SSL:无身份认证,加密传输。通过 TLS/SSL 证书加密通道,防止数据被窃听。
- SASL_PLAINTEXT:身份认证 + 明文传输。连接时必须对账密,但后续传输的数据是明文。
- SASL_SSL:身份认证 + 加密传输。最安全的安全级别,既要对账密,传输又加密。
通信安全协议(传输层)
传输层决定了客户端与 Broker 之间的数据交互是否被加密。
- PLAINTEXT:所有流经的数据(包括业务消息和部分认证信息)都是明文,内网抓包一览无余。
- SSL:采用 TLS/SSL 协议对全链路数据进行加密,哪怕被中间人拦截,也无法解析出真实内容。
身份认证机制(认证层)
SASL - 维基百科 :简单认证与安全层 (SASL, Simple Authentication and Security Layer ) 是一个在网络协议中用来认证和数据加密的构架。它把认证机制从程序中分离开,理论上使用SASL的程序协议都可以使用SASL所支持的全部认证机制。
SASL(简单认证与安全层,Simple Authentication and Security Layer) 不是具体的加密算法,而是一个外挂式的认证框架。它把认证机制从底层网络通信中抽离出来,Kafka 相当于提供了一个“标准接口”,具体的验证逻辑交由不同的插件机制实现。
- PLAIN(明文密码):一种基于简单文本比对的身份验证机制。客户端通过网络向 Kafka Broker 发送明文的用户名和密码。Kafka Broker 接收到请求后,直接将收到的字符串与本地配置(如 JAAS 文件)或 KRaft 中存储的账密进行一致性比对。由于凭证在传输过程中未加密,必须配合 SSL/TLS 加密通道使用,否则存在严重的明文抓包风险。账密写死配置文件。
- SCRAM(加盐挑战应答):一种基于哈希算法的安全性双向凭证校验机制。客户端与 Kafka Broker 在认证过程中,均不在网络上直接传输真实的密码。通过挑战-应答 (Challenge-Response) 流程,Broker 发送随机数(Salt 和 Nonce),客户端利用该随机数与自身密码进行加盐哈希运算 (如 SHA-256 或 SHA-512) 并返回结果。Kafka Broker 运行相同的哈希算法,通过比对计算结果来确认客户端的身份。该机制既防范了重放攻击,也保护了存储在后端的凭证安全。支持使用
kafka-configs.sh动态增删用户。 - GSSAPI(Kerberos):一种基于密钥分发中心 (KDC) 的强身份验证机制。密钥分发中心 (KDC) 负责对客户端进行身份验证,并向其签发加密的票据 (Ticket)。Broker 仅通过验证客户端持有的票据有效性来确认其身份,不直接处理客户端的登录凭证。该票据具有固定的有效期,一旦过期则自动失效,客户端必须重新向 KDC 申请。
- OAUTHBEARER(OAuth2 Token):一种基于令牌 (Token) 的无状态分布式认证机制。客户端的身份由外部独立的身份提供商 (IDP) 进行集中核实与管理。身份提供商 (IDP) 在验证客户端身份后,向其颁发具有时效性的 OAuth2 访问令牌 (Access Token)。Kafka Broker 作为资源服务器,不参与身份核实,仅负责利用配置的公钥或通过自省接口对令牌进行数字签名验签。只要令牌签名合法且在有效期内,Kafka Broker 即允许该客户端建立连接并进行后续的数据交互。
认证机制实现类(登录模块)
| 维度 | PlainLoginModule (明文) | ScramLoginModule (安全哈希) |
|---|---|---|
| 实现协议 | SASL_PLAINTEXT / SASL_SSL + PLAIN |
SASL_PLAINTEXT / SASL_SSL + SCRAM (常用 SCRAM-SHA-256 / 512 |
| 认证机制 | 明文比对:客户端直接发送账号密码(Base64 编码,等同于明文),Broker 接收后直接比对。 | 挑战-应答 (Challenge-Response):通过加盐随机数和哈希计算比对,真实密码绝不在网络上传输。 |
| 用户存储 | 静态存储:用户必须硬编码在 server.properties 的 JAAS 配置文件中。 |
动态存储:用户数据保存在 Kafka 集群内部(KRaft 元数据日志或 ZooKeeper 中)。 |
| 账号管理 | 死板:增删改用户必须修改配置文件并重启所有 Broker。 | 灵活:支持使用 kafka-configs.sh 脚本动态增删改查用户,即时生效,无需重启。 |
| 安全级别 | 极低:若未开启 SSL 加密,内网抓包可瞬间截获管理员明文密码。 | 高:即使在非加密的 SASL_PLAINTEXT 链路上,黑客抓包也无法还原密码。 |
| 配置复杂度 | 冗长:必须在 Broker 的 JAAS 配置中列出所有允许登录的账号明细 user_xxx="pwd"。 |
清爽:Broker 的 JAAS 仅需配置自身用于集群内通信的账号,外部客户端账号直接走元数据查询。 |
| 适用场景 | 仅限本地开发测试环境,或必须搭配严格 SSL 传输加密的纯内网环境。 | 生产环境首选。安全、灵活,完美契合 KRaft 模式的动态元数据管理。 |
组合方案
将传输层与认证层结合,我们可以推导出适用于不同业务场景的经典安全组合方案:
| 组合方案(传输 + 认证) | 适用场景 | 核心优点 | 核心缺点 |
|---|---|---|---|
| PLAINTEXT (无 SASL) | 内网、本地开发测试环境。 | 零配置,性能达到物理极限。 | 毫无安全性,数据与权限完全裸奔。 |
| SASL_PLAINTEXT + PLAIN | 互信的内网隔离环境,初衷仅为防止不同业务线“误操作”。 | 配置极简,学习成本低。 | 账密在网络中是明文传输,防君子不防小人。 |
| SASL_PLAINTEXT + SCRAM | 预算/性能有限的超大规模内网,但需要严格的多租户账号权限隔离。 | 安全性优于 PLAIN,压榨性能的同时实现了动态权限控制。 | 实际业务消息(Payload)在网络中仍是明文。 |
| SASL_SSL + SCRAM | 中小型企业生产环境 / 跨公网消费首选。 | 账密不外泄,业务数据全链路加密,安全性极高。 | 引入 SSL 证书管理成本;Broker 会有约 15%~30% 的非对称加密 CPU 损耗。 |
| SASL_SSL + GSSAPI | 银行、大厂等已有完善 Kerberos 体系的 Hadoop 大数据生态。 | 满足极度严苛的合规审查,实现企业级跨组件统一认证。 | 运维犹如走钢丝(Ticket 过期、Keytab 分发、KDC 故障等)。 |
| SASL_SSL + OAUTHBEARER | SaaS 云服务、全面推行零信任的 K8s 云原生微服务架构。 | 契合现代化单点登录(SSO),完美融入企业级 IAM 体系。 | 依赖外部 IDP 的稳定性,系统链路变长。 |
授权和访问控制列表(ACLs)
https://kafka.apache.org/43/security/authorization-and-acls/
Kafka 的 ACL 规则核心就是一句话:“允许/拒绝 某个用户 从 某个IP 对 某个资源 执行 某种操作”。
1 | # 对于 KRaft 集群,请在所有节点上使用标准授权器以开启权限控制。默认不开启权限控制。 |
kafka-acls.sh 使用示例:绝对禁止在生产中授予普通用户 Create 和 Alter 操作权限。
1 | 查看当前集群所有的权限列表 |
kafka-storage.sh
kafka-storage.sh 是 Kafka 提供的存储工具,用于 KRaft 模式 下生成 Cluster ID 和 格式化日志目录。
命令行参数
完整帮助列表:kafka-storage.sh --help 。
| 位置参数 | 参数 | 描述 |
|---|---|---|
random-uuid |
打印一个随机的 UUID。 | |
format |
格式化此节点上的 Kafka 日志目录。 | |
--cluster-id CLUSTER_ID, -t CLUSTER_ID |
要使用的集群 ID。 | |
--config CONFIG, -c CONFIG |
要使用的 Kafka 配置文件。 | |
--add-scram ADD_SCRAM, -S ADD_SCRAM |
要添加到 __cluster_metadata(集群元数据) 日志中的 SCRAM_CREDENTIAL(SCRAM 凭证),例如:SCRAM-SHA-256=[name=alice,password=pCVXLxxVkqX9]SCRAM-SHA-512=[name=alice,iterations=8192,salt="N3E=",saltedpassword="YCE="]补充:在集群启动前把 SCRAM 用户的账号密码写入 KRaft 元数据。如果你的集群被配置为 “代理间通信(Inter-broker)必须使用 SASL/SCRAM 认证”,那么在没有任何用户凭据时,节点之间根本无法建立连接并启动集群。此参数允许你在集群首次启动之前注入超级管理员凭据,确保节点在 Bootstrap(引导启动)阶段就能通过身份验证,顺利组成集群。 |
|
--ignore-formatted, -g |
如果传入此选项,format 命令将跳过已格式化的目录,而不是失败。 | |
--standalone, -s |
用于将控制器初始化为单节点动态仲裁组。设置此标志时,不得设置 controller.quorum.voters 配置项,而应设置 controller.quorum.bootstrap.servers。补充:格式化时明确定义当前节点是唯一的初始投票者。用于快速创建单节点 KRaft 集群。 |
|
--initial-controllers INITIAL_CONTROLLERS, -I INITIAL_CONTROLLERS |
用于使用指定的动态仲裁初始化服务器。参数是一个以逗号分隔的列表,格式为 id@hostname:port:directory。所有节点必须使用相同的值。例如: 0@example.com:8082:JEXY6aqzQY-32P5TStzaFg,1@example.com:8083:MvDxzVmcRsaTz33bUuRU6A,2@example.com:8084:07R5amHmR32VDA6jHkGbTA。设置此标志时,不得设置 controller.quorum.voters 配置,而应设置 controller.quorum.bootstrap.servers。补充:格式化时明确定义集群的最初投票者列表。用于首次创建 KRaft 集群。 |
|
--no-initial-controllers, -N |
用于初始化服务器而不指定动态仲裁。设置此标志时,不应设置 controller.quorum.voters 配置,而应设置 controller.quorum.bootstrap.servers。补充:格式化时明确定义该节点不是集群的最初投票者。用于向现有集群添加新 Controller 节点(动态扩缩容)前的格式化。 |
|
--release-version RELEASE_VERSION, -r RELEASE_VERSION |
用于指定集群初始化时的元数据版本(Metadata Version)。它决定了 __cluster_metadata 日志能够使用哪些功能特性、支持哪些元数据记录格式,以及集群内 Controller 和 Broker 之间通信的协议版本。kafka-features.sh 可以动态升级元数据版本。 |
|
--feature FEATURE, -f FEATURE |
命令的 --feature 参数用于在集群初始化格式化时,显式设定特定独立特性的版本级别(Feature Level),格式固定为 feature=level(特性名称=级别数字)。例如:format --feature kraft.version=1format --feature group.version=1format --feature transaction.version=2 |
|
info |
获取有关此节点上 Kafka 日志目录的信息。 | |
--config CONFIG, -c CONFIG |
要使用的 Kafka 配置文件。 例: kafka-storage.sh info --config /data/local/kafka/config/server-standalone.properties |
|
version-mapping |
查找给定元数据版本对应的功能。使用该命令时不带 --release-version 参数,将返回最新稳定元数据版本的映射关系。 |
|
--release-version RELEASE_VERSION, -r RELEASE_VERSION |
查询并列出该指定元数据版本(Metadata Version)下,所有独立特性的默认映射级别(Feature Levels)。 例: kafka-storage.sh version-mapping --release-version 4.3-IV0 |
|
feature-dependencies |
查找指定功能版本的依赖项。如果功能未知或版本尚未定义,则会抛出错误。可以指定多个功能。 | |
--feature FEATURE, -f FEATURE |
查询指定特性在特定级别(Level/Version)下,所依赖的其他前置特性和最低元数据版本。格式为 feature=version。例如:metadata.version=5。例: kafka-storage.sh feature-dependencies --feature metadata.version=30 |
KRaft 动态控制器仲裁引导参数
执行 bin/kafka-storage.sh format 格式化集群存储目录时,如何定义和初始化**动态控制器仲裁集群(Dynamic Controller Quorum)**的投票者结构?
- **首次建群:**若为多节点高可用集群,使用
--initial-controllers明确定义初始投票者;若为单机测试环境,使用--standalone快速创建。 - **后期扩容:**向已有集群添加新控制器或更换故障磁盘时,使用
--no-initial-controllers以“观察者”身份加入。待新节点从集群的 Leader 同步完元数据快照和日志后,管理员可手动运行kafka-metadata-quorum.sh add-controller命令,正式将新节点从“观察者”提升为“投票者”(即正式加入控制器仲裁集)。
kafka-configs.sh
kafka-configs.sh 是 Kafka 提供的动态配置管理工具,用于在不重启 Kafka Broker 的情况下,动态修改、查看或删除集群组件的各类配置参数。
命令行参数
完整帮助列表:kafka-configs.sh --help 。
| 参数 | 描述 |
|---|---|
--bootstrap-controller |
要连接的 Kafka 控制器。 |
--bootstrap-server |
要连接的 Kafka 服务器。 用于与 Kafka 集群建立初始连接的主机/端口对列表。客户端使用该列表来引导初始化并发现完整的 Kafka Broker 集群。虽然列表中服务器的顺序无关紧要,但我们建议包含多个服务器,以确保在某些服务器宕机时具备容错能力。此列表无需包含全部 Broker,因为 Kafka 客户端会自动高效地管理和更新与集群的连接。此列表必须采用 host1:port1,host2:port2,… 的格式。 |
--command-config |
包含要传递给 AdminClient 的配置信息的属性文件。此文件仅与 --bootstrap-server 选项一起使用,用于描述和修改代理配置。有效配置见 Admin Configs。补充:该属性文件里包含了脚本工具作为“客户端”去连接 Kafka 集群时所需的配置。文件通常命名为 admin.properties,常用于 SASL/PLAIN 安全认证、双向 SSL / TLS 加密与认证、网络与超时调优 等。 |
--entity-type |
实体类型(topics/clients/users/brokers/broker-loggers/ips/client-metrics/groups)。 |
--entity-name |
实体名称。 |
--entity-default |
clients/users/brokers/ips 的默认实体名称(适用于相应的实体类型)。 补充:表示集群级别的“全局动态默认配置”,能让全集群当前以及未来新增的所有相同类型的实体都继承这个配置。 |
--describe |
列出给定实体的配置。 补充:仅包括 运行期间修改过的动态配置。 |
--all |
列出给定实体的所有配置。 补充:包括 运行期间修改过的动态配置 + 继承自系统默认的静态配置。不能独立运行,必须配合 --describe。 |
--alter |
修改实体的配置。 |
--add-config |
要添加的配置键值对。方括号可用于分组包含逗号的值:‘k1=v1,k2=[v1,v2,v2],k3=v3’。 关于实体类型 topics、brokers、users、clients、ips、client-metrics、groups 有效配置的列表见 --help 实体类型“users”和“clients”可以同时指定,以更新特定用户的客户端配置。 |
--add-config-file |
包含待添加配置的属性文件路径。有关有效配置的列表,请参阅 --add-config。 |
--delete-config |
要删除的配置项,例如 “k1,k2”。 补充:用于删除已经存在的动态配置、将其恢复为系统默认值的核心参数。 |
示例
topics
1 | 修改 Topic 的日志清理策略 |
brokers
1 | 修改在消息被刷新到磁盘之前,日志分区上累积的消息数量 |
clients
1 | 限制指定 client.id 下所有连接的生产速率和消费速率(单位:字节/秒),默认无限制。 |
ips
1 | 限制指定 IP 的连接创建速率。每秒钟允许单个具体实体(如特定 IP 或全局默认)创建的最大 TCP 连接数,默认无限制。 |
users
1 | 动态创建或修改认证密码。PLAIN 机制不支持动态创建或修改。 |
kafka-topics.sh(主题)
kafka-topics.sh 是 Kafka 提供的主题管理工具。主要负责 Topic 的创建、查看、修改、删除和扩容。
快速入门(官网)
Kafka 是一个分布式事件流平台,允许您跨多台机器读取、写入、存储和处理事件(在文档中也称为记录或消息)。
例如,事件包括支付交易、来自手机的地理位置更新、发货订单、来自物联网设备或医疗设备的传感器测量数据等等。这些事件被组织并存储在主题中。简单来说,主题类似于文件系统中的文件夹,而事件就是该文件夹中的文件。
因此,在编写第一个事件之前,您必须先创建一个主题。打开另一个终端会话并运行:
1 | 查看服务器中的所有主题 |
命令行参数
完整帮助列表:kafka-topics.sh --help 。
| 参数 | 描述 |
|---|---|
--bootstrap-server <String: server toconnect to> |
必需:要连接的 Kafka 服务器。 补充:Broker的主机名称和端口号,逗号分隔。 |
--command-config <String: command config property file> |
包含要传递给 AdminClient 的配置信息的属性文件。有效配置见 Admin Configs。 |
--topic <String: topic> |
要 create, alter, describe 或 delete 的主题。它也接受正则表达式,但 --create 选项除外。 |
--create |
创建主题。 |
--delete |
删除主题。 |
--alter |
修改分区数和副本分配。(要修改主题配置,可以使用 kafka-configs 工具。) |
--list |
列出所有可用主题。 |
--describe |
列出指定主题的详细信息。 |
--partitions <Integer: # of partitions> |
要创建或更改的主题的分区数。如果创建主题(--create)时没有指定该参数,Kafka 会自动使用集群默认的分区数(属性 num.partitions)。警告:如果一个主题已经包含了带有键(Key)的消息,千万不要随意增加分区数量,否则 分区逻辑 和 消息顺序 将受到影响。。 当消息带有 Key 时,Kafka 默认使用 哈希取模算法( Hash(Key) % 分区数)来决定消息发送到哪一个分区。1、分区逻辑:改变分区总数后,相同的 Key 计算出来的分区编号会改变。新消息会被发送到不同的分区。 2、消息乱序:Kafka 只能保证单个分区内部的消息顺序。如果同一个 Key 的消息分散到了不同的分区,消费者(Consumer)就无法再按照发送的先后顺序读取这些消息。 补充:修改分区数时只能增加,不能减少。 |
--replication-factor <Integer: replication factor> |
要创建的主题中每个分区的副本因子。若未提供,则该主题使用集群默认值(属性 default.replication.factor)。补充:同一个分区的多个副本必须分布在不同的 Broker 上。副本数不能通过命令行修改,标准的官方做法是通过 kafka-reassign-partitions.sh 脚本,手动编写或生成 JSON 副本重分配计划来执行修改。 |
--config <String: name=value> |
用于覆盖当前创建主题的配置。有效配置见 Topic Configs: 有关主题配置的详细信息,请参阅 Kafka 文档。该功能仅在与 --create 参数结合使用时才受支持。(若要修改主题配置,可使用 kafka-configs 工具。) |
kafka-console-producer.sh(生产者)
kafka-console-producer.sh 是 Kafka 提供的控制台生产者工具。它允许用户直接在终端向指定主题(Topic)发送消息,常用于快速验证、开发调试或通过管道(Pipe)批量传输数据。
快速入门(官网)
Kafka 客户端通过网络与 Kafka broker 通信,以写入(或读取)事件。broker 接收到事件后,会以持久且容错的方式存储这些事件,存储时间长短完全由您决定,甚至可以永久保存。
运行控制台生产者客户端,向主题写入一些事件。默认情况下,您输入的每一行都会生成一个独立的事件并写入主题。
1 | 发送消息。开启 auto.create.topics.enable 以支持自动创建主题 |
您可以随时按 Ctrl+C 停止生产者客户端。
命令行参数
完整帮助列表:kafka-console-producer.sh --help 。
| 参数 | 描述 |
|---|---|
--bootstrap-server <String: server toconnect to> |
连接的Kafka Broker主机名称和端口号。 |
--command-config <String: config file> |
生产者配置属性文件。请注意,--command-property 的优先级高于此配置。当需要传递大量、敏感或复杂的客户端配置时,这些配置最好保存在一个独立的属性文件中(如 producer.properties)。如:生产环境的安全认证、复用企业统一配置。 |
--command-property <String: producer_prop> |
生产者配置属性,格式为 key=value。 当需要在命令行中临时、快速地修改或测试一两个特定参数时使用。如:临时改变消息可靠性(acks)、调大客户端的请求超时时间(request.timeout.ms)、限制单次请求的最大字节数(max.request.size)、批处理大小(batch.size)。 |
--topic <String: topic> |
操作的topic名称。 |
配置属性
https://kafka.apache.org/43/configuration/producer-configs/
| 属性 | 默认值 | 描述 |
|---|---|---|
bootstrap.servers |
用于与 Kafka 集群建立初始连接的主机/端口对列表。客户端使用此列表来引导并发现所有 Kafka Broker。虽然列表中服务器的顺序无关紧要,但我们建议包含多个服务器,以确保在某些服务器宕机时仍能保持系统弹性。此列表无需包含所有 Broker,因为 Kafka 客户端会自动高效地管理和更新与集群的连接。此列表必须采用 host1:port1,host2:port2,…. 的格式。 | |
key.serializer |
用于键的序列化器类,该类实现了 org.apache.kafka.common.serialization.Serializer 接口。 | |
value.serializer |
用于值的序列化器类,该类实现了 org.apache.kafka.common.serialization.Serializer 接口。 | |
partitioner.class |
null | 确定在生成记录时将记录发送至哪个分区。可用选项包括: 1、如果未设置,则使用默认的分区逻辑。该策略会将记录发送到某个分区,直到该分区接收到的数据量至少达到 batch.size 字节为止。它与以下策略配合使用: a、如果未指定分区但存在键,则根据键的哈希值选择分区。 b、如果既未指定分区也未提供键,请选择当该分区接收的数据量达到至少 batch.size 字节时才会发生变化的粘性分区。 2、org.apache.kafka.clients.producer.RoundRobinPartitioner:一种分区策略,其中连续记录序列中的每条记录都会被发送到不同的分区,无论是否提供了“键”,直到分区用尽,然后该过程重新开始。注意:存在一个已知问题,即创建新批次时会导致分布不均。详情请参阅 KAFKA-9965。 实现 org.apache.kafka.clients.producer.Partitioner 接口可让您接入自定义分区器。 |
batch.size |
16384 | 当向同一分区发送多条记录时,生产端会尝试将这些记录批量合并,以减少请求次数。这有助于提升客户端和服务器端的性能。此配置用于控制默认的批处理大小(以字节为单位)。 |
linger.ms |
5 | 生产者会将两次请求传输之间收到的所有记录合并为一个批处理请求。通常,这种情况仅在高负载下发生,即记录到达速度快于发送速度时。但在某些情况下,即使在中等负载下,客户端也可能希望减少请求数量。**此设置通过添加少量人工延迟来实现这一目标——也就是说,生产者不会立即发送记录,而是会等待最长为给定延迟的时间,以便将其他记录也发送出去,从而将这些发送操作批量处理。**这可以类比为 TCP 中的纳格尔算法。**该设置为批处理的延迟提供了上限:一旦某个分区积累了与 batch.size 相当的记录,无论此设置如何,都将立即发送;但如果该分区积累的字节数少于此值,则会“滞留”指定时间,等待更多记录出现。该设置默认值为 5(即 5 毫秒延迟)。**例如,将 linger.ms 设为 50 会减少发送的请求数量,但在无负载情况下,会使记录的延迟累计增加 50 毫秒。Apache Kafka 4.0 中该默认值从 0 改为 5,因为尽管 linger 值增大,但更大的批次通常能带来效率提升,从而使生产者延迟保持在相同或更低的水平。 |
buffer.memory |
33554432 | 生产者可用于缓冲待发送至服务器的记录的总内存字节数。如果记录的发送速度快于其被传递至服务器的速度,生产者将阻塞 max.block.ms 毫秒,之后将抛出异常并失败。此设置应大致对应生产者将使用的总内存,但并非硬性限制,因为生产者使用的内存并非全部用于缓冲。部分额外内存将用于压缩(如果启用了压缩)以及维护正在传输中的请求。 默认 32 M。 |
compression.type |
none | 生产者生成的所有数据的压缩类型。默认值为 none(即不进行压缩)。有效值为 none、gzip、snappy、lz4 或 zstd。压缩操作针对完整的数据批次进行,因此批处理的效率也会影响压缩率(批处理次数越多,压缩效果越好)。 |
acks |
all | 消息确认模式 生产者要求领导者收到多少个确认后,才将请求视为已完成。这控制了所发送记录的持久性。允许使用以下设置: acks=0 如果设置为零,则生产者将完全不等待服务器的任何确认。记录将立即添加到套接字缓冲区中,并被视为已发送。在此情况下,无法保证服务器已收到该记录,且重试配置将不会生效(因为客户端通常无法得知任何失败情况)。每条记录返回的偏移量将始终设置为 -1。 acks=1 这意味着 leader 会将记录写入其本地日志,但在未等待所有 follower 完全确认的情况下便会做出响应。在这种情况下,如果 leader 在确认记录后立即发生故障,且此时 follower 尚未完成复制,则该记录将会丢失。 一般用于传输普通日志,允许丢个别数据。 acks=all 这意味着主节点将等待所有同步的副本都确认该记录。这保证了只要至少有一个同步的副本仍处于活动状态,该记录就不会丢失。这是目前可用的最强保证。这相当于 acks=-1 的设置。 一般用于传输和钱相关的数据,对可靠性要求比较高的场景。 请注意,要启用幂等性,此配置值必须设为“all”。如果设置了冲突的配置且未显式启用幂等性,则幂等性将被禁用。 注意:即便设置为 1 或 all,无法保证单节点消息不丢失,因为消息会先写到操作系统的内存缓存区(Page Cache,页缓存),详见 Broker 配置中的 Log Flush Policy 部分。 |
min.insync.replicas |
1 | 指定当发布者将 acks 设置为“all”(或“-1”)时,写入操作成功所需的最小同步副本数量(包括领导者)。在 acks=all 的情况下,每个同步副本都必须确认写入操作,该操作才被视为成功。例如,如果某个主题的 replication.factor 为 3,且 ISR 集合包含所有三个副本,那么即使 min.insync.replicas 恰好小于 3,这三个副本也必须全部确认 acks=all 的写入操作,该操作才算成功。 补充:当 ISR 缩水到只剩 1 个(Leader)时,Leader 发现 1 < min.insync.replicas ,就拒绝 Producer 的写入。不是用来决定“正常情况下要等几个副本确认”,而是用来建立“最坏情况下的安全熔断底线”。 |
retries |
2147483647 | 因瞬时错误导致请求失败时的重试次数。设置大于零的值将导致客户端重新发送任何因潜在瞬时错误而发送失败的记录。请求将重试此次数,直到成功、因非瞬时错误失败,或 delivery.timeout.ms 超时为止。请注意,此自动重试功能仅会在收到错误时重新发送相同的记录。将该值设为零将禁用此自动重试行为,此时瞬时错误将传递给应用程序进行处理。通常建议用户不设置此配置项,而是使用 delivery.timeout.ms 来控制重试行为。要启用幂等性,此配置值必须大于 0。如果设置了冲突的配置且未显式启用幂等性,则幂等性将被禁用。 在将 enable.idempotence 设置为 false 且 max.in.flight.requests.per.connection 设置为大于 1 的情况下允许重试,可能会改变记录的排序顺序。因为如果向单个分区发送两个批次,且第一个批次失败并被重试,而第二个批次成功,那么第二个批次中的记录可能会出现在前面。 |
delivery.timeout.ms |
120000 (2 minutes) | 调用 send() 返回后,报告成功或失败的时限上限。这限制了记录在发送前被延迟的总时间、等待代理确认的时间(如果需要确认),以及允许重试发送失败的时间。如果遇到不可恢复的错误、重试次数已用尽,或者记录被添加到已达到较早投递过期截止时间的批次中,生产者可能会在此配置时间之前报告记录发送失败。此配置的值应大于或等于 request.timeout.ms 和 linger.ms 的总和。 |
request.timeout.ms |
30000 (30 seconds) | 该配置控制客户端等待请求响应的最长时间。如果在超时结束前未收到响应,客户端将在必要时重新发送请求;若重试次数已用尽,则会将请求标记为失败。该值应大于 replica.lag.time.max.ms(一个代理配置项),以减少因生产者不必要的重试而导致消息重复的可能性。 |
enable.idempotence |
true | 当设置为“true”时,生产者将确保每个消息在流中仅写入一份。如果设置为“false”,则生产者因 Broker 故障等原因进行重试时,可能会在流中写入重试消息的重复副本。请注意,启用幂等性要求:max.in.flight.requests.per.connection<=5(对于任何允许的值,消息顺序均被保持),重试次数大于 0,且确认模式必须为 ‘all’。 |
max.in.flight.requests.per.connection |
5 | 客户端在阻塞前,单个连接上发送的未确认请求的最大数量。请注意,如果此配置值大于 1,且 enable.idempotence 设置为 false,则在发送失败后,由于重试(即如果启用了重试),消息可能会发生重新排序;如果禁用了重试,或者 enable.idempotence 设置为 true,则消息顺序将得到保留。此外,启用幂等性要求此配置的值小于或等于 5,因为消息代理(broker)为每个生产者最多仅保留 5 个批次。如果该值大于 5,消息代理端可能会移除之前的批次。 |
transactional.id |
null | 用于事务性投递的 TransactionalId。这启用了跨越多个生产者会话的可靠性语义,因为它允许客户端确保在使用相同 TransactionalId 的事务完成之前,不会启动任何新事务。如果未提供 TransactionalId,则生产者仅限于幂等投递。如果配置了 TransactionalId,则默认启用 idempotence。默认情况下未配置 TransactionalId,这意味着无法使用事务。请注意,默认情况下,事务需要至少包含三个代理的集群,这是生产环境的推荐设置;在开发环境中,您可以通过调整代理设置 transaction.state.log.replication.factor 来更改此配置。 |
消息推送流程
在消息发送的过程中,涉及到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。

Java API
异步发送
需求:创建Kafka生产者,采用异步的方式发送到Kafka Broker。
1 | import org.apache.kafka.clients.producer.KafkaProducer; |
异步发送(带回调函数)
回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
1 | import org.apache.kafka.clients.producer.*; |
同步发送
只需在异步发送的基础上,再调用一下get()方法即可。
1 | for (int i = 0; i < 100; i++) { |
生产者分区
分区的好处
1、便于合理使用存储资源,每个 partition 在一个 broker 上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台 broker 上。合理控制分区的任务,可以实现负载均衡的效果。
2、提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

发送消息的分区策略
默认的分区器
默认的分区器:DefaultPartitioner
分区原则:ProducerRecord

示例一
将数据发往指定partition的情况下,例如,将所有数据发往分区1中。
1 | for (int i = 0; i < 100; i++) { |
示例二
没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值。
1 | for (int i = 0; i < 100; i++) { |
自定义分区器
**需求:**例如我们实现一个分区器实现,发送过来的数据中如果包含Kafka,就发往0号分区,不包含Kafka,就发往1号分区。
实现:
1 | import org.apache.kafka.clients.producer.Partitioner; |
使用分区器的方法,在生产者的配置中添加分区器参数。
1 | // 1. 创建kafka生产者的配置对象 |
提高吞吐量
提高 Kafka 生产者吞吐量相关参数:batch.size 、linger.ms 、buffer.memory 、compression.type。
1 | // 1. 创建kafka生产者的配置对象 |
消息有序

Topic 单分区:消息有序的条件如下:
- 未开启幂等性:
max.in.flight.requests.per.connection=1。 - 开启幂等性:原理见下文【幂等性】。
Topic 多分区:每个分区消息有序,分区与分区间消息无序。
- 如何做到多分区间消息有序呢?方案:同一个消费者拉取所有分区的数据,然后进行全排序,效率低,建议使用单分区。
消息传递语义
https://kafka.apache.org/43/design/design/#message-delivery-semantics
- 最多一次(At Most Once):消息可能会丢失,但绝不会重复写入。
ACK级别设置为0。 - 至少一次(At Least Once):消息绝不会丢失,但可能会重复写入。
消息确认模式为all+主题的分区副本数>=2+ISR里应答的最小同步副本数>=2。 - 精确一次(Exactly Once):消息既不会丢失,也不会重复写入。
至少一次+幂等性。
至少一次
消息不丢失(发送可靠消息)的条件:
- 生产者端:
acks=all - Broker端:
default.replication.factor>=2、min.insync.replicas>=2

消息可能重复:acks=all 时,生产者发送消息,leader 收到消息且与 ISR 队列里的所有 follower 同步完成,在响应生产者的一瞬间 leader 挂了,导致 ack 失败。然后生产者会再选择一个 leader 重新发消息,此时消息重复(因为上次发送的消息已经同步完成)。虽然概率很低,但确实有可能发生。
幂等性
幂等性是指无论 Producer 将同一条消息重复发送多少次,Broker 在日志中都只会持久化一次。即生产者重试(Retries)不会引发消息重复写入问题。
在生产者端开启幂等性:enable.idempotence=true 、max.in.flight.requests.per.connection<=5 、retries>0 、acks=all 。
Kafka 实现幂等性的核心原理是引入了 PID(Producer ID) 和 Sequence Number(序列号) 机制。以下是其运作的具体步骤:
核心概念:
- PID:当开启幂等性后,每个 Producer 在初始化启动时,都会向 Broker 申请并被分配一个唯一的 PID。这个过程对用户是完全透明的。
- Sequence Number:Producer 发送的每一个消息批次 RecordBatch 都会携带一个从 0 开始递增的序列号,Broker 端会在内存中为每个
<PID, Topic, Partition>记录一个当前已接收的最大序列号。
Broker 端去重逻辑:
当生产者重试发送消息时,Broker 会对比接收到的 新序列号 与 最大序列号 :
Broker 接收到消息后,会在内存中为每个 <PID, Topic, Partition> 维护一个状态,记录其对应的最新已提交序列号(假设记录为 SN_broker)。当 Broker 收到新消息时(带有序列号 SN_new),会执行以下校验逻辑:
- 如果
新序列号 == 最大序列号 + 1: 这是预期的正常情况,说明消息是连续的,Broker 正常接收并写入,同时更新内存中的最大序列号。 - 如果
新序列号 <= 最大序列号: 这可能是网络延迟导致 Producer 误以为失败而发起的重试,Broker 判定为消息重复,直接丢弃并向 Producer 返回成功 ACK,让 Producer 停止重试。所以,开启幂等性会保证单会话单分区消息不重复。 - 如果
新序列号 > 最大序列号 + 1: 序列号出现跳跃,Broker 认为消息乱序。此时,Kafka Broker 最多缓存生产者发来的最近max.in.flight.requests.per.connection个消息批次,直到前序号码到来并重排序,然后落盘。所以,开启幂等性会保证单会话单分区消息有序。- 若跳跃幅度在
max.in.flight.requests.per.connection(默认 ≤ 5)范围内,Broker 会将其视为暂时的网络乱序,允许将该消息暂存内存中等待前序号码到来并重排序。 - 若前序消息因【发送失败且重试耗尽】等原因被客户端永久放弃(形成真实空洞),或跳跃幅度超过了窗口上限(> 5),则意味着数据流一致性遭到破坏。此时 Broker 会拒绝该消息,并抛出
OutOfOrderSequenceException异常,通常会导致 Producer 进入致命错误状态。
- 若跳跃幅度在
幂等性的局限性(作用范围):
- 只能保证单会话 (Single Session):PID 是 Producer 进程启动时动态分配的。**如果 Producer 应用崩溃重启,它会获得一个新的 PID。**对于 Broker 来说,这完全是一个新的生产者,之前的序列号状态失效,因此无法防范应用重启导致的重复发送。
- 只能保证单分区 (Single Partition):幂等性的序列号是绑定在特定 Partition 上的。如果消息被路由到了不同的 Partition,幂等性无法生效。
若需要跨会话、跨分区进行精确一次的原子操作(例如读写多个 Topic 时),必须结合 Kafka 事务(Transactions) 才能实现完整的端到精确一次。
生产者事务
解决 跨会话、跨分区 的原子性问题。开启事务必须要开启幂等性。
事务原理:

Kafka的事务一共有如下 5 个API:
1 | // 1、初始化事务 |
单个Producer,使用事务保证消息的仅一次发送:
1 | import org.apache.kafka.clients.producer.KafkaProducer; |
kafka-console-consumer.sh(消费者)
kafka-console-consumer.sh 是 Kafka 提供的控制台消费者工具。它用于从指定主题中读取并实时打印消息数据,是开发调试、故障排查和临时查看数据的常用利器。
快速入门(官网)
打开另一个终端会话,运行控制台消费者客户端以读取您刚刚创建的事件:
1 | 消费主题中的新事件 |
您可以随时按 Ctrl-C 停止消费者客户端。
请随意尝试:例如,切换回生产端终端(上一步),写入更多事件,然后观察这些事件如何立即显示在消费者端终端上。
由于事件会被持久化存储在 Kafka 中(默认保留 7 天),因此可以被任意多次读取,且支持任意数量的消费者进行读取。您只需再打开一个终端会话并重新运行之前的命令,即可轻松验证这一点。
命令行参数
完整帮助列表:kafka-console-consumer.sh --help 。
| 参数 | 描述 |
|---|---|
--bootstrap-server <String: server toconnect to> |
连接的Kafka Broker主机名称和端口号。 |
--command-config <String: config file> |
消费者配置属性文件。请注意,--command-property 的优先级高于此配置。 |
--command-property <String: consumer_prop> |
消费者配置属性,格式为 key=value。 |
--group <String: consumer group id> |
消费者的消费者组ID。 补充: 1、若未指定 group id,则 Kafka 的 组协调器(Group Coordinator)会自动创建并初始化。 2、删除消费者组是一个不可逆的操作。一旦删除,该组在 Kafka 里的所有消费进度(Offsets)会被彻底清空。如果再次启动改组,由于找不到 established offset(已建立的偏移量),它会重新根据 auto.offset.reset(如 earliest 或 latest)去重新初始化位置,这极有可能导致你的业务系统发生重复消费或历史数据漏读。 |
--topic <String: topic> |
操作的 Topic 名称。 |
--from-beginning |
如果消费者还没有建立要消费的偏移量,则从日志中最早的消息开始消费,而不是从最新的消息开始消费。 |
配置属性
| 属性 | 默认值 | 描述 |
|---|---|---|
bootstrap.servers |
用于与 Kafka 集群建立初始连接的主机/端口对列表。客户端使用此列表来引导并发现所有 Kafka Broker。虽然列表中服务器的顺序无关紧要,但我们建议包含多个服务器,以确保在某些服务器宕机时仍能保持系统弹性。此列表无需包含所有 Broker,因为 Kafka 客户端会自动高效地管理和更新与集群的连接。此列表必须采用 host1:port1,host2:port2,…. 的格式。 | |
key.deserializer |
用于键的反序列化器类,该类实现了 org.apache.kafka.common.serialization.Deserializer 接口。 | |
value.deserializer |
用于值的反序列化器类,该类实现了 org.apache.kafka.common.serialization.Deserializer 接口。 | |
group.id |
null | 用于标识该消费者所属消费者组的唯一字符串。如果消费者使用 subscribe(topic) 实现的组管理功能,或者使用基于 Kafka 的偏移量管理策略,则此属性为必填项。 |
enable.auto.commit |
true | 如果为 true,消费者的偏移量将在后台定期提交。 |
auto.commit.interval.ms |
5000 (5 seconds) | 当 enable.auto.commit 设置为 true 时,消费者偏移量自动提交到 Kafka 的频率(以毫秒为单位)。 |
auto.offset.reset |
latest | 当 Kafka 中没有初始偏移量,或者服务器上当前偏移量已不存在时(例如因数据已被删除),应如何处理: 1、earliest:自动将偏移量重置到最早的偏移量 2、latest:自动将偏移量重置到最新的偏移量 3、by_duration:自动将偏移量重置到距离当前时间戳指定“时长”的位置。时长必须以 ISO8601 格式(PnDTnHnMn.nS)指定。不允许使用负时长。 4、none:如果找不到消费者组的任何先前偏移量,则向消费者抛出异常 5、其他任何值:直接向消费者抛出异常。 请注意,在将此配置设置为 latest 时,更改分区号可能会导致消息丢失,因为生产者可能会在消费者重置其偏移量(开始消费)之前开始向新添加的分区发送消息(即,尚不存在初始偏移量)。 补充: 1、Offset 归属于 消费者组(Consumer Group)+ 主题分区(Topic Partition),而不是单个消费者。 2、 当配置为 latest 时,动态增加分区数量可能会导致消息丢失。 原因:新分区刚创建时,消费者组在服务端没有它的偏移量记录,从而触发 latest 重置策略。如果生产者在消费者完成重置之前,就已经向新分区写入了数据,消费者重置偏移量时会直接定位到新分区的最新末尾,导致重置前写入的那部分历史消息被直接跳过(漏读)。 |
offsets.topic.num.partitions |
50 | 偏移量提交主题的分区数(部署后不应更改)。 补充:这是 Broker 的配置。 |
group.protocol |
classic | 消费者使用的组协议。支持的值为 classic 或 consumer。 补充:通过将其设置为 consumer,可以启用新一代的消费者重平衡协议(基于 KIP-848),大幅缩短由于节点加入或退出导致的集群停顿时间。 |
heartbeat.interval.ms |
3000 (3 seconds) | 使用 Kafka 的组管理功能时,向消费者协调器发送心跳的预期时间间隔。心跳用于确保消费者的会话保持活动状态,并在新消费者加入或离开组时促进重新平衡。此配置仅在 group.protocol 设置为“classic”时受支持。在这种情况下,该值必须小于 session.timeout.ms,但通常不应高于该值的 1/3。可以将其调整得更低,以控制正常重新平衡的预期时间。如果 group.protocol 设置为“consumer”,则不支持此配置,因为心跳间隔由代理通过 group.consumer.heartbeat.interval.ms 控制。 |
group.consumer.heartbeat.interval.ms |
5000 (5 seconds) | 分配给消费者组成员的心跳间隔。 补充:这是 Broker 的配置。 |
session.timeout.ms |
45000 (45 seconds) | 在使用 Kafka 的组管理功能时,用于检测客户端故障的超时时间。客户端会定期向 Broker 发送心跳信号,以表明其处于活动状态。如果在该会话超时到期前 Broker 未收到任何心跳信号,则 Broker 将把该客户端从组中移除并启动重新平衡。请注意,该值必须在由 broker 配置中的 group.min.session.timeout.ms 和 group.max.session.timeout.ms 设定的允许范围内。另请注意,当 group.protocol 设置为“consumer”时,不支持此客户端配置。在这种情况下,会话超时由 Broker 配置中的 group.consumer.session.timeout.ms 控制。 |
group.consumer.session.timeout.ms |
45000 (45 seconds) | 使用 consumer 组协议时,用于检测客户端故障的超时时间。 补充:这是 Broker 的配置。 |
max.poll.interval.ms |
300000 (5 minutes) | 使用消费组管理时,两次调用 poll() 之间的最大延迟。这为消费者在获取更多记录之前可以处于空闲状态的时间设置了上限。如果在该超时时间到期前未调用 poll(),则认为该消费者已失败,并且消费组将进行重新平衡(Rebalance),以便将分区重新分配给另一个成员。对于使用非空 group.instance.id 且触发此超时的消费者,分区不会立即被重新分配。相反,消费者将停止发送心跳,并且分区将在会话超时(Session Timeout)到期后被重新分配。此处的会话超时时间取决于所使用的协议:1、如果使用经典重平衡协议(Classic rebalance protocol),由客户端配置 session.timeout.ms 决定。 2、如果使用消费者协议(Consumer protocol),由服务端配置 group.consumer.session.timeout.ms 决定。这与已关闭的静态消费者的行为完全一致。 |
max.poll.records |
500 | 单次调用 poll() 时返回的最大记录条数。请注意,max.poll.records 并不会影响底层的数据拉取(Fetching)行为。消费者会缓存每次拉取请求(Fetch Request)获取到的记录,并在每次调用 poll() 时将它们递增地(分批)返回。 |
fetch.max.wait.ms |
500 | 服务器在响应获取请求之前,如果数据量不足以立即满足 fetch.min.bytes 指定的要求,将阻塞的最长时间。此配置仅用于本地日志获取。要调整远程获取的最大等待时间,请参阅代理配置中的 remote.fetch.max.wait.ms。 |
fetch.min.bytes |
1 | 服务器针对获取请求应返回的最小数据量。如果可用数据不足,请求将等待该数量的数据累积完毕后再进行响应。默认设置为 1 字节,这意味着一旦有该数量的字节数据可用,系统就会立即响应获取请求;否则,获取请求将在等待数据到达时超时。将此值设置为较大数值将导致服务器等待积累更多数据后再响应,这可以在一定程度上提高服务器吞吐量,但会带来额外的延迟。即使 Broker 中可用的总数据量超过了 fetch.min.bytes,由于每个分区的限制 max.partition.fetch.bytes 以及最大返回限制 fetch.max.bytes,实际返回的大小仍可能小于该值。 |
fetch.max.bytes |
52428800 (50 MiB) | 服务器针对一次获取请求应返回的数据最大量。消费者会分批获取记录;如果获取操作中第一个非空分区中的首个记录批次大于此值,该记录批次仍会被返回,以确保消费者能够继续处理。因此,这并非绝对上限。Broker 接受的最大记录批次大小通过 message.max.bytes(Broker 配置)或 max.message.bytes(主题配置)定义。一个获取请求由多个分区组成,还有另一个设置用于控制获取请求中每个分区返回的数据量——请参阅 max.partition.fetch.bytes。请注意,消费者会并行执行多次获取操作。 |
max.partition.fetch.bytes |
1048576 (1 MiB) | 服务器返回的每个分区最大数据量。消费者以批次形式获取记录。如果获取操作中第一个非空分区中的第一个记录批次大于此限制,该批次仍将被返回,以确保消费者能够继续处理。Broker 接受的最大记录批次大小通过 message.max.bytes(Broker 配置)或 max.message.bytes(主题配置)进行定义。有关限制消费者请求大小的信息,请参阅 fetch.max.bytes。补充:1 MiB(百萬位元組):基于二进制,等于 2²⁰ 字节(1,048,576 字节)。 |
message.max.bytes |
1048588 | Kafka 允许的最大记录批处理大小(如果启用了压缩,则为压缩后的大小)。可以通过主题级别的 max.message.bytes 配置项按主题进行设置。补充:这是 Broker 的配置。 |
partition.assignment.strategy |
class org.apache.kafka.clients.consumer.RangeAssignor,class org.apache.kafka.clients.consumer.CooperativeStickyAssignor | 支持的分区分配策略的类名或类类型列表(按首选项排序)。当使用组管理时,客户端将使用这些策略在消费者实例之间分配分区所有权。可用选项包括: org.apache.kafka.clients.consumer.RangeAssignor:按主题分配分区。 org.apache.kafka.clients.consumer.RoundRobinAssignor:以轮询方式将分区分配给消费者。 org.apache.kafka.clients.consumer.StickyAssignor:在尽可能保留现有分区分配的同时,保证分配结果达到最大平衡。 org.apache.kafka.clients.consumer.CooperativeStickyAssignor:遵循与 StickyAssignor 相同的逻辑,但允许进行协作式重平衡。 默认的分配器为 [RangeAssignor, CooperativeStickyAssignor]。它默认会使用 RangeAssignor,但允许通过一次滚动重启(Rolling Bounce)来移除列表中的 RangeAssignor,从而直接升级到 CooperativeStickyAssignor。实现 org.apache.kafka.clients.consumer.ConsumerPartitionAssignor 接口允许您插入自定义的分配策略。 |
消费模型
| 维度 | 传统消费者组 (Consumer Group) | 共享组 (Share Group - Kafka 4.0) |
|---|---|---|
| 消费模型 | 流处理模型:基于**发布-订阅(Publish-Subscribe)**模式,侧重事件流、状态计算及严格顺序消费。 | 消息队列模型:基于传统的**消息队列(Message Queue / Point-to-Point)**模式(类似于 RabbitMQ 的 Work Queues),侧重异步任务、Worker 派发及高并发处理。 |
| 并发粒度 | 分区级独占: 1、在同一个消费者组内,一个分区在同一时间只能被一个消费者消费; 2、如果组内消费者数大于分区数,多出来的消费者将会处于**闲置(Idle)**状态。 |
消息级共享: 1、解除了“分区绑定”的限制。同一个分区内的消息可以被组内的多个消费者并发消费。 2、消息就像传送带上的零件,哪个消费者有空闲,谁就拉取一条进行处理。 |
| 最大并发数 | 受限于分区数量:最大有效消费者数等于 Topic 分区总数。 | 理论上无限制:按需横向扩展消费者,消费者数量可远超分区数量。 |
| 消息顺序性 | 严格保证单分区内消息顺序。因为单个分区只由一个消费者处理,所以能严格保证单分区内消息的**先进先出(FIFO)**顺序。 | 不保证顺序。由于多条消息并发分配给不同的消费者,先分配的消息不一定先处理完。 |
| 消息确认机制 | 提交分区偏移量(Offset)。消费者提交的是当前分区消费到的最高偏移量(例如:分区 0 消费到了第 100 条)。 | 单条消息确认 (Ack / Nack)。每个消费者对单条消息进行确认(Ack)或拒绝(Nack)。未成功处理的消息会触发超时并投递给其他消费者(重试机制)。 |
| 重平衡 (Rebalance) | 经典的“分区所有权”模型。在现代机制下(如增量协作与 KIP-848),增减消费者时,Broker 服务端会通过增量协议进行局部拓扑微调。只有受影响的分区会发生所有权转移,其余分区的消费完全不间断,极大地优化了重平衡抖动。 | 底层彻底解耦了分区所有权。组内所有消费者共同面对所有分区,由 Broker 采用拉取驱动的消息级锁(Message-level Lock)动态分发数据。谁有空闲,谁就直接拉取单条消息。这种设计从根本上规避了分区分配的概念,因此彻底杜绝了任何形式的消费者重平衡。 |
| 适用场景 | 需要严格保证消息顺序(如:银行流水、日志审计)。 流式计算与状态处理(如:Kafka Streams、Flink 任务)。 |
大任务/重度异步处理: 比如每条消息都需要调用第三方 API 或进行耗时的 CPU 计算。 传统 MQ 迁移: 方便将原本运行在 RabbitMQ、ActiveMQ 的工作队列业务平滑迁移到 Kafka。 |
Java API
独立消费者示例(订阅主题)
需求:创建一个独立消费者,消费first主题中数据。
1 | import org.apache.kafka.clients.consumer.ConsumerConfig; |
独立消费者示例(订阅分区)
需求:创建一个独立消费者,消费first主题0号分区的数据。
1 | import org.apache.kafka.clients.consumer.ConsumerConfig; |
消费者组示例
需求:测试同一个主题的分区数据,只能由一个消费者组中的一个消费。
示例:
- 复制 CustomConsumer 类的代码,在 IDEA 中同时启动,即可启动同一个消费者组中的两个消费者。
- 启动代码中的生产者发送消息,在 IDEA 控制台即可看到两个消费者在消费不同分区的数据(如果只发生到一个分区,可以在发送时增加延迟代码Thread.sleep(2);)。
- 重新发送到一个全新的主题中,由于创建的主题分区数为1,可以看到只能有一个消费者消费到数据。
分区分配策略
partition.assignment.strategy=class org.apache.kafka.clients.consumer.RangeAssignor,class org.apache.kafka.clients.consumer.CooperativeStickyAssignor
Range 以及再平衡
原理:

原理验证:
修改主题 first 为7个分区
复制 CustomConsumer 类,创建三个消费者 CustomConsumer、CustomConsumer1、CustomConsumer2 组成消费者组,组名都为“test”,同时启动 3 个消费者。
启动 CustomProducerCallback 生产者,随机发送消息到不同的分区。
观看 3 个消费者分别消费哪些分区的数据。
0号消费者:消费到0、1、2号分区数据。
1号消费者:消费到3、4号分区数据
2号消费者:消费到5、6号分区数据。
触发重平衡:
停掉0号消费者,快速重发消息,观看结果(尽量在 45s 以内,因为默认
session.timeout.ms=45000)。1号消费者:消费到3、4号分区数据,外加0、1、2号分区数据。
2号消费者:消费到5、6号分区数据。
再次重新发送消息观看结果(45s 以后),已发生重平衡。
1号消费者:消费到0、1、2、3号分区数据
2号消费者:消费到4、5、6号分区数据。
总结:Broker 未在 45s 内收到0号消费者的心跳,则把0号消费者从消费者组中移除并将它的任务整体交给其他 Broker执行,45s 以后按照 Range 方式重平衡。
RoundRobin 以及再平衡
原理:

原理验证:
依次在 CustomConsumer、CustomConsumer1、CustomConsumer2 三个消费者代码中修改分区分配策略为 RoundRobin。
1
2// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());重启 3 个消费者,重复发送消息的步骤,观看分区结果。
0号消费者:消费到0、3、6号分区数据。
1号消费者:消费到1、4号分区数据
2号消费者:消费到2、5号分区数据。
触发重平衡:
停掉0号消费者,快速重发消息,观看结果(尽量在 45s 以内,因为默认
session.timeout.ms=45000)。1号消费者:消费到1、4号分区数据,外加0、6号分区数据。
2号消费者:消费到2、5号分区数据,外加3号分区数据。
再次重新发送消息观看结果(45s 以后),已发生重平衡。
1号消费者:消费到0、2、4、6号分区数据
2号消费者:消费到1、3、5号分区数据。
总结:Broker 未在 45s 内收到0号消费者的心跳,则把0号消费者从消费者组中移除并将它的任务轮训交给其他 Broker执行,45s 以后按照 RoundRobin 方式重平衡。
Sticky 以及再平衡
Sticky(粘性)分区分配策略是 Kafka 0.11.x 版本引入的一种高效的分区分配算法。它的核心目标是在保证分区均匀分配的同时,尽可能保留上一次的分配结果(粘性),从而显著减少再平衡(Rebalance)的开销。
原理验证:
依次在 CustomConsumer、CustomConsumer1、CustomConsumer2 三个消费者代码中修改分区分配策略为 Sticky。
1
2// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StickyAssignor.class.getName());重启 3 个消费者,重复发送消息的步骤,观看分区结果。
每次重启都不同。
触发重平衡:
- 停掉0号消费者,快速重发消息,观看结果(尽量在 45s 以内,因为默认
session.timeout.ms=45000)。 - 再次重新发送消息观看结果(45s 以后),已发生重平衡。
总结:Broker 未在 45s 内收到0号消费者的心跳,则把0号消费者从消费者组中移除并将它的任务均匀分配给其他 Broker执行,45s 以后按照 Sticky 方式重平衡(会尽量保留现有的分配关系,基本不变)。
offset 偏移量
使用 Kafka Connect 将数据作为事件流导入/导出
您可能在现有系统(例如关系数据库或传统消息系统)中拥有大量数据,并且许多应用程序已经在使用这些系统。Kafka Connect 允许您持续地将数据从外部系统导入 Kafka Topic,反之亦然。运行连接器是一个可扩展的工具,这些连接器实现了与外部系统交互的自定义逻辑。因此,将现有系统与 Kafka 集成非常容易。为了进一步简化此过程,Kafka 提供了数百个现成的连接器。
下面了解如何使用简单的连接器运行 Kafka Connect,实现将数据从文件导入到 Kafka Topic,以及将数据从 Kafka Topic 导出到文件。
请确保将 connect-file-4.2.0.jar 添加到 Connect Worker 进程配置中的 plugin.path 属性。
1 | cd /data/local/kafka |
接下来,先创建一些用于测试的初始数据:
1 | echo foo > test.txt |
接下来,我们将启动两个以独立模式运行的连接器,这意味着它们运行在单个本地专用进程中。我们提供三个配置文件作为参数。第一个始终是 Kafka Connect 进程的配置,其中包含通用配置,例如要连接的 Kafka broker 和数据序列化格式。其余配置文件分别指定要创建的连接器。这些文件包含唯一的连接器名称、要实例化的连接器类以及连接器所需的任何其他配置。
1 | 三个配置文件分别是 Worker进程配置、数据源(Source)配置、数据目标(Sink)配置 |
Kafka 附带的这些示例配置文件使用您之前启动的默认本地集群配置,并创建两个连接器:
- 源连接器,它从文件中读取行,并将每一行作为一个事件写入 Kafka Topic。
- 接收器连接器,它从 Kafka Topic 中读取事件,并将每个事件作为一行输出到文件。
启动过程中,您会看到一些日志消息,其中一些指示连接器正在实例化。Kafka Connect 进程启动后,源连接器应开始从 test.txt 读取行并将其写入主题 connect-test,而接收器连接器应开始从主题 connect-test 中读取事件并将其输出到文件 test.sink.txt。我们可以通过检查输出文件的内容来验证数据是否已通过整个管道传输:more test.sink.txt 。
请注意,数据正存储在 Kafka 主题 connect-test 中,因此我们还可以运行一个控制台消费者来查看该主题中的数据(或使用自定义消费者代码进行处理):
1 | kafka-console-consumer.sh --topic connect-test --from-beginning --bootstrap-server localhost:9092 |
连接器会持续处理数据,因此我们可以向文件中添加数据 echo "Another line" >> test.txt,并观察数据在管道中的流动。控制台消费者 和 接收文件 应该能看到该行数据。
为什么用它?(与直接写代码的区别)
零代码/低代码:只需通过 JSON 配置文件即可启动连接器(Connector),无需为每个数据库编写重复的同步逻辑。
高可靠性:自动管理 offset(偏移量),如果同步过程中断,重启后能从断点继续,保证数据不丢不重。
横向扩展:支持集群模式(Distributed Mode),可以轻松增加节点来应对海量数据的处理压力。
转换能力 (SMTs):可以在数据流动过程中进行简单的转换,比如隐藏敏感字段、给数据打标签或格式转换。
常见应用场景
构建数据仓库/湖:把业务库(MySQL/Oracle)的数据实时同步到数据湖(Iceberg/HDFS)或数仓。
实时搜索:将数据库的更新实时推送到 Elasticsearch 供前端搜索。
多云/异构系统集成:连接 AWS S3、Google Cloud Pub/Sub、MongoDB 等各种云服务和中间件。
使用 Kafka Streams 处理事件
Kafka Streams 是一个 Java 库(Jar 包)。它让你的 Java 程序具备了实时处理海量数据的能力,而你只需要像写普通的 Java 代码一样去调用它的 API。
Kafka Streams = Kafka 读写能力 + 强大的逻辑处理(计算/转换/聚合) + 自动容错。
终止 Kafka 环境
终止 Ctrl-C 。
若想删除本地 Kafka 环境中的所有数据,包括创建的所有事件,请运行以下命令:
1 | 默认目录 |