Kafka消息队列
Kafka server:消息系统的中间件,接收生产者产生的消息,接收消费者的订阅,将消息交付给消费者处理
producer:生产者
consumer:消费者,多个消费者形成一个组
zookeeper:缓存(用来接收生产者的消息)相当于exchange
parttion:分区,排列数据,通过offset偏移量排列,offset对分区数据进行记录
zookeeper:键值对的 用来存放meta信息(原始数据,最底层的数据)还有watch发现机制
1.broken node registry :borken注册节点,会生成znode的临时节点保存信息
2.broken topic registry: 当一个zookeeper启动时会注册自己持有的topic和partition的信息
3.consumer and consumer group : 主要是用来做负载均衡
4.consumer id registry: 每个consumer都有唯一的id号,用来标记消费者的信息
5.consumer offset tracking 用来跟踪每个concumer消费的最大的offset
6.partition owner registry: 用来标记partition被那个consumer所消费
消息传送机制
1、at most once:消息最多发送一次,无论成败,不再发送
2、at lease once:消息最少发送一次
部署Kafka集群
leader:是真正工作的
follower:是复制leader的信息,做备份的
Kafka的优点
1、保证消息的先来后到(offset)
2、当消息被消费,数据不丢失
3、分布式存储(可以做集群)
4、容量比较大,Kafka的容量取决于硬盘的大小
5、数量的大小不会影响到Kafka的性能
实验环境
三台服务器
192.168.1.1
192.168.1.4
192.168.1.6
实验步骤
第一步
首先部署zookeeper的集群(拖包)192.168.1.1
确认java环境
[root@r1 ~]# java -version
openjdk version "1.8.0_161"
OpenJDK Runtime Environment (build 1.8.0_161-b14)
OpenJDK 64-Bit Server VM (build 25.161-b14, mixed mode)
[root@r1 ~]# tar zxf zookeeper-3.3.6.tar.gz -C /usr/src
[root@r1 ~]# mv /usr/src/zookeeper-3.3.6/ /usr/local/zookeeper
[root@r1 ~]# cd /usr/local/zookeeper/conf/
[root@r1 conf]# cp zoo_sample.cfg zoo.cfg # 复制配置文件
第二步
调整配置文件
[root@r1 conf]# vim zoo.cfg
dataDir=/usr/local/zookeeper/data # 数据存放目录
dataLogDir=/usr/local/zookeeper/datalog # 日志存放目录
# 集群信息
server.1=192.168.1.1:2888:3888 # 节点之间通讯的端口和节点之间选取leader的端口
server.2=192.168.1.4:2888:3888
server.3=192.168.1.6:2888:3888
配置文件释义
# The number of milliseconds of each tick
tickTime=2000 # 节点之间发送心跳包的时间,单位毫秒
# The number of ticks that the initial
# synchronization phase can take
initLimit=10 # 单位个,乘以心跳包就是时间,新加入节点初始化的时间
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5 # 单位个,乘以心跳包就是时间,节点连接的超时等待时间
第三步
创建刚才配置文件指定的数据目录和日志目录
[root@r1 ~]# cd /usr/local/zookeeper/
[root@r1 zookeeper]# mkdir data
[root@r1 zookeeper]# mkdir datalog
# 添加节点标识
[root@r1 zookeeper]# cd data
[root@r1 data]# echo 1 > myid
第四步
将做好的zookeeper使用scp传送到其他两台服务器
scp -r /usr/local/zookeeper/ root@192.168.1.4:/usr/local/
scp -r /usr/local/zookeeper/ root@192.168.1.6:/usr/local/
第五步
修改节点标识,也就是刚才做的myid
192.168.1.4
echo 2 > /usr/local/zookeeper/data/myid
192.168.1.6
echo 3 > /usr/local/zookeeper/data/myid
第六步
命令路径优化(三台都做),并启动zookeeper
ln -s /usr/local/zookeeper/bin/* /usr/local/bin/
zkServer.sh start # 启动zookeeper
zkServer.sh status # 查看状态,要确定有两个follower,一个leader
第七步
开始搭建Kafka集群(拖包),安装Kafka,不需要编译,先在192.168.1.1进行搭建
[root@r1 ~]# tar zxf kafka_2.11-1.0.1.tgz -C /usr/src
[root@r1 ~]# cd /usr/src
[root@r1 src]# mv kafka_2.11-1.0.1/ /usr/local/kafka
第八步
调整Kafka配置文件
vim /usr/local/kafka/config/server.properties
修改:
# 节点标识,与myid对应
broker.id=1
listeners=PLAINTEXT://192.168.1.1:9092
log.dirs=/usr/local/kafka/data
zookeeper.connect=192.168.1.1:2181,192.168.1.4:2181,192.168.1.6:2181
添加:
message.max.byte=1024000 # 单个消息最大字节
default.replication.factor=2 # 默认follower的个数
replica.fetch.max.bytes=1024000
num.rtition=1 # 每个parttion创建一个队列
第九步
创建数据目录
[root@r1 ~]# cd /usr/local/kafka/
[root@r1 kafka]# mkdir data
第十步
传送到其他两台服务器
scp -r /usr/local/kafka/ root@192.168.1.4:/usr/local/
scp -r /usr/local/kafka/ root@192.168.1.6:/usr/local/
第十一步
调整另外两台的部分配置
[root@r2 ~]# vim /usr/local/kafka/config/server.properties
broker.id=2
listeners=PLAINTEXT://192.168.1.4:9092
[root@r3 ~]# vim /usr/local/kafka/config/server.properties
broker.id=3
listeners=PLAINTEXT://192.168.1.6:9092
第十二步
命令路径优化(三台都做)
ln -s /usr/local/kafka/bin/* /usr/local/bin
第十三步
启动服务(三台)
[root@r1 ~]# cd /usr/local/kafka/bin
[root@r1 bin]# ./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
[root@r2 ~]# cd /usr/local/kafka/bin
[root@r2 bin]# ./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
[root@r3 ~]# cd /usr/local/kafka/bin/
[root@r3 bin]# ./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
netstat -anpt | grep 9092
第十四步
创建topic测试
192.168.1.1
[root@r1 bin]# ./kafka-topics.sh --create --zookeeper 192.168.1.1:2181 --partitions 1 --replication-factor 2 -topic logs
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
[2020-02-12 02:02:41,506] WARN Connected to an old server; r-o mode will be unavailable (org.apache.zookeeper.ClientCnxnSocket)
Created topic "logs".
[root@r1 bin]# ./kafka-topics.sh --list --zookeeper 192.168.1.1:2181
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
[2020-02-12 02:10:32,364] WARN Connected to an old server; r-o mode will be unavailable (org.apache.zookeeper.ClientCnxnSocket)
logs
第十五步
模拟生产者
[root@r1 bin]# ./kafka-console-producer.sh --broker-list 192.168.1.4:9092 --topic logs
>
# 进入阻塞模式,等待发消息,接收消息的消费者模拟出来才可以发消息
模拟消费者
[root@r2 bin]# ./kafka-console-consumer.sh --zookeeper 192.168.1.1:2181 --topic logs --from-beginning
# --zookeeper 192.168.1.1:2181
# --from-beginning 从哪里读消息,从开始读消息
# 也同样是阻塞模式
这个时候在生产者的>后面开始生产消息
>sf
在去看模拟消费者的pc中,是否接收到了sf,接收到则成功