Kafka_01安装

推荐:

官方

https://github.com/apache/kafka

https://kafka.apache.org/

可视化工具

UI for Apache Kafka

https://github.com/kafbat/kafka-ui 新版

https://github.com/provectus/kafka-ui 旧版

脚本文件介绍

4.0 版本的一个重大变化是彻底移除了 ZooKeeper,因此所有脚本现在都通过 --bootstrap-server 直接与 Broker 或 Controller 通信。

集群与 Broker 管理

  • kafka-server-start.sh / stop.sh: 启动和停止 Kafka Broker 服务。
  • kafka-storage.sh: (重要) 用于格式化存储目录。在 KRaft 模式下,启动前必须用它生成 Cluster ID 并格式化日志目录。
  • kafka-configs.sh: 动态修改配置(如修改 Topic 参数、用户配额、Broker 配置等)。
  • kafka-broker-api-versions.sh: 查看 Broker 支持的 API 版本信息。
  • kafka-features.sh: 查看或升级 Kafka 集群的功能特性版本(Feature Gates)。
  • kafka-metadata-quorum.sh: 查看 KRaft 控制器(Controller )副本的状态和选举情况。
  • kafka-metadata-shell.sh: 交互式工具,允许像查看文件系统一样查看 Kafka 的内部元数据(类似于旧版的 zkCli)。
  • kafka-cluster.sh: 用于管理集群 ID 和注销已停止的 Broker。

业务操作 (Topic/Data)

  • kafka-topics.sh: 最常用的工具,用于创建、删除、查看和修改 Topic 信息。
  • kafka-console-producer.sh: 控制台生产者,手动向 Topic 发送消息。
  • kafka-console-consumer.sh: 控制台消费者,直接在终端查看消息。
  • kafka-delete-records.sh: 根据指定的偏移量(Offset)物理删除 Topic 中的旧数据。
  • kafka-get-offsets.sh: 获取指定 Topic 各分区的最大/最小偏移量。

分区与副本管理

  • kafka-reassign-partitions.sh: 用于在 Broker 之间迁移分区,实现负载均衡或扩容。
  • kafka-leader-election.sh: 手动触发分区的 Leader 选举。
  • kafka-log-dirs.sh: 查询各 Broker 上日志目录的占用情况。
  • kafka-replica-verification.sh: 验证集群内各副本间的数据是否同步。

消费者组管理

  • kafka-consumer-groups.sh: 管理传统的消费者组(列出组、查看积压、重置 Offset)。
  • kafka-groups.sh: (4.0 新增) 通用组管理工具,支持新的消费组协议。
  • kafka-share-groups.sh: (4.0 核心新特性) 用于管理“共享消费组”(Share Groups),这种组支持类似传统消息队列的竞争消费模式。

安全与权限

  • kafka-acls.sh: 管理访问控制列表(ACL),设置用户读写权限。
  • kafka-delegation-tokens.sh: 管理委托令牌,用于轻量级身份验证。

性能测试与调试

  • kafka-producer-perf-test.sh: 生产者压力测试工具。
  • kafka-consumer-perf-test.sh: 消费者压力测试工具。
  • kafka-dump-log.sh: 查看底层的日志文件(.log)或索引文件(.index)的具体内容。
  • kafka-e2e-latency.sh: 测试端到端的延迟。
  • kafka-jmx.sh: 简单的 JMX 监控工具。
  • trogdor.sh: Kafka 官方的测试框架,用于模拟各种故障和工作负载。

生态组件 (Connect & Streams)

  • connect-distributed.sh / standalone.sh: 分别以分布式模式或单机模式启动 Kafka Connect 任务。
  • connect-mirror-maker.sh: 启动 MirrorMaker 2.0,用于跨集群同步数据。
  • connect-plugin-path.sh: 查看或列出 Connect 插件路径。
  • kafka-streams-application-reset.sh: 重置 Kafka Streams 应用程序的状态(清理本地状态存储和中间 Topic)。
  • kafka-streams-groups.sh: 管理 Kafka Streams 的消费组。

开发验证工具

  • kafka-verifiable-producer.sh / consumer.sh: 带有验证功能的生产/消费工具,常用于集成测试。
  • kafka-run-class.sh: 这是一个底层的辅助脚本,上面所有的脚本本质上都是通过它来启动 Java 类。

配置文件介绍

https://github.com/apache/kafka/tree/trunk/config

https://kafka.apache.org/42/configuration/

服务端配置文件

server.properties:KRaft 混合模式下的 Broker + Controller 配置,不推荐生产环境使用。适用于快速测试或单节点模式。

broker.properties:KRaft 专用模式下的 Broker 配置。专门用于纯 Broker 节点的配置。负责处理客户端的请求和存储数据。

controller.properties:KRaft 专用模式下的 Controller 配置。专门用于纯 Controller 节点的配置。负责管理集群状态和维护元数据。

客户端配置文件

producer.properties:示例文件,用于 kafka-console-producer.sh

consumer.properties:示例文件,用于 kafka-console-consumer.sh

不直接应用于集群,仅供 CLI 工具加载默认值。

Connect 配置文件

connect-standalone.properties:Kafka Connect 独立工作模式配置;

connect-distributed.properties:Kafka Connect 分布式模式配置;

connect-console-source.properties/connect-console-sink.properties:Console 示例连接器,包括数据源和数据目标。

connect-file-source.properties/connect-file-sink.properties:FileStream 示例连接器,包括数据源和数据目标。

MirrorMaker 2.0 配置文件

connect-mirror-maker.properties:定义跨集群数据复制的规则。与旧版 MirrorMaker 不同,MM2 是基于 Kafka Connect 框架构建的,该文件用于配置这种“连接器模式”的运行参数。

使用场景:集群备份、云迁移、数据聚合

日志配置文件(有惊喜)

log4j2.yaml:Kafka broker 和 controller 默认运行日志配置。

connect-log4j2.yaml:Kafka Connect 和 MirrorMaker 运行日志配置。

tools-log4j2.yaml:Kafka CLI 工具(如 kafka-topics、kafka-consumer-groups 等)运行日志配置。

[!IMPORTANT]

log4j2.yaml 文件中日志输出目录为 ${sys:kafka.logs.dir} ,带有 sys: 前缀的变量会直接跳过内部变量配置(Property 节点),仅读取 Java 系统属性中定义的值。

查看 Kafka 启动脚本:kafka-server-start.sh -> kafka-run-class.sh ,发现几个环境变量的使用:

  • KAFKA_LOG4J_CMD_OPTS="-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS"
  • $KAFKA_OPTS
  • exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_CMD_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@"

