Skip to content

kafka 集群部署(3 个节点)

一、集群规划

在 node1、node2 和 node3 三个节点上都部署 kafka。

二、下载

shell
# 下载kafka-华为镜像站下载的3.0.0版本
cd ~ && wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.0.0/kafka_2.12-3.0.0.tgz --no-check-certificate

二、安装

shell
# 解压
tar -zxvf kafka_2.12-3.0.0.tgz -C /usr/local/
# 修改文件夹名称
mv /usr/local/kafka_2.12-3.0.0/ /usr/local/kafka

三、配置、启动

1、修改配置文件共同配置

shell
mkdir -p /usr/local/kafka/logdirs
#注释掉原有的配置
sed "s/^log.dirs=.*/#&/" /usr/local/kafka/config/server.properties  -i
#加入新配置
grep -q '^#log.dirs=' /usr/local/kafka/config/server.properties && sed -i '/^#log.dirs=.*/a\log.dirs=/usr/local/kafka/logdirs' /usr/local/kafka/config/server.properties || echo 'log.dirs=/usr/local/kafka/logdirs' >> /usr/local/kafka/config/server.properties

2、修改 zookeeper 配置(可选)

如果需要,也可以修改 zookeeper 的配置

properties
# zookeeper.connect=node1:2181,node2:2181,node3:2181/kafka
# zookeeper.connection.timeout.ms=18000
#注释掉原有的配置
sed "s/^zookeeper.connect=.*/#&/" /usr/local/kafka/config/server.properties  -i
#加入新配置
grep -q '^#zookeeper.connect=' /usr/local/kafka/config/server.properties && sed -i '/^#zookeeper.connect=.*/a\zookeeper.connect=node1:2181,node2:2181,node3:2181/kafka' /usr/local/kafka/config/server.properties || echo 'zookeeper.connect=node1:2181,node2:2181,node3:2181/kafka' >> /usr/local/kafka/config/server.properties

3、不同的服务器不同配置

(1)配置服务器编号:broker.id 分别设置为 0、1、2

shell
sed "s/^broker.id=.*/#&/" /usr/local/kafka/config/server.properties  -i
# node1
sed -i '/^#broker.id=.*/a\broker.id=0' /usr/local/kafka/config/server.properties
# node2
sed -i '/^#broker.id=.*/a\broker.id=1' /usr/local/kafka/config/server.properties
# node3
sed -i '/^#broker.id=.*/a\broker.id=2' /usr/local/kafka/config/server.properties

4、启动 zookeeper 和 kafka

shell
# 如果需要启动zookeeper的话
cd /usr/local/kafka && ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
# 启动命令,如果需要后台启动,则加上 -daemon 参数即可
cd /usr/local/kafka && ./bin/kafka-server-start.sh -daemon ./config/server.properties

5、关闭 kafka

shell
# 关闭命令
 cd /usr/local/kafka && ./bin/kafka-server-stop.sh

四、Kraft 模式

1、集群规划

在 node1、node2 和 node3 三个节点上都作为 broker, controller 节点。

如果后续还有 node4、和 node5 节点,这两个节点配置为 broker 节点。

2、修改配置文件共同配置

shell
mkdir -p /usr/local/kafka/logdirs2
#注释掉原有的配置
sed "s/^controller.quorum.voters=.*/#&/" /usr/local/kafka/config/kraft/server.properties  -i
sed "s/^log.dirs=.*/#&/" /usr/local/kafka/config/kraft/server.properties  -i
#加入新配置
grep -q '^#controller.quorum.voters=' /usr/local/kafka/config/kraft/server.properties && sed -i '/^#controller.quorum.voters=.*/a\controller.quorum.voters=1@node1:9093,2@node2:9093,3@node3:9093' /usr/local/kafka/config/kraft/server.properties || echo 'controller.quorum.voters=1@node1:9093,2@node2:9093,3@node3:9093' >> /usr/local/kafka/config/kraft/server.properties
grep -q '^#log.dirs=' /usr/local/kafka/config/kraft/server.properties && sed -i '/^#log.dirs=.*/a\log.dirs=/usr/local/kafka/logdirs2' /usr/local/kafka/config/kraft/server.properties || echo 'log.dirs=/usr/local/kafka/logdirs2' >> /usr/local/kafka/config/kraft/server.properties

