跳转到内容

Kafka分布式消息队列安装部署

课程介绍

本课程将学习Kafka分布式消息队列安装部署的核心知识和实用技巧。

1. Linux分布式消息系统深解

2. Kafka概述

Kafka是一款分布式、去中心化、高吞吐、低延迟订阅模式的消息队列系统。它主要用于大数据体系,专注于数据的吞吐能力,多数运行在分布式(集群化)模式下。

核心特点:

  • 高吞吐:支持每秒百万级消息传输
  • 低延迟:消息传递延迟最低可至毫秒级
  • 分布式架构:支持水平扩展
  • 持久化存储:消息持久化到磁盘,支持数据回放
  • 多副本机制:提供数据高可靠性
  • 实时处理:支持在线实时数据流处理

Kafka集群架构


3. Kafka与RabbitMQ对比

特性KafkaRabbitMQ
适用领域大数据体系后端系统
核心优势高吞吐能力消息延迟和容错
消息模型发布/订阅多种消息模型(队列、主题、交换机等)
持久化基于磁盘的持久化内存和磁盘持久化可选
吞吐量每秒百万级每秒万级
延迟毫秒级微秒级
扩展性水平扩展能力强集群扩展相对复杂
依赖ZooKeeper

4. 安装前准备

环境要求

  • 集群服务器:至少3台Linux服务器(本课程使用node1、node2、node3)
  • JDK环境:已安装JDK 1.8或以上版本
  • ZooKeeper集群:已安装并运行ZooKeeper集群
  • 网络配置
    • 三台服务器之间可以互相通信
    • 已配置主机名映射
    • 已配置SSH免密登录
  • 目录结构
    • 已创建/export/server/目录用于安装软件

检查环境

bash
# 检查JDK版本
java -version

# 检查ZooKeeper状态
/export/server/zookeeper/bin/zkServer.sh status

# 检查主机名映射
cat /etc/hosts

# 测试SSH免密登录
ssh node2
ssh node3

5. Kafka安装步骤

步骤1:下载Kafka安装包

在node1服务器上执行:

bash
# 切换到安装目录
cd /export/server/

# 下载Kafka安装包
wget https://downloads.apache.org/kafka/2.12-2.4.1/kafka_2.12-2.4.1.tgz

步骤2:解压安装包

bash
# 解压安装包
tar -zxvf kafka_2.12-2.4.1.tgz -C /export/server/

# 查看解压结果
ls -l /export/server/

步骤3:创建软链接

为了方便后续操作,创建一个指向Kafka安装目录的软链接:

bash
# 创建软链接
ln -s /export/server/kafka_2.12-2.4.1 /export/server/kafka

# 验证软链接
ls -l /export/server/

6. 集群配置

步骤1:修改核心配置文件

bash
# 进入Kafka配置目录
cd /export/server/kafka/config/

# 编辑服务器配置文件
vim server.properties

配置文件内容修改(node1):

properties
# Broker的唯一标识,集群中每个节点必须不同
broker.id=1

# 监听地址和端口
listeners=PLAINTEXT://node1:9092

# 日志存储目录
log.dirs=/export/server/kafka/data

# ZooKeeper连接地址(多个用逗号分隔)
zookeeper.connect=node1:2181,node2:2181,node3:2181

步骤2:将配置复制到其他节点

bash
# 复制Kafka到node2
scp -r /export/server/kafka_2.12-2.4.1/ node2:/export/server/

# 复制Kafka到node3
scp -r /export/server/kafka_2.12-2.4.1/ node3:/export/server/

步骤3:配置其他节点的软链接和broker.id

在node2上执行:

bash
# 创建软链接
ln -s /export/server/kafka_2.12-2.4.1 /export/server/kafka

# 修改broker.id为2
vim /export/server/kafka/config/server.properties

在node2的server.properties中,修改:

properties
broker.id=2
listeners=PLAINTEXT://node2:9092

在node3上执行:

bash
# 创建软链接
ln -s /export/server/kafka_2.12-2.4.1 /export/server/kafka

# 修改broker.id为3
vim /export/server/kafka/config/server.properties

在node3的server.properties中,修改:

properties
broker.id=3
listeners=PLAINTEXT://node3:9092

7. 集群启动与验证

启动Kafka集群

Kafka有两种启动方式:前台启动和后台启动。

方式1:前台启动(不推荐用于生产环境)

在node1上:

bash
/export/server/kafka/bin/kafka-server-start.sh /export/server/kafka/config/server.properties

在node2上:

bash
/export/server/kafka/bin/kafka-server-start.sh /export/server/kafka/config/server.properties

在node3上:

bash
/export/server/kafka/bin/kafka-server-start.sh /export/server/kafka/config/server.properties

方式2:后台启动(推荐用于生产环境)

在node1上:

bash
nohup /export/server/kafka/bin/kafka-server-start.sh /export/server/kafka/config/server.properties > /export/server/kafka/kafka-server.log 2>&1 &