所以,要想改变日志位置,可以在启动之前设置环境变量 export LOG_DIR="/data/local/kafka/logs" 。 JVM GC 日志的路径也依赖 LOG_DIR

Trogdor 配置文件

trogdor.conf:Trogdor 分布式测试框架配置(定义 Coordinator 和 Agent 节点)

Linux下安装

下载并解压

二进制下载地址:https://kafka.apache.org/community/downloads/

1
2
3
4
5
6
7
8
9
cd /data/local
tar -zxvf kafka_2.13-4.2.0.tgz
mv kafka_2.13-4.2.0 kafka

# 配置环境变量
grep -qxF 'export PATH=/data/local/kafka/bin:$PATH' /etc/profile || \
echo 'export PATH=/data/local/kafka/bin:$PATH' >> /etc/profile

source /etc/profile

安装依赖

本地必须安装 Java 环境,兼容性:https://kafka.apache.org/42/getting-started/compatibility/

KRaft 动态控制器仲裁的格式化参数

--standalone

  • 定义: 格式化时自动将当前节点自身定义为唯一的初始投票者
  • 场景: 快速创建单节点 KRaft 集群时使用。它省去了手动指定控制器列表的麻烦,系统会自动识别本地配置并完成初始化。
  • 作用: 简化单机模式的配置流程。它相当于执行了 --initial-controllers,但不需要手动列出节点信息,直接确立当前节点为仲裁中的唯一领袖(Leader)。

--initial-controllers

  • 定义: 格式化时明确定义集群的最初投票者列表(Controller 节点)
  • 场景: 首次创建 KRaft 集群时使用。必须确保此处的列表与配置文件中的 controller.quorum.bootstrap.servers 一致,否则可能无法正确初始化。
  • 作用: 设定初始的仲裁列表,定义谁拥有投票权。

--no-initial-controllers

  • 定义: 在格式化新节点时使用,表示该节点不是集群的最初投票者
  • 场景:现有已运行的 KRaft 集群添加新 Controller 节点(动态扩缩容)。
  • 作用: 它允许新节点加入,但不需要其参与最初的投票者元数据生成,通常配合动态控制器功能(该功能允许在不重启整个集群或修改所有节点配置文件的情况下,动态地添加或删除控制器)进行动态成员变更。 当新节点从集群的 Leader 同步完元数据快照和日志后,管理员可手动运行 kafka-metadata-quorum.sh add-controller 命令,正式将新节点从“观察者”提升为“投票者”(即正式加入控制器仲裁集)。

总结:

  • 首次建群: 若为多节点高可用集群,使用 --initial-controllers 明确定义初始投票者;若为单机测试环境,使用 --standalone 快速自立门户。
  • 后期扩容: 向已有集群添加新控制器或更换故障磁盘时,使用 --no-initial-controllers 以“观察者”身份加入,避免元数据冲突,待同步完成后再动态提升为投票者。

单机-KRaft 混合模式

配置

混合节点: cp /data/local/kafka/config/server.properties /data/local/kafka/config/server-standalone.properties

https://github.com/apache/kafka/blob/trunk/config/server.properties

只需要修改 log.dirs ,其他默认值即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
############################# Server Basics #############################
# 此服务器的角色。设置此项会将我们置于 KRaft 模式。
process.roles=broker,controller
# 与此实例角色关联的节点ID
node.id=1
# 控制器集群列表(连接入口),用于节点启动时发现整个控制器集群,支持动态扩容。Broker 和 Controller 启动时通过它接入集群。
controller.quorum.bootstrap.servers=localhost:9093


############################# Socket Server Settings #############################
# 用于 broker 之间通信的监听器名称。
inter.broker.listener.name=BROKER
# 控制器所使用的监听器名称的逗号分隔列表。在与控制器仲裁组通信时,此项为必填项;消息代理将始终使用该列表中的第一个监听器。
# 如果配置了多个 CONTROLLER 监听器名称,代表该节点可以监听多个“控制器通道”,必须在 listeners 中有对应定义。
# 如果 listener.security.protocol.map 中未显式设置映射,则默认使用 PLAINTEXT 协议。如果在 KRaft 模式下运行,则此参数为必需项。
# 官方推荐只定义一个,而不是在同一个节点上开两个控制器监听器,推荐使用 controller.quorum.bootstrap.servers。
controller.listener.names=CONTROLLER
# 将 监听器名称 映射到 安全协议。若未提供映射,则默认安全协议未 PLAINTEXT
listener.security.protocol.map=SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT
# Broker 实际绑定的网卡地址和端口。它告诉 Kafka 在哪些接口上开启监听服务。双斜杠后面直接跟冒号,表示绑定到 0.0.0.0(所有网卡)。
listeners=BROKER://:9092,CONTROLLER://:9093
# Broker 对外暴露(宣布)的地址。这是写入 KRaft 控制器,并在元数据请求中返回给客户端的地址。如果未设置,则使用 "listeners" 的值。
advertised.listeners=BROKER://localhost:9092
# 服务器用于接收网络请求并向网络发送响应的线程数量。注意:每个监听器(controller监听器除外)都会创建自己的线程池。
num.network.threads=3
# 服务器用于处理请求的线程数,其中可能包括磁盘I/O
num.io.threads=8
# 套接字服务器的 SO_SNDBUF 缓冲区。如果该值为 -1,则使用操作系统的默认值。
socket.send.buffer.bytes=102400
# 套接字服务器的 SO_RCVBUF 缓冲区。如果该值为 -1,则使用操作系统的默认值。
socket.receive.buffer.bytes=102400
# 套接字请求的最大字节数/套接字服务器可接受的最大请求大小(防止内存溢出)
socket.request.max.bytes=104857600


############################# Log Basics #############################
# 业务数据日志目录,Kafka 存储消息(Topic 数据)的地方。默认 /tmp/kraft-combined-logs
log.dirs=/data/local/kafka/data
# 每个主题的默认日志分区数。更多分区可以提高并行处理能力,但也会导致各个代理上的文件数量增加。
# 强烈建议保持全集群配置一致。如果不一致,会导致新 Topic 的初始容量变得不可控。
num.partitions=1
# 每个数据目录用于启动时日志恢复和关闭时日志刷新的线程数。建议根据安装资源增加此值。
num.recovery.threads.per.data.dir=2


