Kafka

Kafka消息队列

Kafka server:消息系统的中间件,接收生产者产生的消息,接收消费者的订阅,将消息交付给消费者处理

producer:生产者

consumer:消费者,多个消费者形成一个组

zookeeper:缓存(用来接收生产者的消息)相当于exchange

parttion:分区,排列数据,通过offset偏移量排列,offset对分区数据进行记录

topic:对消息进行分类,一个类型一个主题

zookeeper:键值对的 用来存放meta信息(原始数据,最底层的数据)还有watch发现机制
1.broken node registry :borken注册节点,会生成znode的临时节点保存信息
2.broken topic registry: 当一个zookeeper启动时会注册自己持有的topic和partition的信息
3.consumer and consumer group : 主要是用来做负载均衡
4.consumer id registry: 每个consumer都有唯一的id号,用来标记消费者的信息
5.consumer offset tracking 用来跟踪每个concumer消费的最大的offset
6.partition owner registry: 用来标记partition被那个consumer所消费

消息传送机制

1、at most once:消息最多发送一次,无论成败,不再发送

2、at lease once:消息最少发送一次

部署Kafka集群

leader:是真正工作的

follower:是复制leader的信息,做备份的

Kafka的优点

1、保证消息的先来后到(offset)

2、当消息被消费,数据不丢失

3、分布式存储(可以做集群)

4、容量比较大,Kafka的容量取决于硬盘的大小

5、数量的大小不会影响到Kafka的性能

实验环境

三台服务器

192.168.1.1

192.168.1.4

192.168.1.6

实验步骤

第一步

首先部署zookeeper的集群(拖包)192.168.1.1

确认java环境
[root@r1 ~]# java -version
openjdk version "1.8.0_161"
OpenJDK Runtime Environment (build 1.8.0_161-b14)
OpenJDK 64-Bit Server VM (build 25.161-b14, mixed mode)

[root@r1 ~]# tar zxf zookeeper-3.3.6.tar.gz -C /usr/src
[root@r1 ~]# mv /usr/src/zookeeper-3.3.6/ /usr/local/zookeeper
[root@r1 ~]# cd /usr/local/zookeeper/conf/
[root@r1 conf]# cp zoo_sample.cfg zoo.cfg   # 复制配置文件

第二步

调整配置文件

[root@r1 conf]# vim zoo.cfg 
dataDir=/usr/local/zookeeper/data    # 数据存放目录
dataLogDir=/usr/local/zookeeper/datalog    # 日志存放目录
# 集群信息
server.1=192.168.1.1:2888:3888   # 节点之间通讯的端口和节点之间选取leader的端口
server.2=192.168.1.4:2888:3888
server.3=192.168.1.6:2888:3888

配置文件释义

# The number of milliseconds of each tick
tickTime=2000        # 节点之间发送心跳包的时间,单位毫秒
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10        # 单位个,乘以心跳包就是时间,新加入节点初始化的时间
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5    # 单位个,乘以心跳包就是时间,节点连接的超时等待时间

第三步

创建刚才配置文件指定的数据目录和日志目录

[root@r1 ~]# cd /usr/local/zookeeper/
[root@r1 zookeeper]# mkdir data
[root@r1 zookeeper]# mkdir datalog
# 添加节点标识
[root@r1 zookeeper]# cd data
[root@r1 data]# echo 1 > myid

第四步

将做好的zookeeper使用scp传送到其他两台服务器

scp -r /usr/local/zookeeper/ root@192.168.1.4:/usr/local/
scp -r /usr/local/zookeeper/ root@192.168.1.6:/usr/local/

第五步

修改节点标识,也就是刚才做的myid

192.168.1.4

echo 2 > /usr/local/zookeeper/data/myid

192.168.1.6

echo 3 > /usr/local/zookeeper/data/myid

第六步

命令路径优化(三台都做),并启动zookeeper

ln -s /usr/local/zookeeper/bin/* /usr/local/bin/
zkServer.sh start  # 启动zookeeper
zkServer.sh status   # 查看状态,要确定有两个follower,一个leader

第七步

开始搭建Kafka集群(拖包),安装Kafka,不需要编译,先在192.168.1.1进行搭建

[root@r1 ~]# tar zxf kafka_2.11-1.0.1.tgz -C /usr/src
[root@r1 ~]# cd /usr/src
[root@r1 src]# mv kafka_2.11-1.0.1/ /usr/local/kafka

第八步

调整Kafka配置文件

 vim /usr/local/kafka/config/server.properties 
修改:
# 节点标识,与myid对应
broker.id=1
listeners=PLAINTEXT://192.168.1.1:9092
log.dirs=/usr/local/kafka/data
zookeeper.connect=192.168.1.1:2181,192.168.1.4:2181,192.168.1.6:2181


添加:
message.max.byte=1024000    # 单个消息最大字节
default.replication.factor=2  # 默认follower的个数
replica.fetch.max.bytes=1024000
num.rtition=1        # 每个parttion创建一个队列

第九步

创建数据目录

[root@r1 ~]# cd /usr/local/kafka/
[root@r1 kafka]# mkdir data

第十步

传送到其他两台服务器

scp -r /usr/local/kafka/ root@192.168.1.4:/usr/local/
scp -r /usr/local/kafka/ root@192.168.1.6:/usr/local/

第十一步

调整另外两台的部分配置

[root@r2 ~]# vim /usr/local/kafka/config/server.properties
broker.id=2
listeners=PLAINTEXT://192.168.1.4:9092

[root@r3 ~]# vim /usr/local/kafka/config/server.properties 
broker.id=3
listeners=PLAINTEXT://192.168.1.6:9092

第十二步

命令路径优化(三台都做)

ln -s /usr/local/kafka/bin/* /usr/local/bin

第十三步

启动服务(三台)

[root@r1 ~]# cd /usr/local/kafka/bin
[root@r1 bin]# ./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
[root@r2 ~]# cd /usr/local/kafka/bin
[root@r2 bin]# ./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties 
[root@r3 ~]# cd /usr/local/kafka/bin/
[root@r3 bin]# ./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties 

netstat -anpt | grep 9092

第十四步

创建topic测试

192.168.1.1

[root@r1 bin]# ./kafka-topics.sh --create --zookeeper 192.168.1.1:2181 --partitions 1 --replication-factor 2 -topic logs
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
[2020-02-12 02:02:41,506] WARN Connected to an old server; r-o mode will be unavailable (org.apache.zookeeper.ClientCnxnSocket)
Created topic "logs".
[root@r1 bin]# ./kafka-topics.sh --list --zookeeper 192.168.1.1:2181
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
[2020-02-12 02:10:32,364] WARN Connected to an old server; r-o mode will be unavailable (org.apache.zookeeper.ClientCnxnSocket)
logs

第十五步

模拟生产者

[root@r1 bin]# ./kafka-console-producer.sh --broker-list 192.168.1.4:9092 --topic logs
>
# 进入阻塞模式,等待发消息,接收消息的消费者模拟出来才可以发消息

模拟消费者

[root@r2 bin]# ./kafka-console-consumer.sh --zookeeper 192.168.1.1:2181 --topic logs --from-beginning
# --zookeeper 192.168.1.1:2181
# --from-beginning 从哪里读消息,从开始读消息
# 也同样是阻塞模式

这个时候在生产者的>后面开始生产消息

>sf
在去看模拟消费者的pc中,是否接收到了sf,接收到则成功

评论




正在载入...
PoweredHexo
HostedAliyun
DNSAliyun
ThemeVolantis
UV
PV
BY-NC-SA 4.0