在node2上:

bash
nohup /export/server/kafka/bin/kafka-server-start.sh /export/server/kafka/config/server.properties > /export/server/kafka/kafka-server.log 2>&1 &

在node3上:

bash
nohup /export/server/kafka/bin/kafka-server-start.sh /export/server/kafka/config/server.properties > /export/server/kafka/kafka-server.log 2>&1 &

验证集群启动状态

查看Kafka进程:

bash
# 在任意节点执行
jps | grep Kafka

查看日志:

bash
# 在任意节点执行
tail -f /export/server/kafka/kafka-server.log

创建测试主题:

bash
# 在node1上执行
/export/server/kafka/bin/kafka-topics.sh --create --bootstrap-server node1:9092,node2:9092,node3:9092 --replication-factor 3 --partitions 3 --topic test-topic

查看主题列表:

bash
# 在任意节点执行
/export/server/kafka/bin/kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092

Kafka基本使用演示

启动生产者(在node1上):

bash
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test-topic

启动消费者(在node2上):

bash
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic test-topic --from-beginning

现在可以在生产者窗口输入消息,然后在消费者窗口查看是否能接收到消息。


参数名作用说明
broker.idBroker的唯一标识,集群中每个节点必须不同
listeners监听地址和端口,格式:PLAINTEXT://主机名:端口
log.dirs日志存储目录,用于存储Kafka的消息日志
zookeeper.connectZooKeeper连接地址,多个用逗号分隔
num.partitions默认分区数,创建主题时如果不指定分区数则使用此值
default.replication.factor默认副本数,创建主题时如果不指定副本数则使用此值
log.retention.hours日志保留时间(小时),超过此时间的日志将被删除
log.segment.bytes每个日志段的大小,达到此大小后将创建新的日志段

8. 命令总结表

功能描述命令示例
下载Kafkawget https://downloads.apache.org/kafka/2.12-2.4.1/kafka_2.12-2.4.1.tgz
解压安装包tar -zxvf kafka_2.12-2.4.1.tgz -C /export/server/
创建软链接ln -s /export/server/kafka_2.12-2.4.1 /export/server/kafka
修改配置文件vim /export/server/kafka/config/server.properties
复制到其他节点scp -r /export/server/kafka_2.12-2.4.1/ node2:/export/server/
前台启动Kafka/export/server/kafka/bin/kafka-server-start.sh /export/server/kafka/config/server.properties
后台启动Kafkanohup /export/server/kafka/bin/kafka-server-start.sh /export/server/kafka/config/server.properties > /export/server/kafka/kafka-server.log 2>&1 &
查看Kafka进程`jps
创建主题/export/server/kafka/bin/kafka-topics.sh --create --bootstrap-server node1:9092,node2:9092,node3:9092 --replication-factor 3 --partitions 3 --topic test-topic
查看主题列表/export/server/kafka/bin/kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092
启动生产者/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test-topic
启动消费者/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic test-topic --from-beginning
停止Kafka/export/server/kafka/bin/kafka-server-stop.sh

9. 常见问题与解决方案

问题描述可能原因解决方案
启动时报错:ZooKeeper connection timeoutZooKeeper未启动或连接地址错误检查ZooKeeper状态和连接地址配置
启动时报错:Address already in use端口已被占用检查端口是否被占用或修改listeners配置
集群节点无法加入broker.id重复确保每个节点的broker.id唯一
消费者无法接收消息主题未创建或分区配置错误检查主题是否存在和分区配置
日志文件过大日志保留时间过长修改log.retention.hours参数










扩展阅读

NOTE

课后请完成所有练习,并尝试搭建一个Kafka集群,为后续学习大数据处理框架做好准备。


课程总结

这节课我们学了Kafka分布式消息队列的安装和部署。

安装前提: 需要先安装JDK和ZooKeeper,Kafka依赖ZooKeeper做协调。

安装步骤:

  1. 下载Kafka安装包
  2. 解压到/export/server目录
  3. 配置server.properties文件

核心配置:

  • broker.id - Broker唯一标识
  • listeners - 监听地址和端口
  • log.dirs - 日志目录
  • zookeeper.connect - ZK集群地址(2181端口)

启动Kafka: kafka-server-start.sh config/server.properties & - 启动服务 kafka-topics.sh --list --bootstrap-server localhost:9092 - 查看主题

Kafka是分布式消息队列,支持高吞吐、低延迟的消息传输。 常用端口: 9092(客户端), 9093(JMX), 9094(broker)

课后练习

  • 基础练习:在三台服务器上独立安装Kafka并配置成集群。

  • 进阶练习:创建一个具有5个分区和3个副本的主题,并测试消息的生产和消费。

  • 综合练习:模拟Kafka节点故障,观察集群的自动恢复能力。


评论区

专业的Linux技术学习平台,从入门到精通的完整学习路径