############################# Internal Topic Settings #############################
# 偏移量(offsets)主题的副本因子(建议设置为较高值以确保可用性)。在集群规模达到此副本因子要求之前,内部主题的创建将会失败。
offsets.topic.replication.factor=1
# 共享消费组(share-group)状态主题的副本因子。在集群规模(节点数)达到此副本因子要求之前,主题创建将失败。
share.coordinator.state.topic.replication.factor=1
# 针对 share-group(共享消费组)状态主题,覆盖(重写)了通用的 min.insync.replicas 配置。
share.coordinator.state.topic.min.isr=1
# 事务主题的副本因子(建议设置为较高值以确保可用性)。在集群规模达到此副本因子要求之前,内部主题的创建将会失败。
transaction.state.log.replication.factor=1
# 要使对事务主题的写入操作被视为成功,必须至少有指定数量的副本对此写入操作进行确认。
transaction.state.log.min.isr=1


############################# Log Retention Policy #############################
# 在删除日志文件之前保留该文件的时长(以小时为单位),该设置优先级低于 log.retention.ms 属性
log.retention.hours=168


############################# 其他 #############################
delete.topic.enable=true
# 在服务器上启用主题自动创建功能。
auto.create.topics.enable=true
# 每个主题的默认副本数。自动创建的主题、将 --replication-factor 设为 -1 创建的主题 的默认副本数量。
default.replication.factor=1

初始化数据目录

1
2
3
4
5
6
7
# 生成集群 UUID,所有机器必须使用同一个 cluster-id(全局唯一)
KAFKA_CLUSTER_ID="$(kafka-storage.sh random-uuid)" && KAFKA_CLUSTER_ID="aM5LdSHYQDuLvBKaeTrcQw"

kafka-storage.sh format \
-t $KAFKA_CLUSTER_ID \
-c /data/local/kafka/config/server-standalone.properties \
--standalone

启动

1
2
3
4
# 前台运行
kafka-server-start.sh /data/local/kafka/config/server-standalone.properties
# 后台运行
kafka-server-start.sh -daemon /data/local/kafka/config/server-standalone.properties

查看集群

1
2
kafka-metadata-quorum.sh --bootstrap-controller localhost:9093 describe --status
kafka-metadata-quorum.sh --bootstrap-server localhost:9092 describe --replication

集群-KRaft 混合模式

配置

节点 1 (127.0.0.1): cp /data/local/kafka/config/server.properties /data/local/kafka/config/server1.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
############################# Server Basics #############################
process.roles=broker,controller
node.id=1
controller.quorum.bootstrap.servers=127.0.0.1:9193,127.0.0.2:9293,127.0.0.3:9393

############################# Socket Server Settings #############################
inter.broker.listener.name=BROKER
controller.listener.names=CONTROLLER
listener.security.protocol.map=SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT
listeners=BROKER://:9192,CONTROLLER://:9193
advertised.listeners=BROKER://127.0.0.1:9192

############################# Log Basics #############################
log.dirs=/data/local/kafka/data1

节点 2 (127.0.0.2): cp /data/local/kafka/config/server1.properties /data/local/kafka/config/server2.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
############################# Server Basics #############################
process.roles=broker,controller
node.id=2
controller.quorum.bootstrap.servers=127.0.0.1:9193,127.0.0.2:9293,127.0.0.3:9393

############################# Socket Server Settings #############################
inter.broker.listener.name=BROKER
controller.listener.names=CONTROLLER
listener.security.protocol.map=SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT
listeners=BROKER://:9292,CONTROLLER://:9293
advertised.listeners=BROKER://127.0.0.2:9292

############################# Log Basics #############################
log.dirs=/data/local/kafka/data2

节点 3 (127.0.0.3): cp /data/local/kafka/config/server2.properties /data/local/kafka/config/server3.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
############################# Server Basics #############################
process.roles=broker,controller
node.id=3
controller.quorum.bootstrap.servers=127.0.0.1:9193,127.0.0.2:9293,127.0.0.3:9393

############################# Socket Server Settings #############################
inter.broker.listener.name=BROKER
controller.listener.names=CONTROLLER
listener.security.protocol.map=SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT
listeners=BROKER://:9392,CONTROLLER://:9393
advertised.listeners=BROKER://127.0.0.3:9392

############################# Log Basics #############################
log.dirs=/data/local/kafka/data3

初始化数据目录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 生成集群 UUID,所有机器必须使用同一个 cluster-id(全局唯一)
KAFKA_CLUSTER_ID="$(kafka-storage.sh random-uuid)" && KAFKA_CLUSTER_ID="aM5LdSHYQDuLvBKaeTrcQw"
# 生成三个节点的 Directory ID (各自唯一)
DIR_ID_1="$(kafka-storage.sh random-uuid)" && DIR_ID_1="DnCdSu9uTJejryl97eLw6w"
DIR_ID_2="$(kafka-storage.sh random-uuid)" && DIR_ID_2="hUcTgc7jQzynpOpPW95rOA"
DIR_ID_3="$(kafka-storage.sh random-uuid)" && DIR_ID_3="tUacrDuTTCefMRBAoooqqA"

kafka-storage.sh format \
-t $KAFKA_CLUSTER_ID \
-c /data/local/kafka/config/server1.properties \
--initial-controllers "1@127.0.0.1:9193:${DIR_ID_1},2@127.0.0.2:9293:${DIR_ID_2},3@127.0.0.3:9393:${DIR_ID_3}"

kafka-storage.sh format \
-t $KAFKA_CLUSTER_ID \
-c /data/local/kafka/config/server2.properties \
--initial-controllers "1@127.0.0.1:9193:${DIR_ID_1},2@127.0.0.2:9293:${DIR_ID_2},3@127.0.0.3:9393:${DIR_ID_3}"

kafka-storage.sh format \
-t $KAFKA_CLUSTER_ID \
-c /data/local/kafka/config/server3.properties \
--initial-controllers "1@127.0.0.1:9193:${DIR_ID_1},2@127.0.0.2:9293:${DIR_ID_2},3@127.0.0.3:9393:${DIR_ID_3}"

启动

1
2
3
4
5
6
7
8
export LOG_DIR="/data/local/kafka/logs/server1"
kafka-server-start.sh /data/local/kafka/config/server1.properties
export LOG_DIR="/data/local/kafka/logs/server2"
kafka-server-start.sh /data/local/kafka/config/server2.properties
export LOG_DIR="/data/local/kafka/logs/server3"
kafka-server-start.sh /data/local/kafka/config/server3.properties

