首页
关于
推荐
CSDN
Search
1
文件上传下载-io-流的理解-笔记
128 阅读
2
vue循环指令el-table-column展示图片src路径拼接
121 阅读
3
正则表达式,将字符串分割两部分
111 阅读
4
MySQL数据库练习【一】
109 阅读
5
MySQL数据库练习【三】
92 阅读
默认分类
Mysql
Java基础
一天一练
Mongodb
Nginx
Docker
FastDFS
面试题
云计算基础
linux基础
shell脚本
实验
工具
基础命令
redis
zookeeper
部署
案例
登录
Search
标签搜索
vue
Mysql
IO
面试题
良辰美景好时光
累计撰写
67
篇文章
累计收到
0
条评论
首页
栏目
默认分类
Mysql
Java基础
一天一练
Mongodb
Nginx
Docker
FastDFS
面试题
云计算基础
linux基础
shell脚本
实验
工具
基础命令
redis
zookeeper
部署
案例
页面
关于
推荐
CSDN
搜索到
1
篇与
的结果
2024-08-01
Zookeeper 入门
一、Zookeeper 入门1.1.概述Zookeeper 是一个开源的分布式的,为分布式框架提供协调服务的Apache 项目。1.2.Zookeeper工作机制Zookeeper从设计模式角度来理解:是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper就将负责通知已经在Zookeeper上注册的那些观察者做出相应的反应。1.3.Zookeeper特点1) Zookeeper:一个领导者(Leader),多个跟随者(Follower)组成的集群。2) 集群中只要有半数以上节点存活,Zookeeper集群就能正常服务。所以Zookeeper适合安装奇数台服务器。3) 全局数据一致:每个Server保存一份相同的数据副本,Client无论连接到哪个Server,数据都是一致的。4) 更新请求顺序执行,来自同一个Client的更新请求按其发送顺序依次执行。5) 数据更新原子性,一次数据更新要么成功,要么失败。6) 实时性,在一定时间范围内,Client能读到最新数据。1.4.数据结构ZooKeeper 数据模型的结构与 Unix 文件系统很类似,整体上可以看作是一棵树,每个节点称做一个 ZNode。每一个 ZNode 默认能够存储 1MB 的数据,每个 ZNode 都可以通过其路径唯一标识。 1.5.应用场景提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等。1.5.1.统一命名服务在分布式环境下,经常需要对应用/服务进行统一命名,便于识别。例如:IP不容易记住,而域名容易记住。1.5.2.统一配置管理分布式环境下,配置文件同步非常常见。一般要求一个集群中,所有节点的配置信息是一致的,比如 Kafka 集群。对配置文件修改后,希望能够快速同步到各个节点上。配置管理可交由ZooKeeper实现。可将配置信息写入ZooKeeper上的一个Znode。各个客户端服务器监听这个Znode。一旦Znode中的数据被修改,ZooKeeper将通知各个客户端服务器。1.5.3.统一集群管理分布式环境中,实时掌握每个节点的状态是必要的。可根据节点实时状态做出一些调整。ZooKeeper可以实现实时监控节点状态变化可将节点信息写入ZooKeeper上的一个ZNode。监听这个ZNode可获取它的实时状态变化。1.5.4.服务器动态上下线客户端能实时洞察到服务器上下线的变化1.5.5.软负载均衡在Zookeeper中记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端请求1.6.Zookeeper官网Zookeeper官网:https://zookeeper.apache.org/Zookeeper所有版本:https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7:https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/1.6.1.Zookeeper下载1.6.2.历史版本1.6.3.下载Linux 环境安装的tar包二、Zookeeper安装【Centos7】2.1.环境要求2.1.1.安装JDK2.1.2.上传apache-zookeeper-3.5.7-bin.tar.gz 安装包到/opt/module目录下mkdir -p /opt/module cd /opt/module2.1.3.解压到指定目录tar -xzvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module2.1.4.修改文件夹名称cd /opt/module mv apache-zookeeper-3.5.7-bin zookeeper-3.5.72.2.配置修改2.2.1.将zookeeper-3.5.7/conf 路径下的 zoo_sample.cfg 修改为 zoo.cfgcd /opt/module/zookeeper-3.5.7/conf cp zoo_sample.cfg zoo.cfg2.2.2.修改zookeeper数据文件存放目录vi /opt/module/zookeeper-3.5.7/conf/zoo.cfg # 修改数据存储路径配置 dataDir=/opt/module/zookeeper-3.5.7/zkData2.2.3.创建相关数据文件存放目录mkdir -p /opt/module/zookeeper-3.5.7/zkData/{logs,data}2.3.操作Zookeeper2.3.1.添加到环境变量vim /etc/profile # zookeeper export ZK_HOME=/opt/module/zookeeper-3.5.7 export PATH=$PATH:$ZK_HOME/bin # 输入下面命令让设置的环境变量生效 source /etc/profile2.3.2.启动ZookeeperzkServer.sh start zkServer.sh status zkServer.sh stop zkServer.sh restart #以打印日志方式启动 zkServer.sh start-foreground2.3.3.查看进程是否启动# jps 是 Java Process Status Tool 的简称,它的作用是为了列出所有正在运行中的 Java 虚拟机进程 # 每一个 Java 程序在启动的时候都会为之创建一个Jvm 实例,通过jps可以查看这些进程的相关信息 # jps是Jdk提供的一个工具,它安装在 JAVA_HOME/bin下 jps jps -l2.3.4.ZooKeeper服务端口为2181,查看服务已经启动ps -aux | grep zookeeper netstat -ant | grep 21812.3.5.查看状态zkServer.sh status2.3.6.启动客户端zkCli.sh2.3.7.退出客户端quit2.3.8.停止ZookeeperzkServer.sh stop2.3.9.查看数据文件存放目录zkData2.4.配置参数解读2.4.1.tickTime = 2000:通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒2.4.2.initLimit = 10:LF初始通信时限Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量)2.4.3.syncLimit = 5:LF同步通信时限Leader和Follower之间通信时间如果超过syncLimit * tickTime,Leader认为Follwer死掉,从服务器列表中删除Follwer。2.4.4.dataDir:保存Zookeeper中的数据注意:默认的tmp目录,容易被Linux系统定期删除,所以一般不用默认的tmp目录。2.4.5.clientPort = 2181:客户端连接端口,通常不做修改三、Zookeeper 集群操作3.1.集群操作在ZooKeeper集群服中务中有三个角色:Leader 领导者:处理事务请求集群内部各服务器的调度者Follower 跟随者:处理客户端非事务请求,转发事务请求给Leader服务器参与Leader选举投票Observer 观察者:处理客户端非事务请求,转发事务请求给Leader服务器3.1.1.集群安装3.1.1.1.集群规划在host128、host129 和host130 三个节点上都部署Zookeeper思考:如果是 10 台服务器,需要部署多少台 Zookeeper?3.1.1.2.解压安装# 解压Zookeeper 安装包到/opt/module/目录下 tar -xzvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module cd /opt/module # 修改apache-zookeeper-3.5.7-bin 名称为zookeeper-3.5.7 mv apache-zookeeper-3.5.7-bin zookeeper-3.5.73.1.1.3.配置服务器编号3.1.1.3.1.创建相关数据文件存放目录 zkDatamkdir -p /opt/module/zookeeper-3.5.7/zkData/{logs,data}3.1.1.3.2.创建一个 myid 的文件【!!!】# 在文件中添加与server 对应的编号(注意:上下不要有空行,左右不要有空格) echo 1 >/opt/module/zookeeper-3.5.7/zkData/myid cat /opt/module/zookeeper-3.5.7/zkData/myid3.1.1.3.3.拷贝配置好的 zookeeper 到其他机器上xsync 集群同步工具,需要编辑集群分发脚本xsync.sh zookeeper-3.5.7并分别在host129、host130 上修改myid 文件中内容为 2、33.1.1.4.配置zoo.cfg文件3.1.1.4.1.拷贝配置文件zoo_sample.cfg 为zoo.cfgcd /opt/module/zookeeper-3.5.7/conf # 拷贝配置文件 zoo_sample.cfg 为zoo.cfg cp zoo_sample.cfg zoo.cfg3.1.1.4.2.修改zookeeper配置文件zoo.cfg【!!!】vi /opt/module/zookeeper-3.5.7/conf/zoo.cfg # 修改数据存储路径配置 dataDir=/opt/module/zookeeper-3.5.7/zkData # 增加如下配置 #######################cluster########################## server.1=192.168.147.128:2888:3888 server.2=192.168.147.129:2888:3888 server.3=192.168.147.130:2888:38883.1.1.4.3.配置参数解读【!!!】# tickTime这个时间是作为zookeeper服务器之间或客户端与服务器之间维持心跳的时间间隔为2ms,也就是说每隔tickTime时间就会发送一个心跳。 tickTime=2000 # initLimit这个配置项是用来配置zookeeper接受客户端 # (这里所说的客户端不是用户连接zookeeper服务器的客户端,而是zookeeper服务器集群中连接到leader的follower 服务器) # 初始化连接时最长能忍受多少个心跳时间间隔数。当初始化连接时间超过该值,则表示连接失败。 # 当已经超过10个心跳的时间(也就是tickTime)长度后 zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。 # 对于从节点最初连接到主节点时的超时时间,单位为tick值的倍数。总的时间长度就是 10*2000。即20ms initLimit=10 # syncLimit这个配置项标识leader与follower之间发送消息,请求和应答时间长度,最长不能超过多少个tickTime的时间长度 # 如果follower在设置时间内不能与leader通信,那么此follower将会被丢弃。 # 对于主节点与从节点进行同步操作时的超时时间,单位为tick值的倍数。总的时间长度就是5*2000。即10ms syncLimit=5 # dataDir就是zookeeper保存数据库数据快照的位置,默认情况下zookeeper将写数据的日志文件也保存在这个目录里 # 注意:不能使用 /tmp 路径,会被定期清除。使用专用的存储设备能够大大提高系统的性能 # dataDir=/tmp/zookeeper dataLogDir=/opt/module/zookeeper-3.5.7/zkData/logs # 数据文件存放目录 dataDir=/opt/module/zookeeper-3.5.7/zkData # clientPort这个端口就是客户端连接Zookeeper服务器的端口,Zookeeper会监听这个端口接受客户端的访问请求 clientPort=2181 # 客户端最大链接数 maxClientCnxns=60 # zookeeper在运行过程中会生成快照数据,默认不会自动清理,会持续占用硬盘空间 # 保存3个快照,即3个日志文件 autopurge.snapRetainCount=3 # 间隔1个小时执行一次清理 autopurge.purgeInterval=1 # server.A=B:C:D # 其中A是一个数字,表示这个是第几号服务器 # 集群模式下配置一个文件myid,这个文件在dataDir 目录下,这个文件里面有一个数据就是 A 的值 # Zookeeper 启动时读取此文件,拿到里面的数据与zoo.cfg 里面的配置信息比较从而判断到底是哪个server。 # echo 1 >/opt/module/zookeeper-3.5.7/zkData/myid # B是这个服务器的IP地址 # C第一个端口用来集群成员的信息交换,表示这个服务器Follower与集群中的Leader服务器交换信息的端口 # D标识假如集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口 # 服务器=运行主机:心跳端口:选举端口 # zk集群 # 3888后面位置不能有空格,否则Address unresolved: 192.168.147.128:3888 #######################cluster########################## server.1=192.168.147.128:2888:3888 server.2=192.168.147.129:2888:3888 server.3=192.168.147.130:2888:38883.1.1.4.4.同步zoo.cfg 配置文件cd /opt/module/zookeeper-3.5.7/conf xsync.sh zoo.cfg3.1.1.4.5.修改zkEnv.sh文件并同步,配置java环境变量不修改容易报错:JAVA_HOME is not set and java could not be found in PATH.vi /opt/module/zookeeper-3.5.7/bin/zkEnv.sh #添加 JAVA_HOME="/usr/java/jdk1.8" xsync.sh zkEnv.sh3.1.1.5.集群操作3.1.1.5.1.分别启动ZookeeperzkServer.sh start zkServer.sh status zkServer.sh stop zkServer.sh restart #以打印日志方式启动 zkServer.sh start-foreground3.1.1.5.2.查看状态3.1.2.选举机制(面试重点)3.1.2.1.Zookeeper选举机制——第一次启动(1)服务器1启动,发起一次选举。服务器1投自己一票。此时服务器1票数一票,不够半数以上(3票),选举无法完成,服务器1状态保持为LOOKING;(2)服务器2启动,再发起一次选举。服务器1和2分别投自己一票并交换选票信息:此时服务器1发现服务器2的myid比自己目前投票推举的(服务器1)大,更改选票为推举服务器2。此时服务器1票数0票,服务器2票数2票,没有半数以上结果,选举无法完成,服务器1,2状态保持LOOKING;(3)服务器3启动,发起一次选举。此时服务器1和2都会更改选票为服务器3。此次投票结果:服务器1为0票,服务器2为0票,服务器3为3票。此时服务器3的票数已经超过半数,服务器3当选Leader。服务器1,2更改状态为FOLLOWING,服务器3更改状态为LEADING;(4)服务器4启动,发起一次选举。此时服务器1,2,3已经不是LOOKING状态,不会更改选票信息。交换选票信息结果:服务器3为3票,服务器4为1票。此时服务器4服从多数,更改选票信息为服务器3,并更改状态为FOLLOWING;(5)服务器5启动,同4一样当小弟。名词简介含义SID服务器ID用来唯一标识一台ZooKeeper集群中的机器,每台机器不能重复,和myid一致ZXID事务IDZXID是一个事务ID,用来标识一次服务器状态的变更。在某一时刻,集群中的每台机器的ZXID值不一定完全一致,这和ZooKeeper服务器对于客户端“更新请求”的处理逻辑有关。Epoch每个Leader任期的代号没有Leader时同一轮投票过程中的逻辑时钟值是相同的。每投完一次票这个数据就会增加3.1.2.2.Zookeeper选举机制——非第一次启动(1)当zookeeper集群中的一台服务器出现以下两种情况之一时,就会开始进入Leader选举:服务器初始化启动服务器运行期间无法和leader保持连接(2)而当一台机器进入Leader选举流程时,当前集群也可能会处于以下两种状态:集群中本来就已经存在一个Leader 对于这种已经存在Leader的情况,机器试图去选举Leader时,会被告知当前服务器的Leader信息,对于该机器来说,仅仅需要和Leader机器建立连接,并进行状态同步即可。集群中确实不存在Leader(重点) 假设ZooKeeper由5台服务器组成,SID分别为1、2、3、4、5,ZXID分别为8、8、8、7、7,并且此时SID为3的服务器是Leader。某一时刻,3和5服务器出现故障,因此开始进行Leader选举。最后选了服务器2。3.1.3.ZK 集群启动停止脚本3.1.3.1.在host128 创建脚本vim /usr/bin/zk.sh3.1.3.2.脚本内容#!/bin/bash # 定义一个数组 arr=(host128 host129 host130) case $1 in "start"){ for i in ${arr[@]} do echo ---------- zookeeper $i 启动 ------------ ssh $i "bash /opt/module/zookeeper-3.5.7/bin/zkServer.sh start" done };; "stop"){ for i in ${arr[@]} do echo ---------- zookeeper $i 停止 ------------ ssh $i "bash /opt/module/zookeeper-3.5.7/bin/zkServer.sh stop" done };; "status"){ for i in ${arr[@]} do echo ---------- zookeeper $i 状态 ------------ ssh $i "bash /opt/module/zookeeper-3.5.7/bin/zkServer.sh status" done };; esac3.1.3.3.增加脚本执行权限# u代表所有者user;x代表执行权限;+ 表示增加权限 chmod u+x /usr/bin/zk.sh3.1.3.4.Zookeeper 集群启动zk.sh start3.1.3.5.Zookeeper 集群状态zk.sh status3.1.3.6.Zookeeper 集群停止zk.sh stop3.1.4.显示集群的所有java进程状态jpsall 脚本3.1.4.1.在host128 创建脚本vim /usr/bin/jpsall3.1.4.2.脚本内容#!/bin/bash # 该脚本是用来显示集群的所有java进程状态 # 定义一个数组 list="host128 host129 host130" JAVA_HOME="/usr/java/jdk1.8" echo "显示集群的所有java进程状态" for node in $list do echo =============== $node =============== ssh $node $JAVA_HOME'/bin/jps' done echo "执行结束"3.1.4.3.增加脚本执行权限# u代表所有者user;x代表执行权限;+ 表示增加权限 chmod u+x /usr/bin/jpsall3.1.4.4.执行脚本jpsall3.2. 客户端命令行操作3.2.1.命令行基本语法命令基本语法功能描述help显示所有操作的命令ls path使用 ls 命令来查看当前 znode 的子节点 [可监听] -w 监听子节点变化 <br/>-s 附加次级信息create普通创建-s 含有序列<br/>-e 临时(重启或者超时消失)get path获得节点的值 [可监听] -w 监听节点内容变化 -s 附加次级信息set设置节点的具体值stat查看节点状态delete删除节点deleteall递归删除节点3.2.1.1.启动客户端cd /opt/module/zookeeper-3.5.7 # 指定启动host128的客户端,而不是localhost的 bin/zkCli.sh -server host128:21813.2.1.2.显示所有的操作命令help3.2.2.znode 节点数据信息3.2.2.1.查看当前znode中所包含的内容ls /[zk: host128:2181(CONNECTED) 0] ls / [zookeeper]3.2.2.2.查看当前节点详细数据ls -s /[zk: host128:2181(CONNECTED) 1] ls -s / [zookeeper]cZxid = 0x0 ctime = Thu Jan 01 08:00:00 CST 1970 mZxid = 0x0 mtime = Thu Jan 01 08:00:00 CST 1970 pZxid = 0x0 cversion = -1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 0 numChildren = 1创建节点的事务 zxid:每次修改 ZooKeeper 状态都会产生一个 ZooKeeper 事务 ID。事务 ID 是 ZooKeeper 中所有修改总的次序。每次修改都有唯一的 zxid,如果 zxid1 小于 zxid2,那么 zxid1 在 zxid2 之前发生。 czxid创建节点的事务 zxid(create zxid)ctimeznode 被创建的毫秒数(从 1970 年开始)mzxidznode 最后更新的事务 zxidmtimeznode 最后修改的毫秒数(从 1970 年开始)pZxidznode 最后更新的子节点 zxidcversionznode 子节点变化号,znode 子节点修改次数 dataversionznode 数据变化号aclVersionznode 访问控制列表的变化号ephemeralOwner如果是临时节点,这个是 znode 拥有者的 session id。如果不是临时节点则是 0。dataLengthznode 的数据长度numChildrenznode 子节点数量3.2.3.节点类型(持久/短暂/有序号/无序号)持久(Persistent):客户端和服务器端断开连接后,创建的节点不删除短暂(Ephemeral):客户端和服务器端断开连接后,创建的节点自己删除说明:创建znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护注意:在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序3.2.3.1.分别创建2个普通节点(永久节点 + 不带序号)create /sanguo "xishi" create /sanguo/shuguo "adou"[zk: host128:2181(CONNECTED) 2] create /sanguo "xishi" Created /sanguo [zk: host128:2181(CONNECTED) 3] create /sanguo/shuguo "adou" Created /sanguo/shuguo ## 注意:创建节点时,要赋值3.2.3.2.获得节点的值get -s /sanguo get -s /sanguo/shuguo[zk: host128:2181(CONNECTED) 4] get -s /sanguo xishi cZxid = 0x400000004 ctime = Tue Feb 27 21:42:39 CST 2024 mZxid = 0x400000004 mtime = Tue Feb 27 21:42:39 CST 2024 pZxid = 0x400000005 cversion = 1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 5 numChildren = 1 [zk: host128:2181(CONNECTED) 5] get -s /sanguo/shuguo adou cZxid = 0x400000005 ctime = Tue Feb 27 21:43:01 CST 2024 mZxid = 0x400000005 mtime = Tue Feb 27 21:43:01 CST 2024 pZxid = 0x400000005 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 4 numChildren = 03.2.3.3.创建带序号的节点(永久节点 + 带序号)3.2.3.3.1.先创建一个普通的根节点/sanguo/weiguo[zk: host128:2181(CONNECTED) 6] create /sanguo/weiguo "caocao" Created /sanguo/weiguo3.2.3.3.2.创建带序号的节点如果原来没有序号节点,序号从0 开始依次递增。 如果原节点下已有2个节点,则再排序时从2开始,以此类推。[zk: host128:2181(CONNECTED) 7] create -s /sanguo/weiguo/zhangliao "zhangliao" Created /sanguo/weiguo/zhangliao0000000000 [zk: host128:2181(CONNECTED) 8] create -s /sanguo/weiguo/zhangliao "zhangliao" Created /sanguo/weiguo/zhangliao0000000001 [zk: host128:2181(CONNECTED) 9] create -s /sanguo/weiguo/zhangliao "zhangliao" Created /sanguo/weiguo/zhangliao0000000002 [zk: host128:2181(CONNECTED) 10] create -s /sanguo/weiguo/zhangliao "zhangliao" Created /sanguo/weiguo/zhangliao00000000033.2.3.4.创建短暂节点(短暂节点 + 不带序号 or 带序号)3.2.3.4.1.创建短暂的不带序号的节点[zk: host128:2181(CONNECTED) 0] create -e /sanguo/wuguo "zhouyu" Created /sanguo/wuguo3.2.3.4.2.创建短暂的带序号的节点[zk: host128:2181(CONNECTED) 1] create -e -s /sanguo/wuguo "zhouyu" Created /sanguo/wuguo00000000033.2.3.4.3.在当前客户端是能查看到的[zk: host128:2181(CONNECTED) 2] ls /sanguo [shuguo, weiguo, wuguo, wuguo0000000003]3.2.3.4.4.退出当前客户端然后再重启客户端quit bin/zkCli.sh -server host128:21813.2.3.4.5.再次查看根目录下短暂节点已经删除[zk: host128:2181(CONNECTED) 0] ls /sanguo [shuguo, weiguo]3.2.3.5.修改节点数据值[zk: host128:2181(CONNECTED) 1] set /sanguo/weiguo "kongming" [zk: host128:2181(CONNECTED) 2] get -s /sanguo/weiguo kongming cZxid = 0x400000006 ctime = Tue Feb 27 21:46:26 CST 2024 mZxid = 0x400000011 mtime = Tue Feb 27 21:53:24 CST 2024 pZxid = 0x40000000a cversion = 4 dataVersion = 1 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 8 numChildren = 43.2.4.监听器原理客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、节点删除、子目录节点增加删除)时,ZooKeeper 会通知客户端。监听机制保证ZooKeeper 保存的任何的数据的任何改变都能快速的响应到监听了该节点的应用程序。3.2.4.1.监听原理详解1) 首先要有一个main()线程2) 在main线程中创建Zookeeper客户端,这时就会创建两个线程,一个负责网络连接通信(connet),一个负责监听(listener)。线程connet负责网络通信连接,连接服务器; 线程Listener负责监听;3) 通过connect线程将注册的监听事件发送给Zookeeper。getChildren("/" , true) ," / "表示监听的是根目录,true表示监听,不监听用false4) 在Zookeeper的注册监听器列表中将注册的监听事件添加到列表中,表示这个服务器中的/path,即根目录这个路径被客户端监听了;5) Zookeeper监听到有数据或路径变化,就会将这个消息发送给listener线程。6) listener线程内部调用了process()方法,采取相应的措施,例如更新服务器列表等。3.2.4.2.常见的监听1) 监听节点数据的变化get path [watch]2) 监听子节点增减的变化ls path [watch]3.2.4.3.节点的值变化监听cd /opt/module/zookeeper-3.5.7 # 指定启动host129的客户端,而不是localhost的 bin/zkCli.sh -server host129:2181(1)在host129 主机上注册监听/sanguo 节点数据变化[zk: host129:2181(CONNECTED) 1] get -w /sanguo xishi(2)在host130 主机上修改/sanguo 节点的数据[zk: host130:2181(CONNECTED) 2] set /sanguo "yangyuhuan"(3)观察host129 主机收到数据变化的监听[zk: host129:2181(CONNECTED) 2] WATCHER:: WatchedEvent state:SyncConnected type:NodeDataChanged path:/sanguo注意:在host130 再多次修改/sanguo的值,host129上不会再收到监听。因为注册一次,只能监听一次。想再次监听,需要再次注册。3.2.4.4.节点的子节点变化监听(路径变化)(1)在host129 主机上注册监听/sanguo 节点的子节点变化[zk: host129:2181(CONNECTED) 2] ls -w /sanguo [shuguo, weiguo](2)在host130 主机/sanguo 节点上创建子节点[zk: host130:2181(CONNECTED) 3] create /sanguo/jin "simayi" Created /sanguo/jin(3)观察host129 主机收到子节点变化的监听[zk: host129:2181(CONNECTED) 3] WATCHER:: WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/sanguo注意:节点的路径变化,也是注册一次,生效一次。想多次生效,就需要多次注册。3.2.5.节点删除与查看3.2.5.1.查看节点状态[zk: host129:2181(CONNECTED) 3] stat /sanguo cZxid = 0x400000004 ctime = Tue Feb 27 21:42:39 CST 2024 mZxid = 0x500000005 mtime = Wed Feb 28 10:50:35 CST 2024 pZxid = 0x500000006 cversion = 7 dataVersion = 1 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 10 numChildren = 33.2.5.2.删除节点[zk: host129:2181(CONNECTED) 4] ls /sanguo [jin, shuguo, weiguo] [zk: host129:2181(CONNECTED) 5] delete /sanguo/jin3.2.5.3.递归删除节点[zk: host129:2181(CONNECTED) 6] delete /sanguo Node not empty: /sanguo [zk: host129:2181(CONNECTED) 7] deleteall /sanguo [zk: host129:2181(CONNECTED) 8] [zk: host129:2181(CONNECTED) 8] ls / [zookeeper]四、ZooKeeper-IDEA环境搭建保证三台Zookeeper 集群服务端启动[root@host128 ~]# jpsall 显示集群的所有java进程状态 =============== host128 =============== 66496 Jps 2445 QuorumPeerMain =============== host129 =============== 66162 Jps 2413 QuorumPeerMain =============== host130 =============== 65947 Jps 2383 QuorumPeerMain 执行结束4.1.环境搭建4.1.1.创建maven工程:zookeeper4.1.2.在pom文件添加依赖<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>zookeeper-test01</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <properties> <maven.complier.source>8</maven.complier.source> <maven.complier.target>8</maven.complier.target> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.7</version> </dependency> </dependencies> </project>4.1.3.在项目的src/main/resources 目录下,新建文件为“log4j.properties”log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n4.1.4.创建包名com.orange.zk4.2.ZooKeeper 客户端API操作4.2.1.初始化ZooKeeper对象/** * Description: zookeeper客户端 */ public class Client { //注意:connectString逗号左右不能有空格,否则连接不上 private String connectString = "192.168.147.128:2181,192.168.147.129:2181,192.168.147.130:2181"; //tickTime为2000,initLimit为10 //LF初次连接时的超时时间应起码大于延迟时间tickTime*initLimit的值否则会因为超时而连接失败。 private int sessionTimeout = 200000; private ZooKeeper zkClient; /** * 初始化ZooKeeper对象,必须要先创建,再进行创建节点,否则报空指针异常 * * @throws IOException */ @Before public void init() throws IOException { zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { } }); } }4.2.2.创建节点 @Test public void create() throws InterruptedException, KeeperException { //"/tang":创建的节点的路径; //"t.avi".getBytes():节点里面的值,需要转化为字节传输; //ZooDefs.Ids.OPEN_ACL_UNSAFE:权限控制,设置访问权限; //CreateMode.PERSISTENT:创建的节点类型,如这个是永久不带序号节点。 String nodeCreated = zkClient.create("/tang", "t.avi".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); }# 指定启动host128的客户端,而不是localhost的 /opt/module/zookeeper-3.5.7/bin/zkCli.sh -server host128:2181[zk: host128:2181(CONNECTED) 0] ls / [tang, zookeeper] [zk: host128:2181(CONNECTED) 1] get -s /tang t.avi cZxid = 0x100000002 ctime = Wed Feb 28 12:35:13 CST 2024 mZxid = 0x100000002 mtime = Wed Feb 28 12:35:13 CST 2024 pZxid = 0x100000002 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 5 numChildren = 04.2.3.获取子节点并监听节点变化/** * Description: zookeeper客户端 */ public class Client { //注意:connectString逗号左右不能有空格,否则连接不上 private String connectString = "192.168.147.128:2181,192.168.147.129:2181,192.168.147.130:2181"; //tickTime为2000,initLimit为10 //LF初次连接时的超时时间应起码大于延迟时间tickTime*initLimit的值否则会因为超时而连接失败。 private int sessionTimeout = 200000; private ZooKeeper zkClient; /** * 初始化ZooKeeper对象,必须要先创建,再进行创建节点,否则报空指针异常 * * @throws IOException */ @Before public void init() throws IOException { zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { //收到时间通知后的回调函数(用户的业务逻辑) System.out.println(watchedEvent.getType() + "--" + watchedEvent.getPath()); //再次启动监听 try { System.out.println("==============================="); List<String> children = zkClient.getChildren("/", true); for (String child : children) { System.out.println(child); } System.out.println("==============================="); } catch (Exception e) { e.printStackTrace(); } } }); } @Test public void create() throws InterruptedException, KeeperException { //"/tang":创建的节点的路径; //"t.avi".getBytes():节点里面的值,需要转化为字节传输; //ZooDefs.Ids.OPEN_ACL_UNSAFE:权限控制,设置访问权限; //CreateMode.PERSISTENT:创建的节点类型,如这个是永久不带序号节点。 String nodeCreated = zkClient.create("/tang", "t.avi".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } /** * 监听节点变化信息 */ @Test public void getChildren() throws KeeperException, InterruptedException { System.out.println("-----------------------------"); List<String> children = zkClient.getChildren("/", true); for (String child : children){ System.out.println(child); } System.out.println("-----------------------------"); //延时阻塞 Thread.sleep(Long.MAX_VALUE); } }None--null =============================== tang zookeeper ----------------------------- tang zookeeper ===============================监听器只能监听一次,如果再发生变化需要重新注册监听器,要想每次节点发生变化都能检测到并且在控制台打印,就在初始化监听器里面再注册一个监听器,每次监听完又马上注册一个新的监听器。# 指定启动host128的客户端,而不是localhost的 /opt/module/zookeeper-3.5.7/bin/zkCli.sh -server host128:2181[zk: host128:2181(CONNECTED) 8] create /test01 "test01" Created /test01 [zk: host128:2181(CONNECTED) 9] create /test02 "test02" Created /test02NodeChildrenChanged--/ =============================== tang zookeeper test01 =============================== NodeChildrenChanged--/ =============================== tang test02 zookeeper test01 ===============================4.2.4.判断节点Node是否存在 /** * 判断节点是否存在 */ @Test public void exist() throws InterruptedException, KeeperException { Stat stat = zkClient.exists("/tang", false); System.out.println(stat == null ? "not data" : "exist"); }4.3.客户端向服务端写数据流程4.3.1.写流程之写入请求直接发送给Leader节点1.当client向zookeeper的leader上写数据,发送一个写请求2.这个Leader会把写请求广播给各个server,当各个server写成功后就会通知Leader.3.当Leader收到半数以上server写成功应答,此时认为写成功,Client会收到Leader写成功应答。4.3.2.写流程之写入请求发送给follower节点1.当client向zookeeper集群的某个server上写数据,发送一个写请求2.如果接收到请求的不是Leader,那么server会把请求转发给Leader,因为zookeeper的集群中只有一个是Leader,这个Leader会把写请求广播给各个server,当各个server写成功后就会通知Leader.3.当Leader收到半数以上server写成功应答,此时认为写成功,Leader会告知向他提交申请的server4.Server会进一步将通知Client写成功, 此时就认为写成功了。五、服务器动态上下线监听案例5.1.需求某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。5.2.需求分析--服务器动态上下线5.3.具体实现5.3.1.先在集群上创建/servers 节点[zk: host128:2181(CONNECTED) 10] create /servers "servers" Created /servers5.3.2.创建包名:com.orange.zkcase15.3.3.服务器端向Zookeeper注册代码/** * Description: 服务端和zookeeper集群创建连接 */ public class DistributeServer { //注意:connectString逗号左右不能有空格,否则连接不上 private String connectString = "192.168.147.128:2181,192.168.147.129:2181,192.168.147.130:2181"; //tickTime为2000,initLimit为10 //LF初次连接时的超时时间应起码大于延迟时间tickTime*initLimit的值否则会因为超时而连接失败。 private int sessionTimeout = 200000; private ZooKeeper zkClient; public static void main(String[] args) throws IOException, InterruptedException, KeeperException { DistributeServer server = new DistributeServer(); //1.连接zookeeper集群,获取zk连接,创建zk server.getConnect(); //2.注册服务器到zk集群 server.regist(args[0]); //3.启动业务逻辑 server.business(); } private void business() throws InterruptedException { //延时阻塞 Thread.sleep(Long.MAX_VALUE); } /** * 注册服务器,创建节点 */ private void regist(String hostname) throws InterruptedException, KeeperException { String create = zkClient.create("/servers/" + hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(hostname + " is online"); } /** * 连接上zookeeper集群 */ private void getConnect() throws IOException { zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { } }); } }5.3.4.客户端代码/** * Description: 客户端监听集群节点的动态变化 */ public class DistributeClient { //注意:connectString逗号左右不能有空格,否则连接不上 private String connectString = "192.168.147.128:2181,192.168.147.129:2181,192.168.147.130:2181"; //tickTime为2000,initLimit为10 //LF初次连接时的超时时间应起码大于延迟时间tickTime*initLimit的值否则会因为超时而连接失败。 private int sessionTimeout = 200000; private ZooKeeper zkClient; public static void main(String[] args) throws IOException, InterruptedException, KeeperException { DistributeClient client = new DistributeClient(); //1.获取zk连接 client.getConnect(); //2.监听服务器 /servers 下面子节点的增加和删除 client.getServerList(); //获取servers上的所有节点的上线和下线 //3.业务逻辑 client.business(); } private void business() throws InterruptedException { //延时阻塞 Thread.sleep(Long.MAX_VALUE); } private void getServerList() throws InterruptedException, KeeperException { //获取servers下的所有节点信息 List<String> children = zkClient.getChildren("/servers", true);//对父节点监听 ArrayList<String> servers = new ArrayList<String>(); //集合用来存所有的服务器节点 //遍历所有节点 获取节点中的主机名称信息 for (String child : children) { byte[] data = zkClient.getData("/servers/" + child, false, null); servers.add(new String(data)); } //打印服务器列表信息 System.out.println(servers); } // 创建zookeeper客户端 private void getConnect() throws IOException { zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { //收到事件通知后的回调函数(用户的业务逻辑) try { //再次启动监听,避免只监听一次 getServerList(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } }5.4.测试5.4.1.在Linux 命令行上操作增加减少服务器5.4.1.1.启动DistributeClient 客户端5.4.1.2.在host128 上zk 的客户端/servers 目录上创建临时带序号节点[zk: host128:2181(CONNECTED) 0] ls / [zookeeper] [zk: host128:2181(CONNECTED) 1] create /servers "servers" Created /servers [zk: host128:2181(CONNECTED) 2] create -e -s /servers/host128 "hsot128" Created /servers/host1280000000000 [zk: host128:2181(CONNECTED) 3] create -e -s /servers/host129 "hsot129" Created /servers/host1290000000001 [zk: host128:2181(CONNECTED) 4] create -e -s /servers/host130 "hsot130" Created /servers/host13000000000025.4.1.3.观察Idea 控制台变化[] [] [hsot128] [hsot129, hsot128] [hsot130, hsot129, hsot128]5.4.1.4.执行删除操作[zk: host128:2181(CONNECTED) 6] ls /servers [host1280000000000, host1290000000001, host1300000000002] [zk: host128:2181(CONNECTED) 7] delete /servers/host1280000000000 [zk: host128:2181(CONNECTED) 8] delete /servers/host12900000000015.4.1.5.观察Idea 控制台变化[hsot130, hsot129] [hsot130]5.4.2.在Idea 上操作增加减少服务器5.4.2.1.启动DistributeClient 客户端(如果已经启动过,不需要重启)5.4.2.2.启动DistributeServer 服务5.4.2.2.1.点击Edit Configurations…5.4.2.2.2.在弹出的窗口中(Program arguments)输入想启动的主机,例如,host1305.4.2.2.3.回到DistributeServer的main方法,右 键,在弹出的窗口中点击 Run “DistributeServer.main()”5.4.2.2.4.观察DistributeServer 控制台host130 is online5.4.2.2.5.观察DistributeClient 控制台#host130 已经上线 [hsot130]六、ZooKeeper 分布式锁案例6.1.分布式锁比如说"进程 1"在使用该资源的时候,会先去获得锁,"进程 1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。6.2.ZooKeeper分布式锁原理核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点。客户端获取锁时,在lock节点下创建临时顺序节点。然后获取lock下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁后,将该节点删除。如果发现自己创建的节点并非lock所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件。如果发现比自己小的那个节点被删除,则客户端的Watcher会收到相应通知,此时再次判断自己创建的节点是否是lock子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听。6.3.分布式锁案例分析1)接收到请求后,在/locks节点下创建一个临时顺序节点2)判断自己是不是当前节点下最小的节点:是,获取到锁;不是,对前一个节点进行监听3)获取到锁,处理完业务后,delete节点释放锁,然后下面的节点将收到通知,重复第二步判断6.3.原生Zookeeper 实现分布式锁案例6.3.1.分布式锁实现package com.orange.zkcase2; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; /** * Description: zookeeper分布式锁案例 */ public class DistributedLock { //注意:connectString逗号左右不能有空格,否则连接不上 private final String connectString = "192.168.147.128:2181,192.168.147.129:2181,192.168.147.130:2181"; //tickTime为2000,initLimit为10 //LF初次连接时的超时时间应起码大于延迟时间tickTime*initLimit的值否则会因为超时而连接失败。 private final int sessionTimeout = 200000; private final ZooKeeper zkClient; //增加代码健壮性 //zookeeper连接 private CountDownLatch connectLatch = new CountDownLatch(1); //zookeeper等待 private CountDownLatch waitLatch = new CountDownLatch(1); //当前client等待的子节点的路径 private String waitPath; //当前client创建的子节点 private String currentNode; /** * 和zk创建连接,并创建根节点 */ public DistributedLock() throws IOException, InterruptedException, KeeperException { //1.获取连接 建立服务端与客户端连接 zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("-----process-------"); // connectLatch 如果连接上zk 可以释放 // 连接建立时, 打开latch, 唤醒wait在该latch上的线程 if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { connectLatch.countDown(); } // 发生了waitPath的删除事件 需要释放 if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) { waitLatch.countDown(); } } }); //等待zookeeper正常连接后,代码才往下继续执行 connectLatch.await(); //2.判断根节点 /locks 是否存在 Stat stat = zkClient.exists("/locks", false); //如果根节点不存在,则创建根节点,根节点类型为永久节点 if (stat == null) { System.out.println("根节点不存在"); // 创建根节点,根节点必须是永久节点 zkClient.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } //对zk 加锁 public void zkLock() { try { //创建对应的临时带序号临时节点,返回值为创建的节点路径 currentNode = zkClient.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(Thread.currentThread().getName() + "当前节点为:" + currentNode); //注意, 没有必要监听"/locks"的子节点的变化情况 //判断创建的节点是否是最小的序号节点,如果是获取到锁;如果不是,监听它序号前一个节点 List<String> children = zkClient.getChildren("/locks", false); //如果children只要一个子节点,那就直接获取锁; 如果有多个节点,需要判断,谁最小 if (children.size() == 1) { System.out.println(Thread.currentThread().getName() + "对zk 加锁, 当前节点:" + currentNode); return; } else { ///对根节点下的所有临时顺序节点进行从小到大排序,有序递增 Collections.sort(children); //获取当前节点名称 seq-00000000 String thisNode = currentNode.substring("/locks/".length()); System.out.println(Thread.currentThread().getName() + "当前节点名称为:" + thisNode); // 通过seq-00000000 获取该节点在children集合的位置 int index = children.indexOf(thisNode); System.out.println(Thread.currentThread().getName() + "当前节点在集合的位置为:" + index); //判断 if (index == -1) { System.out.println(Thread.currentThread().getName() + "数据异常"); } else if (index == 0) { //只有一个节点,就可以获取锁了 System.out.println(Thread.currentThread().getName() + "对zk 加锁, 当前节点:" + currentNode); return; } else { //获得排名比 currentNode 前 1 位的节点 waitPath = "/locks/" + children.get(index - 1); System.out.println(Thread.currentThread().getName() + "前一个节点为:" + waitPath); //在 waitPath 上注册监听器, 当 waitPath 被删除时, zookeeper会回调监听器的 process方法 //需要监听 它前一个节点变化 zkClient.getData(waitPath, true, null); //入等待锁状态,等待监听 waitLatch.await(); return; } } } catch (KeeperException e) { throw new RuntimeException(e); } catch (InterruptedException e) { throw new RuntimeException(e); } } //对zk 解锁 public void unZkLock() { try { System.out.println(Thread.currentThread().getName() + "解锁,删除当前节点:" + currentNode); //删除节点 zkClient.delete(currentNode, -1); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (KeeperException e) { throw new RuntimeException(e); } } }6.3.2.分布式锁测试6.3.2.1.创建两个线程package com.orange.zkcase2; import org.apache.zookeeper.KeeperException; import java.io.IOException; /** * Description: 测试分布式锁 */ public class DistributedLockTest { public static void main(String[] args) throws IOException, InterruptedException, KeeperException { // 创建分布式锁 // final修饰的对象必须被初始化,不能被修改。 // 非final的对象可以被重新赋值,锁对象就不受管控了。 // 当一个锁被其他对象占有时,当前线程可以对锁对象重新赋值(相当于从新创建了一个锁对象),从而也拿到了运行的权利。 //创建分布式锁 1 final DistributedLock lock1 = new DistributedLock(); //创建分布式锁 2 final DistributedLock lock2 = new DistributedLock(); new Thread(new Runnable() { @Override public void run() { //获取锁对象 try { lock1.zkLock(); System.out.println("线程0 启动,获取到锁"); Thread.sleep(5*1000);//延迟5秒 lock1.unZkLock(); System.out.println("线程0 释放锁"); } catch (InterruptedException e) { throw new RuntimeException(e); } } }).start(); new Thread(new Runnable() { @Override public void run() { //获取锁对象 try { lock2.zkLock(); System.out.println("线程1 启动,获取到锁"); Thread.sleep(5 * 1000);//延迟5秒 lock2.unZkLock(); System.out.println("线程1 释放锁"); } catch (InterruptedException e) { throw new RuntimeException(e); } } }).start(); } } 6.3.2.2.观察控制台变化-----process------- 根节点不存在 -----process------- Thread-1当前节点为:/locks/seq-0000000000 Thread-0当前节点为:/locks/seq-0000000001 Thread-1当前节点名称为:seq-0000000000 Thread-1当前节点在集合的位置为:0 Thread-1对zk 加锁, 当前节点:/locks/seq-0000000000 线程1 启动,获取到锁 Thread-0当前节点名称为:seq-0000000001 Thread-0当前节点在集合的位置为:1 Thread-0前一个节点为:/locks/seq-0000000000 Thread-1解锁,删除当前节点:/locks/seq-0000000000 -----process------- 线程0 启动,获取到锁 线程1 释放锁 Thread-0解锁,删除当前节点:/locks/seq-0000000001 线程0 释放锁6.4.Curator框架实现分布式锁案例6.4.1.Curator有五种锁方案:InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)InterProcessMutex:分布式可重入排它锁InterProcessReadWriteLock:分布式读写锁InterProcessMultiLock:将多个锁作为单个实体管理的容器InterProcessSemaphoreV2:共享信号量6.4.1.原生的Java API 开发存在的问题(1)会话连接是异步的,需要自己去处理。比如使用 CountDownLatch(2)Watch 需要重复注册,不然就不能生效(3)开发的复杂性还是比较高的(4)不支持多节点删除和创建。需要自己去递归6.4.2.Curator是一个专门解决分布式锁的框架,解决了原生Java API开发分布式遇到的问题官方文档:https://curator.apache.org/index.html6.4.3.Curator 案例实操6.4.3.1.添加依赖 <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>4.3.0</version> </dependency>6.4.3.2.代码实现package com.orange.zkcase3; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; /** * Description: Curator 框架实现分布式锁案例 */ public class CuratorLockTest { public static void main(String[] args) { //创建分布式锁1 InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks"); //创建分布式锁2 InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks"); new Thread(new Runnable() { @Override public void run() { try { //获取到锁 lock1.acquire(); System.out.println("线程1 获取到锁"); //测试锁重入 lock1.acquire(); System.out.println("线程1 再次获取到锁"); Thread.sleep(3 * 1000); //释放锁 lock1.release(); System.out.println("线程1 释放锁"); lock1.release(); System.out.println("线程1 再次释放锁"); } catch (Exception e) { throw new RuntimeException(e); } } }).start(); new Thread(new Runnable() { @Override public void run() { try { //获取到锁 lock2.acquire(); System.out.println("线程2 获取到锁"); //测试锁重入 lock2.acquire(); System.out.println("线程2 再次获取到锁"); Thread.sleep(3 * 1000); //释放锁 lock2.release(); System.out.println("线程2 释放锁"); lock2.release(); System.out.println("线程2 再次释放锁"); } catch (Exception e) { throw new RuntimeException(e); } } }).start(); } /** * 分布式锁初始化 */ private static CuratorFramework getCuratorFramework() { //重试策略,初试时间 3秒,重试3次 ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3); //通过工厂创建Curator CuratorFramework client = CuratorFrameworkFactory .builder() //zookeeper server列表 .connectString("192.168.147.128:2181,192.168.147.129:2181,192.168.147.130:2181") //connection超时时间 .connectionTimeoutMs(20000) //session超时时间 .sessionTimeoutMs(20000) .retryPolicy(policy).build(); //启动客户端 client.start(); System.out.println("zookeeper 初始化完成..."); return client; } }6.4.3.3.控制台变化zookeeper 初始化完成... zookeeper 初始化完成... 线程1 获取到锁 线程1 再次获取到锁 线程1 释放锁 线程1 再次释放锁 线程2 获取到锁 线程2 再次获取到锁 线程2 释放锁 线程2 再次释放锁七、模拟12306售票案例7.1.代码实现/** * Description: 模拟12306售票案例 */ public class LockTicket implements Runnable { private int tickets = 20;//数据库的票数 private InterProcessMutex lock; public LockTicket() { //重试策略,初试时间 3秒,重试3次 ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3); //通过工厂创建client客户端对象 CuratorFramework client = CuratorFrameworkFactory .builder() //zookeeper server列表 .connectString("192.168.147.128:2181,192.168.147.129:2181,192.168.147.130:2181") //connection超时时间 .connectionTimeoutMs(20000) //session超时时间 .sessionTimeoutMs(20000) .retryPolicy(policy) .build(); //启动客户端 client.start(); lock = new InterProcessMutex(client, "/locks"); } @Override public void run() { while (true) { try { //获取锁 lock.acquire(3, TimeUnit.SECONDS); if (tickets > 0) { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + ":" + tickets); tickets--; } } catch (Exception e) { e.printStackTrace(); } finally { //释放锁 try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } } } }7.2.测试/** * Description: 模拟12306售票案例 */ public class LockTicketTest { public static void main(String[] args) { LockTicket lockTicket=new LockTicket(); //创建客户端 Thread t1 = new Thread(lockTicket, "携程"); Thread t2 = new Thread(lockTicket, "飞猪"); t1.start(); t2.start(); } }7.3.控制台变化飞猪:20 携程:19 飞猪:18 携程:17 飞猪:16 携程:15 飞猪:14 携程:13 飞猪:12 携程:11 飞猪:10 携程:9 飞猪:8 携程:7 飞猪:6 携程:5 飞猪:4 携程:3 飞猪:2 携程:1八、企业面试真题(面试重点)8.1.选举机制半数机制,超过半数的投票通过,即通过。(1)第一次启动选举规则: 投票过半数时,服务器 id 大的胜出(2)第二次启动选举规则:EPOCH 大的直接胜出EPOCH 相同,事务 id 大的胜出事务 id 相同,服务器 id 大的胜出8.2.生产集群安装多少zk 合适安装奇数台生产经验:10 台服务器:3 台zk20 台服务器:5 台zk100 台服务器:11 台zk200 台服务器:11 台zk服务器台数多:好处,提高可靠性;坏处:提高通信延时 8.3.常用命令ls、get、create、delete endl
2024年08月01日
2 阅读
0 评论
0 点赞