kafka集群部署

2025/1/14 Install

# 准备工作

# 1. 搭建设计

# 2. 服务器准备

  • 机器分配

    kafka-broker1(192.168.150.111)
    kafka-broker2(192.168.150.112)
    kafka-broker3(192.168.150.113)
    
    1
    2
    3
  • 修改Linux主机名称

    hostnamectl set-hostname kafka-broker1
    hostnamectl set-hostname kafka-broker2
    hostnamectl set-hostname kafka-broker3
    
    1
    2
    3
  • 配置主机名映射

    #三台服务器都需要执行添加
    #命令
    vim /etc/hosts
    #文件尾部添加
    192.168.150.111 kafka-broker1
    192.168.150.112 kafka-broker2
    192.168.150.113 kafka-broker3
    
    1
    2
    3
    4
    5
    6
    7

# 3.安装包准备

  • JDK1.8

    https://pan.baidu.com/s/1qVp0xx7HrHG3yzF_vtUpfg?pwd=ovrx
    
    1
  • kafka_2.13-2.8.0

    https://pan.baidu.com/s/1zozaxOWA5dCZKbIG2aqGgg?pwd=5s9g
    
    1
  • zookeeper-3.6.3

    https://pan.baidu.com/s/1ajsuPLdGKqmS1fCYACnsrQ?pwd=esq9
    
    1

# 4.配置SSH免密登录

后续安装的集群化软件,多数需要远程登录以及远程执行命令,我们可以简单起见,配置三台Linux服务器之间的免密码互相SSH登陆(为了安全可以不做)

  1. 在每一台机器都执行:

    #一路回车到底即可 
    ssh-keygen -t rsa -b 4096
    
    1
    2
  2. 在每一台机器都执行:

    ssh-copy-id kafka-broker1
    ssh-copy-id kafka-broker2
    ssh-copy-id kafka-broker3
    
    1
    2
    3
  3. 执行完毕后,kafka-broker1、kafka-broker2、kafka-broker3之间将完成root用户之间的免密互通

  4. 测试免密登录 完成以上步骤后,你应该能够使用SSH免密登录到目标服务器了。只需执行以下命令:ssh 用户名@remote_host

    ssh rootmkafka-broker1
    ssh root@kafka-broker2
    ssh root@kafka-broker3
    
    1
    2
    3

# 5.修改时区并配置自动时间同步

#1. 安装ntp软件
yum install -y ntp
#2. 更新时区
rm -f /etc/localtime;sudo ln -s /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
#3. 同步时间
ntpdate -u ntp.aliyun.com
#4. 开启ntp服务并设置开机自启
systemctl start ntpd
systemctl enable ntpd
1
2
3
4
5
6
7
8
9

# 配置java环境

三个节点都需要配置JDK安装

# 安装ZooKeeper

三个节点都需要安装配置ZooKeeper

# 1、上传并解压ZooKeeper压缩包

#解压缩文件到指定目录
tar -zxvf apache-zookeeper-3.6.3-bin.tar.gz -C /home/saltedFish

#文件目录改名
mv apache-zookeeper-3.6.3-bin/ zookeeper
1
2
3
4
5

# 2、配置服务器编号

  • 在/home/saltedFish/zookeeper/目录下创建zkData目录

    mkdir zkData
    
    1
  • 在zkData目录中创建myid文件

    vim myid
    
    1
  • 在文件中增加内容

    #三台服务器不能重复
    1
    
    1
    2

# 3、修改配置文件

  • 重命名/zookeeper/conf目录下的zoo_sample.cfg文件为zoo.cfg文件

    # 修改文件名称
    mv zoo_sample.cfg zoo.cfg
    
    1
    2
  • 修改zoo.cfg文件

    # 修改文件内容
    vim zoo.cfg
    
    1
    2
    # 以下内容为修改内容(实际的全路径)
    dataDir=/home/saltedFish/zookeeper/zkData
    # 以下内容为新增内容
    ####################### cluster ##########################
    # server.A=B:C:D
    #
    # A是一个数字,表示这个是第几号服务器
    # B是A服务器的主机名
    # C是A服务器与集群中的主服务器(Leader)交换信息的端口
    # D是A服务器用于主服务器(Leader)选举的端口
    #########################################################
    server.1=kafka-broker1:2888:3888
    server.2=kafka-broker2:2888:3888
    server.3=kafka-broker3:2888:3888
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14