# 关闭最后一个节点,只能 skill -9

查看集群

1
2
3
kafka-metadata-quorum.sh --bootstrap-controller 127.0.0.1:9193 describe --status
kafka-metadata-quorum.sh --bootstrap-controller 127.0.0.2:9293 describe --status
kafka-metadata-quorum.sh --bootstrap-controller 127.0.0.3:9393 describe --status

集群-KRaft 专用模式

配置

Controller 节点: cp /data/local/kafka/config/controller.properties /data/local/kafka/config/controller1.properties

https://github.com/apache/kafka/blob/trunk/config/controller.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
############################# Server Basics #############################
process.roles=controller
node.id=1
controller.quorum.bootstrap.servers=localhost:9093

############################# Socket Server Settings #############################
controller.listener.names=CONTROLLER
listener.security.protocol.map=SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT
listeners=CONTROLLER://:9093
advertised.listeners=CONTROLLER://localhost:9093

############################# Log Basics #############################
log.dirs=/data/local/kafka/data1

Broker 节点: cp /data/local/kafka/config/broker.properties /data/local/kafka/config/broker2.properties

https://github.com/apache/kafka/blob/trunk/config/broker.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
############################# Server Basics #############################
process.roles=broker
node.id=2
controller.quorum.bootstrap.servers=localhost:9093

############################# Socket Server Settings #############################
inter.broker.listener.name=BROKER
listener.security.protocol.map=SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT
listeners=BROKER://localhost:9092
advertised.listeners=BROKER://localhost:9092

############################# Log Basics #############################
log.dirs=/data/local/kafka/data2

初始化数据目录

1
2
3
4
5
6
7
8
9
10
11
12
13
# 生成集群 UUID,所有机器必须使用同一个 cluster-id(全局唯一)
KAFKA_CLUSTER_ID="$(kafka-storage.sh random-uuid)" && KAFKA_CLUSTER_ID="aM5LdSHYQDuLvBKaeTrcQw"

# controller 初始化。为了方便演示,只配置一个 Controller 节点,所以使用 --standalone
kafka-storage.sh format \
-t $KAFKA_CLUSTER_ID \
-c /data/local/kafka/config/controller1.properties \
--standalone

# broker 初始化。
kafka-storage.sh format \
-t $KAFKA_CLUSTER_ID \
-c /data/local/kafka/config/broker2.properties

启动

1
2
3
4
5
6
export LOG_DIR="/data/local/kafka/logs/controller1"
kafka-server-start.sh /data/local/kafka/config/controller1.properties
export LOG_DIR="/data/local/kafka/logs/broker2"
kafka-server-start.sh /data/local/kafka/config/broker2.properties

# 先关闭 broker,再关闭 controller。或者 暴力停止 `skill -9`

查看集群

1
kafka-metadata-quorum.sh --bootstrap-controller localhost:9093 describe --status

快速入门-官网

从主题读取事件-消费者

消费者命令行参数:

完整帮助列表:kafka-console-consumer.sh --help

参数 描述
--bootstrap-server <String: server toconnect to> 连接的Kafka Broker主机名称和端口号。
--topic <String: topic> 操作的topic名称。
--from-beginning 从头开始消费。
--group <String: consumer group id> 指定消费者组名称。

消费者配置文件中的重要属性:

属性 默认值 描述

打开另一个终端会话,运行控制台消费者客户端以读取您刚刚创建的事件:

1
2
3
4
# 消费主题中的新事件
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-events
# 把主题中所有的事件都读出来(包括历史事件)
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-events --from-beginning

您可以随时按 Ctrl-C 停止消费者客户端。

请随意尝试:例如,切换回生产端终端(上一步),写入更多事件,然后观察这些事件如何立即显示在消费者端终端上。

由于事件会被持久化存储在 Kafka 中(默认保留 7 天),因此可以被任意多次读取,且支持任意数量的消费者进行读取。您只需再打开一个终端会话并重新运行之前的命令,即可轻松验证这一点。

使用 Kafka Connect 将数据作为事件流导入/导出

您可能在现有系统(例如关系数据库或传统消息系统)中拥有大量数据,并且许多应用程序已经在使用这些系统。Kafka Connect 允许您持续地将数据从外部系统导入 Kafka Topic,反之亦然。运行连接器是一个可扩展的工具,这些连接器实现了与外部系统交互的自定义逻辑。因此,将现有系统与 Kafka 集成非常容易。为了进一步简化此过程,Kafka 提供了数百个现成的连接器。

下面了解如何使用简单的连接器运行 Kafka Connect,实现将数据从文件导入到 Kafka Topic,以及将数据从 Kafka Topic 导出到文件

请确保将 connect-file-4.2.0.jar 添加到 Connect Worker 进程配置中的 plugin.path 属性。

1
2
cd /data/local/kafka
echo "plugin.path=libs/connect-file-4.2.0.jar" >> config/connect-standalone.properties

接下来,先创建一些用于测试的初始数据:

1
2
echo foo > test.txt
echo bar >> test.txt

接下来,我们将启动两个以独立模式运行的连接器,这意味着它们运行在单个本地专用进程中。我们提供三个配置文件作为参数。第一个始终是 Kafka Connect 进程的配置,其中包含通用配置,例如要连接的 Kafka broker 和数据序列化格式。其余配置文件分别指定要创建的连接器。这些文件包含唯一的连接器名称、要实例化的连接器类以及连接器所需的任何其他配置。

1
2
# 三个配置文件分别是 Worker进程配置、数据源(Source)配置、数据目标(Sink)配置
connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

Kafka 附带的这些示例配置文件使用您之前启动的默认本地集群配置,并创建两个连接器:

  • 源连接器,它从文件中读取行,并将每一行作为一个事件写入 Kafka Topic。
  • 接收器连接器,它从 Kafka Topic 中读取事件,并将每个事件作为一行输出到文件。

启动过程中,您会看到一些日志消息,其中一些指示连接器正在实例化。Kafka Connect 进程启动后,源连接器应开始从 test.txt 读取行并将其写入主题 connect-test,而接收器连接器应开始从主题 connect-test 中读取事件并将其输出到文件 test.sink.txt。我们可以通过检查输出文件的内容来验证数据是否已通过整个管道传输:more test.sink.txt

请注意,数据正存储在 Kafka 主题 connect-test 中,因此我们还可以运行一个控制台消费者来查看该主题中的数据(或使用自定义消费者代码进行处理):

1
2
3
kafka-console-consumer.sh --topic connect-test --from-beginning --bootstrap-server localhost:9092
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}

