kafka安装和配置

kafka 4.0.0 (用kraft模式)

1.安装kafka 需要有jdk环境

1
#wget https://downloads.apache.org/kafka/4.0.0/kafka_2.13-4.0.0.tgz

1.1修改配置文件

1
2
3
4
5
6
7
8
9
10
11
12
# cat config/server.properties 
node.id=1 #唯一的值
log.dirs=/data/kafka-logs

listeners=PLAINTEXT://192.168.85.128:9092,CONTROLLER://192.168.85.128:9093 #每个节点的ip
controller.listener.names=CONTROLLER
inter.broker.listener.name=PLAINTEXT
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT

process.roles=broker,controller
controller.quorum.voters=1@192.168.85.128:9093,2@192.168.85.129:9093,3@192.168.85.130:9093 #固定这样配置

1.2每个节点执行一下(初始化 Kafka 数据目录)

1
2
3
4
5
6
7
#bin/kafka-storage.sh format -t cluster-id-123 -c config/server.properties
#cat /data/kafka-logs/meta.properties
node.id=1
directory.id=T-V8hYIPF4f6O3aJf0dB7A
version=1
cluster.id=cluster-id-123 #确保这个值在三台机子都一样

1.3启动集群

1
#nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &

1.4测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#bin/kafka-topics.sh --bootstrap-server 192.168.85.128:9092 --list
出现topic或者没有报错则为成功

#bin/kafka-topics.sh --bootstrap-server 192.168.85.128:9092 --create --topic test-topic --partitions 3 --replication-factor 2
--topic: 指定要创建的主题名称。

--partitions: 指定主题的分区数量。

--replication-factor: 指定副本因子。

启动生产者
#bin/kafka-console-producer.sh --bootstrap-server 192.168.85.128:9092 --topic test-topic #ip随便写哪个
启动消费者
#bin/kafka-console-consumer.sh --bootstrap-server 192.168.85.128:9092 --topic test-topic --from-beginning

配置优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# Kafka Broker 配置

# Kafka Broker 的唯一标识符
broker.id=1

# 设置监听的 IP 和端口
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://kafka-broker-1:9092

# 日志存储目录
log.dirs=/mnt/data/kafka-logs,/mnt/data2/kafka-logs

# 每个 Kafka 分区默认的分区数
num.partitions=3

# 写入确认机制
acks=all # 所有副本都确认后再返回

# 每个请求最大字节数
fetch.max.bytes=52428800 # 50 MB

# 网络线程数
num.network.threads=3

# I/O 线程数
num.io.threads=8

# 处理每秒钟的最大请求数
queued.max.requests=500

# 日志文件的最大大小
log.segment.bytes=1073741824 # 1 GB

# 日志文件滚动时的最大时间
log.roll.ms=86400000 # 24 hours

# 日志清理策略(删除策略)
log.cleanup.policy=delete

# 设置每个分区的最大保留大小(10GB)
log.retention.bytes=10737418240 # 10 GB

# 设置日志的最大保留时间(7天)
log.retention.hours=168 # 7 days

# 控制每个分区的最大 ISR 数量
min.insync.replicas=2 # 至少2个副本同步成功

# 控制副本拉取数据的最大字节数
replica.fetch.max.bytes=10485760 # 10 MB

# 副本拉取的最大等待时间
replica.fetch.wait.max.ms=500 # 500ms

# 设置网络缓冲区大小
socket.send.buffer.bytes=1048576 # 1 MB
socket.receive.buffer.bytes=1048576 # 1 MB

# 设置最大网络连接空闲时间
connections.max.idle.ms=600000 # 10 minutes

# 服务器的最大连接数
max.connections=10000

# 客户端与服务器之间的最大请求字节数
max.request.size=10485760 # 10 MB

# Kafka 的最大消息批量大小
batch.size=16384 # 16 KB

# Kafka 高级日志压缩设置(消息压缩)
compression.type=snappy

# 增强 Kafka 对事务的支持
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2

# 允许的最大消息大小(默认为 1 MB)
message.max.bytes=1048576 # 1 MB

# Kafka 使用 G1GC 进行垃圾回收的 JVM 配置
export KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=200"

# Kafka 使用的堆内存大小
export KAFKA_HEAP_OPTS="-Xms4G -Xmx4G"

# 配置监听协议
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT
listener.name.internals=PLAINTEXT://0.0.0.0:9092

# Kafka 启动时使用的线程池大小
num.recovery.threads.per.data.dir=1

# Kafka 聚合或压缩日志策略
log.cleaner.min.cleanable.ratio=0.5 # 设置日志清理的最小可清理比例

# Kafka 集群的 ZooKeeper 配置(如果是基于 ZooKeeper 的)
zookeeper.connect=192.168.85.128:2181,192.168.85.129:2181,192.168.85.130:2181
zookeeper.connection.timeout.ms=6000

# Kafka 集群的连接超时和心跳配置
socket.connection.setup.timeout.ms=10000
socket.connection.setup.timeout.max.ms=30000

# Kafka 最大队列请求数
queued.max.requests=500

# Kafka 数据传输的最大文件大小
log.max.message.bytes=1000000 # 1 MB

kafka安装和配置
https://www.tiantian123.asia/2025/05/26/kafka_4.0.0_(用kraft模式)/
作者
lht
发布于
2025年5月26日
许可协议