# 4、启停脚本

ZooKeeper软件的启动和停止比较简单,但是每一次如果都在不同服务器节点执行相应指令,也会有点麻烦,所以我们这里将指令封装成脚本文件,方便我们的调用。

  1. 在虚拟机kafka-broker1的/root/bin目录下创建zk.sh脚本文件(在/root/bin这个目录下存放的脚本,root用户可以在系统任何地方直接执行)

    # 进入/root/bin目录
    cd /root/bin
    # 创建zk.sh脚本文件
    vim zk.sh
    
    1
    2
    3
    4
  2. 在脚本中增加内容:

    #!/bin/bash
    case $1 in
    "start"){
    	for i in kafka-broker1 kafka-broker2 kafka-broker3
    	do
            echo ---------- zookeeper $i 启动 ------------
    		ssh $i "/home/saltedFish/zookeeper/bin/zkServer.sh start"
    	done
    };;
    "stop"){
    	for i in kafka-broker1 kafka-broker2 kafka-broker3
    	do
            echo ---------- zookeeper $i 停止 ------------    
    		ssh $i "/home/saltedFish/zookeeper/bin/zkServer.sh stop"
    	done
    };;
    "status"){
    	for i in kafka-broker1 kafka-broker2 kafka-broker3
    	do
            echo ---------- zookeeper $i 状态 ------------    
    		ssh $i "/home/saltedFish/zookeeper/bin/zkServer.sh status"
    	done
    };;
    esac
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
  3. 增加脚本文件权限

    # 给zk.sh文件授权
    chmod 777 zk.sh
    
    1
    2
  4. 脚本调用方式

    # 启动ZK服务
    zk.sh start
    # 查看ZK服务状态
    zk.sh status
    # 停止ZK服务
    zk.sh stop
    
    1
    2
    3
    4
    5
    6

# 安装Kafka

# 1、上传并解压Kafka压缩包

# 解压缩文件到指定目录
tar -zxvf kafka_2.13-2.8.0.tgz -C /home/saltedFish
# 修改文件目录名称
mv kafka_2.13-2.8.0/ kafka
1
2
3
4

# 2、修改配置文件

#切换目录 /kafka/config文件目录
cd /kafka/config

# 修改配置文件
vim server.properties
1
2
3
4
5

输入以下内容(主要修改上面四项配置):

#broker的全局唯一编号,每个服务节点不能重复,只能是数字。
broker.id=1
#broker对外暴露的IP和端口 (每个节点单独配置)
advertised.listeners=PLAINTEXT://kafka-broker1:9092
#kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔(实际的全路径)
log.dirs=/home/saltedFish/kafka/datas
#配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)
zookeeper.connect=kafka-broker1:2181,kafka-broker2:2181,kafka-broker3:2181/kafka
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个topic创建时的副本数,默认时1个副本
offsets.topic.replication.factor=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#每个segment文件的大小,默认最大1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认5分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# 3、配置环境变量

  • 修改 /etc/profile.d/my_env.sh文件

    vim /etc/profile.d/my_env.sh
    
    1
  • 添加内容

    #KAFKA_HOME
    export KAFKA_HOME=实际的全路径/kafka
    export PATH=$PATH:$KAFKA_HOME/bin
    
    1
    2
    3
  • 让环境变量生效

    source /etc/profile.d/my_env.sh
    
    1

# 4、启停脚本

