主题
Kafka分布式消息队列安装部署
课程介绍
本课程将学习Kafka分布式消息队列安装部署的核心知识和实用技巧。
1. Linux分布式消息系统深解
2. Kafka概述
Kafka是一款分布式、去中心化、高吞吐、低延迟订阅模式的消息队列系统。它主要用于大数据体系,专注于数据的吞吐能力,多数运行在分布式(集群化)模式下。
核心特点:
- 高吞吐:支持每秒百万级消息传输
- 低延迟:消息传递延迟最低可至毫秒级
- 分布式架构:支持水平扩展
- 持久化存储:消息持久化到磁盘,支持数据回放
- 多副本机制:提供数据高可靠性
- 实时处理:支持在线实时数据流处理

3. Kafka与RabbitMQ对比
| 特性 | Kafka | RabbitMQ |
|---|---|---|
| 适用领域 | 大数据体系 | 后端系统 |
| 核心优势 | 高吞吐能力 | 消息延迟和容错 |
| 消息模型 | 发布/订阅 | 多种消息模型(队列、主题、交换机等) |
| 持久化 | 基于磁盘的持久化 | 内存和磁盘持久化可选 |
| 吞吐量 | 每秒百万级 | 每秒万级 |
| 延迟 | 毫秒级 | 微秒级 |
| 扩展性 | 水平扩展能力强 | 集群扩展相对复杂 |
| 依赖 | ZooKeeper | 无 |
4. 安装前准备
环境要求
- 集群服务器:至少3台Linux服务器(本课程使用node1、node2、node3)
- JDK环境:已安装JDK 1.8或以上版本
- ZooKeeper集群:已安装并运行ZooKeeper集群
- 网络配置:
- 三台服务器之间可以互相通信
- 已配置主机名映射
- 已配置SSH免密登录
- 目录结构:
- 已创建
/export/server/目录用于安装软件
- 已创建
检查环境
bash
# 检查JDK版本
java -version
# 检查ZooKeeper状态
/export/server/zookeeper/bin/zkServer.sh status
# 检查主机名映射
cat /etc/hosts
# 测试SSH免密登录
ssh node2
ssh node35. Kafka安装步骤
步骤1:下载Kafka安装包
在node1服务器上执行:
bash
# 切换到安装目录
cd /export/server/
# 下载Kafka安装包
wget https://downloads.apache.org/kafka/2.12-2.4.1/kafka_2.12-2.4.1.tgz步骤2:解压安装包
bash
# 解压安装包
tar -zxvf kafka_2.12-2.4.1.tgz -C /export/server/
# 查看解压结果
ls -l /export/server/步骤3:创建软链接
为了方便后续操作,创建一个指向Kafka安装目录的软链接:
bash
# 创建软链接
ln -s /export/server/kafka_2.12-2.4.1 /export/server/kafka
# 验证软链接
ls -l /export/server/6. 集群配置
步骤1:修改核心配置文件
bash
# 进入Kafka配置目录
cd /export/server/kafka/config/
# 编辑服务器配置文件
vim server.properties配置文件内容修改(node1):
properties
# Broker的唯一标识,集群中每个节点必须不同
broker.id=1
# 监听地址和端口
listeners=PLAINTEXT://node1:9092
# 日志存储目录
log.dirs=/export/server/kafka/data
# ZooKeeper连接地址(多个用逗号分隔)
zookeeper.connect=node1:2181,node2:2181,node3:2181步骤2:将配置复制到其他节点
bash
# 复制Kafka到node2
scp -r /export/server/kafka_2.12-2.4.1/ node2:/export/server/
# 复制Kafka到node3
scp -r /export/server/kafka_2.12-2.4.1/ node3:/export/server/步骤3:配置其他节点的软链接和broker.id
在node2上执行:
bash
# 创建软链接
ln -s /export/server/kafka_2.12-2.4.1 /export/server/kafka
# 修改broker.id为2
vim /export/server/kafka/config/server.properties在node2的server.properties中,修改:
properties
broker.id=2
listeners=PLAINTEXT://node2:9092在node3上执行:
bash
# 创建软链接
ln -s /export/server/kafka_2.12-2.4.1 /export/server/kafka
# 修改broker.id为3
vim /export/server/kafka/config/server.properties在node3的server.properties中,修改:
properties
broker.id=3
listeners=PLAINTEXT://node3:90927. 集群启动与验证
启动Kafka集群
Kafka有两种启动方式:前台启动和后台启动。
方式1:前台启动(不推荐用于生产环境)
在node1上:
bash
/export/server/kafka/bin/kafka-server-start.sh /export/server/kafka/config/server.properties在node2上:
bash
/export/server/kafka/bin/kafka-server-start.sh /export/server/kafka/config/server.properties在node3上:
bash
/export/server/kafka/bin/kafka-server-start.sh /export/server/kafka/config/server.properties方式2:后台启动(推荐用于生产环境)
在node1上:
bash
nohup /export/server/kafka/bin/kafka-server-start.sh /export/server/kafka/config/server.properties > /export/server/kafka/kafka-server.log 2>&1 &在node2上:
bash
nohup /export/server/kafka/bin/kafka-server-start.sh /export/server/kafka/config/server.properties > /export/server/kafka/kafka-server.log 2>&1 &在node3上:
bash
nohup /export/server/kafka/bin/kafka-server-start.sh /export/server/kafka/config/server.properties > /export/server/kafka/kafka-server.log 2>&1 &验证集群启动状态
查看Kafka进程:
bash
# 在任意节点执行
jps | grep Kafka查看日志:
bash
# 在任意节点执行
tail -f /export/server/kafka/kafka-server.log创建测试主题:
bash
# 在node1上执行
/export/server/kafka/bin/kafka-topics.sh --create --bootstrap-server node1:9092,node2:9092,node3:9092 --replication-factor 3 --partitions 3 --topic test-topic查看主题列表:
bash
# 在任意节点执行
/export/server/kafka/bin/kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092Kafka基本使用演示
启动生产者(在node1上):
bash
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test-topic启动消费者(在node2上):
bash
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic test-topic --from-beginning现在可以在生产者窗口输入消息,然后在消费者窗口查看是否能接收到消息。
| 参数名 | 作用说明 |
|---|---|
| broker.id | Broker的唯一标识,集群中每个节点必须不同 |
| listeners | 监听地址和端口,格式:PLAINTEXT://主机名:端口 |
| log.dirs | 日志存储目录,用于存储Kafka的消息日志 |
| zookeeper.connect | ZooKeeper连接地址,多个用逗号分隔 |
| num.partitions | 默认分区数,创建主题时如果不指定分区数则使用此值 |
| default.replication.factor | 默认副本数,创建主题时如果不指定副本数则使用此值 |
| log.retention.hours | 日志保留时间(小时),超过此时间的日志将被删除 |
| log.segment.bytes | 每个日志段的大小,达到此大小后将创建新的日志段 |
8. 命令总结表
| 功能描述 | 命令示例 |
|---|---|
| 下载Kafka | wget https://downloads.apache.org/kafka/2.12-2.4.1/kafka_2.12-2.4.1.tgz |
| 解压安装包 | tar -zxvf kafka_2.12-2.4.1.tgz -C /export/server/ |
| 创建软链接 | ln -s /export/server/kafka_2.12-2.4.1 /export/server/kafka |
| 修改配置文件 | vim /export/server/kafka/config/server.properties |
| 复制到其他节点 | scp -r /export/server/kafka_2.12-2.4.1/ node2:/export/server/ |
| 前台启动Kafka | /export/server/kafka/bin/kafka-server-start.sh /export/server/kafka/config/server.properties |
| 后台启动Kafka | nohup /export/server/kafka/bin/kafka-server-start.sh /export/server/kafka/config/server.properties > /export/server/kafka/kafka-server.log 2>&1 & |
| 查看Kafka进程 | `jps |
| 创建主题 | /export/server/kafka/bin/kafka-topics.sh --create --bootstrap-server node1:9092,node2:9092,node3:9092 --replication-factor 3 --partitions 3 --topic test-topic |
| 查看主题列表 | /export/server/kafka/bin/kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092 |
| 启动生产者 | /export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test-topic |
| 启动消费者 | /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic test-topic --from-beginning |
| 停止Kafka | /export/server/kafka/bin/kafka-server-stop.sh |
9. 常见问题与解决方案
| 问题描述 | 可能原因 | 解决方案 |
|---|---|---|
启动时报错:ZooKeeper connection timeout | ZooKeeper未启动或连接地址错误 | 检查ZooKeeper状态和连接地址配置 |
启动时报错:Address already in use | 端口已被占用 | 检查端口是否被占用或修改listeners配置 |
| 集群节点无法加入 | broker.id重复 | 确保每个节点的broker.id唯一 |
| 消费者无法接收消息 | 主题未创建或分区配置错误 | 检查主题是否存在和分区配置 |
| 日志文件过大 | 日志保留时间过长 | 修改log.retention.hours参数 |
扩展阅读
NOTE
课后请完成所有练习,并尝试搭建一个Kafka集群,为后续学习大数据处理框架做好准备。
课程总结
这节课我们学了Kafka分布式消息队列的安装和部署。
安装前提: 需要先安装JDK和ZooKeeper,Kafka依赖ZooKeeper做协调。
安装步骤:
- 下载Kafka安装包
- 解压到/export/server目录
- 配置server.properties文件
核心配置:
- broker.id - Broker唯一标识
- listeners - 监听地址和端口
- log.dirs - 日志目录
- zookeeper.connect - ZK集群地址(2181端口)
启动Kafka: kafka-server-start.sh config/server.properties & - 启动服务 kafka-topics.sh --list --bootstrap-server localhost:9092 - 查看主题
Kafka是分布式消息队列,支持高吞吐、低延迟的消息传输。 常用端口: 9092(客户端), 9093(JMX), 9094(broker)
课后练习
基础练习:在三台服务器上独立安装Kafka并配置成集群。
进阶练习:创建一个具有5个分区和3个副本的主题,并测试消息的生产和消费。
综合练习:模拟Kafka节点故障,观察集群的自动恢复能力。