连接器会持续处理数据,因此我们可以向文件中添加数据 echo "Another line" >> test.txt,并观察数据在管道中的流动。控制台消费者 和 接收文件 应该能看到该行数据

为什么用它?(与直接写代码的区别)

零代码/低代码:只需通过 JSON 配置文件即可启动连接器(Connector),无需为每个数据库编写重复的同步逻辑。

高可靠性:自动管理 offset(偏移量),如果同步过程中断,重启后能从断点继续,保证数据不丢不重。

横向扩展:支持集群模式(Distributed Mode),可以轻松增加节点来应对海量数据的处理压力。

转换能力 (SMTs):可以在数据流动过程中进行简单的转换,比如隐藏敏感字段、给数据打标签或格式转换。

常见应用场景

构建数据仓库/湖:把业务库(MySQL/Oracle)的数据实时同步到数据湖(Iceberg/HDFS)或数仓。

实时搜索:将数据库的更新实时推送到 Elasticsearch 供前端搜索。

多云/异构系统集成:连接 AWS S3、Google Cloud Pub/Sub、MongoDB 等各种云服务和中间件。

使用 Kafka Streams 处理事件

Kafka Streams 是一个 Java 库(Jar 包)。它让你的 Java 程序具备了实时处理海量数据的能力,而你只需要像写普通的 Java 代码一样去调用它的 API。

Kafka Streams = Kafka 读写能力 + 强大的逻辑处理(计算/转换/聚合) + 自动容错。

终止 Kafka 环境

终止 Ctrl-C

若想删除本地 Kafka 环境中的所有数据,包括创建的所有事件,请运行以下命令:

1
2
# 默认目录
rm -rf /tmp/kafka-logs /tmp/kraft-combined-logs

主题

主题命令行参数:

完整帮助列表:kafka-topics.sh --help

参数 描述
--bootstrap-server <String: server toconnect to> 连接的Kafka Broker主机名称和端口号,逗号分隔。
--topic <String: topic> 操作的topic名称。
--create 创建主题。
--delete 删除主题。
--alter 修改主题。
--list 查看所有主题。
--describe 查看主题详细信息。
--partitions <Integer: # of partitions> 设置分区数。
若未指定,则使用集群默认配置(参数 default.replication.factor)。
修改分区数时只能增加,不能减少。
--replication-factor<Integer: replication factor> 设置分区副本数,即数据的总份数,包含“主本”在内。
若未指定,则使用集群默认配置(参数 default.replication.factor)。
不能通过命令行修改。
--config <String: name=value> 更新系统默认的配置。

主题配置文件中的重要属性:

属性 默认值 描述

Kafka 是一个分布式事件流平台,允许您跨多台机器读取、写入、存储和处理事件(在文档中也称为记录或消息)。

例如,事件包括支付交易、来自手机的地理位置更新、发货订单、来自物联网设备或医疗设备的传感器测量数据等等。这些事件被组织并存储在主题中。简单来说,主题类似于文件系统中的文件夹,而事件就是该文件夹中的文件。

因此,在编写第一个事件之前,您必须先创建一个主题。打开另一个终端会话并运行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 查看服务器中的所有主题
kafka-topics.sh --bootstrap-server localhost:9092 --list
# 创建主题
kafka-topics.sh --bootstrap-server localhost:9092 --topic test-events --create # --partitions 1 --replication-factor 1
# 查看主题详细信息
kafka-topics.sh --bootstrap-server localhost:9092 --topic test-events --describe
# 查看主题的分区目录
ll -d /data/local/kafka/data/test-events*

# 修改分区数(注意:分区数只能增加,不能减少)
kafka-topics.sh --bootstrap-server localhost:9092 --topic test-events --alter --partitions 3
# 再次查看主题详细信息
kafka-topics.sh --bootstrap-server localhost:9092 --topic test-events --describe
# 查看主题的分区目录
ll -d /data/local/kafka/data/test-events*

# 删除主题
kafka-topics.sh --bootstrap-server localhost:9092 --topic test-events --delete

生产者-向主题写入事件

生产者命令行参数

完整帮助列表:kafka-console-producer.sh --help

参数 描述
--bootstrap-server <String: server toconnect to> 连接的Kafka Broker主机名称和端口号。
--topic <String: topic> 操作的topic名称。
--command-config 指定配置文件。
适合配置较多或包含敏感信息(如 SASL 认证的用户名和密码)的情况。
--command-property key=value 的形式传递单项配置
适合快速临时修改单个非敏感参数,例如调整 acks 等级或 batch.size

生产者重要属性

属性 默认值 描述
bootstrap.servers 用于与 Kafka 集群建立初始连接的主机/端口对列表。客户端使用此列表进行初始化并发现完整的 Kafka 代理集。虽然列表中服务器的顺序无关紧要,但我们建议包含多个服务器,以确保在任何服务器不可用时具备容错能力。此列表无需包含全部代理,因为 Kafka 客户端会自动高效地管理和更新与集群的连接。此列表必须采用 host1:port1,host2:port2,…. 的格式。
key.serializer 用于键的序列化器类,该类实现了 org.apache.kafka.common.serialization.Serializer 接口。
value.serializer 用于值的序列化器类,该类实现了 org.apache.kafka.common.serialization.Serializer 接口。
partitioner.class null 确定在生成记录时将记录发送至哪个分区。可用选项包括:
1、如果未设置,则使用默认的分区逻辑。该策略会将记录发送到某个分区,直到该分区接收到的数据量至少达到 batch.size 字节为止。它与以下策略配合使用:
 a、如果未指定分区但存在键,则根据键的哈希值选择分区。
 b、如果既未指定分区也未提供键,请选择当该分区接收的数据量达到至少 batch.size 字节时才会发生变化的粘性分区。
