Kafka教程

推荐链接:

https://en.wikipedia.org/wiki/Apache_Kafka

https://github.com/apache/kafka

https://kafka.apache.org/

Raft - 维基百科

MQ

介绍

MQ即MessageQueue,消息对列。我们这次要学习的RabbitMQ就是一种典型的MQ产品。

那么到底什么是MQ呢?可以分两个部分来理解:

  • 消息Message:在不同应用程序之间传递的数据。
  • 队列Queue:一种FIFO先进先出的数据结构。将消息以队列的形式存储起来,并且在不同的应用程序之间进行传递,这就成了MessageQueue。

**MQ产品最直接的作用,是将同步的事件驱动改为异步的消息驱动。**这话什么意思?我们从一个最常见的SpringBoot应用开始说起。

首先搭建一个普通的Maven项目在pom.xml中引入SpringBoot的依赖

1
2
3
4
5
6
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
</dependencies>

然后添加一个监听器类

1
2
3
4
5
6
7
8
9
10
11
public class MyApplicationListener implements ApplicationListener<ApplicationEvent> {
@Override
public void onApplicationEvent(ApplicationEvent event) {
System.out.println("=====> MyApplicationListener: " + event);
}

@Override
public boolean supportsAsyncExecution() {
return ApplicationListener.super.supportsAsyncExecution();
}
}

接下来,添加一个springboot启动类,在启动类中加入自己的监听器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@SpringBootApplication
public class ServiceDemoApplication implements CommandLineRunner {

@Resource
private ApplicationContext applicationContext;

public static void main(String[] args) {
SpringApplication application = new SpringApplication(ServiceDemoApplication.class);
application.addListeners(new MyApplicationListener());
application.run(args);
}

@Override
public void run(String... args) throws Exception {
applicationContext.publishEvent(new ApplicationEvent("myEvent") {
});
}
}

好了,不用添加配置文件,直接启动。

image-20241118105439872

从控制台可以看出,SpringBoot框架在启动时,会尝试发布各种ApplicationEvent事件。此时,SpringBoot框架可以称为消息生产者Producer。同样,只要有ApplicationEvent事件发布了,就会触发MyApplicationListener监听器处理这些事件,MyApplicationListener可以称为消息消费者Consumer。

Producer和Consumer的运行状况互不干涉,不管有没有Consumer,Producer一样会发布消息。反过来,不管Producer有没有发布消息,Consumer也一样会监听这些事件。这种方式,实际上就是通过事件中包含的消息在驱动Producer和Consumer工作,这种工作方式也就称为消息驱动。

与消息驱动形成对比的是常见的事件驱动。比如经常写的Controller,只有通过一个事件主动触发,才会调用。

由此可见,SpringBoot内部集成这种消息驱动的机制。但是,这些Producer和Consumer都只能在一个进程中使用。如果需要跨进程调用呢?这就需要独立一个中间服务,才能发布和订阅这些消息。而这个中间服务,就是MQ中间件。

比如在一个大型电商项目中,订单服务完成下单,就可以发布下单事件,而下游的消费者就可以消费这个下单事件,进行一些补充的业务。

image-20241118105946620

应用场景(优势)

**异步处理/异步提速:将非核心、耗时的操作放入队列,让主流程快速返回,从而大幅缩短响应时间。**举例:电商平台用户完成下单支付后,主线程只需记录订单成功。随后将“发送通知短信”、“生成积分”、“扣减库存”等任务丢入消息队列异步执行,提升用户体验。

**应用解耦:上下游系统通过MQ进行交互(即便使用不同开发语言),从而实现系统间的隔离。**举例:当用户在论坛发布新文章时,系统只需将文章内容发至消息队列,由下游的“搜索引擎”、“文章推荐系统”、“积分系统”各自独立消费处理。新增业务模块时,无需修改主发帖逻辑。

**流量削峰/削峰填谷:在上游请求激增时,将瞬时海量请求暂存到消息队列中,下游服务根据自身最大处理能力匀速消费,防止系统被打垮。**举例:电商秒杀活动中,数百万用户同时点击购买,将请求全部打入数据库会导致宕机。使用消息队列可缓冲洪峰,系统依自身能力处理(如每秒处理2000笔),未处理的请求在队列中排队等待。

**分布式事务管理:通过消息的最终一致性,保证分布式环境下各个独立数据库的数据同步。**举例:跨库业务处理。例如用户跨行转账,资金转出成功后产生一条消息,转入方系统消费该消息增加余额,若失败则不断重试,达成事务的最终一致。扩展:分布式事务的主流解决方案

劣势

引入 MQ 属于典型的“用空间换时间”和“用复杂度换并发”的方案。

系统可用性降低(引入单点风险)

  • 原理:系统引入的外部依赖越多,故障概率就越高。
  • 代价:原本系统只需 A 服务直接调用 B 服务即可完成业务。引入 MQ 后,一旦 MQ 出现宕机、磁盘满、或集群网络分区,整个上游的写操作将全部瘫痪。因此,为了保证高可用,必须额外投入精力去维护 MQ 的主从集群或分布式部署。

系统复杂度大幅提高(治理成本高),开发者必须处理一系列由异步带来的棘手问题:

  • 消息丢失问题:如何保证消息在网络闪断、MQ 宕机或消费者挂掉时不丢失?(需要开启生产者确认、持久化、消费者手动 ACK)。
  • 消息重复消费(幂等性问题):由于网络延迟,生产者可能会重复发送消息,或者 MQ 可能会重复投递消息。下游消费者必须实现幂等性设计(如通过唯一业务 ID 去重、加锁),否则会导致重复扣款、重复加积分等严重业务事故。
  • 消息积压问题:当上游发送速度远大于下游消费速度时,MQ 队列会爆满,可能导致磁盘写满或消息过期丢失(需要监控、临时扩容消费者、加速消费)。

数据一致性问题(从强一致变为最终一致)

  • 原理:A 服务执行完本地操作,发完 MQ 消息就直接返回成功给用户了。此时 B 服务还在队列里排队等待消费。
  • 代价:这破坏了传统的强一致性。如果 B 服务在消费时发生代码异常、数据库宕机,导致这条消息最终消费失败,那么 A 和 B 的数据就会产生不一致(例如 A 钱扣了,B 商品没发)。此时必须引入分布式事务、复杂的重试机制或人工对账补偿。

消息顺序性难以保证

  • 原理:分布式 MQ 为了高并发,通常会将消息分散在多个分片/队列(Partition/Queue)中并发处理。
  • 代价:这导致消息的消费顺序可能与发送顺序不一致。例如:某个订单同时产生了“创建订单”和“支付订单”两条消息,如果“支付订单”因为网络快先被消费了,业务逻辑就会报错。保证严格顺序(如使用 Kafka 的 Partition Key 机制)往往需要牺牲系统的并发性能。

可视化工具

https://www.kafkatool.com/

https://github.com/Bronya0/Kafka-King 使用中

https://github.com/kafbat/kafka-ui

历史版本

关于所有历史版本的 发布日期官方博客升级说明发行说明,均可在 下载页面 集中查阅。

大版本 发布时间 核心技术标签 架构依赖 核心商业/运维价值
Kafka 1.0 2017 年 Exactly-Once 落地 强依赖 ZooKeeper 解决数据重复/丢失痛点,流处理正式进入金融级金融/核心业务。
Kafka 2.0 2018 年 安全、合规与运维优化 强依赖 ZooKeeper 引入前缀 ACL 与动态配置,极大降低跨部门、大规模集群的权限和运维成本。
Kafka 3.0 2021 年 KRaft 共识预演 双模式(ZK/KRaft 并存) 宣布弃用 ZK,默认开启最高安全级别数据一致性,为架构平滑过渡打下基础。
Kafka 4.0 2025 年 彻底去 ZK 与纯粹云原生 纯 KRaft 模式(零 ZK 依赖) 运维架构减负 50%;支持单集群百万级分区;彻底解决重平衡(Rebalance)停顿痛点。

1.0

  • **Exactly-Once Semantics (EOS) 落地与完善:**提供了完备的幂等性与事务支持,保证消息在网络波动或 Broker 宕机时,既不重复也不丢失,使 Kafka Streams 能够安全地应用于核心记账、计费系统。
  • **JBOD 磁盘容错优化:**改进了对单节点多磁盘阵列(Just a Bunch Of Disks)的支持。如果某一块磁盘损坏,Broker 能够自动隔离该盘上的分区,而不会导致整个 Broker 节点崩溃。
  • **Controller 指标细化:**大幅增加了对 Controller 内部状态的监控指标,提升了系统黑盒运维的透明度。

2.0

  • **基于前缀的 ACL 访问控制 (KIP-290):**支持通过 * 等通配符对特定前缀的 Topic 进行批量授权,解决了大型组织内成千上万个 Topic 必须逐一赋权的运维死结。
  • **免重启动态配置与安全升级:**支持在不重启 Broker 的情况下,动态修改配置参数并实时重新加载 SSL/TLS 证书,实现了真正意义上的零停机(Zero-Downtime)安全维护。
  • **引入消费者领先指标 Lead Metrics (KIP-223):**引入了消费者位置与分区最前端(Log Start Offset)的距离监控。一旦该指标逼近零,表明消费者面临因数据过期被踢出、丢失数据的风险,为监控告警提供了新维度。

3.0

  • **KRaft 协议预览版登场 (KIP-500):**正式拉开“去 ZK”序幕,允许在实验性场景下不部署 ZK 运行 Kafka。
  • **默认开启最高级别一致性:**生产者的默认确认机制由 acks=1 升级为最高安全级别的 acks=all,且默认启用幂等性(enable.idempotence=true),标志着 Kafka 从“追求极致吞吐”向“默认数据绝对安全”的策略转变。
  • **JDK 生态清洗:**彻底弃用 Java 8 和 Scala 2.12,全面转向对现代化高性能 JDK(如 Java 11/17)的适配。

4.0

  • **100% 移除 ZooKeeper 代码:**4.0 版本彻底删除了所有与 ZK 相关的代码路径,KRaft 成为唯一合法的元数据管理架构。任何既有 ZK 集群必须先迁移至 KRaft 方可升级至 4.0。
  • **新一代消费者组重平衡协议 GA (KIP-848):**彻底重构了 Consumer Group 协议。将成员状态管理和分区分配逻辑完全移至 Broker 端执行。引发全网痛点的 “Stop-the-World” 全组消费暂停现象不复存在,支持极其平滑的增量式重平衡。
  • **Kafka 共享队列模式 - Share Groups (KIP-932):**引入了传统传统消息队列(如 RabbitMQ)的“竞争消费者”概念。打破了过去“一个分区只能由组内一个消费者消费”的强绑定限制,多个消费者可以并发消费同一个分区内的不同消息。
  • Eligible Leader Replicas (ELR, KIP-966) 与 Pre-Vote 机制 (KIP-996):* ELR 防止在极端灾难、ISR 全挂时,由于盲目选举导致严重的数据丢失。
    • Pre-Vote 引入了 Raft 的预投票机制,有效防止因个别网络抖动断连的节点重连时,引发集群无谓的 Controller 重新选举,大幅提升了集群的稳定性。
  • Java 环境门槛:Broker 运行环境及组件(Connect, Tools)强制要求最低 Java 17;客户端 API 最低要求 Java 11

核心概念

介绍系统核心组件。

Controller(控制器/集群大脑)

Controller 是整个 Kafka 集群的最高指挥官。它不负责处理客户端的具体消息读写,只负责管理整个集群元数据(Metadata)(例如:集群里有几台 Broker、有哪些 Topic、每个 Topic 有几个分区、每个分区的 Leader 在谁手上)。

核心职责:

  • 全局生命周期管理:执行 Topic 的创建、删除、参数修改以及分区扩容 等管理指令。
  • 集群成员管理:监控所有 Broker 的心跳,维护集群拓扑(Broker 的加入、退出)。
  • 分区选主(Leader Election):当某个分区的 Leader 副本不可用时,负责从同步副本集(ISR)中选出新的分区 Leader