启动前请先启动ZooKeeper服务

  • 在虚拟机kafka-broker1的/root/bin目录下创建kfk.sh脚本文件,对kafka服务的启动停止等指令进行封装

    # 进入/root/bin目录
    cd /root/bin
    # 创建kfk.sh脚本文件
    vim kfk.sh
    
    1
    2
    3
    4
  • 在脚本中增加内容:

    #! /bin/bash
    case $1 in
    "start"){
        for i in kafka-broker1 kafka-broker2 kafka-broker3
        do
            echo " --------启动 $i Kafka-------"
            ssh $i "/home/saltedFish/kafka/bin/kafka-server-start.sh -daemon /home/saltedFish/kafka/config/server.properties"
        done
    };;
    "stop"){
        for i in kafka-broker1 kafka-broker2 kafka-broker3
        do
            echo " --------停止 $i Kafka-------"
            ssh $i "/home/saltedFish/kafka/bin/kafka-server-stop.sh "
        done
    };;
    esac
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
  • 增加脚本文件权限

    # 给文件授权
    chmod 777 kfk.sh
    
    1
    2
  • 脚本调用方式

    # 启动kafka
    kfk.sh start
    # 停止Kafka
    kfk.sh stop
    
    1
    2
    3
    4

注意:停止Kafka集群时,一定要等Kafka所有节点进程全部停止后再停止ZooKeeper集群。因为Zookeeper集群当中记录着Kafka集群相关信息,Zookeeper集群一旦先停止,Kafka集群就没有办法再获取停止进程的信息,只能手动杀死Kafka进程了。

# 5、联合脚本

因为Kafka启动前,需要先启动ZooKeeper,关闭时,又需要将所有Kafka全部关闭后,才能关闭ZooKeeper,这样操作起来感觉比较麻烦,所以可以将之前的2个脚本再做一次封装。

  • 在虚拟机kafka-broker1的/root/bin目录下创建xcall脚本文件

    # 创建xcall文件
    vim /root/bin/xcall
    
    1
    2
  • 在脚本中增加内容:

    #! /bin/bash
     
    for i in kafka-broker1 kafka-broker2 kafka-broker3
    do
        echo --------- $i ----------
        ssh $i "$*"
    done
    
    1
    2
    3
    4
    5
    6
    7
  • 增加脚本文件权限

    # 增加权限
    chmod 777 /root/bin/xcall
    
    1
    2
  • 在虚拟机kafka-broker1的/root/bin目录下创建cluster.sh脚本文件

    # 创建cluster.sh脚本文件
    vim /root/bin/cluster.sh
    
    1
    2
  • 在脚本中增加内容

    #!/bin/bash
    case $1 in
    "start"){
            echo ================== 启动 Kafka集群 ==================
            #启动 Zookeeper集群
            zk.sh start
            #启动 Kafka采集集群
            kfk.sh start
            };;
    "stop"){
            echo ================== 停止 Kafka集群 ==================
            #停止 Kafka采集集群
            kfk.sh stop
    		#循环直至 Kafka 集群进程全部停止
    		kafka_count=$(xcall jps | grep Kafka | wc -l)
    		while [ $kafka_count -gt 0 ]
    		do
    			sleep 1
    			kafka_count=$(xcall | grep Kafka | wc -l)
                echo "当前未停止的 Kafka 进程数为 $kafka_count"
    		done
            #停止 Zookeeper集群
            zk.sh stop
    };;
    esac
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
  • 增加脚本文件权限

    # 增加权限
    chmod 777 /root/bin/cluster.sh
    
    1
    2
  • 脚本调用方式

    # 集群启动
    cluster.sh start
    # 集群关闭
    cluster.sh stop
    
    1
    2
    3
    4

# 测试集群

  • 启动Kafka集群

    # 启动集群
    cluster.sh start
    
    1
    2
  • 输入指令查看进程

    # xcall 后面跟着linux指令操作,可以同时对多个服务器节点同时执行相同指令
    xcall jps
    
    1
    2
  • topic(只需要再一个节点)

    ## 创建
    ./kafka-topics.sh --create --topic wechat --partitions 10 --replication-factor 1 --zookeeper 192.168.150.111:2181,192.168.150.112:2181,192.168.150.113:2181
    ## 查看
    ./kafka-topics.sh  --topic wechat --describe --zookeeper 192.168.150.111:2181,192.168.150.112:2181,192.168.150.113:2181
    ## 删除
    ./kafka-topics.sh --delete --topic wechat --zookeeper 192.168.150.111:2181,192.168.150.112:2181,192.168.150.113:2181
    
    1
    2
    3
    4
    5
    6
  • 关闭Kafka集群

    # 关闭集群
    cluster.sh stop
    # 查看进程
    xcall jps
    
    1
    2
    3
    4