2、org.apache.kafka.clients.producer.RoundRobinPartitioner:一种分区策略,其中连续记录序列中的每条记录都会被发送到不同的分区,无论是否提供了“键”,直到分区用尽,然后该过程重新开始。注意:存在一个已知问题,即创建新批次时会导致分布不均。详情请参阅 KAFKA-9965。
实现 org.apache.kafka.clients.producer.Partitioner 接口可让您接入自定义分区器。
batch.size 16384 当向同一分区发送多条记录时,生产端会尝试将这些记录批量合并,以减少请求次数。这有助于提升客户端和服务器端的性能。此配置用于控制默认的批处理大小(以字节为单位)。
linger.ms 5 生产者会将两次请求传输之间收到的所有记录合并为一个批处理请求。通常,这种情况仅在高负载下发生,即记录到达速度快于发送速度时。但在某些情况下,即使在中等负载下,客户端也可能希望减少请求数量。**此设置通过添加少量人工延迟来实现这一目标——也就是说,生产者不会立即发送记录,而是会等待最长为给定延迟的时间,以便将其他记录也发送出去,从而将这些发送操作批量处理。**这可以类比为 TCP 中的纳格尔算法。**该设置为批处理的延迟提供了上限:一旦某个分区积累了与 batch.size 相当的记录,无论此设置如何,都将立即发送;但如果该分区积累的字节数少于此值,则会“滞留”指定时间,等待更多记录出现。该设置默认值为 5(即 5 毫秒延迟)。**例如,将 linger.ms 设为 50 会减少发送的请求数量,但在无负载情况下,会使记录的延迟累计增加 50 毫秒。Apache Kafka 4.0 中该默认值从 0 改为 5,因为尽管 linger 值增大,但更大的批次通常能带来效率提升,从而使生产者延迟保持在相同或更低的水平。
buffer.memory 33554432 生产者可用于缓冲待发送至服务器的记录的总内存字节数。如果记录的发送速度快于其被传递至服务器的速度,生产者将阻塞 max.block.ms 毫秒,之后将抛出异常并失败。
此设置应大致对应生产者将使用的总内存,但并非硬性限制,因为生产者使用的内存并非全部用于缓冲。部分额外内存将用于压缩(如果启用了压缩)以及维护正在传输中的请求。
默认 32 M。
compression.type none 生产者生成的所有数据的压缩类型。默认值为 none(即不进行压缩)。有效值为 none、gzip、snappy、lz4 或 zstd。压缩操作针对完整的数据批次进行,因此批处理的效率也会影响压缩率(批处理次数越多,压缩效果越好)。
acks all 生产者要求领导者收到多少个确认后,才将请求视为已完成。这控制了所发送记录的持久性。允许使用以下设置:
acks=0 如果设置为零,则生产者将完全不等待服务器的任何确认。记录将立即添加到套接字缓冲区中,并被视为已发送。在此情况下,无法保证服务器已收到该记录,且重试配置将不会生效(因为客户端通常无法得知任何失败情况)。每条记录返回的偏移量将始终设置为 -1。
acks=1 这意味着 leader 会将记录写入其本地日志,但在未等待所有 follower 完全确认的情况下便会做出响应。在这种情况下,如果 leader 在确认记录后立即发生故障,且此时 follower 尚未完成复制,则该记录将会丢失。 一般用于传输普通日志,允许丢个别数据。
acks=all 这意味着主节点将等待所有同步的副本都确认该记录。这保证了只要至少有一个同步的副本仍处于活动状态,该记录就不会丢失。这是目前可用的最强保证。这相当于 acks=-1 的设置。 一般用于传输和钱相关的数据,对可靠性要求比较高的场景。
请注意,要启用幂等性,此配置值必须设为“all”。如果设置了冲突的配置且未显式启用幂等性,则幂等性将被禁用。
retries 2147483647 因瞬时错误导致请求失败时的重试次数。设置大于零的值将导致客户端重新发送任何因潜在瞬时错误而发送失败的记录。请求将重试此次数,直到成功、因非瞬时错误失败,或 delivery.timeout.ms 超时为止。请注意,此自动重试功能仅会在收到错误时重新发送相同的记录。将该值设为零将禁用此自动重试行为,此时瞬时错误将传递给应用程序进行处理。通常建议用户不设置此配置项,而是使用 delivery.timeout.ms 来控制重试行为。
要启用幂等性,此配置值必须大于 0。如果设置了冲突的配置且未显式启用幂等性,则幂等性将被禁用。
在将 enable.idempotence 设置为 falsemax.in.flight.requests.per.connection 设置为大于 1 的情况下允许重试,可能会改变记录的排序顺序。因为如果向单个分区发送两个批次,且第一个批次失败并被重试,而第二个批次成功,那么第二个批次中的记录可能会出现在前面。
delivery.timeout.ms 120000 (2 minutes) 调用 send() 返回后,报告成功或失败的时限上限。这限制了记录在发送前被延迟的总时间、等待代理确认的时间(如果需要确认),以及允许重试发送失败的时间。如果遇到不可恢复的错误、重试次数已用尽,或者记录被添加到已达到较早投递过期截止时间的批次中,生产者可能会在此配置时间之前报告记录发送失败。此配置的值应大于或等于 request.timeout.ms 和 linger.ms 的总和。
request.timeout.ms 30000 (30 seconds) 该配置控制客户端等待请求响应的最长时间。如果在超时结束前未收到响应,客户端将在必要时重新发送请求;若重试次数已用尽,则会将请求标记为失败。该值应大于 replica.lag.time.max.ms(一个代理配置项),以减少因生产者不必要的重试而导致消息重复的可能性。
enable.idempotence true 当设置为“true”时,发布者将确保每个消息在流中仅写入一份。如果设置为“false”,则发布者因代理故障等原因进行的重试可能会在流中写入重试消息的重复副本。请注意,启用幂等性需要满足以下条件:max.in.flight.requests.per.connection 小于或等于 5(对于任何允许的值,消息顺序均需保持),重试次数大于 0,且确认模式必须为 ‘all’。
max.in.flight.requests.per.connection 5 客户端在阻塞前,单个连接上发送的未确认请求的最大数量。请注意,如果此配置值大于 1,且 enable.idempotence 设置为 false,则在发送失败后,由于重试(即如果启用了重试),消息可能会发生重新排序;如果禁用了重试,或者 enable.idempotence 设置为 true,则消息顺序将得到保留。此外,启用幂等性要求此配置的值小于或等于 5,因为消息代理(broker)为每个生产者最多仅保留 5 个批次。如果该值大于 5,消息代理端可能会移除之前的批次。
transactional.id null 用于事务性投递的 TransactionalId。这启用了跨越多个生产者会话的可靠性语义,因为它允许客户端确保在使用相同 TransactionalId 的事务完成之前,不会启动任何新事务。如果未提供 TransactionalId,则生产者仅限于幂等投递。如果配置了 TransactionalId,则默认启用 idempotence。默认情况下未配置 TransactionalId,这意味着无法使用事务。请注意,默认情况下,事务需要至少包含三个代理的集群,这是生产环境的推荐设置;在开发环境中,您可以通过调整代理设置 transaction.state.log.replication.factor 来更改此配置。