4.0 版本的重要变化(KRaft 架构):

  • 去 ZooKeeper 化与 KRaft 仲裁组(Quorum):Kafka 4.0 彻底移除了对 ZooKeeper 的依赖。控制权完全交由基于 Raft 共识算法构建的 Controller 仲裁组。仲裁组通过选举产生一个 Active Controller(主控制器),其余作为 Follower Controller。
  • 元数据“日志化”(__cluster_metadata:全局元数据变更为“事件驱动”模型,持久化在内部主题 __cluster_metadata 中。Active Controller 会将元数据变更编码为 Metadata Record,并追加到 Metadata Log 中,Broker 们像消费者一样去“拉取”消息更新自己的缓存。
  • 预投票机制(Pre-Vote, KIP-996):针对 KRaft 仲裁组引入了预投票机制。当网络发生瞬时抖动时,节点在正式发起 KRaft Leader 选举前会先发起一轮“预探测”。仅在确认能获得多数派响应时才会增加纪元(Epoch)并正式发起选举,大幅降低了集群网络不稳定时的控制平面震荡。
  • 秒级故障恢复与百万分区:消除了外部 ZooKeeper 交互导致的 RPC 瓶颈。当 Active Controller 宕机时,新 Controller 的接管在极短时间内完成。KRaft 架构显著提升了元数据管理能力,使 Kafka 集群能够支持数十万级甚至接近百万级 Partition 的部署规模。

Broker(数据节点/业务员)

Broker 是 Kafka 集群中的实际服务器节点,是底层服务单元,主要负责高并发的消息持久化与 I/O 读写。一个 Broker 就是一台运行着 Kafka 服务的机器。

核心职责:

  • 高吞吐 I/O 处理:基于操作系统的 PageCache、零拷贝技术(Zero-Copy / sendfile)以及磁盘顺序写特性,高效响应客户端的 Produce(写入)与 Fetch(读取)请求。
  • 消息持久化:将生产者发送的消息以追加(Append-only)模式写入本地磁盘的日志段(Log Segment)中。
  • 心跳维持:向 Active Controller 周期性发送心跳。若发生超时,Controller 将判定该 Broker 处于不可用状态(如 Fenced 隔离状态),并剥离 Partition Leader 身份。

4.0 版本的重要变化:

  • 清晰的节点角色(process.roles:节点职责边界更加严格,可配置为 broker(纯数据节点)、controller(纯控制节点)或 broker,controller(混合模式,适用于边缘计算或小型集群)。
  • 被动监听转为主动获取:Broker 不再依赖 ZooKeeper Watch 机制,而是通过 KRaft 元数据同步协议持续获取 Controller 产生的 Metadata Log 增量,并更新本地 Metadata Cache。
  • 新一代重平衡协议落地(KIP-848):4.0 将 Consumer Group(消费者组)的分配算力从客户端收回到了 Broker 端的 Group Coordinator 手中。实现了渐进式、增量式重平衡,彻底终结了过去十几年消费者上下线时导致全组消费卡死的 “Stop-The-World(全局停顿)” 问题。

Topic Partition

Topic 是一个逻辑上的消息分类;Partition(分区)是 Kafka 进行 物理存储、并发读写、高可用复制 的最小工作单元

核心职责与特征:

  • 物理分片与水平扩展:一个 Topic 可划分为多个 Partition,多个 Partition 可以分布在不同 Broker 上,从而突破单机 I/O 吞吐与存储容量的物理上限。Partition 数量决定了 Topic 可达到的理论最大消费并发度。
  • 严格局部有序:单分区内部的消息严格按照写入顺序排列,每个分区的每条消息都会被分配一个唯一的、递增的序列号——Offset(偏移量)Kafka 仅保证 Partition 级别的有序性,不保证 Topic 级别的全局有序。

4.0 版本的重要变化:

  • 消除分区数量天花板:由于 ZooKeeper 无法承受频繁的元数据更新,单个集群的分区总数很难突破几万个。在 4.0 KRaft 模式下,元数据传输极快,单集群可以安全地运行数十万甚至上百万个分区。
  • 引入“共享组”概念(Share Groups, KIP-932):Share Groups 是 Kafka 4.0 引入的新消费模型,与传统 Consumer Group 并存。它允许多个消费者并发处理同一 Partition 中的消息,并通过消息级 ACK/NACK 实现类似传统消息队列的竞争消费模式。

运维指南:Partition 数量与 Linux 系统的文件句柄数(File Descriptors)直接挂钩。海量分区需确保宿主机配置了足够高的 ulimit -n(如 655350),否则进程将因 Too many open files 异常而崩溃。

Leader(领导者/主副本)

为了保证高可用,Kafka 会为每个 Partition 创建多个副本(Replica),在这组副本中,有且仅有一个会被指定为 Leader。也就是说,对于某个 Topic,其每个 Partition 都有且仅有一个 Leader,因此该 Topic 的 Leader Partition 数量等于 Partition 总数。

一个 Broker 上可能存放着成百上千个 Partition 的副本,其中有些是 Leader,有些是 Follower。Leader 的分布应该尽量均衡在不同的 Broker 上,以分摊读写压力。

核心职责:

  • 统一读写入口:在默认情况下,所有的生产者写入请求和消费者读取请求,都必须由该分区的 Leader 副本 亲自处理和响应。(注:虽然 Kafka 支持特定配置下的 Follower 副本读取,但 Leader 依然是绝对的写入核心)。
  • 协调数据同步:Leader 收到写入数据后,会将其追加到本地 Log,并负责监控、协调其他 Follower 副本的数据同步进度。

4.0 版本的重要变化:

  • 预投票机制(KIP-996 Pre-Vote):4.0 引入了 Pre-Vote 机制。当网络出现短暂抖动时,某个节点在正式发起 Leader 选举前会先进行一轮“预热投票”。如果它无法获得大多数节点的响应,就不会触发正式选举。这极大地减少了网络不稳定时的无效选举,让分区 Leader 更加稳定。
  • 合格领导者副本(KIP-966 Eligible Leader Replicas,简称 ELR):Kafka 4.0 中引入了一项重要高可用机制(基于 KIP-966)。它由 KRaft Controller 动态维护,用于在 ISR 不可用时提供经过验证的安全候选副本,从而在保证已提交数据不丢失的前提下缩短 Leader 恢复时间。

Follower(跟随者/从副本)

Leader 之外的所有副本,都叫做 Follower。

核心职责:

  • 数据同步(容灾备份):Follower 副本不接受客户端的写请求。其核心任务是不断地向 Leader 副本发送 Fetch 请求,拉取 Leader 副本最新追加的消息到本地,以保持与 Leader 副本的数据一致性。
  • 故障转移(高可用):当 Leader 副本所在的 Broker 发生故障宕机时,Controller 会从 Follower 中选出一个新的 Leader 以接管读写。

核心机制:ISR(In-Sync Replicas 同步副本集):

  • 动态维护:Leader 根据 Follower 的同步状态向 Controller 上报 ISR 变更请求,由 Controller 更新并发布最新 ISR 状态。ISR 包含 Leader 自身,以及与 Leader 保持足够紧密同步的 Follower(同步滞后时间阈值由 replica.lag.time.max.ms 参数控制)。
  • 选主权限(Leader Election):默认且安全的情况下,仅有处于 ISR 集合中的 Follower 才有资格在 Leader 宕机时接管分区的领导权。若 Follower 同步严重滞后会被踢出 ISR;追平进度后会再次重新加入。 (注:若强制开启 unclean.leader.election.enable=true,则允许非 ISR 副本强行当选,但这将面临数据丢失的风险,生产环境通常禁用此配置。)

脚本文件

https://github.com/apache/kafka/tree/trunk/bin

服务器与集群管理 (Server & Cluster)

这些脚本用于启动、停止和管理 Kafka 节点及存储。

  • kafka-server-start.sh:启动 Kafka 服务器进程(在 4.0 中,可以是 Broker 角色、Controller 角色或两者兼有)。
  • kafka-server-stop.sh:优雅地停止正在运行的 Kafka 服务器。
  • kafka-storage.sh:用于 KRaft 模式 下生成 Cluster ID 和 格式化日志目录。
  • kafka-cluster.sh:集群管理工具。用于查看集群 ID 或注销不再使用的 Broker。
  • kafka-features.sh:管理集群的特性标志(Feature Flags),例如查看或升级集群的元数据版本(Metadata Version)。

主题与分区管理 (Topics & Partitions)

用于对数据流的核心概念(Topic)进行运维。

  • kafka-topics.sh:最常用的脚本。用于 创建、删除、修改、查看 Topic 信息。
  • kafka-reassign-partitions.sh:用于在 Broker 之间同一台 Broker 的不同磁盘路径之间 移动分区和副本(例如节点扩容或缩容时的日志目录重平衡)。
  • kafka-leader-election.sh:手动触发分区的 Leader 选举,通常用于恢复平衡或处理节点故障。
  • kafka-log-dirs.sh:查询各个 Broker 上底层日志目录(Log Directory)的大小和状态。
  • kafka-delete-records.sh:手动删除某个 Topic 中指定 Offset 之前的历史消息记录。

生产者、消费者与组管理 (Clients & Groups)

用于日常的数据发布、订阅以及消费进度管理。

  • kafka-console-producer.sh:控制台生产者。允许你通过终端标准输入直接向 Topic 发送文本消息。
  • kafka-console-consumer.sh:控制台消费者。用于在终端实时读取并打印 Topic 中的消息。
  • kafka-consumer-groups.sh:管理传统的消费者组(Consumer Groups),查看消费积压(Lag)、重置消费位点(Reset Offsets)等。
  • kafka-groups.sh:一个更通用的组管理工具,旨在统一管理不同协议的组。

共享组特性 (Share Groups) 🌟 4.0 新特性

传统 Consumer Group 是按分区(Partition)分配的,而 Share Groups 引入了类似传统消息队列(如 RabbitMQ)的“共享拉取”模式,允许多个消费者并发消费同一个分区的消息。

  • kafka-console-share-consumer.sh:使用全新 Share Group 协议的控制台消费者。
  • kafka-share-groups.sh:用于管理 Share Groups(列出、查看详情、删除)。
  • kafka-share-consumer-perf-test.sh:针对 Share Consumer 模式的性能测试工具。
  • kafka-verifiable-share-consumer.sh:用于系统级自动化测试,验证 Share Consumer 消费行为的正确性。

安全、权限与配置管理 (Security & Configs)

  • kafka-delegation-tokens.sh:管理用于轻量级身份认证的委托令牌(Delegation Tokens)。
  • kafka-acls.sh:管理访问控制列表(ACLs),配置谁(Principal)可以对什么资源(Topic/Group)执行什么操作(读/写)。
  • kafka-configs.sh:动态修改配置工具。可以在不停机的情况下修改 Broker、Topic 或客户端的配置。

KRaft 元数据与底层调试 (Metadata & Debugging)

  • kafka-metadata-quorum.sh:管理和监控 KRaft Controller 的仲裁副本(Quorum),查看 Controller 的 Leader 状态和同步情况。
  • kafka-metadata-shell.sh:一个交互式 Shell 环境,允许你像浏览文件系统一样,浏览 KRaft 集群的内部元数据(替代了以前用 zookeeper-shell.sh 查看 ZK 节点的方式)。
  • kafka-dump-log.sh:底层调试工具。将 Kafka 磁盘上的 .log.index.timeindex 等文件内容导出为可读文本,排查消息损坏或位点错误。
  • kafka-get-offsets.sh:快速查询指定 Topic 分区的 Earliest(最早)或 Latest(最新)Offset(偏移量)。
  • kafka-transactions.sh:用于检查 Kafka 事务状态,或者强制终止卡住的“悬挂事务”(Dangling Transactions)。

Kafka Connect (数据集成)

用于连接外部数据系统(如数据库、Elasticsearch、HDFS 等)的工具。

  • connect-standalone.sh:以单机(Standalone)模式启动 Kafka Connect Worker,适合测试或轻量级任务。
  • connect-distributed.sh:以分布式(Distributed)模式启动 Kafka Connect Worker,支持高可用和弹性扩展(生产环境推荐)。
  • connect-mirror-maker.sh:启动 MirrorMaker 2,用于在不同的 Kafka 集群之间进行数据复制和灾备。
  • connect-plugin-path.sh:帮助管理和排查 Kafka Connect 插件(Plugins)路径配置的工具。

性能测试与自动化验证 (Perf & Testing)

  • kafka-producer-perf-test.sh:生产者压力测试工具,用于测试集群的写入吞吐量(MB/s,Records/s)和延迟。
  • kafka-consumer-perf-test.sh:消费者压力测试工具,用于测试集群的读取吞吐量。
  • kafka-e2e-latency.sh:端到端延迟测试工具,测量一条消息从发出到被消费者接收所需的总时间。
  • kafka-replica-verification.sh:验证不同 Broker 上的副本数据是否一致。
  • kafka-verifiable-producer.sh / kafka-verifiable-consumer.sh:生成或消费确定性事件流的工具,通常不是给普通用户用的,而是供 Apache Kafka 官方进行系统集成测试(System Tests)时验证数据不丢失、不重复使用的。

Kafka Streams (流计算)

  • kafka-streams-application-reset.sh:重置 Kafka Streams 应用的状态。它会清理内部 Topic(如 Changelog 和 Repartition topics),让你能重新处理数据。
  • kafka-streams-groups.sh:用于管理 Kafka Streams(流处理应用)在 Kafka 内部注册的消费组。

通用底座工具 (General Utils)

  • kafka-run-class.sh最底层的包装脚本。几乎所有的脚本,都是通过调用该脚本来执行对应的 Java 类。
  • kafka-jmx.sh:快速查询运行中 Kafka 进程的 JMX 指标的简易命令行工具。
  • kafka-broker-api-versions.sh:查询 Broker 支持的各项 API 版本,用于排查客户端与服务端版本兼容性问题。
  • kafka-client-metrics.sh:管理客户端遥测(Telemetry)指标配置的工具(基于 KIP-714),允许 Broker 动态控制客户端上报哪些监控指标。

安装

下载并解压

二进制下载地址:https://kafka.apache.org/community/downloads/

1
2
3
4
5
6
7
8
9
cd /data/local
tar -zxvf kafka_2.13-4.3.0.tgz
mv kafka_2.13-4.3.0 kafka

# 配置环境变量
grep -qxF 'export PATH=/data/local/kafka/bin:$PATH' /etc/profile || \
echo 'export PATH=/data/local/kafka/bin:$PATH' >> /etc/profile

source /etc/profile

查看版本

1
2
3
kafka-server-start.sh --version
kafka-run-class.sh kafka.Kafka --version
kafka-configs.sh --version

安装依赖

本地必须安装 Java 环境,兼容性:https://kafka.apache.org/43/getting-started/compatibility/

单机-KRaft 混合模式(使用中)

配置

https://github.com/apache/kafka/blob/trunk/config/server.properties

vim /data/local/kafka/config/server-standalone.properties

请修改广播地址 advertised.listeners

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
############################# Server Basics #############################
# 此服务器的角色。设置此项会将我们置于 KRaft 模式。
process.roles=broker,controller
# 与此实例角色关联的节点ID
node.id=1
# 用于连接控制器集群的控制器端点列表。Broker 和 Controller 启动时通过它接入集群,支持动态扩容。
controller.quorum.bootstrap.servers=localhost:9093


############################# Socket Server Settings #############################
# 监听器列表 - 我们将监听的 URI 及其监听器名称的逗号分隔列表。如果监听器名称不是安全协议,则必须同时设置 listener.security.protocol.map。
# 监听器名称和端口号必须是唯一的,除非一个监听器是 IPv4 地址,而另一个监听器是 IPv6 地址(且端口相同)。
# 将主机名指定为 0.0.0.0 可绑定到所有接口。补充:双斜杠后面直接跟冒号,表示主机名为 0.0.0.0。
# 将主机名留空可绑定到默认接口。
# 合法的监听器列表示例:
# listener_name://host_name:port
# PLAINTEXT://myhost:9092,SSL://:9091
# CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093
# PLAINTEXT://127.0.0.1:9092,SSL://[::1]:9092
# 补充:
# 可以理解为“监听器名称与网络端口之间的映射/绑定”。
# 双斜杠后面直接跟冒号,表示主机名为 0.0.0.0。
# 组合节点(即具有 `process.roles=broker,controller` 的节点)至少必须在此处列出 Controller 监听器。
listeners=INTER_BROKER://:9092,INTER_CONTROLLER://:9093
# 指定 Kafka Broker 将向客户端和其他 Broker 广播的监听地址。
# 当实际的监听器配置地址与客户端应使用的连接地址不一致时(例如在云环境中),此配置非常有用。例如:Kafka Broker 部署在内网或容器内(IP 为 172.31.20.50),但外部公网或客户端需要通过映射后的外网 IP(如 100.10.11.12)来访问。这些地址由控制器发布和管理,Broker 根据需要从控制器拉取这些数据。如果未设置此项,将使用 listeners 的值。与 listeners 不同,广播 0.0.0.0 元地址是无效的。
# 此外,与 listeners 不同,此属性中可以存在重复的端口,从而允许一个监听器被配置为发布另一个监听器的地址。在使用外部负载均衡器的一些场景中,这可能非常有用。
# 补充:Broker 启动时会向 KRaft 控制器发送注册请求,把自己配置的 advertised.listeners 地址上报。KRaft 控制器会将这些地址持久化到元数据中,当有元数据请求时,再返回给客户端和其他 broker 作为实际路由依据。
advertised.listeners=INTER_BROKER://10.25.65.162:9092
# 监听器名称与安全协议之间的映射。每个监听器名称在映射中只能出现一次。
# 要使同一安全协议可在多个端口或 IP 地址上使用,必须定义此映射关系。例如,即使内部和外部流量都需要 SSL,也可以将它们分开。具体来说,用户可以定义名为 INTERNAL 和 EXTERNAL 的监听器,并将此属性设置为:INTERNAL:SSL,EXTERNAL:SSL。
# 可以通过在配置名称前添加规范化前缀(监听器名称小写)来为每个监听器配置不同的安全设置(SSL 和 SASL)。例如,要为 INTERNAL 监听器设置不同的密钥库,可以设置一个名为 listener.name.internal.ssl.keystore.location 的配置,如果未设置监听器名称对应的配置,则该配置将回退到通用配置(即 ssl.keystore.location)。
# 请注意,在 KRaft 中,如果没有提供显式映射并且没有使用其他安全协议,则会假定从 controller.listener.names 定义的监听器名称到 PLAINTEXT 的默认映射。
# 补充:可以理解为“监听器名称的创建”。
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,INTER_CONTROLLER:SASL_PLAINTEXT,INTER_BROKER:SASL_PLAINTEXT

# 用于 Broker 之间通信的监听器名称。如果未设置此属性,监听器名称将由 security.inter.broker.protocol 属性决定。同时设置此属性与 security.inter.broker.protocol 属性会导致错误。
# 补充:专门用于处理 Broker 服务器之间的请求。
inter.broker.listener.name=INTER_BROKER
# 控制器使用的监听器名称列表,以逗号分隔。与控制器集群通信时需要此列表,broker 始终会使用此列表中的第一个监听器。
# 如果 listener.security.protocol.map 中未显式设置映射,则默认使用 PLAINTEXT 协议。如果在 KRaft 模式下运行,则此项为必需项。
controller.listener.names=INTER_CONTROLLER

# 服务器用于接收网络请求和向网络发送响应的线程数。注意:每个监听器(controller监听器除外)都会创建自己的线程池。默认值3。
num.network.threads=3
# 服务器用于处理请求的线程数,其中可能包括磁盘I/O。默认值8。
num.io.threads=8
# 套接字服务器使用的发送缓冲区(SO_SNDBUF)。如果该值为 -1,则使用操作系统的默认值。默认值 100 KB。
socket.send.buffer.bytes=102400
# 套接字服务器使用的接收缓冲区(SO_RCVBUF)。如果该值为 -1,则使用操作系统的默认值。默认值 64 KB。
socket.receive.buffer.bytes=102400
# 套接字(socket)请求的最大字节数,即套接字服务器可接受的最大请求大小(防止内存溢出)。默认值 100 MB。
socket.request.max.bytes=104857600


############################# Log Basics #############################
# 以逗号分隔的日志数据存储目录列表。如果未设置,则使用 log.dir 中的值。默认 /tmp/kraft-combined-logs
# 补充:
# 1、当配置了多个路径(通常对应多个物理磁盘),新建分区会被创建在“最少分区数”的目录。但可能发生磁盘空间倾斜(分区数少,但磁盘空间使用率高),可以利用 kafka-reassign-partitions.sh 工具,在同一台 Broker 的不同磁盘路径之间在线迁移分区,实现盘与盘之间的动态均衡。
# 2、如何定位某个 Topic 分区?
# 启动阶段:全量扫描与内存映射构建。当 Kafka Broker 启动时,它的 LogManager(日志管理器)会启动一个多线程的组件去并行扫描 log.dirs 中配置的所有物理路径,扫描过程中会在内存中构建一个核心的哈希映射表(在源码中通常表现为 ConcurrentHashMap),其结构大致为:Key:TopicPartition(Topic名称-分区号) --> Value:Log 对象(包含该分区的绝对路径、索引等信息)。
# 运行阶段:O(1)复杂度的内存定位。当 Producer 发送请求或 Consumer 拉取请求到达 Broker 时:
# a、请求中会携带目标 Topic 和 Partition ID。
# b、Broker 直接去内存的哈希映射表中进行查找,由于是哈希表,定位速度是 O(1) 级别的。
# c、查到对应的 Log 对象后,直接通过该对象中缓存的绝对路径(如/data/logs/tpoic-0)去操作对应的 .log 数据文件和 .index 索引文件。
# 整个定位过程完全在内存中完成,不需要任何磁盘随机I/O,因此性能极高。
#log.dirs=/data/local/kafka/data/standalone1,/data/local/kafka/data/standalone2,/data/local/kafka/data/standalone3
log.dirs=/data/local/kafka/data/standalone
# 每个主题的默认日志分区数(默认值1)。此配置会影响以下路径:
# 1. 自动创建主题
# 2. 内部流主题的创建
# 3. 通过 AdminClient#createTopics 创建主题时设置分区数为 -1 。补充:或者 kafka-topics.sh --create --partitions -1。
# 对于 (1),仅当新建 Topic 归属的 Broker 节点显式设置时才使用 Broker 配置中的值。如果 broker 配置中未显式配置,则使用 Controller 配置中的值。
# 对于 (2) 和 (3),始终使用 Controller 配置中的值。
# 补充:
# 1、强烈建议全集群保持一致。否则会导致新建 Topic 的初始容量不同。
# 2、分区越多,消费时的并行度越高,但这也将导致各 broker 上的文件数量增加。
num.partitions=1
# 每个数据目录用于启动时日志恢复和关闭时日志刷新的线程数。建议根据安装资源增加此值。
num.recovery.threads.per.data.dir=2


############################# Internal Topic Settings #############################
# 组元数据内部主题“__consumer_offsets”、“__share_group_state”和“__transaction_state”的副本因子。
# 除了开发测试之外,建议使用大于 1 的值以确保可用性,例如 3。

# 偏移量(offsets)主题的副本因子(建议设置为较高值以确保可用性)。在集群规模满足此副本因子要求之前,内部主题的创建将失败。默认值3。
offsets.topic.replication.factor=1
# 共享消费组(share-group)状态主题的副本因子。在集群规模满足此副本因子要求之前,主题创建将失败。默认值3。
share.coordinator.state.topic.replication.factor=1
# 已覆盖 share-group (共享消费组)状态主题的 min.insync.replicas 参数(具体见官网...)。默认值2。
share.coordinator.state.topic.min.isr=1
# 事务主题的副本因子(建议设置为较高值以确保可用性)。在集群规模满足此副本因子要求之前,内部主题的创建将失败。默认值3。
transaction.state.log.replication.factor=1
# 要使对事务主题的写入操作被视为成功,必须至少有指定数量的副本对此进行确认。默认值2。
transaction.state.log.min.isr=1


############################# Log Flush Policy #############################
# 消息会立即写入文件系统,但默认情况下,我们仅使用 fsync() 延迟同步操作系统缓存。以下配置控制数据刷新到磁盘。
# 这里存在几个重要的权衡:
# 1. 持久性:若未启用副本功能,则未刷盘的数据可能会丢失。解读:启用多副本是,数据会同步到其他服务器的内存中。只要不同时断电,数据就是安全的。
# 2. 延迟:过长的刷新间隔可能会导致刷新时出现延迟峰值,因为需要刷新的数据量很大。解读:过长的刷盘间隔会导致当时正在读写请求的客户端遭遇严重的卡顿。
# 3. 吞吐量:刷新通常是最耗时的操作,过短的刷新间隔可能会导致过多的寻道操作。解读:过短的刷盘间隔会降低整个集群的吞吐量。
# 以下设置允许您配置刷新策略,以便在一段时间后或每 N 条消息后(或两者兼有)刷新数据。此设置可以全局设置,也可以针对某个特定的 Topic 单独设置。
# 补充:
# 写入流程:当生产者(Producer)发送消息到 Kafka 时,Kafka 并不是直接把数据写到硬盘的磁道上,而是先写到操作系统的内存缓存区(Page Cache,页缓存)中。
# 什么是 fsync():这是一个系统调用,作用是强制把内存缓存区中的数据真正写入到物理磁盘上。
# 默认行为(Lazily):Kafka 默认不主动调用 fsync(),而是把刷盘时机完全交给操作系统(通常每隔几秒自动刷盘)。这种设计让 Kafka 的写入速度接近操作系统的内存访问速度,极为高效。
# 在绝大多数生产环境中,强烈建议保持默认配置(即不调整这些参数,不强制刷盘)。Kafka 的高可用和数据高可靠应该通过设置合理的 acks=all、min.insync.replicas(最小同步副本数)以及多副本机制来保障,而不是依赖牺牲性能的磁盘同步。

# 在消息被刷新到磁盘之前,日志分区上累积的消息数量。默认 Long.MAX_VALUE。
#log.flush.interval.messages=10000
# 任何主题中的消息在被刷新到磁盘之前,在内存中保留的最长时间(以毫秒为单位,默认null)。如果未设置,则使用 log.flush.scheduler.interval.ms 中的值。
#log.flush.interval.ms=1000
# 日志刷新程序检查是否有日志需要刷新到磁盘的频率(以毫秒为单位,默认 Long.MAX_VALUE)。
#log.flush.scheduler.interval.ms=9223372036854775807


############################# Log Retention Policy #############################
# 以下配置用于控制日志段的处理。策略可设置为在经过一段时间或累积达到指定大小后删除日志段。
# 只要满足上述任一条件,日志段就会被删除。删除操作始终从日志末尾开始进行。

# 日志文件在被删除前保留的小时数(以小时为单位),优先级:log.retention.ms > log.retention.minutes > log.retention.hours。如果设置为 -1,则不应用时间限制。
log.retention.hours=168
# 基于大小的日志保留策略。除非剩余日志段的大小低于 `log.retention.bytes`,否则日志段将被删除。此策略独立于 `log.retention.hours` 运行。默认-1。
#log.retention.bytes=1073741824
# 日志段文件的最大大小。当达到此大小时,将创建一个新的日志段。默认1GB。
log.segment.bytes=1073741824
# 日志清理器(Log Cleaner)检查是否有符合删除条件的日志的频率(以毫秒为单位,默认5分钟)。
log.retention.check.interval.ms=300000


#↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑ 以上是官方示例包含的属性 ↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑


############################# 其他 #############################
# 设置为 true 时,AdminClient 可以删除主题。设置为 false 时,Broker 将明确拒绝删除请求。默认true。
delete.topic.enable=true
# 启用服务器上的主题自动创建功能。默认true。
auto.create.topics.enable=true
# 每个主题的默认副本因子(默认值1)。此配置会影响以下路径:
# 1. 自动创建主题
# 2. 内部流主题的创建
# 3. 通过 AdminClient#createTopics 创建主题时设置副本因子为 -1。补充:或者 kafka-topics.sh --create --replication-factor -1。
# 对于 (1),仅当新建 Topic 归属的 Broker 节点显式设置时才使用 Broker 配置中的值。如果 broker 配置中未显式配置,则使用 Controller 配置中的值。
# 对于 (2) 和 (3),始终使用 Controller 配置中的值。
# 补充:
# 1、即每个Topic的每个分区的副本数
# 2、强烈建议全集群保持一致。否则会导致集群行为难以预测、数据分配不均甚至引发故障。
# 3、生产环境推荐设置:acks=all、default.replication.factor>=2、min.insync.replicas>=2
default.replication.factor=1
# 启用自动leader平衡功能。一个后台线程会定期检查分区领导者的分布情况,检查间隔可通过 leader.imbalance.check.interval.seconds 进行配置。如果领导者分布不平衡,系统将触发领导者重新平衡,将其调整为各分区首选的领导者。
auto.leader.rebalance.enable=true
#
leader.imbalance.check.interval.seconds=300
# 对于已订阅的消费者,当满足以下任一条件时,特定分区的已提交偏移量将过期并被丢弃:1) 消费者组失去所有消费者(即变为空组)后,该保留期已过;2) 自该分区上次提交偏移量起,该保留期已过,且该组不再订阅相应的主题。对于独立消费者(使用手动分配),自上次提交以来,若该保留期已过,偏移量将过期。请注意,当通过 delete-group 请求删除一个组时,其已提交的偏移量也将被删除,且不会有额外的保留期;同样,当通过 delete-topic 请求删除一个主题时,在元数据更新传播后,任何组针对该主题的已提交偏移量也将被删除,且不会有额外的保留期。
offsets.retention.minutes=10080
# 偏移量提交主题的分区数(部署后不应更改)。
# 补充:核心作用
# 1、存储消费进度:消费者在消费消息时,需要定期提交自己的消费进度(Offset)。这些进度数据作为普通消息写入 Kafka 内部一个特殊的 Topic,即 __consumer_offsets。
# 2、决定并发与扩展性:该分区数量决定了 Kafka 集群处理偏移量提交请求的并发能力。更多的分区可以将偏移量提交的负载更均匀地分散到不同的 Broker 节点上。
offsets.topic.num.partitions=50


############################# SSL #############################
# none 表示不需要客户端身份验证(不需要客户端提供证书)。
# required 表示需要客户端身份验证(需要客户端提供证书)。
# requested 表示客户端可以选择是否进行身份验证(客户端可以提供证书,也可以不提供)。
# 补充:使用场景
# 1:单向 SSL认证 (ssl.client.auth=none)
# 连接条件:客户端只需要配置 security.protocol=SSL,并持有正确的 TrustStore(信任库)来验证 Broker 的身份即可。如果你的 Broker 证书是由知名公共 CA(如 Let's Encrypt 或 DigiCert)签发的,客户端甚至可能不需要手动配置 TrustStore,系统默认信任链就能生效。
# 谁能连接:任何知道 Broker 地址并拥有合法 TrustStore 的客户端都可以连接。这里的 SSL 仅仅起到了加密网络传输数据的作用,并没有做身份拦截。
# 2:双向 SSL认证 / mTLS (ssl.client.auth=required)
# 连接条件:客户端不仅需要验证 Broker 的身份,还必须提交自己的私钥和证书 (KeyStore) 给 Broker 验证。客户端的证书必须是由 Broker 的 TrustStore 中配置的 CA 所签发的。
# 谁能连接:只有拥有合法且经过签发的客户端证书的应用才能连接。如果客户端没有提供证书,或者证书过期、不匹配,Broker 会在底层的 TLS 握手阶段直接切断连接(客户端会收到类似 SSLHandshakeException 的错误)。
#ssl.client.auth=none
# 密钥库文件的文件格式,默认为 Java 专有的 JKS 格式(现已弃用)。默认的 ssl.engine.factory.class 目前支持的值包括 [JKS, PKCS12, PEM]。
#ssl.keystore.type=PEM
# 密钥存储文件的位置。此项对客户端而言是可选的,可用于客户端的双向身份验证。补充:Broker 节点的私钥+证书。
#ssl.keystore.location=/data/local/key/ecc/test_keystore.pem
# 信任库文件的文件格式,默认为 Java 专有的 JKS 格式(现已弃用)。默认的 ssl.engine.factory.class 目前支持的值包括 [JKS, PKCS12, PEM]。
#ssl.truststore.type=PEM
# 信任库文件的位置(即受信任的证书位置)。补充:信任的 CA(颁发机构)的根证书,多个证书直接拼接即可。
#ssl.truststore.location=/data/local/key/ecc/test_cert.pem


############################# SASL(简单认证与安全层)认证机制 #############################
# Kafka 服务器中启用的 SASL 机制列表。该列表可以包含任何有安全提供程序支持的机制。默认情况下仅启用 GSSAPI。补充:这是全局配置。
#sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,GSSAPI,OAUTHBEARER
listener.name.inter_controller.sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-512
listener.name.inter_broker.sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-512
# 用于与 Controller 通信的 SASL 机制。默认值为GSSAPI。
# 补充: 集群模式下,绝对不能将 Controller 间通信的认证机制配置为依赖动态元数据的 SCRAM-SHA-512 或 SCRAM-SHA-256。
# 第一次冷启动时的顺序:
# [ 步骤 1: 节点启动 ]
# [ 步骤 2: Quorum 投票阶段 ] ──► 各个 Controller 必须通过网络互相发送 VoteRequest (投票) 选出 Leader
# │ ⚠️ 此时要求进行 SASL_PLAINTEXT 认证!
# ▼
# [ 步骤 3: 元数据状态机激活 ] ──► 选出 Leader 后,Controller 才能正式加载并读取底层数据目录中的元数据
# │ (只有到了这一步,Controller 才知道 --add-scram 加进来的用户是谁)
# ▼
# [ 步骤 4: 正常对外提供服务 ]
# 致命的死锁点:在步骤 2时,Controller 1 试图通过 SCRAM-SHA-512 连接 Controller 2。由于此时还没有选出 Leader,Controller 2 还没有权限和能力去读取本地的元数据日志。因此,尽管 --add-scram 确实把 admin 用户写进了磁盘,但 Controller 2 在这一刻根本不知道有这个用户存在,所以认证失败。
sasl.mechanism.controller.protocol=PLAIN
# 用于 Broker 间通信的 SASL 机制。默认值为GSSAPI。
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
# 对外部客户端 和 Broker 来说:--add-scram 添加的用户完全有效。因为当外部 Client 和 Broker 连接上来时,集群早就跨过了步骤 2 和 3,Controller 已经正常工作,能够完美识别和校验 SCRAM 用户。
# 对 Controller 内部通信来说:绝对不能将认证机制配置为依赖动态元数据的 SCRAM-SHA-512 或 SCRAM-SHA-256。


############################# JAAS(Java认证与授权服务) #############################
# 适用于 SASL 连接的 JAAS 登录上下文参数,格式与 JAAS 配置文件相同。
# 对于 Broker,配置必须以监听器前缀和全小写的 SASL 机制名称作为前缀:
# 属性名格式:listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config
# 属性值格式:loginModuleClass controlFlag (optionName=optionValue)*;
# 如果监听器上配置了多种 SASL 认证机制,则必须为每种机制配置登录模块。
# 下面定义了两个用户 admin 和 alice :
# username 和 password 属性用于 Broker 发起与其他 Broker 的连接。在本例中,admin 是用于 Broker 间通信的用户。
# user__userName_ 属性集定义了连接到该 Broker 的所有用户的密码,Broker 会使用这些属性验证所有客户端连接,包括来自其他 Broker 的连接。
listener.name.inter_controller.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="pCVXLxxVkqX9" \
user_admin="pCVXLxxVkqX9" \
user_alice="pCVXLxxVkqX9";
listener.name.inter_controller.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="pCVXLxxVkqX9";
listener.name.inter_broker.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="pCVXLxxVkqX9" \
user_admin="pCVXLxxVkqX9" \
user_alice="pCVXLxxVkqX9";
listener.name.inter_broker.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="pCVXLxxVkqX9";

初始化数据目录

1
2
3
4
5
6
7
8
9
# 生成集群 UUID,所有机器必须使用同一个 cluster-id(全局唯一)
KAFKA_CLUSTER_ID="$(kafka-storage.sh random-uuid)" && KAFKA_CLUSTER_ID="aM5LdSHYQDuLvBKaeTrcQw"

# 初始化数据目录
kafka-storage.sh format \
-t $KAFKA_CLUSTER_ID \
-c /data/local/kafka/config/server-standalone.properties \
--add-scram 'SCRAM-SHA-512=[name=admin,password=pCVXLxxVkqX9]' \
--standalone

启动

1
2
3
4
5
6
LOG_DIR="/data/local/kafka/logs/standalone" kafka-server-start.sh /data/local/kafka/config/server-standalone.properties

# --override <String>:可选属性,用于覆盖 server.properties 文件中设置的值(例如,Key=value)。
kafka-server-start.sh /data/local/kafka/config/server-standalone.properties \
--override num.network.threads=3 \
--override num.io.threads=8

自启

vim /etc/systemd/system/kafka.service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
[Unit]
Description=Apache Kafka Server (KRaft)
Documentation=https://kafka.apache.org/
After=syslog.target network.target network-online.target
Wants=network-online.target

[Service]
# 前台运行,需配置 Type=simple
# 后台运行,需配置 Type=forking,同时指定 PIDFile
Type=simple
User=root
Group=root

# 先查看 echo $PATH
Environment="JAVA_HOME=/data/local/java/jdk-17.0.16"
Environment="PATH=/data/local/java/jdk-17.0.16/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
Environment="LOG_DIR=/data/local/kafka/logs/standalone"
ExecStart=/data/local/kafka/bin/kafka-server-start.sh /data/local/kafka/config/server-standalone.properties
ExecStop=/data/local/kafka/bin/kafka-server-stop.sh

Restart=on-failure
RestartSec=10s
# 文件描述符的数量
LimitNOFILE=65535

[Install]
WantedBy=multi-user.target

启动

1
2
3
4
5
6
7
# 刷新systemd守护进程,以重新加载systemd服务配置文件
systemctl daemon-reload
systemctl enable kafka.service
systemctl start kafka.service
systemctl status kafka.service
systemctl stop kafka.service
journalctl -u kafka.service -f

停止

1
2
3
4
5
6
7
# 找到你想杀掉的那个 Kafka 进程的 PID
ps -ef | grep kafka
# 使用 kill -15 优雅终止它(严禁直接使用 kill -9)
kill -15 <PID>

# 该脚本默认会停止当前机器上运行的所有 Kafka 实例(无论是纯 Broker、纯 Controller 还是混合节点)。
kafka-server-stop.sh

AdminClient 认证配置

https://kafka.apache.org/43/configuration/admin-configs/

客户端(如 Java 程序、命令行工具)连接时,需要提供对应的账号密码。

编辑 PLAIN 认证机制的客户端配置文件:vim /data/local/kafka/config/admin-plain.properties

1
2
3
4
5
6
# 用于与 Broker 通信的协议。
security.protocol=SASL_PLAINTEXT
# 用于客户端连接的 SASL 机制。这可以是任何一个由安全提供程序支持的机制。GSSAPI 是默认机制。
sasl.mechanism=PLAIN
# 适用于 SASL 连接的 JAAS 登录上下文参数。补充:required 可以改为 sufficient。
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="pCVXLxxVkqX9";

编辑 SCRAM 认证机制的客户端配置文件:vim /data/local/kafka/config/admin-scram.properties

1
2
3
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="pCVXLxxVkqX9";

查看集群

1
2
3
4
5
kafka-metadata-quorum.sh --bootstrap-controller localhost:9093 --command-config /data/local/kafka/config/admin-plain.properties describe --status
kafka-metadata-quorum.sh --bootstrap-controller localhost:9093 --command-config /data/local/kafka/config/admin-scram.properties describe --status

kafka-metadata-quorum.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-plain.properties describe --replication
kafka-metadata-quorum.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties describe --replication

集群-KRaft 混合模式

配置

节点 1 :vim /data/local/kafka/config/server-node-1.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
############################# Server Basics #############################
process.roles=broker,controller
node.id=1
controller.quorum.bootstrap.servers=localhost:9193,localhost:9293,localhost:9393

############################# Socket Server Settings #############################
listeners=INTER_BROKER://:9192,INTER_CONTROLLER://:9193
advertised.listeners=INTER_BROKER://localhost:9192
listener.security.protocol.map=INTER_CONTROLLER:SASL_PLAINTEXT,INTER_BROKER:SASL_PLAINTEXT
inter.broker.listener.name=INTER_BROKER
controller.listener.names=INTER_CONTROLLER

############################# Log Basics #############################
log.dirs=/data/local/kafka/data/node-1

############################# Internal Topic Settings #############################
offsets.topic.replication.factor=3
share.coordinator.state.topic.replication.factor=3
share.coordinator.state.topic.min.isr=2
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2

############################# SASL(简单认证与安全层)认证机制 #############################
listener.name.inter_controller.sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-512
listener.name.inter_broker.sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-512
sasl.mechanism.controller.protocol=PLAIN
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512

############################# JAAS(Java认证与授权服务) #############################
listener.name.inter_controller.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="pCVXLxxVkqX9" \
user_admin="pCVXLxxVkqX9" \
user_alice="pCVXLxxVkqX9";
listener.name.inter_controller.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="pCVXLxxVkqX9";
listener.name.inter_broker.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="pCVXLxxVkqX9" \
user_admin="pCVXLxxVkqX9" \
user_alice="pCVXLxxVkqX9";
listener.name.inter_broker.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="pCVXLxxVkqX9";

节点 2 :vim /data/local/kafka/config/server-node-2.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
############################# Server Basics #############################
process.roles=broker,controller
node.id=2
controller.quorum.bootstrap.servers=localhost:9193,localhost:9293,localhost:9393

############################# Socket Server Settings #############################
listeners=INTER_BROKER://:9292,INTER_CONTROLLER://:9293
advertised.listeners=INTER_BROKER://localhost:9292
listener.security.protocol.map=INTER_CONTROLLER:SASL_PLAINTEXT,INTER_BROKER:SASL_PLAINTEXT
inter.broker.listener.name=INTER_BROKER
controller.listener.names=INTER_CONTROLLER

############################# Log Basics #############################
log.dirs=/data/local/kafka/data/node-2

############################# Internal Topic Settings #############################
offsets.topic.replication.factor=3
share.coordinator.state.topic.replication.factor=3
share.coordinator.state.topic.min.isr=2
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2

############################# SASL(简单认证与安全层)认证机制 #############################
listener.name.inter_controller.sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-512
listener.name.inter_broker.sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-512
sasl.mechanism.controller.protocol=PLAIN
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512

############################# JAAS(Java认证与授权服务) #############################
listener.name.inter_controller.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="pCVXLxxVkqX9" \
user_admin="pCVXLxxVkqX9" \
user_alice="pCVXLxxVkqX9";
listener.name.inter_controller.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="pCVXLxxVkqX9";
listener.name.inter_broker.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="pCVXLxxVkqX9" \
user_admin="pCVXLxxVkqX9" \
user_alice="pCVXLxxVkqX9";
listener.name.inter_broker.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="pCVXLxxVkqX9";

节点 3 :vim /data/local/kafka/config/server-node-3.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
############################# Server Basics #############################
process.roles=broker,controller
node.id=3
controller.quorum.bootstrap.servers=localhost:9193,localhost:9293,localhost:9393

############################# Socket Server Settings #############################
listeners=INTER_BROKER://:9392,INTER_CONTROLLER://:9393
advertised.listeners=INTER_BROKER://localhost:9392
listener.security.protocol.map=INTER_CONTROLLER:SASL_PLAINTEXT,INTER_BROKER:SASL_PLAINTEXT
inter.broker.listener.name=INTER_BROKER
controller.listener.names=INTER_CONTROLLER

############################# Log Basics #############################
log.dirs=/data/local/kafka/data/node-3

############################# Internal Topic Settings #############################
offsets.topic.replication.factor=3
share.coordinator.state.topic.replication.factor=3
share.coordinator.state.topic.min.isr=2
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2

############################# SASL(简单认证与安全层)认证机制 #############################
listener.name.inter_controller.sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-512
listener.name.inter_broker.sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-512
sasl.mechanism.controller.protocol=PLAIN
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512

############################# JAAS(Java认证与授权服务) #############################
listener.name.inter_controller.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="pCVXLxxVkqX9" \
user_admin="pCVXLxxVkqX9" \
user_alice="pCVXLxxVkqX9";
listener.name.inter_controller.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="pCVXLxxVkqX9";
listener.name.inter_broker.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="pCVXLxxVkqX9" \
user_admin="pCVXLxxVkqX9" \
user_alice="pCVXLxxVkqX9";
listener.name.inter_broker.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="pCVXLxxVkqX9";

初始化数据目录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 生成集群 UUID,所有机器必须使用同一个 cluster-id(全局唯一)
KAFKA_CLUSTER_ID="$(kafka-storage.sh random-uuid)" && KAFKA_CLUSTER_ID="aM5LdSHYQDuLvBKaeTrcQw"
# 生成三个节点的 Directory ID (各自唯一)
DIR_ID_1="$(kafka-storage.sh random-uuid)" && DIR_ID_1="DnCdSu9uTJejryl97eLw6w"
DIR_ID_2="$(kafka-storage.sh random-uuid)" && DIR_ID_2="hUcTgc7jQzynpOpPW95rOA"
DIR_ID_3="$(kafka-storage.sh random-uuid)" && DIR_ID_3="tUacrDuTTCefMRBAoooqqA"

kafka-storage.sh format \
-t $KAFKA_CLUSTER_ID \
-c /data/local/kafka/config/server-node-1.properties \
--add-scram 'SCRAM-SHA-512=[name=admin,password=pCVXLxxVkqX9]' \
--initial-controllers "1@localhost:9193:${DIR_ID_1},2@localhost:9293:${DIR_ID_2},3@localhost:9393:${DIR_ID_3}"

kafka-storage.sh format \
-t $KAFKA_CLUSTER_ID \
-c /data/local/kafka/config/server-node-2.properties \
--add-scram 'SCRAM-SHA-512=[name=admin,password=pCVXLxxVkqX9]' \
--initial-controllers "1@localhost:9193:${DIR_ID_1},2@localhost:9293:${DIR_ID_2},3@localhost:9393:${DIR_ID_3}"

kafka-storage.sh format \
-t $KAFKA_CLUSTER_ID \
-c /data/local/kafka/config/server-node-3.properties \
--add-scram 'SCRAM-SHA-512=[name=admin,password=pCVXLxxVkqX9]' \
--initial-controllers "1@localhost:9193:${DIR_ID_1},2@localhost:9293:${DIR_ID_2},3@localhost:9393:${DIR_ID_3}"

启动

1
2
3
4
5
LOG_DIR="/data/local/kafka/logs/node-1" kafka-server-start.sh /data/local/kafka/config/server-node-1.properties
LOG_DIR="/data/local/kafka/logs/node-2" kafka-server-start.sh /data/local/kafka/config/server-node-2.properties
LOG_DIR="/data/local/kafka/logs/node-3" kafka-server-start.sh /data/local/kafka/config/server-node-3.properties

# 关闭最后一个节点,只能 skill -9

集群-KRaft 专用模式

配置

https://github.com/apache/kafka/blob/trunk/config/controller.properties

https://github.com/apache/kafka/blob/trunk/config/broker.properties

Controller 节点:vim /data/local/kafka/config/server-controller.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
############################# Server Basics #############################
process.roles=controller
node.id=1
controller.quorum.bootstrap.servers=localhost:9093

############################# Socket Server Settings #############################
listeners=INTER_CONTROLLER://:9093
advertised.listeners=INTER_CONTROLLER://localhost:9093
listener.security.protocol.map=INTER_CONTROLLER:SASL_PLAINTEXT
controller.listener.names=INTER_CONTROLLER

############################# Log Basics #############################
log.dirs=/data/local/kafka/data/controller

############################# SASL(简单认证与安全层)认证机制 #############################
listener.name.inter_controller.sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-512
listener.name.inter_broker.sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-512
sasl.mechanism.controller.protocol=PLAIN
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512

############################# JAAS(Java认证与授权服务) #############################
listener.name.inter_controller.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="pCVXLxxVkqX9" \
user_admin="pCVXLxxVkqX9" \
user_alice="pCVXLxxVkqX9";
listener.name.inter_controller.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="pCVXLxxVkqX9";
listener.name.inter_broker.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="pCVXLxxVkqX9" \
user_admin="pCVXLxxVkqX9" \
user_alice="pCVXLxxVkqX9";
listener.name.inter_broker.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="pCVXLxxVkqX9";

Broker 节点:vim /data/local/kafka/config/server-broker.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
############################# Server Basics #############################
process.roles=broker
node.id=2
controller.quorum.bootstrap.servers=localhost:9093

############################# Socket Server Settings #############################
listeners=INTER_BROKER://localhost:9092
advertised.listeners=INTER_BROKER://localhost:9092
listener.security.protocol.map=INTER_CONTROLLER:SASL_PLAINTEXT,INTER_BROKER:SASL_PLAINTEXT
inter.broker.listener.name=INTER_BROKER
controller.listener.names=INTER_CONTROLLER

############################# Log Basics #############################
log.dirs=/data/local/kafka/data/broker

############################# SASL(简单认证与安全层)认证机制 #############################
listener.name.inter_controller.sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-512
listener.name.inter_broker.sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-512
sasl.mechanism.controller.protocol=PLAIN
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512

############################# JAAS(Java认证与授权服务) #############################
listener.name.inter_controller.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="pCVXLxxVkqX9" \
user_admin="pCVXLxxVkqX9" \
user_alice="pCVXLxxVkqX9";
listener.name.inter_controller.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="pCVXLxxVkqX9";
listener.name.inter_broker.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="pCVXLxxVkqX9" \
user_admin="pCVXLxxVkqX9" \
user_alice="pCVXLxxVkqX9";
listener.name.inter_broker.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="pCVXLxxVkqX9";

初始化数据目录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 生成集群 UUID,所有机器必须使用同一个 cluster-id(全局唯一)
KAFKA_CLUSTER_ID="$(kafka-storage.sh random-uuid)" && KAFKA_CLUSTER_ID="aM5LdSHYQDuLvBKaeTrcQw"

# controller 初始化。为了方便演示,只配置一个 Controller 节点,所以使用 --standalone
kafka-storage.sh format \
-t $KAFKA_CLUSTER_ID \
-c /data/local/kafka/config/server-controller.properties \
--add-scram 'SCRAM-SHA-512=[name=admin,password=pCVXLxxVkqX9]' \
--standalone

# broker 初始化。
kafka-storage.sh format \
-t $KAFKA_CLUSTER_ID \
-c /data/local/kafka/config/server-broker.properties \
--add-scram 'SCRAM-SHA-512=[name=admin,password=pCVXLxxVkqX9]'

启动

1
2
3
4
5
LOG_DIR="/data/local/kafka/logs/controller" kafka-server-start.sh /data/local/kafka/config/server-controller.properties

LOG_DIR="/data/local/kafka/logs/broker" kafka-server-start.sh /data/local/kafka/config/server-broker.properties

# 先关闭 broker,再关闭 controller。或者 暴力停止 `skill -9`

配置指南(官网)

https://kafka.apache.org/43/configuration/

包含了 Broker(节点)、Producer(生产者)、Consumer(消费者)、Topic、Kafka Connect 以及 Kafka Streams 的所有参数配置项与默认值。

配置文件模版:https://github.com/apache/kafka/tree/trunk/config

管理员配置文件

https://kafka.apache.org/43/configuration/admin-configs/

官方未给出配置示例,admin.properties 用于控制客户端连接集群的行为(如超时时间、重试次数、加密认证)。

服务器与 KRaft 集群角色配置文件 (Server & KRaft Roles)

在 KRaft 架构下,一个 Kafka 节点可以承担不同的“角色”(Role)。这几个文件就是针对不同角色的启动配置模板。

  • server.properties 最标准的 Kafka 节点配置文件。通常用于配置同时承担 Broker 和 Controller 双重角色的节点,或者根据其中的 process.roles 参数灵活调整节点职责。
  • broker.properties 纯 Broker 模式配置。使用此配置启动的节点只负责处理客户端的数据读写请求和存储消息,不参与集群元数据的管理选举。它需要连接到独立的 Controller 节点。
  • controller.properties 纯 Controller 模式配置。使用此配置启动的节点专门作为 KRaft 的仲裁节点(Quorum),负责管理集群状态、Topic 元数据等。它不存储业务消息,也不接受客户端的数据读写请求。
  • server-standalone.properties 单机快捷开发配置。专门为本地开发、测试或快速体验设计的轻量级配置文件。它通常预设了单机运行所需的所有参数,无需复杂配置即可一键启动一个功能完整的本地 Kafka 节点。

客户端配置文件 (Clients)

仅供命令行工具调用的客户端配置,kafka-console-producer.shkafka-console-consumer.sh

  • producer.properties 生产者的基础配置模板。包含连接集群的地址 (bootstrap.servers)、序列化器设置、压缩类型、ACK 机制等基础参数。
  • consumer.properties 消费者的基础配置模板。包含连接集群的地址、反序列化器设置、消费者组 ID (group.id)、自动提交 Offset 的策略等基础参数。

Kafka Connect 数据集成配置文件 (Connectors)

Kafka Connect 用于在 Kafka 和外部数据系统之间流转数据。这些是启动 Connect 工作节点和基础插件的配置文件。

1. 运行模式配置

  • connect-standalone.properties 单机模式运行配置。通常用于本地测试或极轻量级的任务。所有的状态和配置都保存在本地文件中,不具备高可用性。
  • connect-distributed.properties 分布式模式运行配置。生产环境的标准用法,支持高可用、负载均衡和动态伸缩。所有的连接器状态、配置和 Offset 都存储在 Kafka 的内部 Topic 中。
  • connect-mirror-maker.properties MirrorMaker 2 配置文件。MirrorMaker 基于 Kafka Connect 框架构建,这个文件专门用于配置跨数据中心、跨集群的 Kafka 数据复制与灾备。

2. 连接器插件配置

  • connect-file-source.properties 文件源示例配置,用于读取本地某个文本文件中的每一行内容,并将其发送到 Kafka 指定的 Topic 中。
  • connect-file-sink.properties 文件接收器示例配置,用于从 Kafka 指定的 Topic 中消费消息,并按行写入本地文本文件。
  • connect-console-source.properties 控制台源配置,用于从标准输入中读取内容(例如终端键盘输入),并将其作为消息发送到 Kafka。
  • connect-console-sink.properties 控制台接收器配置,用于从 Kafka 中消费消息,并直接输出到标准输出(终端屏幕)。

日志系统配置文件 (Logging - Log4j2)

  • log4j2.yaml Kafka 服务端的主日志配置。控制 Broker/Controller 运行时的日志输出级别(INFO, DEBUG, ERROR)、日志文件的滚动策略(Rolling)以及输出路径(通常是 server.log 等)。
  • connect-log4j2.yaml 专门为 Kafka Connect 进程准备的日志配置,方便将数据集成任务的日志与 Kafka 服务端日志区分开来。
  • tools-log4j2.yaml 命令行工具(如 kafka-topics、kafka-consumer-groups 等)的日志配置。当你在终端运行本时,这个文件确保终端只会输出最关键的信息,而不会被海量的底层 DEBUG 日志淹没。

[!IMPORTANT]

log4j2.yaml 文件中日志输出目录为 ${sys:kafka.logs.dir} ,带有 sys: 前缀的变量会直接跳过内部变量配置(Property 节点),仅读取 Java 系统属性中定义的值。

查看 Kafka 启动脚本:kafka-server-start.sh -> kafka-run-class.sh ,发现几个环境变量的使用:

  • KAFKA_LOG4J_CMD_OPTS="-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS"
  • $KAFKA_OPTS
  • exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_CMD_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@"

所以,要想改变日志位置,可以在启动之前设置环境变量 export LOG_DIR="/data/local/kafka/logs" 。 JVM GC 日志的路径也依赖 LOG_DIR

测试与故障注入框架配置文件 (Testing)

trogdor.conf 这是 Trogdor 的配置文件。Trogdor 是 Kafka 自带的一个分布式系统测试框架,主要用于执行负载测试和故障注入(Fault Injection,例如模拟网络延迟、节点宕机、磁盘满等)。这个文件用于配置 Trogdor 代理(Agent)和协调器(Coordinator)的运行参数,普通用户在日常运维中极少用到。

架构设计(官网)

https://kafka.apache.org/43/design/

阐述了其高吞吐、低延迟的核心逻辑,包括:为什么要用文件系统页面缓存(Pagecache)、为什么坚持顺序读写、零拷贝(Zero-copy)机制、以及推拉模式(Push vs Pull)的权衡。

消费者

推送与拉取

共享消费者

底层实现(官网)

https://kafka.apache.org/43/implementation/

主要讲解底层的网络协议、文件存储格式(如日志段文件、索引结构)、复制协议的底层细节等。

运维操作(官网)

https://kafka.apache.org/43/operations/

围绕 Kafka 集群的日常维护展开,包括:集群参数配置(Config)、监控(Monitoring)、扩展节点、工具使用、常见故障处理以及 OS 层面的调优。

扩展集群

安全配置(官网)

https://kafka.apache.org/43/security/

主要涵盖 Kafka 的安全合规机制,包括客户端与 Broker 之间的认证(SASL/SSL)、加密(TLS 传输加密)以及权限控制(ACLs 访问控制列表)。

使用 SSL 进行加密和身份验证

https://kafka.apache.org/43/security/encryption-and-authentication-using-ssl/

使用 SASL 进行身份验证

https://kafka.apache.org/43/security/authentication-using-sasl/

  • PLAINTEXT无身份认证,明文传输。默认配置。数据裸奔,谁都能连接,适合开发测试。
  • SSL无身份认证,加密传输。通过 TLS/SSL 证书加密通道,防止数据被窃听。
  • SASL_PLAINTEXT身份认证 + 明文传输。连接时必须对账密,但后续传输的数据是明文。
  • SASL_SSL身份认证 + 加密传输。最安全的安全级别,既要对账密,传输又加密。

通信安全协议(传输层)

传输层决定了客户端与 Broker 之间的数据交互是否被加密。

  • PLAINTEXT:所有流经的数据(包括业务消息和部分认证信息)都是明文,内网抓包一览无余。
  • SSL:采用 TLS/SSL 协议对全链路数据进行加密,哪怕被中间人拦截,也无法解析出真实内容。

身份认证机制(认证层)

SASL - 维基百科简单认证与安全层 (SASL, Simple Authentication and Security Layer ) 是一个在网络协议中用来认证数据加密构架。它把认证机制从程序中分离开,理论上使用SASL的程序协议都可以使用SASL所支持的全部认证机制。

SASL(简单认证与安全层,Simple Authentication and Security Layer) 不是具体的加密算法,而是一个外挂式的认证框架。它把认证机制从底层网络通信中抽离出来,Kafka 相当于提供了一个“标准接口”,具体的验证逻辑交由不同的插件机制实现。

  • PLAIN(明文密码):一种基于简单文本比对的身份验证机制。客户端通过网络向 Kafka Broker 发送明文的用户名和密码Kafka Broker 接收到请求后,直接将收到的字符串与本地配置(如 JAAS 文件)或 KRaft 中存储的账密进行一致性比对。由于凭证在传输过程中未加密,必须配合 SSL/TLS 加密通道使用,否则存在严重的明文抓包风险。账密写死配置文件。
  • SCRAM(加盐挑战应答):一种基于哈希算法的安全性双向凭证校验机制。客户端与 Kafka Broker 在认证过程中,均不在网络上直接传输真实的密码。通过挑战-应答 (Challenge-Response) 流程,Broker 发送随机数(Salt 和 Nonce),客户端利用该随机数与自身密码进行加盐哈希运算 (如 SHA-256 或 SHA-512) 并返回结果。Kafka Broker 运行相同的哈希算法,通过比对计算结果来确认客户端的身份。该机制既防范了重放攻击,也保护了存储在后端的凭证安全。支持使用 kafka-configs.sh 动态增删用户。
  • GSSAPI(Kerberos):一种基于密钥分发中心 (KDC) 的强身份验证机制。密钥分发中心 (KDC) 负责对客户端进行身份验证,并向其签发加密的票据 (Ticket)Broker 仅通过验证客户端持有的票据有效性来确认其身份,不直接处理客户端的登录凭证。该票据具有固定的有效期,一旦过期则自动失效,客户端必须重新向 KDC 申请。
  • OAUTHBEARER(OAuth2 Token):一种基于令牌 (Token) 的无状态分布式认证机制。客户端的身份由外部独立的身份提供商 (IDP) 进行集中核实与管理。身份提供商 (IDP) 在验证客户端身份后,向其颁发具有时效性的 OAuth2 访问令牌 (Access Token)Kafka Broker 作为资源服务器,不参与身份核实,仅负责利用配置的公钥或通过自省接口对令牌进行数字签名验签。只要令牌签名合法且在有效期内,Kafka Broker 即允许该客户端建立连接并进行后续的数据交互。

认证机制实现类(登录模块)

维度 PlainLoginModule (明文) ScramLoginModule (安全哈希)
实现协议 SASL_PLAINTEXT / SASL_SSL + PLAIN SASL_PLAINTEXT / SASL_SSL + SCRAM (常用 SCRAM-SHA-256 / 512
认证机制 明文比对:客户端直接发送账号密码(Base64 编码,等同于明文),Broker 接收后直接比对。 挑战-应答 (Challenge-Response):通过加盐随机数和哈希计算比对,真实密码绝不在网络上传输
用户存储 静态存储:用户必须硬编码在 server.properties 的 JAAS 配置文件中。 动态存储:用户数据保存在 Kafka 集群内部(KRaft 元数据日志或 ZooKeeper 中)。
账号管理 死板:增删改用户必须修改配置文件并重启所有 Broker。 灵活:支持使用 kafka-configs.sh 脚本动态增删改查用户,即时生效,无需重启。
安全级别 极低:若未开启 SSL 加密,内网抓包可瞬间截获管理员明文密码。 :即使在非加密的 SASL_PLAINTEXT 链路上,黑客抓包也无法还原密码。
配置复杂度 冗长:必须在 Broker 的 JAAS 配置中列出所有允许登录的账号明细 user_xxx="pwd" 清爽:Broker 的 JAAS 仅需配置自身用于集群内通信的账号,外部客户端账号直接走元数据查询。
适用场景 仅限本地开发测试环境,或必须搭配严格 SSL 传输加密的纯内网环境。 生产环境首选。安全、灵活,完美契合 KRaft 模式的动态元数据管理。

组合方案

将传输层与认证层结合,我们可以推导出适用于不同业务场景的经典安全组合方案:

组合方案(传输 + 认证) 适用场景 核心优点 核心缺点
PLAINTEXT (无 SASL) 内网、本地开发测试环境。 零配置,性能达到物理极限。 毫无安全性,数据与权限完全裸奔。
SASL_PLAINTEXT + PLAIN 互信的内网隔离环境,初衷仅为防止不同业务线“误操作”。 配置极简,学习成本低。 账密在网络中是明文传输,防君子不防小人。
SASL_PLAINTEXT + SCRAM 预算/性能有限的超大规模内网,但需要严格的多租户账号权限隔离。 安全性优于 PLAIN,压榨性能的同时实现了动态权限控制。 实际业务消息(Payload)在网络中仍是明文。
SASL_SSL + SCRAM 中小型企业生产环境 / 跨公网消费首选 账密不外泄,业务数据全链路加密,安全性极高。 引入 SSL 证书管理成本;Broker 会有约 15%~30% 的非对称加密 CPU 损耗。
SASL_SSL + GSSAPI 银行、大厂等已有完善 Kerberos 体系的 Hadoop 大数据生态。 满足极度严苛的合规审查,实现企业级跨组件统一认证。 运维犹如走钢丝(Ticket 过期、Keytab 分发、KDC 故障等)。
SASL_SSL + OAUTHBEARER SaaS 云服务、全面推行零信任的 K8s 云原生微服务架构。 契合现代化单点登录(SSO),完美融入企业级 IAM 体系。 依赖外部 IDP 的稳定性,系统链路变长。

授权和访问控制列表(ACLs)

https://kafka.apache.org/43/security/authorization-and-acls/

Kafka 的 ACL 规则核心就是一句话:“允许/拒绝 某个用户 从 某个IP 对 某个资源 执行 某种操作”

1
2
3
4
5
6
7
8
9
10
# 对于 KRaft 集群,请在所有节点上使用标准授权器以开启权限控制。默认不开启权限控制。
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer

############ 请将以下配置添加到 server.properties 文件。它们是 Authorizer(授权器) 的专属配置,而不是常规的 Broker 配置。
# 当某个 Topic 没有配置任何一条 ACL 规则时的默认行为(安全兜底策略):
# false(推荐,生产安全):除了超级管理员,拒绝所有人的访问。
# true (适合测试):允许所有人访问。一旦有人为这个 Topic 建了第一条 ACL,就会立刻转为严格模式(只允许该 ACL 允许的人访问)。
allow.everyone.if.no.acl.found=false
# 设置超级管理员。被加入此白名单的用户拥有对所有 Topic 和集群资源的最高控制权,完全绕过任何 ACL 规则限制。名字区分大小写,多个用户用分号隔开。
super.users=User:admin;User:alice

kafka-acls.sh 使用示例:绝对禁止在生产中授予普通用户 Create 和 Alter 操作权限。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 查看当前集群所有的权限列表
kafka-acls.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties --list

# 生产者所需的权限:允许用户 alice 向主题 test-topic 写入数据
kafka-acls.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--add \
--allow-principal User:alice \
--operation Write \
--operation Describe \
--topic test-topic

# 消费者所需的权限:允许用户 alice 从主题 test-topic 读取数据。
# --resource-pattern-type prefixed:使用前缀匹配简化配置
# --group:要添加或移除访问控制列表 (ACL) 的消费者组。值“*”表示 ACL 应用于所有组。补充:消费组级别。
kafka-acls.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--add \
--allow-principal User:alice \
--operation Read \
--operation Describe \
--topic test-topic- --resource-pattern-type prefixed
--group mars-service-group

# 运维与监控组件所需的权限
# --cluster:添加/删除集群访问控制列表 (ACL)。补充:集群级别。
kafka-acls.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--add \
--allow-principal User:alice \
--operation Describe \
--operation DescribeConfigs \
--cluster

kafka-storage.sh

kafka-storage.sh 是 Kafka 提供的存储工具,用于 KRaft 模式 下生成 Cluster ID 和 格式化日志目录。

命令行参数

完整帮助列表:kafka-storage.sh --help

位置参数 参数 描述
random-uuid 打印一个随机的 UUID。
format 格式化此节点上的 Kafka 日志目录。
--cluster-id CLUSTER_ID, -t CLUSTER_ID 要使用的集群 ID。
--config CONFIG, -c CONFIG 要使用的 Kafka 配置文件。
--add-scram ADD_SCRAM, -S ADD_SCRAM 要添加到 __cluster_metadata(集群元数据) 日志中的 SCRAM_CREDENTIAL(SCRAM 凭证),例如:
SCRAM-SHA-256=[name=alice,password=pCVXLxxVkqX9]
SCRAM-SHA-512=[name=alice,iterations=8192,salt="N3E=",saltedpassword="YCE="]
补充:在集群启动前把 SCRAM 用户的账号密码写入 KRaft 元数据。如果你的集群被配置为 “代理间通信(Inter-broker)必须使用 SASL/SCRAM 认证”,那么在没有任何用户凭据时,节点之间根本无法建立连接并启动集群。此参数允许你在集群首次启动之前注入超级管理员凭据,确保节点在 Bootstrap(引导启动)阶段就能通过身份验证,顺利组成集群。
--ignore-formatted, -g 如果传入此选项,format 命令将跳过已格式化的目录,而不是失败。
--standalone, -s 用于将控制器初始化为单节点动态仲裁组。设置此标志时,不得设置 controller.quorum.voters 配置项,而应设置 controller.quorum.bootstrap.servers
补充:格式化时明确定义当前节点是唯一的初始投票者。用于快速创建单节点 KRaft 集群。
--initial-controllers INITIAL_CONTROLLERS, -I INITIAL_CONTROLLERS 用于使用指定的动态仲裁初始化服务器。参数是一个以逗号分隔的列表,格式为 id@hostname:port:directory所有节点必须使用相同的值。
例如:0@example.com:8082:JEXY6aqzQY-32P5TStzaFg,1@example.com:8083:MvDxzVmcRsaTz33bUuRU6A,2@example.com:8084:07R5amHmR32VDA6jHkGbTA
设置此标志时,不得设置 controller.quorum.voters 配置,而应设置 controller.quorum.bootstrap.servers
补充:格式化时明确定义集群的最初投票者列表。用于首次创建 KRaft 集群。
--no-initial-controllers, -N 用于初始化服务器而不指定动态仲裁。设置此标志时,不应设置 controller.quorum.voters 配置,而应设置 controller.quorum.bootstrap.servers
补充:格式化时明确定义该节点不是集群的最初投票者。用于向现有集群添加新 Controller 节点(动态扩缩容)前的格式化。
--release-version RELEASE_VERSION, -r RELEASE_VERSION 用于指定集群初始化时的元数据版本(Metadata Version)。它决定了 __cluster_metadata 日志能够使用哪些功能特性、支持哪些元数据记录格式,以及集群内 Controller 和 Broker 之间通信的协议版本。kafka-features.sh 可以动态升级元数据版本。
--feature FEATURE, -f FEATURE 命令的 --feature 参数用于在集群初始化格式化时,显式设定特定独立特性的版本级别(Feature Level),格式固定为 feature=level(特性名称=级别数字)。例如:
format --feature kraft.version=1
format --feature group.version=1
format --feature transaction.version=2
info 获取有关此节点上 Kafka 日志目录的信息。
--config CONFIG, -c CONFIG 要使用的 Kafka 配置文件。
例:kafka-storage.sh info --config /data/local/kafka/config/server-standalone.properties
version-mapping 查找给定元数据版本对应的功能。使用该命令时不带 --release-version 参数,将返回最新稳定元数据版本的映射关系。
--release-version RELEASE_VERSION, -r RELEASE_VERSION 查询并列出该指定元数据版本(Metadata Version)下,所有独立特性的默认映射级别(Feature Levels)。
例:kafka-storage.sh version-mapping --release-version 4.3-IV0
feature-dependencies 查找指定功能版本的依赖项。如果功能未知或版本尚未定义,则会抛出错误。可以指定多个功能。
--feature FEATURE, -f FEATURE 查询指定特性在特定级别(Level/Version)下,所依赖的其他前置特性和最低元数据版本。格式为 feature=version。例如:metadata.version=5
例:kafka-storage.sh feature-dependencies --feature metadata.version=30

KRaft 动态控制器仲裁引导参数

执行 bin/kafka-storage.sh format 格式化集群存储目录时,如何定义和初始化**动态控制器仲裁集群(Dynamic Controller Quorum)**的投票者结构?

  • **首次建群:**若为多节点高可用集群,使用 --initial-controllers 明确定义初始投票者;若为单机测试环境,使用 --standalone 快速创建。
  • **后期扩容:**向已有集群添加新控制器或更换故障磁盘时,使用 --no-initial-controllers 以“观察者”身份加入。待新节点从集群的 Leader 同步完元数据快照和日志后,管理员可手动运行 kafka-metadata-quorum.sh add-controller 命令,正式将新节点从“观察者”提升为“投票者”(即正式加入控制器仲裁集)。

kafka-configs.sh

kafka-configs.sh 是 Kafka 提供的动态配置管理工具,用于在不重启 Kafka Broker 的情况下,动态修改、查看或删除集群组件的各类配置参数。

命令行参数

完整帮助列表:kafka-configs.sh --help

参数 描述
--bootstrap-controller 要连接的 Kafka 控制器。
--bootstrap-server 要连接的 Kafka 服务器。
用于与 Kafka 集群建立初始连接的主机/端口对列表。客户端使用该列表来引导初始化并发现完整的 Kafka Broker 集群。虽然列表中服务器的顺序无关紧要,但我们建议包含多个服务器,以确保在某些服务器宕机时具备容错能力。此列表无需包含全部 Broker,因为 Kafka 客户端会自动高效地管理和更新与集群的连接。此列表必须采用 host1:port1,host2:port2,… 的格式。
--command-config 包含要传递给 AdminClient 的配置信息的属性文件。此文件仅与 --bootstrap-server 选项一起使用,用于描述和修改代理配置。有效配置见 Admin Configs
补充:该属性文件里包含了脚本工具作为“客户端”去连接 Kafka 集群时所需的配置。文件通常命名为 admin.properties,常用于 SASL/PLAIN 安全认证、双向 SSL / TLS 加密与认证、网络与超时调优 等。
--entity-type 实体类型(topics/clients/users/brokers/broker-loggers/ips/client-metrics/groups)。
--entity-name 实体名称。
--entity-default clients/users/brokers/ips 的默认实体名称(适用于相应的实体类型)。
补充:表示集群级别的“全局动态默认配置”,能让全集群当前以及未来新增的所有相同类型的实体都继承这个配置。
--describe 列出给定实体的配置。
补充:仅包括 运行期间修改过的动态配置。
--all 列出给定实体的所有配置。
补充:包括 运行期间修改过的动态配置 + 继承自系统默认的静态配置。不能独立运行,必须配合 --describe
--alter 修改实体的配置。
--add-config 要添加的配置键值对。方括号可用于分组包含逗号的值:‘k1=v1,k2=[v1,v2,v2],k3=v3’。
关于实体类型 topics、brokers、users、clients、ips、client-metrics、groups 有效配置的列表见 --help
实体类型“users”和“clients”可以同时指定,以更新特定用户的客户端配置。
--add-config-file 包含待添加配置的属性文件路径。有关有效配置的列表,请参阅 --add-config
--delete-config 要删除的配置项,例如 “k1,k2”。
补充:用于删除已经存在的动态配置、将其恢复为系统默认值的核心参数。

示例

topics

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 修改 Topic 的日志清理策略
# 修改 Topic 允许的最大记录批处理大小(如果启用了压缩,则指压缩后的大小)。调整为10MB,默认1MB。
kafka-configs.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--entity-type topics \
--entity-name mars-user-registered-events \
--alter --add-config cleanup.policy=delete,max.message.bytes=10485760
# 查看动态配置
kafka-configs.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--entity-type topics \
--entity-name mars-user-registered-events \
--describe
# 删除动态配置
kafka-configs.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--entity-type topics \
--entity-name mars-user-registered-events \
--alter --delete-config cleanup.policy,max.message.bytes

brokers

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 修改在消息被刷新到磁盘之前,日志分区上累积的消息数量
kafka-configs.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--entity-type brokers \
--entity-name 1 \
--alter --add-config log.flush.interval.messages=10000
# 查看动态配置
kafka-configs.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--entity-type brokers \
--entity-name 1 \
--describe
# 删除动态配置
kafka-configs.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--entity-type brokers \
--entity-name 1 \
--alter --delete-config log.flush.interval.messages

# 修改集群级别的全局默认配置(对所有 Broker 生效),使用 --entity-default 代替 --entity-name
# 修改用于日志清理的后台线程数为2
# 修改服务器用于接收网络请求和向网络发送响应的线程数为6
# 修改服务器用于处理请求的线程数为16
kafka-configs.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--entity-type brokers \
--entity-default \
--alter --add-config log.cleaner.threads=2,num.network.threads=6,num.io.threads=16
# 查看动态配置
kafka-configs.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--entity-type brokers \
--entity-default \
--describe
# 删除动态配置
kafka-configs.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--entity-type brokers \
--entity-default \
--alter --delete-config log.cleaner.threads,num.network.threads,num.io.threads

clients

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 限制指定 client.id 下所有连接的生产速率和消费速率(单位:字节/秒),默认无限制。
# 为了防止客户端把 Kafka 压垮,通常会让同一种业务服务共享同一个 client.id。如果 SpringBoot 项目强行不配置 client-id,那就没法限制了。。。
kafka-configs.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--entity-type clients \
--entity-name mars-service-client \
--alter --add-config producer_byte_rate=10485760,consumer_byte_rate=10485760
# 查看动态配置
kafka-configs.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--entity-type clients \
--entity-name mars-service-client \
--describe
# 删除动态配置
kafka-configs.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--entity-type clients \
--entity-name mars-service-client \
--alter --delete-config producer_byte_rate,consumer_byte_rate

# 全局限制所有客户端的流量
kafka-configs.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--entity-type clients \
--entity-default \
--alter --add-config producer_byte_rate=10485760,consumer_byte_rate=10485760
# 查看动态配置
kafka-configs.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--entity-type clients \
--entity-default \
--describe
# 删除动态配置
kafka-configs.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--entity-type clients \
--entity-default \
--alter --delete-config producer_byte_rate,consumer_byte_rate

ips

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 限制指定 IP 的连接创建速率。每秒钟允许单个具体实体(如特定 IP 或全局默认)创建的最大 TCP 连接数,默认无限制。
kafka-configs.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--entity-type ips \
--entity-name 192.168.0.107 \
--alter --add-config connection_creation_rate=50
# 查看动态配置
kafka-configs.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--entity-type ips \
--entity-name 192.168.0.107 \
--describe
# 删除动态配置
kafka-configs.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--entity-type ips \
--entity-name 192.168.0.107 \
--alter --delete-config connection_creation_rate

users

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 动态创建或修改认证密码。PLAIN 机制不支持动态创建或修改。
kafka-configs.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--entity-type users \
--entity-name zhaolq \
--alter --add-config 'SCRAM-SHA-512=[password=pCVXLxxVkqX9]'
# 查看动态配置
kafka-configs.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--entity-type users \
--entity-name zhaolq \
--describe
# 删除某个用户的 SCRAM 认证凭证
kafka-configs.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--entity-type users \
--entity-name zhaolq \
--alter --delete-config SCRAM-SHA-512

kafka-topics.sh(主题)

kafka-topics.sh 是 Kafka 提供的主题管理工具。主要负责 Topic 的创建、查看、修改、删除和扩容

快速入门(官网)

Kafka 是一个分布式事件流平台,允许您跨多台机器读取、写入、存储和处理事件(在文档中也称为记录或消息)。

例如,事件包括支付交易、来自手机的地理位置更新、发货订单、来自物联网设备或医疗设备的传感器测量数据等等。这些事件被组织并存储在主题中。简单来说,主题类似于文件系统中的文件夹,而事件就是该文件夹中的文件。

因此,在编写第一个事件之前,您必须先创建一个主题。打开另一个终端会话并运行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 查看服务器中的所有主题
kafka-topics.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties --list

# 创建主题
kafka-topics.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--topic mars-user-registered-events \
--create \
--partitions 1 \
--replication-factor 1
# 查看主题详细信息
kafka-topics.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--topic mars-user-registered-events \
--describe
# 查看主题的分区目录
ll -d /data/local/kafka/data/standalone/mars-user-registered-events*

# 修改分区数(注意:分区数只能增加,不能减少)
kafka-topics.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--topic mars-user-registered-events \
--alter \
--partitions 3
# 再次查看主题详细信息
kafka-topics.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--topic mars-user-registered-events \
--describe
# 查看主题的分区目录
ll -d /data/local/kafka/data/standalone/mars-user-registered-events*

# 删除主题
kafka-topics.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--topic mars-user-registered-events \
--delete

命令行参数

完整帮助列表:kafka-topics.sh --help

参数 描述
--bootstrap-server <String: server toconnect to> 必需:要连接的 Kafka 服务器。
补充:Broker的主机名称和端口号,逗号分隔。
--command-config <String: command config property file> 包含要传递给 AdminClient 的配置信息的属性文件。有效配置见 Admin Configs
--topic <String: topic> 要 create, alter, describe 或 delete 的主题。它也接受正则表达式,但 --create 选项除外。
--create 创建主题。
--delete 删除主题。
--alter 修改分区数和副本分配。(要修改主题配置,可以使用 kafka-configs 工具。)
--list 列出所有可用主题。
--describe 列出指定主题的详细信息。
--partitions <Integer: # of partitions> 要创建或更改的主题的分区数。如果创建主题(--create)时没有指定该参数,Kafka 会自动使用集群默认的分区数(属性 num.partitions)。
警告:如果一个主题已经包含了带有键(Key)的消息,千万不要随意增加分区数量,否则 分区逻辑 和 消息顺序 将受到影响。。
当消息带有 Key 时,Kafka 默认使用 哈希取模算法Hash(Key) % 分区数)来决定消息发送到哪一个分区。
1、分区逻辑:改变分区总数后,相同的 Key 计算出来的分区编号会改变。新消息会被发送到不同的分区。
2、消息乱序:Kafka 只能保证
单个分区内部
的消息顺序。如果同一个 Key 的消息分散到了不同的分区,消费者(Consumer)就无法再按照发送的先后顺序读取这些消息。
补充:修改分区数时只能增加,不能减少。
--replication-factor <Integer: replication factor> 要创建的主题中每个分区的副本因子。若未提供,则该主题使用集群默认值(属性 default.replication.factor)。
补充:同一个分区的多个副本必须分布在不同的 Broker 上。副本数不能通过命令行修改,标准的官方做法是通过 kafka-reassign-partitions.sh 脚本,手动编写或生成 JSON 副本重分配计划来执行修改
--config <String: name=value> 用于覆盖当前创建主题的配置。有效配置见 Topic Configs
有关主题配置的详细信息,请参阅 Kafka 文档。该功能仅在与 --create 参数结合使用时才受支持。(若要修改主题配置,可使用 kafka-configs 工具。)

kafka-console-producer.sh(生产者)

kafka-console-producer.sh 是 Kafka 提供的控制台生产者工具。它允许用户直接在终端向指定主题(Topic)发送消息,常用于快速验证、开发调试或通过管道(Pipe)批量传输数据。

快速入门(官网)

Kafka 客户端通过网络与 Kafka broker 通信,以写入(或读取)事件。broker 接收到事件后,会以持久且容错的方式存储这些事件,存储时间长短完全由您决定,甚至可以永久保存。

运行控制台生产者客户端,向主题写入一些事件。默认情况下,您输入的每一行都会生成一个独立的事件并写入主题。

1
2
3
4
5
# 发送消息。开启 auto.create.topics.enable 以支持自动创建主题
kafka-console-producer.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--topic mars-user-registered-events
>hello world
>123 456 789

您可以随时按 Ctrl+C 停止生产者客户端。

命令行参数

完整帮助列表:kafka-console-producer.sh --help

参数 描述
--bootstrap-server <String: server toconnect to> 连接的Kafka Broker主机名称和端口号。
--command-config <String: config file> 生产者配置属性文件。请注意,--command-property 的优先级高于此配置。
当需要传递大量、敏感或复杂的客户端配置时,这些配置最好保存在一个独立的属性文件中(如 producer.properties。如:生产环境的安全认证、复用企业统一配置。
--command-property <String: producer_prop> 生产者配置属性,格式为 key=value。
当需要在命令行中临时、快速地修改或测试一两个特定参数时使用。如:临时改变消息可靠性(acks)、调大客户端的请求超时时间(request.timeout.ms)、限制单次请求的最大字节数(max.request.size)、批处理大小(batch.size)。
--topic <String: topic> 操作的topic名称。

配置属性

https://kafka.apache.org/43/configuration/producer-configs/

属性 默认值 描述
bootstrap.servers 用于与 Kafka 集群建立初始连接的主机/端口对列表。客户端使用此列表来引导并发现所有 Kafka Broker。虽然列表中服务器的顺序无关紧要,但我们建议包含多个服务器,以确保在某些服务器宕机时仍能保持系统弹性。此列表无需包含所有 Broker,因为 Kafka 客户端会自动高效地管理和更新与集群的连接。此列表必须采用 host1:port1,host2:port2,…. 的格式。
key.serializer 用于键的序列化器类,该类实现了 org.apache.kafka.common.serialization.Serializer 接口。
value.serializer 用于值的序列化器类,该类实现了 org.apache.kafka.common.serialization.Serializer 接口。
partitioner.class null 确定在生成记录时将记录发送至哪个分区。可用选项包括:
1、如果未设置,则使用默认的分区逻辑。该策略会将记录发送到某个分区,直到该分区接收到的数据量至少达到 batch.size 字节为止。它与以下策略配合使用:
 a、如果未指定分区但存在键,则根据键的哈希值选择分区。
 b、如果既未指定分区也未提供键,请选择当该分区接收的数据量达到至少 batch.size 字节时才会发生变化的粘性分区。
2、org.apache.kafka.clients.producer.RoundRobinPartitioner:一种分区策略,其中连续记录序列中的每条记录都会被发送到不同的分区,无论是否提供了“键”,直到分区用尽,然后该过程重新开始。注意:存在一个已知问题,即创建新批次时会导致分布不均。详情请参阅 KAFKA-9965。
实现 org.apache.kafka.clients.producer.Partitioner 接口可让您接入自定义分区器。
batch.size 16384 当向同一分区发送多条记录时,生产端会尝试将这些记录批量合并,以减少请求次数。这有助于提升客户端和服务器端的性能。此配置用于控制默认的批处理大小(以字节为单位)。
linger.ms 5 生产者会将两次请求传输之间收到的所有记录合并为一个批处理请求。通常,这种情况仅在高负载下发生,即记录到达速度快于发送速度时。但在某些情况下,即使在中等负载下,客户端也可能希望减少请求数量。**此设置通过添加少量人工延迟来实现这一目标——也就是说,生产者不会立即发送记录,而是会等待最长为给定延迟的时间,以便将其他记录也发送出去,从而将这些发送操作批量处理。**这可以类比为 TCP 中的纳格尔算法。**该设置为批处理的延迟提供了上限:一旦某个分区积累了与 batch.size 相当的记录,无论此设置如何,都将立即发送;但如果该分区积累的字节数少于此值,则会“滞留”指定时间,等待更多记录出现。该设置默认值为 5(即 5 毫秒延迟)。**例如,将 linger.ms 设为 50 会减少发送的请求数量,但在无负载情况下,会使记录的延迟累计增加 50 毫秒。Apache Kafka 4.0 中该默认值从 0 改为 5,因为尽管 linger 值增大,但更大的批次通常能带来效率提升,从而使生产者延迟保持在相同或更低的水平。
buffer.memory 33554432 生产者可用于缓冲待发送至服务器的记录的总内存字节数。如果记录的发送速度快于其被传递至服务器的速度,生产者将阻塞 max.block.ms 毫秒,之后将抛出异常并失败。
此设置应大致对应生产者将使用的总内存,但并非硬性限制,因为生产者使用的内存并非全部用于缓冲。部分额外内存将用于压缩(如果启用了压缩)以及维护正在传输中的请求。
默认 32 M。
compression.type none 生产者生成的所有数据的压缩类型。默认值为 none(即不进行压缩)。有效值为 none、gzip、snappy、lz4 或 zstd。压缩操作针对完整的数据批次进行,因此批处理的效率也会影响压缩率(批处理次数越多,压缩效果越好)。
acks all 消息确认模式
生产者要求领导者收到多少个确认后,才将请求视为已完成。这控制了所发送记录的持久性。允许使用以下设置:
acks=0 如果设置为零,则生产者将完全不等待服务器的任何确认。记录将立即添加到套接字缓冲区中,并被视为已发送。在此情况下,无法保证服务器已收到该记录,且重试配置将不会生效(因为客户端通常无法得知任何失败情况)。每条记录返回的偏移量将始终设置为 -1。
acks=1 这意味着 leader 会将记录写入其本地日志,但在未等待所有 follower 完全确认的情况下便会做出响应。在这种情况下,如果 leader 在确认记录后立即发生故障,且此时 follower 尚未完成复制,则该记录将会丢失。 一般用于传输普通日志,允许丢个别数据。
acks=all 这意味着主节点将等待所有同步的副本都确认该记录。这保证了只要至少有一个同步的副本仍处于活动状态,该记录就不会丢失。这是目前可用的最强保证。这相当于 acks=-1 的设置。 一般用于传输和钱相关的数据,对可靠性要求比较高的场景。
请注意,要启用幂等性,此配置值必须设为“all”。如果设置了冲突的配置且未显式启用幂等性,则幂等性将被禁用。
注意:即便设置为 1 或 all,无法保证单节点消息不丢失,因为消息会先写到操作系统的内存缓存区(Page Cache,页缓存),详见 Broker 配置中的 Log Flush Policy 部分
min.insync.replicas 1 指定当发布者将 acks 设置为“all”(或“-1”)时,写入操作成功所需的最小同步副本数量(包括领导者)。在 acks=all 的情况下,每个同步副本都必须确认写入操作,该操作才被视为成功。例如,如果某个主题的 replication.factor 为 3,且 ISR 集合包含所有三个副本,那么即使 min.insync.replicas 恰好小于 3,这三个副本也必须全部确认 acks=all 的写入操作,该操作才算成功。
补充:当 ISR 缩水到只剩 1 个(Leader)时,Leader 发现 1 < min.insync.replicas ,就拒绝 Producer 的写入。不是用来决定“正常情况下要等几个副本确认”,而是用来建立“最坏情况下的安全熔断底线”。
retries 2147483647 因瞬时错误导致请求失败时的重试次数。设置大于零的值将导致客户端重新发送任何因潜在瞬时错误而发送失败的记录。请求将重试此次数,直到成功、因非瞬时错误失败,或 delivery.timeout.ms 超时为止。请注意,此自动重试功能仅会在收到错误时重新发送相同的记录。将该值设为零将禁用此自动重试行为,此时瞬时错误将传递给应用程序进行处理。通常建议用户不设置此配置项,而是使用 delivery.timeout.ms 来控制重试行为。
要启用幂等性,此配置值必须大于 0。如果设置了冲突的配置且未显式启用幂等性,则幂等性将被禁用。
在将 enable.idempotence 设置为 falsemax.in.flight.requests.per.connection 设置为大于 1 的情况下允许重试,可能会改变记录的排序顺序。因为如果向单个分区发送两个批次,且第一个批次失败并被重试,而第二个批次成功,那么第二个批次中的记录可能会出现在前面。
delivery.timeout.ms 120000 (2 minutes) 调用 send() 返回后,报告成功或失败的时限上限。这限制了记录在发送前被延迟的总时间、等待代理确认的时间(如果需要确认),以及允许重试发送失败的时间。如果遇到不可恢复的错误、重试次数已用尽,或者记录被添加到已达到较早投递过期截止时间的批次中,生产者可能会在此配置时间之前报告记录发送失败。此配置的值应大于或等于 request.timeout.ms 和 linger.ms 的总和。
request.timeout.ms 30000 (30 seconds) 该配置控制客户端等待请求响应的最长时间。如果在超时结束前未收到响应,客户端将在必要时重新发送请求;若重试次数已用尽,则会将请求标记为失败。该值应大于 replica.lag.time.max.ms(一个代理配置项),以减少因生产者不必要的重试而导致消息重复的可能性。
enable.idempotence true 当设置为“true”时,生产者将确保每个消息在流中仅写入一份。如果设置为“false”,则生产者因 Broker 故障等原因进行重试时,可能会在流中写入重试消息的重复副本。请注意,启用幂等性要求:max.in.flight.requests.per.connection<=5(对于任何允许的值,消息顺序均被保持),重试次数大于 0,且确认模式必须为 ‘all’。
max.in.flight.requests.per.connection 5 客户端在阻塞前,单个连接上发送的未确认请求的最大数量。请注意,如果此配置值大于 1,且 enable.idempotence 设置为 false,则在发送失败后,由于重试(即如果启用了重试),消息可能会发生重新排序;如果禁用了重试,或者 enable.idempotence 设置为 true,则消息顺序将得到保留。此外,启用幂等性要求此配置的值小于或等于 5,因为消息代理(broker)为每个生产者最多仅保留 5 个批次。如果该值大于 5,消息代理端可能会移除之前的批次。
transactional.id null 用于事务性投递的 TransactionalId。这启用了跨越多个生产者会话的可靠性语义,因为它允许客户端确保在使用相同 TransactionalId 的事务完成之前,不会启动任何新事务。如果未提供 TransactionalId,则生产者仅限于幂等投递。如果配置了 TransactionalId,则默认启用 idempotence。默认情况下未配置 TransactionalId,这意味着无法使用事务。请注意,默认情况下,事务需要至少包含三个代理的集群,这是生产环境的推荐设置;在开发环境中,您可以通过调整代理设置 transaction.state.log.replication.factor 来更改此配置。

消息推送流程

在消息发送的过程中,涉及到了两个线程——main线程Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。

image-20260429105135307

Java API

异步发送

需求:创建Kafka生产者,采用异步的方式发送到Kafka Broker。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class CustomProducer {

public static void main(String[] args) throws InterruptedException {

// 1. 创建kafka生产者的配置对象
Properties properties = new Properties();

// 2. 给kafka配置对象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

// 配置SASL认证
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
String jaasConfig = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"pCVXLxxVkqX9\";";
properties.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);

// key,value序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

// 3. 创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

// 4. 调用send方法,发送消息
for (int i = 0; i < 100; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "Kafka " + i));
// 延迟一会会看到数据发往不同分区
Thread.sleep(20);
}

// 5. 关闭资源
kafkaProducer.flush();
kafkaProducer.close();
}
}