3、修改配置文件不同的服务器不同配置

shell
sed "s/^node.id=.*/#&/" /usr/local/kafka/config/kraft/server.properties  -i
sed "s/^advertised.listeners=.*/#&/" /usr/local/kafka/config/kraft/server.properties  -i
#node1
grep -q '^#node.id=' /usr/local/kafka/config/kraft/server.properties && sed -i '/^#node.id=.*/a\node.id=1' /usr/local/kafka/config/kraft/server.properties || echo 'node.id=1' >> /usr/local/kafka/config/kraft/server.properties
grep -q '^#advertised.listeners=' /usr/local/kafka/config/kraft/server.properties && sed -i '/^#advertised.listeners=.*/a\advertised.listeners=PLAINTEXT://node1:9092' /usr/local/kafka/config/kraft/server.properties || echo 'advertised.listeners=PLAINTEXT://node1:9092' >> /usr/local/kafka/config/kraft/server.properties

#node2
grep -q '^#node.id=' /usr/local/kafka/config/kraft/server.properties && sed -i '/^#node.id=.*/a\node.id=2' /usr/local/kafka/config/kraft/server.properties || echo 'node.id=2' >> /usr/local/kafka/config/kraft/server.properties
grep -q '^#advertised.listeners=' /usr/local/kafka/config/kraft/server.properties && sed -i '/^#advertised.listeners=.*/a\advertised.listeners=PLAINTEXT://node3:9092' /usr/local/kafka/config/kraft/server.properties || echo 'advertised.listeners=PLAINTEXT://node3:9092' >> /usr/local/kafka/config/kraft/server.properties

#node3
grep -q '^#node.id=' /usr/local/kafka/config/kraft/server.properties && sed -i '/^#node.id=.*/a\node.id=3' /usr/local/kafka/config/kraft/server.properties || echo 'node.id=3' >> /usr/local/kafka/config/kraft/server.properties
grep -q '^#advertised.listeners=' /usr/local/kafka/config/kraft/server.properties && sed -i '/^#advertised.listeners=.*/a\advertised.listeners=PLAINTEXT://node3:9092' /usr/local/kafka/config/kraft/server.properties || echo 'advertised.listeners=PLAINTEXT://node3:9092' >> /usr/local/kafka/config/kraft/server.properties

4、初始化集群数据目录

生成存储目录唯一 ID 并格式化 kafka 存储目录。

properties
#单机可以直接执行改命令,集群需要分开执行,集群所有机子的唯一id要相同
/usr/local/kafka/bin/kafka-storage.sh random-uuid

/usr/local/kafka/bin/kafka-storage.sh format -t i6O4t5ABTR6FfZrfkGBVEw -c /usr/local/kafka/config/kraft/server.properties

5、启动 kafka

shell
# 启动命令,如果需要后台启动,则加上 -daemon 参数即可
 cd /usr/local/kafka && ./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties

6、关闭 kafka

shell
# 关闭命令
 cd /usr/local/kafka && ./bin/kafka-server-stop.sh

五、验证

1.首先创建一个名为 chenlj 的 topic :

shell
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic chenlj

2.创建完成以后,可以使⽤命令来列出⽬前已有的 topic 列表

shell
./bin/kafka-topics.sh --list --bootstrap-server localhost:9092

3.查看 chenlj 主题的详情:

shell
./bin/kafka-topics.sh --describe --bootstrap-server localhost:9092  --topic chenlj

4.接下来创建一个生产者,用于在 chenlj 这个 topic 上生产消息:

shell
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic chenlj

5.接下来创建一个消费者,用于在 chenlj 这个 topic 上获取消息:

shell
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic chenlj

此时生产者发出的消息,在消费者端可以获取到。

六、配置环境变量(全局可以使用 kafka 命令)

shell
cat >>/etc/profile.d/kafka.sh <<EOF
export KAFKA_HOME=/usr/local/kafka
export PATH=\$PATH:\$KAFKA_HOME/bin
EOF
source /etc/profile

七、配置远程访问

1、开启防火墙端口

shell
firewall-cmd --zone=public --add-port=9092/tcp --permanent
firewall-cmd --reload

八、设置开机自启

shell
#待完善