快速入门

Kafka 客户端通过网络与 Kafka broker 通信,以写入(或读取)事件。broker 接收到事件后,会以持久且容错的方式存储这些事件,存储时间长短完全由您决定,甚至可以永久保存。

运行控制台生产者客户端,向主题写入一些事件。默认情况下,您输入的每一行都会生成一个独立的事件并写入主题。

1
2
3
4
# 发送消息。开启 auto.create.topics.enable 以支持自动创建主题
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-events
>hello world
>123 456 789

您可以随时按 Ctrl+C 停止生产者客户端。

生产者消息发送流程

在消息发送的过程中,涉及到了两个线程——main线程Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。

image-20260429105135307

异步发送API

普通异步发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class CustomProducer {

public static void main(String[] args) throws InterruptedException {

// 1. 创建kafka生产者的配置对象
Properties properties = new Properties();

// 2. 给kafka配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");

// key,value序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

// 3. 创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

// 4. 调用send方法,发送消息
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first","Ablaze " + i));
}

// 5. 关闭资源
kafkaProducer.close();
}
}

带回调函数的异步发送

回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class CustomProducerCallback {

public static void main(String[] args) throws InterruptedException {

// 1. 创建kafka生产者的配置对象
Properties properties = new Properties();

// 2. 给kafka配置对象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");

// key,value序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 3. 创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

// 4. 调用send方法,发送消息
for (int i = 0; i < 5; i++) {

// 添加回调
kafkaProducer.send(new ProducerRecord<>("first", "Ablaze " + i), new Callback() {

// 该方法在Producer收到ack时调用,为异步调用
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {

if (exception == null) {
// 没有异常,输出信息到控制台
System.out.println("主题:" + metadata.topic() + "->" + "分区:" + metadata.partition());
} else {
// 出现异常打印
exception.printStackTrace();
}
}
});

// 延迟一会会看到数据发往不同分区
Thread.sleep(2);
}

// 5. 关闭资源
kafkaProducer.close();
}
}

同步发送API

只需在异步发送的基础上,再调用一下get()方法即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducerSync {

public static void main(String[] args) throws InterruptedException, ExecutionException {

// 1. 创建kafka生产者的配置对象
Properties properties = new Properties();

// 2. 给kafka配置对象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");

// key,value序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 3. 创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

// 4. 调用send方法,发送消息
for (int i = 0; i < 10; i++) {

// 异步发送 默认
// kafkaProducer.send(new ProducerRecord<>("first","kafka" + i));
// 同步发送
kafkaProducer.send(new ProducerRecord<>("first","kafka" + i)).get();

}

// 5. 关闭资源
kafkaProducer.close();
}
}

生产者分区

分区的好处

1、便于合理使用存储资源,每个 partition 在一个 broker 上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台 broker 上。合理控制分区的任务,可以实现负载均衡的效果。

2、提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据

image-20260429110446318

生产者发送消息的分区策略

默认的分区器

默认的分区器:DefaultPartitioner

分区原则:ProducerRecord

image-20260429114438999

案例一

将数据发往指定partition的情况下,例如,将所有数据发往分区1中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class CustomProducerCallbackPartitions {

public static void main(String[] args) {

// 1. 创建kafka生产者的配置对象
Properties properties = new Properties();

// 2. 给kafka配置对象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");

// key,value序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

for (int i = 0; i < 5; i++) {
// 指定数据发送到1号分区,key为空(IDEA中ctrl + p查看参数)
kafkaProducer.send(new ProducerRecord<>("first", 1, "", "Ablaze " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e == null){
System.out.println("主题:" + metadata.topic() + "->" + "分区:" + metadata.partition()
);
}else {
e.printStackTrace();
}
}
});
}

kafkaProducer.close();
}
}

案例二

没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class CustomProducerCallback {

public static void main(String[] args) {

Properties properties = new Properties();

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

for (int i = 0; i < 5; i++) {
// 依次指定key值为a,b,f ,数据key的hash值与3个分区求余,分别发往1、2、0
kafkaProducer.send(new ProducerRecord<>("first", "a", "Ablaze " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e == null){
System.out.println("主题:" + metadata.topic() + "->" + "分区:" + metadata.partition()
);
}else {
e.printStackTrace();
}
}
});
}

kafkaProducer.close();
}
}

自定义分区器

研发人员可以根据企业需求,自己重新实现分区器。

**需求:**例如我们实现一个分区器实现,发送过来的数据中如果包含Ablaze,就发往0号分区,不包含Ablaze,就发往1号分区。