异步发送(带回调函数)

回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class CustomProducerCallback {

public static void main(String[] args) throws InterruptedException {

// 1. 创建kafka生产者的配置对象
Properties properties = new Properties();

// 2. 给kafka配置对象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

// 配置SASL认证
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
String jaasConfig = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"pCVXLxxVkqX9\";";
properties.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);

// key,value序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 3. 创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

// 4. 调用send方法,发送消息
for (int i = 0; i < 100; i++) {
// 添加回调。key的hash值与分区个数求余,分别发往不同分区。
kafkaProducer.send(new ProducerRecord<>("first", "key_" + i, "Kafka " + i), new Callback() {
// 该方法在Producer收到ack时调用,为异步调用
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
// 没有异常,输出信息到控制台
System.out.println("主题:" + metadata.topic() + "->" + "分区:" + metadata.partition());
} else {
// 出现异常打印
exception.printStackTrace();
}
}
});
}

// 5. 关闭资源
kafkaProducer.flush();
kafkaProducer.close();
}
}

同步发送

只需在异步发送的基础上,再调用一下get()方法即可。

1
2
3
4
5
6
7
for (int i = 0; i < 100; i++) {
// 异步发送 默认
// kafkaProducer.send(new ProducerRecord<>("first", "kafka" + i));
// 同步发送
kafkaProducer.send(new ProducerRecord<>("first", "kafka" + i)).get();

}

