本文主要针对的是单节点单Broker的部署
准备工作:安装zookeeper
1. kafka在使用时依赖于zookeeper,所以需要先按照好,如果有了可以直接进行下一步。
2. 通过链接下载zookeeper,并把压缩包传上服务器
3. 解压zookeeper
tar -zxvf apache-zookeeper-3.8.1-bin.tar.gz
4. 然后添加环境变量:
# Zookeeper Environment Variable
export ZOOKEEPER_HOME=你的路径/zookeeper-3.8.1
export PATH=$PATH:$ZOOKEEPER_HOME/bin
5. 进入conf目录下,复制zoo_sample.cfg文件重命名为zoo.cfg,修改如下配置:
# 数据的存放目录
dataDir=你自己的路径/zookeeper-3.8.1/zkdata
# 端口,默认就是2181
clientPort=2181
#zookeeper服务默认使用的端口是8080,但是该端口往往会被占用,注意修改,我用的是8081
admin.serverPort=8081
6. 启动zookeeper,若通过jps可以看到QuorumPeerMain进程,说明启动成功
zkServer.sh start
zkServer.sh stop
部署Kafka:单节点单Broker
1. 去以下网址下载压缩包,在服务器上解压Apache Downloads
2. 进入kafka的config目录下,修改server.properties的如下配置:
# broker的全局唯一编号,不能重复
broker.id=0
# 监听
listeners=PLAINTEXT://:9092
# 日志目录,根据自己喜好填,我是直接放在和kafka同级的目录
log.dirs=xxx/kafka
# 配置zookeeper的连接(如果不是本机,需要改为ip或主机名)
zookeeper.connect=localhost:2181
需要注意的是,以上的配置是一个普通的server.properties,只能供一个Broker使用。如果要使用多个Broker,就需要创建准备多份server.properties,每一份有不同的id号,并监听不同的端口,使用不同的日志。这里就不展开叙述。
3. 配置环境变量~/.bashrc,添加kafka所在路径:(可以通过命令pwd获取当前路径)
#kafka
export KAFKA_HOME=你的kafka路径/kafka
export PATH=$KAFKA_HOME/bin:$PATH
并使用source ~/.bashrc使配置生效
4. 在启动kafka之前,记得先启动zookeeper,然后通过如下命令启动kafka:
kafka-server-start.sh $KAFKA_HOME/config/server.properties
如果在打印信息的倒数几行能看到如下信息,jps中有kafka,说明成功启动
INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
注意:需要创建其他窗口来使用命令,该窗口将会一直启动服务不可用。有一个比较好的方法就是使用Byobu进行窗口管理,这个模块有空再补上。
Kafka的使用
kafka使用时需要通过topic管理,而生产者和消费者作为两个重要概念贯穿了kafka 的使用(后续会补上原理的学习)。
创建topic
(maple是我设置的host,映射到本地IP)
kafka-topics.sh --create --bootstrap-server maple:9092 --replication-factor 1 --partitions 1 --topic test1
以前版本用的是–zookeeper,如今这个方法在新版本不再使用,转为使用–bootstrap-server,端口号为前面配置中listener设置的端口号。
- –replication-factor:指定副本数量
- –partitions:指定分区数量
- –topic:主题名称
查看所有topic信息
kafka-topics.sh --list --bootstrap-server maple:9092
启动生产者
启动生产者之后,该窗口将用于生产信息,如若要使用其他命令,记得新建窗口
kafka-console-producer.sh --broker-list localhost:9092 --topic test
启动消费者
启动消费者之后,该窗口将用于消费信息,如若要使用其他命令,记得新建窗口
kafka-console-consumer.sh --bootstrap-server maple:9092 --topic test --from-beginning
该窗口显示的会是被消费的信息。
注意参数“–from-beginning”,该参数表示从最开始消费数据,所有被生产的数据都会被消费;而若没有该参数,则只会消费在该消费者启动之后生产的数据。
多节点
启动多个结点的配置和普通配置差不多,但是注意brokerid要唯一
1.创建主题,多切片多备份kafka-topics.sh –create –bootstrap-server Master:9092 –replication-factor 3 –partitions 3 –topic BITest
2.启动多个生产者kafka-console-producer.sh –broker-list Master:9092,Slave1:9092,Slabe2:9092,Slave3:9092 –topic BITest
hello
hello kafka cluster
test
hello storm
3.在多台机器上分别启动消费者kafka-console-consumer.sh –bootstrap-server Slave2:9092 –topic BI_News –from-beginning
在生产者中输入数据:
每台消费者都可以看到:
kafka脚本设计
kafka启动脚本
#! /bin/bash
if [ $# -lt 1 ]
then
echo "Usage: kf.sh {start|stop|kc [topic]|kp [topic]|list |delete [topic] |describe [topic]}"
exit
fi
case $1 in
"start"){
for i in Master Slave1 Slave2 Slave3
do
echo " --------启动 $i Kafka-------"
ssh $i "/home/hadoop/BI/kafka/bin/kafka-server-start.sh -daemon /home/hadoop/BI/kafka/config/server.properties"
done
};;
"stop"){
for i in Master Slave1 Slave2 Slave3
do
echo " --------停止 $i Kafka-------"
ssh $i "/home/hadoop/BI/kafka/bin/kafka-server-stop.sh stop"
done
};;
"kc"){
if [ $2 ]
then
kafka-console-consumer.sh --bootstrap-server Master:9092,Slave1:9092,Slave2:9092,Slave3:9092 --topic $2
else
echo "Usage: kf.sh {start|stop|kc [topic]|kp [topic]|list |delete [topic] |describe [topic]}"
fi
};;
"kp"){
if [ $2 ]
then
kafka-console-producer.sh --broker-list Master:9092,Slave1:9092,Slave2:9092,Slave3:9092 --topic $2
else
echo "Usage: kf.sh {start|stop|kc [topic]|kp [topic]|list |delete [topic] |describe [topic]}"
fi
};;
"list"){
kafka-topics.sh --list --bootstrap-server
Master:9092,Slave1:9092,Slave2:9092,Slave3:9092
};;
"describe"){
if [ $2 ]
then
kafka-topics.sh --describe --bootstrap-server Master:9092,Slave1:9092,Slave2:9092,Slave3:9092 --topic $2
else
echo "Usage: kf.sh {start|stop|kc [topic]|kp [topic]|list |delete [topic] |describe [topic]}"
fi
};;
"delete"){
if [ $2 ]
then
kafka-topics.sh --delete --bootstrap-server Master:9092,Slave1:9092,Slave2:9092,Slave3:9092 --topic $2
else
echo "Usage: kf.sh {start|stop|kc [topic]|kp [topic]|list |delete [topic] |describe [topic]}"
fi
};;
*)
echo "Usage: kf.sh {start|stop|kc [topic]|kp [topic]|list |delete [topic] |describe [topic]}"
exit
;;
esac