实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
* 1. 实现接口Partitioner
* 2. 实现3个方法:partition,close,configure
* 3. 编写partition方法,返回分区号
*/
public class MyPartitioner implements Partitioner {

/**
* 返回信息对应的分区
* @param topic 主题
* @param key 消息的key
* @param keyBytes 消息的key序列化后的字节数组
* @param value 消息的value
* @param valueBytes 消息的value序列化后的字节数组
* @param cluster 集群元数据可以查看分区信息
* @return
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

// 获取消息
String msgValue = value.toString();

// 创建partition
int partition;

// 判断消息是否包含Ablaze
if (msgValue.contains("Ablaze")){
partition = 0;
}else {
partition = 1;
}

// 返回分区号
return partition;
}

// 关闭资源
@Override
public void close() {

}

// 配置方法
@Override
public void configure(Map<String, ?> configs) {

}
}

使用分区器的方法,在生产者的配置中添加分区器参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class CustomProducerCallbackPartitions {

public static void main(String[] args) throws InterruptedException {

Properties properties = new Properties();

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 添加自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.Ablaze.kafka.producer.MyPartitioner");

KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

for (int i = 0; i < 5; i++) {

kafkaProducer.send(new ProducerRecord<>("first", "Ablaze " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e == null){
System.out.println("主题:" + metadata.topic() + "->" + "分区:" + metadata.partition()
);
}else {
e.printStackTrace();
}
}
});
}

kafkaProducer.close();
}
}

生产经验

生产者如何提高吞吐量

提高 Kafka 生产者吞吐量相关参数:batch.sizelinger.msbuffer.memorycompression.type

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class CustomProducerParameters {

public static void main(String[] args) throws InterruptedException {

// 1. 创建kafka生产者的配置对象
Properties properties = new Properties();

// 2. 给kafka配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");

// key,value序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

// batch.size:批次大小,默认16384(16KB)。
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

// linger.ms:等待时间,默认5ms
properties.put(ProducerConfig.LINGER_MS_CONFIG, 50);

// RecordAccumulator:缓冲区大小,默认33554432(32MB)。由属性 buffer.memory 控制。
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);

// compression.type:压缩,默认none,可配置值gzip、snappy、lz4和zstd。
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

// 3. 创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

// 4. 调用send方法,发送消息
for (int i = 0; i < 5; i++) {

kafkaProducer.send(new ProducerRecord<>("first","atguigu " + i));

}

// 5. 关闭资源
kafkaProducer.close();
}
}

发送可靠消息

生产者配置属性 acks=all 的场景分析:

image-20260429123843906

消息去重

消息重复分析

acks=all 时,生产者发送消息,leader 收到消息且与 ISR 队列里的所有 follower 同步完成,在响应生产者的一瞬间 leader 挂了,导致 ack 失败。然后生产者会再选择一个 leader 重新发消息,此时消息重复(因为上次发送的消息已经同步完成)。虽然概率很低,但确实有可能发生。

消息传递语义

  • 至少一次(At Least Once):ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2保证消息不丢失,但可能重复。
  • 最多一次(At Most Once):ACK级别设置为0保证消息不重复,但可能丢失。
  • 精确一次(Exactly Once):幂等性 + 至少一次保证消息既不重复也不丢失

幂等性

幂等性是指无论 Producer 向 Broker 发送多少次相同的消息,Broker 端都只会持久化一条,保证消息不重复。

Kafka 实现幂等性的核心原理是引入了 PID(Producer ID)Sequence Number(序列号) 机制。以下是其运作的具体步骤:

核心概念:

  • PID:当开启幂等性(配置 enable.idempotence=true)后,每个 Producer 在初始化启动时,都会向 Broker 申请并被分配一个唯一的 PID。这个过程对用户是完全透明的。
  • Sequence Number:Producer 发送的每一条消息(更准确地说是每一个消息批次 RecordBatch)都会被分配一个从 0 开始单调递增的序列号。这个序列号的作用域是 <PID, Topic, Partition> 三元组。也就是说,针对同一个 Producer 的同一个 Topic 的同一个 Partition,序列号从 0 开始严格递增。

Broker 端去重逻辑:

Broker 接收到消息后,会在内存中为每个 <PID, Topic, Partition> 维护一个状态,记录其对应的最新已提交序列号(假设记录为 SN_broker)。当 Broker 收到新消息时(带有序列号 SN_new),会执行以下校验逻辑:

  • 如果 SN_new == SN_broker + 1: 这是预期的正常情况,说明消息是连续的。Broker 正常接收并追加该消息,同时更新内存中的 SN_broker
  • 如果 SN_new <= SN_broker: 这说明该序列号的消息已经被 Broker 成功接收并写入过了。这是典型的网络延迟导致 Producer 误以为失败而发起的重试。此时,Broker 会丢弃这条重复消息,但依然会向 Producer 返回 ACK (确认成功),让 Producer 停止重试。
  • 如果 SN_new > SN_broker + 1: 这说明序列号出现了跳跃,意味着中间有消息丢失了(可能之前的消息发送失败且重试耗尽)。此时,Broker 会拒绝该消息,并抛出 OutOfOrderSequenceException 异常,通常会导致 Producer 进入致命错误状态。

幂等性的局限性(作用范围):

  • 只能保证单分区 (Single Partition):幂等性的序列号是绑定在特定 Partition 上的。如果消息被路由到了不同的 Partition,幂等性无法生效。
  • 只能保证单会话 (Single Session):PID 是 Producer 进程启动时动态分配的。**如果 Producer 应用崩溃重启,它会获得一个新的 PID。**对于 Broker 来说,这完全是一个新的生产者,之前的序列号状态失效,因此无法防范应用重启导致的重复发送。

如果你的业务有着更严格的一致性要求——比如要求 “即使 Producer 崩溃重启,也不能发送重复消息”,或者要求 “同时向多个 Partition 发送消息时,必须全部成功或全部失败”——那么仅靠 Kafka 的幂等性是无法满足的。这时,你需要配合使用 Kafka 的事务机制 (Transactions)。事务机制要求开发者为 Producer 指定一个全局唯一的、固定不变的 Transactional.id (TID)。Kafka 底层会将这个固定的 TID 与动态的 PID 绑定。这样一来,即使 Producer 发生重启,Kafka 也能通过 TID 认出这个“老客户端”,从而接续之前的防重状态;同时,事务还能保证跨多个 Partition 写入时的原子性。

开启幂等性 enable.idempotence=true

生产者事务

开启事务必须要开启幂等性。

事务原理:

image-20260430110136191

Kafka的事务一共有如下 5 个API:

1
2
3
4
5
6
7
8
9
10
11
// 1、初始化事务
void initTransactions();
// 2、开启事务
void beginTransaction() throws ProducerFencedException;
// 3、在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException;
// 4、提交事务
void commitTransaction() throws ProducerFencedException;
// 5、放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

单个Producer,使用事务保证消息的仅一次发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class CustomProducerTransactions {

public static void main(String[] args) throws InterruptedException {

// 1. 创建kafka生产者的配置对象
Properties properties = new Properties();

// 2. 给kafka配置对象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");

// key,value序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 设置事务id(必须),事务id任意起名
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0");

// 3. 创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

// 初始化事务
kafkaProducer.initTransactions();
// 开启事务
kafkaProducer.beginTransaction();
try {
// 4. 调用send方法,发送消息
for (int i = 0; i < 5; i++) {
// 发送消息
kafkaProducer.send(new ProducerRecord<>("first", "Ablaze " + i));
}

// 这里会抛出异常,导致事务回滚
int i = 1 / 0;

// 提交事务
kafkaProducer.commitTransaction();

} catch (Exception e) {
// 终止事务
kafkaProducer.abortTransaction();
} finally {
// 5. 关闭资源
kafkaProducer.close();
}
}
}

消息有序

Topic 单分区:消息有序,但条件如下:

  • 未开启幂等性:max.in.flight.requests.per.connection 需要设置为 =1
  • 开启幂等性:max.in.flight.requests.per.connection 需要设置为 <=5 (见官方属性说明)。

Topic 多分区:每个分区消息有序,分区间消息无序。

  • 如何做到多分区间消息有序呢?方案:同一个消费者拉取所有分区的数据,然后进行全排序,效率低,建议使用单分区。

Kafka Broker

扩展集群

https://kafka.apache.org/42/operations/basic-kafka-operations/#expanding-your-cluster