生产者分区

分区的好处

1、便于合理使用存储资源,每个 partition 在一个 broker 上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台 broker 上。合理控制分区的任务,可以实现负载均衡的效果。

2、提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据

image-20260429110446318

发送消息的分区策略

默认的分区器

默认的分区器:DefaultPartitioner

分区原则:ProducerRecord

image-20260429114438999

示例一

将数据发往指定partition的情况下,例如,将所有数据发往分区1中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
for (int i = 0; i < 100; i++) {
// 指定数据发送到1号分区,key为空(IDEA中ctrl + p查看参数)
kafkaProducer.send(new ProducerRecord<>("first", 1, "", "Kafka " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e == null){
System.out.println("主题:" + metadata.topic() + "->" + "分区:" + metadata.partition()
);
}else {
e.printStackTrace();
}
}
});
}

示例二

没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
for (int i = 0; i < 100; i++) {
// key的hash值与分区个数求余,分别发往不同分区
kafkaProducer.send(new ProducerRecord<>("first", "key_" + i, "Kafka " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e == null){
System.out.println("主题:" + metadata.topic() + "->" + "分区:" + metadata.partition()
);
}else {
e.printStackTrace();
}
}
});
}

自定义分区器

**需求:**例如我们实现一个分区器实现,发送过来的数据中如果包含Kafka,就发往0号分区,不包含Kafka,就发往1号分区。

实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
* 1. 实现接口Partitioner
* 2. 实现3个方法:partition,close,configure
* 3. 编写partition方法,返回分区号
*/
public class MyPartitioner implements Partitioner {

/**
* 返回信息对应的分区
* @param topic 主题
* @param key 消息的key
* @param keyBytes 消息的key序列化后的字节数组
* @param value 消息的value
* @param valueBytes 消息的value序列化后的字节数组
* @param cluster 集群元数据可以查看分区信息
* @return
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

// 获取消息
String msgValue = value.toString();

// 创建partition
int partition;

// 判断消息是否包含Kafka
if (msgValue.contains("Kafka")){
partition = 0;
}else {
partition = 1;
}

// 返回分区号
return partition;
}

// 关闭资源
@Override
public void close() {

}

// 配置方法
@Override
public void configure(Map<String, ?> configs) {

}
}

使用分区器的方法,在生产者的配置中添加分区器参数。

1
2
3
4
5
// 1. 创建kafka生产者的配置对象
Properties properties = new Properties();

// 添加自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.mars.kafka.producer.MyPartitioner");

提高吞吐量

提高 Kafka 生产者吞吐量相关参数:batch.sizelinger.msbuffer.memorycompression.type

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 1. 创建kafka生产者的配置对象
Properties properties = new Properties();

// batch.size:批次大小,默认16384(16KB)。
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

// linger.ms:等待时间,默认5ms
properties.put(ProducerConfig.LINGER_MS_CONFIG, 50);

// RecordAccumulator:缓冲区大小,默认33554432(32MB)。由属性 buffer.memory 控制。
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);

// compression.type:压缩,默认none,可配置值gzip、snappy、lz4和zstd。
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

消息有序

image-20260519170735804

Topic 单分区:消息有序的条件如下

  • 未开启幂等性:max.in.flight.requests.per.connection=1
  • 开启幂等性:原理见下文【幂等性】。

Topic 多分区:每个分区消息有序,分区与分区间消息无序。

  • 如何做到多分区间消息有序呢?方案:同一个消费者拉取所有分区的数据,然后进行全排序,效率低,建议使用单分区。

消息传递语义

https://kafka.apache.org/43/design/design/#message-delivery-semantics

  • 最多一次(At Most Once):消息可能会丢失,但绝不会重复写入。ACK级别设置为0
  • 至少一次(At Least Once):消息绝不会丢失,但可能会重复写入。消息确认模式为all + 主题的分区副本数>=2 + ISR里应答的最小同步副本数>=2
  • 精确一次(Exactly Once):消息既不会丢失,也不会重复写入。至少一次 + 幂等性

至少一次

消息不丢失(发送可靠消息)的条件:

  • 生产者端: acks=all
  • Broker端: default.replication.factor>=2min.insync.replicas>=2

image-20260429123843906

消息可能重复:acks=all 时,生产者发送消息,leader 收到消息且与 ISR 队列里的所有 follower 同步完成,在响应生产者的一瞬间 leader 挂了,导致 ack 失败。然后生产者会再选择一个 leader 重新发消息,此时消息重复(因为上次发送的消息已经同步完成)。虽然概率很低,但确实有可能发生。

幂等性

幂等性是指无论 Producer 将同一条消息重复发送多少次,Broker 在日志中都只会持久化一次。即生产者重试(Retries)不会引发消息重复写入问题。

在生产者端开启幂等性:enable.idempotence=truemax.in.flight.requests.per.connection<=5retries>0acks=all

Kafka 实现幂等性的核心原理是引入了 PID(Producer ID)Sequence Number(序列号) 机制。以下是其运作的具体步骤:

核心概念:

  • PID:当开启幂等性后,每个 Producer 在初始化启动时,都会向 Broker 申请并被分配一个唯一的 PID。这个过程对用户是完全透明的。
  • Sequence Number:Producer 发送的每一个消息批次 RecordBatch 都会携带一个从 0 开始递增的序列号,Broker 端会在内存中为每个 <PID, Topic, Partition> 记录一个当前已接收的最大序列号。

Broker 端去重逻辑:

生产者重试发送消息时,Broker 会对比接收到的 新序列号 与 最大序列号 :

Broker 接收到消息后,会在内存中为每个 <PID, Topic, Partition> 维护一个状态,记录其对应的最新已提交序列号(假设记录为 SN_broker)。当 Broker 收到新消息时(带有序列号 SN_new),会执行以下校验逻辑:

  • 如果 新序列号 == 最大序列号 + 1: 这是预期的正常情况,说明消息是连续的,Broker 正常接收并写入,同时更新内存中的最大序列号。
  • 如果 新序列号 <= 最大序列号: 这可能是网络延迟导致 Producer 误以为失败而发起的重试,Broker 判定为消息重复,直接丢弃并向 Producer 返回成功 ACK,让 Producer 停止重试。所以,开启幂等性会保证单会话单分区消息不重复。
  • 如果 新序列号 > 最大序列号 + 1: 序列号出现跳跃,Broker 认为消息乱序。此时,Kafka Broker 最多缓存生产者发来的最近 max.in.flight.requests.per.connection 个消息批次,直到前序号码到来并重排序,然后落盘。所以,开启幂等性会保证单会话单分区消息有序。
    • 若跳跃幅度在 max.in.flight.requests.per.connection(默认 ≤ 5)范围内,Broker 会将其视为暂时的网络乱序,允许将该消息暂存内存中等待前序号码到来并重排序
    • 若前序消息因【发送失败且重试耗尽】等原因被客户端永久放弃(形成真实空洞),或跳跃幅度超过了窗口上限(> 5),则意味着数据流一致性遭到破坏。此时 Broker 会拒绝该消息,并抛出 OutOfOrderSequenceException 异常,通常会导致 Producer 进入致命错误状态。

幂等性的局限性(作用范围):

  • 只能保证单会话 (Single Session):PID 是 Producer 进程启动时动态分配的。**如果 Producer 应用崩溃重启,它会获得一个新的 PID。**对于 Broker 来说,这完全是一个新的生产者,之前的序列号状态失效,因此无法防范应用重启导致的重复发送。
  • 只能保证单分区 (Single Partition):幂等性的序列号是绑定在特定 Partition 上的。如果消息被路由到了不同的 Partition,幂等性无法生效。

若需要跨会话、跨分区进行精确一次的原子操作(例如读写多个 Topic 时),必须结合 Kafka 事务(Transactions) 才能实现完整的端到精确一次。

生产者事务

解决 跨会话、跨分区 的原子性问题。开启事务必须要开启幂等性。

事务原理:

image-20260430110136191

Kafka的事务一共有如下 5 个API:

1
2
3
4
5
6
7
8
9
10
11
// 1、初始化事务
void initTransactions();
// 2、开启事务
void beginTransaction() throws ProducerFencedException;
// 3、在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException;
// 4、提交事务
void commitTransaction() throws ProducerFencedException;
// 5、放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

单个Producer,使用事务保证消息的仅一次发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class CustomProducerTransactions {

public static void main(String[] args) throws InterruptedException {

// 1. 创建kafka生产者的配置对象
Properties properties = new Properties();

// 2. 给kafka配置对象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

// key,value序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 设置事务id(必须),事务id任意起名
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0");

// 3. 创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

// 初始化事务
kafkaProducer.initTransactions();
// 开启事务
kafkaProducer.beginTransaction();
try {
// 4. 调用send方法,发送消息
for (int i = 0; i < 5; i++) {
// 发送消息
kafkaProducer.send(new ProducerRecord<>("first", "Kafka " + i));
}

// 这里会抛出异常,导致事务回滚
int i = 1 / 0;

// 提交事务
kafkaProducer.commitTransaction();

} catch (Exception e) {
// 终止事务
kafkaProducer.abortTransaction();
} finally {
// 5. 关闭资源
kafkaProducer.flush();
kafkaProducer.close();
}
}
}

kafka-console-consumer.sh(消费者)

kafka-console-consumer.sh 是 Kafka 提供的控制台消费者工具。它用于从指定主题中读取并实时打印消息数据,是开发调试、故障排查和临时查看数据的常用利器。

快速入门(官网)

打开另一个终端会话,运行控制台消费者客户端以读取您刚刚创建的事件:

1
2
3
4
5
6
7
8
9
# 消费主题中的新事件
kafka-console-consumer.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--group mars-service-admin \
--topic mars-user-registered-events
# 把主题中所有的事件都读出来(包括历史事件)
kafka-console-consumer.sh --bootstrap-server localhost:9092 --command-config /data/local/kafka/config/admin-scram.properties \
--group mars-service-admin \
--topic mars-user-registered-events \
--from-beginning

您可以随时按 Ctrl-C 停止消费者客户端。

请随意尝试:例如,切换回生产端终端(上一步),写入更多事件,然后观察这些事件如何立即显示在消费者端终端上。

由于事件会被持久化存储在 Kafka 中(默认保留 7 天),因此可以被任意多次读取,且支持任意数量的消费者进行读取。您只需再打开一个终端会话并重新运行之前的命令,即可轻松验证这一点。

命令行参数

完整帮助列表:kafka-console-consumer.sh --help

参数 描述
--bootstrap-server <String: server toconnect to> 连接的Kafka Broker主机名称和端口号。
--command-config <String: config file> 消费者配置属性文件。请注意,--command-property 的优先级高于此配置。
--command-property <String: consumer_prop> 消费者配置属性,格式为 key=value。
--group <String: consumer group id> 消费者的消费者组ID。
补充:
1、若未指定 group id,则 Kafka 的 组协调器(Group Coordinator)会自动创建并初始化。
2、删除消费者组是一个
不可逆
的操作。一旦删除,该组在 Kafka 里的所有消费进度(Offsets)会被彻底清空。如果再次启动改组,由于找不到 established offset(已建立的偏移量),它会重新根据 auto.offset.reset(如 earliest 或 latest)去重新初始化位置,这极有可能导致你的业务系统发生重复消费历史数据漏读
--topic <String: topic> 操作的 Topic 名称。
--from-beginning 如果消费者还没有建立要消费的偏移量,则从日志中最早的消息开始消费,而不是从最新的消息开始消费。

配置属性

属性 默认值 描述
bootstrap.servers 用于与 Kafka 集群建立初始连接的主机/端口对列表。客户端使用此列表来引导并发现所有 Kafka Broker。虽然列表中服务器的顺序无关紧要,但我们建议包含多个服务器,以确保在某些服务器宕机时仍能保持系统弹性。此列表无需包含所有 Broker,因为 Kafka 客户端会自动高效地管理和更新与集群的连接。此列表必须采用 host1:port1,host2:port2,…. 的格式。
key.deserializer 用于键的反序列化器类,该类实现了 org.apache.kafka.common.serialization.Deserializer 接口。
value.deserializer 用于值的反序列化器类,该类实现了 org.apache.kafka.common.serialization.Deserializer 接口。
group.id null 用于标识该消费者所属消费者组的唯一字符串。如果消费者使用 subscribe(topic) 实现的组管理功能,或者使用基于 Kafka 的偏移量管理策略,则此属性为必填项。
enable.auto.commit true 如果为 true,消费者的偏移量将在后台定期提交。
auto.commit.interval.ms 5000 (5 seconds) enable.auto.commit 设置为 true 时,消费者偏移量自动提交到 Kafka 的频率(以毫秒为单位)。
auto.offset.reset latest 当 Kafka 中没有初始偏移量,或者服务器上当前偏移量已不存在时(例如因数据已被删除),应如何处理:
1、earliest:自动将偏移量重置到最早的偏移量
2、latest:自动将偏移量重置到最新的偏移量
3、by_duration:自动将偏移量重置到距离当前时间戳指定“时长”的位置。时长必须以 ISO8601 格式(PnDTnHnMn.nS)指定。不允许使用负时长。
4、none:如果找不到消费者组的任何先前偏移量,则向消费者抛出异常
5、其他任何值:直接向消费者抛出异常。
请注意,在将此配置设置为 latest 时,更改分区号可能会导致消息丢失,因为生产者可能会在消费者重置其偏移量(开始消费)之前开始向新添加的分区发送消息(即,尚不存在初始偏移量)。
补充:
1、Offset 归属于 消费者组(Consumer Group)+ 主题分区(Topic Partition),而不是单个消费者。
2、 当配置为 latest 时,动态增加分区数量可能会导致消息丢失。 原因:新分区刚创建时,消费者组在服务端没有它的偏移量记录,从而触发 latest 重置策略。如果生产者在消费者完成重置之前,就已经向新分区写入了数据,消费者重置偏移量时会直接定位到新分区的最新末尾,导致重置前写入的那部分历史消息被直接跳过(漏读)。
offsets.topic.num.partitions 50 偏移量提交主题的分区数(部署后不应更改)。
补充:这是 Broker 的配置。
group.protocol classic 消费者使用的组协议。支持的值为 classic 或 consumer。
补充:通过将其设置为 consumer,可以启用新一代的消费者重平衡协议(基于 KIP-848),大幅缩短由于节点加入或退出导致的集群停顿时间。
heartbeat.interval.ms 3000 (3 seconds) 使用 Kafka 的组管理功能时,向消费者协调器发送心跳的预期时间间隔。心跳用于确保消费者的会话保持活动状态,并在新消费者加入或离开组时促进重新平衡。此配置仅在 group.protocol 设置为“classic”时受支持。在这种情况下,该值必须小于 session.timeout.ms,但通常不应高于该值的 1/3。可以将其调整得更低,以控制正常重新平衡的预期时间。如果 group.protocol 设置为“consumer”,则不支持此配置,因为心跳间隔由代理通过 group.consumer.heartbeat.interval.ms 控制。
group.consumer.heartbeat.interval.ms 5000 (5 seconds) 分配给消费者组成员的心跳间隔。
补充:这是 Broker 的配置。
session.timeout.ms 45000 (45 seconds) 在使用 Kafka 的组管理功能时,用于检测客户端故障的超时时间。客户端会定期向 Broker 发送心跳信号,以表明其处于活动状态。如果在该会话超时到期前 Broker 未收到任何心跳信号,则 Broker 将把该客户端从组中移除并启动重新平衡。请注意,该值必须在由 broker 配置中的 group.min.session.timeout.ms 和 group.max.session.timeout.ms 设定的允许范围内。另请注意,当 group.protocol 设置为“consumer”时,不支持此客户端配置。在这种情况下,会话超时由 Broker 配置中的 group.consumer.session.timeout.ms 控制。
group.consumer.session.timeout.ms 45000 (45 seconds) 使用 consumer 组协议时,用于检测客户端故障的超时时间。
补充:这是 Broker 的配置。
max.poll.interval.ms 300000 (5 minutes) 使用消费组管理时,两次调用 poll() 之间的最大延迟。这为消费者在获取更多记录之前可以处于空闲状态的时间设置了上限。如果在该超时时间到期前未调用 poll(),则认为该消费者已失败,并且消费组将进行重新平衡(Rebalance),以便将分区重新分配给另一个成员。对于使用非空 group.instance.id 且触发此超时的消费者,分区不会立即被重新分配。相反,消费者将停止发送心跳,并且分区将在会话超时(Session Timeout)到期后被重新分配。此处的会话超时时间取决于所使用的协议:
1、如果使用经典重平衡协议(Classic rebalance protocol),由客户端配置 session.timeout.ms 决定。
2、如果使用消费者协议(Consumer protocol),由服务端配置 group.consumer.session.timeout.ms 决定。
这与已关闭的静态消费者的行为完全一致。
max.poll.records 500 单次调用 poll() 时返回的最大记录条数。请注意,max.poll.records 并不会影响底层的数据拉取(Fetching)行为。消费者会缓存每次拉取请求(Fetch Request)获取到的记录,并在每次调用 poll() 时将它们递增地(分批)返回。
fetch.max.wait.ms 500 服务器在响应获取请求之前,如果数据量不足以立即满足 fetch.min.bytes 指定的要求,将阻塞的最长时间。此配置仅用于本地日志获取。要调整远程获取的最大等待时间,请参阅代理配置中的 remote.fetch.max.wait.ms
fetch.min.bytes 1 服务器针对获取请求应返回的最小数据量。如果可用数据不足,请求将等待该数量的数据累积完毕后再进行响应。默认设置为 1 字节,这意味着一旦有该数量的字节数据可用,系统就会立即响应获取请求;否则,获取请求将在等待数据到达时超时。将此值设置为较大数值将导致服务器等待积累更多数据后再响应,这可以在一定程度上提高服务器吞吐量,但会带来额外的延迟。即使 Broker 中可用的总数据量超过了 fetch.min.bytes,由于每个分区的限制 max.partition.fetch.bytes 以及最大返回限制 fetch.max.bytes,实际返回的大小仍可能小于该值。
fetch.max.bytes 52428800 (50 MiB) 服务器针对一次获取请求应返回的数据最大量。消费者会分批获取记录;如果获取操作中第一个非空分区中的首个记录批次大于此值,该记录批次仍会被返回,以确保消费者能够继续处理。因此,这并非绝对上限。Broker 接受的最大记录批次大小通过 message.max.bytes(Broker 配置)或 max.message.bytes(主题配置)定义。一个获取请求由多个分区组成,还有另一个设置用于控制获取请求中每个分区返回的数据量——请参阅 max.partition.fetch.bytes。请注意,消费者会并行执行多次获取操作。
max.partition.fetch.bytes 1048576 (1 MiB) 服务器返回的每个分区最大数据量。消费者以批次形式获取记录。如果获取操作中第一个非空分区中的第一个记录批次大于此限制,该批次仍将被返回,以确保消费者能够继续处理。Broker 接受的最大记录批次大小通过 message.max.bytes(Broker 配置)或 max.message.bytes(主题配置)进行定义。有关限制消费者请求大小的信息,请参阅 fetch.max.bytes
补充:1 MiB(百萬位元組):基于二进制,等于 2²⁰ 字节(1,048,576 字节)。
message.max.bytes 1048588 Kafka 允许的最大记录批处理大小(如果启用了压缩,则为压缩后的大小)。可以通过主题级别的 max.message.bytes 配置项按主题进行设置。
补充:这是 Broker 的配置。
partition.assignment.strategy class org.apache.kafka.clients.consumer.RangeAssignor,class org.apache.kafka.clients.consumer.CooperativeStickyAssignor 支持的分区分配策略的类名或类类型列表(按首选项排序)。当使用组管理时,客户端将使用这些策略在消费者实例之间分配分区所有权。可用选项包括:
org.apache.kafka.clients.consumer.RangeAssignor:按主题分配分区。
org.apache.kafka.clients.consumer.RoundRobinAssignor:以轮询方式将分区分配给消费者。
org.apache.kafka.clients.consumer.StickyAssignor:在尽可能保留现有分区分配的同时,保证分配结果达到最大平衡。
org.apache.kafka.clients.consumer.CooperativeStickyAssignor:遵循与 StickyAssignor 相同的逻辑,但允许进行协作式重平衡。
默认的分配器为 [RangeAssignor, CooperativeStickyAssignor]。它默认会使用 RangeAssignor,但允许通过一次滚动重启(Rolling Bounce)来移除列表中的 RangeAssignor,从而直接升级到 CooperativeStickyAssignor。
实现 org.apache.kafka.clients.consumer.ConsumerPartitionAssignor 接口允许您插入自定义的分配策略。

消费模型

维度 传统消费者组 (Consumer Group) 共享组 (Share Group - Kafka 4.0)
消费模型 流处理模型:基于**发布-订阅(Publish-Subscribe)**模式,侧重事件流、状态计算及严格顺序消费。 消息队列模型:基于传统的**消息队列(Message Queue / Point-to-Point)**模式(类似于 RabbitMQ 的 Work Queues),侧重异步任务、Worker 派发及高并发处理。
并发粒度 分区级独占:
1、在同一个消费者组内,一个分区在同一时间只能被一个消费者消费;
2、如果组内消费者数大于分区数,多出来的消费者将会处于**闲置(Idle)**状态。
消息级共享:
1、解除了“分区绑定”的限制。同一个分区内的消息可以被组内的多个消费者并发消费
2、消息就像传送带上的零件,哪个消费者有空闲,谁就拉取一条进行处理。
最大并发数 受限于分区数量:最大有效消费者数等于 Topic 分区总数。 理论上无限制:按需横向扩展消费者,消费者数量可远超分区数量。
消息顺序性 严格保证单分区内消息顺序。因为单个分区只由一个消费者处理,所以能严格保证单分区内消息的**先进先出(FIFO)**顺序。 不保证顺序。由于多条消息并发分配给不同的消费者,先分配的消息不一定先处理完。
消息确认机制 提交分区偏移量(Offset)。消费者提交的是当前分区消费到的最高偏移量(例如:分区 0 消费到了第 100 条)。 单条消息确认 (Ack / Nack)。每个消费者对单条消息进行确认(Ack)或拒绝(Nack)。未成功处理的消息会触发超时并投递给其他消费者(重试机制)。
重平衡 (Rebalance) 经典的“分区所有权”模型。在现代机制下(如增量协作与 KIP-848),增减消费者时,Broker 服务端会通过增量协议进行局部拓扑微调。只有受影响的分区会发生所有权转移,其余分区的消费完全不间断,极大地优化了重平衡抖动。 底层彻底解耦了分区所有权。组内所有消费者共同面对所有分区,由 Broker 采用拉取驱动的消息级锁(Message-level Lock)动态分发数据。谁有空闲,谁就直接拉取单条消息。这种设计从根本上规避了分区分配的概念,因此彻底杜绝了任何形式的消费者重平衡。
适用场景 需要严格保证消息顺序(如:银行流水、日志审计)。
流式计算与状态处理(如:Kafka Streams、Flink 任务)。
大任务/重度异步处理: 比如每条消息都需要调用第三方 API 或进行耗时的 CPU 计算。
传统 MQ 迁移: 方便将原本运行在 RabbitMQ、ActiveMQ 的工作队列业务平滑迁移到 Kafka。

Java API

独立消费者示例(订阅主题)

需求:创建一个独立消费者,消费first主题中数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumer {

public static void main(String[] args) {

// 1.创建消费者的配置对象
Properties properties = new Properties();

// 2.给消费者配置对象添加参数
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

// 配置SASL认证
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
String jaasConfig = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"pCVXLxxVkqX9\";";
properties.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);

// 配置序列化 必须
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

// 配置消费者组(必须),名字任意起
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

// 创建消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);

// 订阅要消费的主题(可以订阅多个主题)
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);

// 拉取数据打印
while (true) {
// 设置1s中消费一批数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

// 打印消费到的数据
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}

独立消费者示例(订阅分区)

需求:创建一个独立消费者,消费first主题0号分区的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Properties;

public class CustomConsumerPartition {

public static void main(String[] args) {

// 1.创建消费者的配置对象
Properties properties = new Properties();

// 2.给消费者配置对象添加参数
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

// 配置序列化 必须
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

// 配置消费者组(必须),名字任意起
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

// 创建消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

// 分配要消费的主题分区
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
topicPartitions.add(new TopicPartition("first", 0));
kafkaConsumer.assign(topicPartitions);

// 拉取数据打印
while (true){
// 设置1s中消费一批数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

// 打印消费到的数据
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}

消费者组示例

需求:测试同一个主题的分区数据,只能由一个消费者组中的一个消费。

示例:

  1. 复制 CustomConsumer 类的代码,在 IDEA 中同时启动,即可启动同一个消费者组中的两个消费者。
  2. 启动代码中的生产者发送消息,在 IDEA 控制台即可看到两个消费者在消费不同分区的数据(如果只发生到一个分区,可以在发送时增加延迟代码Thread.sleep(2);)。
  3. 重新发送到一个全新的主题中,由于创建的主题分区数为1,可以看到只能有一个消费者消费到数据。

分区分配策略

partition.assignment.strategy=class org.apache.kafka.clients.consumer.RangeAssignor,class org.apache.kafka.clients.consumer.CooperativeStickyAssignor

Range 以及再平衡

原理:

image-20260630151945029

原理验证:

  1. 修改主题 first 为7个分区

  2. 复制 CustomConsumer 类,创建三个消费者 CustomConsumer、CustomConsumer1、CustomConsumer2 组成消费者组,组名都为“test”,同时启动 3 个消费者。

  3. 启动 CustomProducerCallback 生产者,随机发送消息到不同的分区。

  4. 观看 3 个消费者分别消费哪些分区的数据。

    0号消费者:消费到0、1、2号分区数据。

    1号消费者:消费到3、4号分区数据

    2号消费者:消费到5、6号分区数据。

触发重平衡:

  1. 停掉0号消费者,快速重发消息,观看结果(尽量在 45s 以内,因为默认 session.timeout.ms=45000)。

    1号消费者:消费到3、4号分区数据,外加0、1、2号分区数据。

    2号消费者:消费到5、6号分区数据。

  2. 再次重新发送消息观看结果(45s 以后),已发生重平衡。

    1号消费者:消费到0、1、2、3号分区数据

    2号消费者:消费到4、5、6号分区数据。

总结:Broker 未在 45s 内收到0号消费者的心跳,则把0号消费者从消费者组中移除并将它的任务整体交给其他 Broker执行,45s 以后按照 Range 方式重平衡。

RoundRobin 以及再平衡

原理:

image-20260630173839761

原理验证:

  1. 依次在 CustomConsumer、CustomConsumer1、CustomConsumer2 三个消费者代码中修改分区分配策略为 RoundRobin。

    1
    2
    // 修改分区分配策略
    properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
  2. 重启 3 个消费者,重复发送消息的步骤,观看分区结果。

    0号消费者:消费到0、3、6号分区数据。

    1号消费者:消费到1、4号分区数据

    2号消费者:消费到2、5号分区数据。

触发重平衡:

  1. 停掉0号消费者,快速重发消息,观看结果(尽量在 45s 以内,因为默认 session.timeout.ms=45000)。

    1号消费者:消费到1、4号分区数据,外加0、6号分区数据。

    2号消费者:消费到2、5号分区数据,外加3号分区数据。

  2. 再次重新发送消息观看结果(45s 以后),已发生重平衡。

    1号消费者:消费到0、2、4、6号分区数据

    2号消费者:消费到1、3、5号分区数据。

总结:Broker 未在 45s 内收到0号消费者的心跳,则把0号消费者从消费者组中移除并将它的任务轮训交给其他 Broker执行,45s 以后按照 RoundRobin 方式重平衡。

Sticky 以及再平衡

Sticky(粘性)分区分配策略是 Kafka 0.11.x 版本引入的一种高效的分区分配算法。它的核心目标是在保证分区均匀分配的同时,尽可能保留上一次的分配结果(粘性),从而显著减少再平衡(Rebalance)的开销。

原理验证:

  1. 依次在 CustomConsumer、CustomConsumer1、CustomConsumer2 三个消费者代码中修改分区分配策略为 Sticky。

    1
    2
    // 修改分区分配策略
    properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StickyAssignor.class.getName());
  2. 重启 3 个消费者,重复发送消息的步骤,观看分区结果。

    每次重启都不同。

触发重平衡:

  1. 停掉0号消费者,快速重发消息,观看结果(尽量在 45s 以内,因为默认 session.timeout.ms=45000)。
  2. 再次重新发送消息观看结果(45s 以后),已发生重平衡。

总结:Broker 未在 45s 内收到0号消费者的心跳,则把0号消费者从消费者组中移除并将它的任务均匀分配给其他 Broker执行,45s 以后按照 Sticky 方式重平衡(会尽量保留现有的分配关系,基本不变)。

offset 偏移量

使用 Kafka Connect 将数据作为事件流导入/导出

您可能在现有系统(例如关系数据库或传统消息系统)中拥有大量数据,并且许多应用程序已经在使用这些系统。Kafka Connect 允许您持续地将数据从外部系统导入 Kafka Topic,反之亦然。运行连接器是一个可扩展的工具,这些连接器实现了与外部系统交互的自定义逻辑。因此,将现有系统与 Kafka 集成非常容易。为了进一步简化此过程,Kafka 提供了数百个现成的连接器。

下面了解如何使用简单的连接器运行 Kafka Connect,实现将数据从文件导入到 Kafka Topic,以及将数据从 Kafka Topic 导出到文件

请确保将 connect-file-4.2.0.jar 添加到 Connect Worker 进程配置中的 plugin.path 属性。

1
2
cd /data/local/kafka
echo "plugin.path=libs/connect-file-4.2.0.jar" >> config/connect-standalone.properties

接下来,先创建一些用于测试的初始数据:

1
2
echo foo > test.txt
echo bar >> test.txt

接下来,我们将启动两个以独立模式运行的连接器,这意味着它们运行在单个本地专用进程中。我们提供三个配置文件作为参数。第一个始终是 Kafka Connect 进程的配置,其中包含通用配置,例如要连接的 Kafka broker 和数据序列化格式。其余配置文件分别指定要创建的连接器。这些文件包含唯一的连接器名称、要实例化的连接器类以及连接器所需的任何其他配置。

1
2
# 三个配置文件分别是 Worker进程配置、数据源(Source)配置、数据目标(Sink)配置
connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

Kafka 附带的这些示例配置文件使用您之前启动的默认本地集群配置,并创建两个连接器:

  • 源连接器,它从文件中读取行,并将每一行作为一个事件写入 Kafka Topic。
  • 接收器连接器,它从 Kafka Topic 中读取事件,并将每个事件作为一行输出到文件。

启动过程中,您会看到一些日志消息,其中一些指示连接器正在实例化。Kafka Connect 进程启动后,源连接器应开始从 test.txt 读取行并将其写入主题 connect-test,而接收器连接器应开始从主题 connect-test 中读取事件并将其输出到文件 test.sink.txt。我们可以通过检查输出文件的内容来验证数据是否已通过整个管道传输:more test.sink.txt

请注意,数据正存储在 Kafka 主题 connect-test 中,因此我们还可以运行一个控制台消费者来查看该主题中的数据(或使用自定义消费者代码进行处理):

1
2
3
kafka-console-consumer.sh --topic connect-test --from-beginning --bootstrap-server localhost:9092
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}

连接器会持续处理数据,因此我们可以向文件中添加数据 echo "Another line" >> test.txt,并观察数据在管道中的流动。控制台消费者 和 接收文件 应该能看到该行数据

为什么用它?(与直接写代码的区别)

零代码/低代码:只需通过 JSON 配置文件即可启动连接器(Connector),无需为每个数据库编写重复的同步逻辑。

高可靠性:自动管理 offset(偏移量),如果同步过程中断,重启后能从断点继续,保证数据不丢不重。

横向扩展:支持集群模式(Distributed Mode),可以轻松增加节点来应对海量数据的处理压力。

转换能力 (SMTs):可以在数据流动过程中进行简单的转换,比如隐藏敏感字段、给数据打标签或格式转换。

常见应用场景

构建数据仓库/湖:把业务库(MySQL/Oracle)的数据实时同步到数据湖(Iceberg/HDFS)或数仓。

实时搜索:将数据库的更新实时推送到 Elasticsearch 供前端搜索。

多云/异构系统集成:连接 AWS S3、Google Cloud Pub/Sub、MongoDB 等各种云服务和中间件。

使用 Kafka Streams 处理事件

Kafka Streams 是一个 Java 库(Jar 包)。它让你的 Java 程序具备了实时处理海量数据的能力,而你只需要像写普通的 Java 代码一样去调用它的 API。

Kafka Streams = Kafka 读写能力 + 强大的逻辑处理(计算/转换/聚合) + 自动容错。

终止 Kafka 环境

终止 Ctrl-C

若想删除本地 Kafka 环境中的所有数据,包括创建的所有事件,请运行以下命令:

1
2
# 默认目录
rm -rf /tmp/kafka-logs /tmp/kraft-combined-logs