一、概述
zookeeper是一个开源的分布式协调服务,提供分布式数据一致性解决方案,分布式应用程序可以实现数据发布订阅、负载均衡、命名服务、集群管理分布式锁、分布式队列等功能。 是一个类似hdfs的树形文件结构,zookeeper可以用来保证数据在(zk)集群之间的数据的事务性一致、 有watch事件,是一次性触发的,当watch监视的数据发生变化时,通知设置了该watch的client,即watcher zookeeper有三个角色:Learner,Follower,Observer zookeeper应用场景:
- 统一命名服务(Name Service)
- 配置管理(Configuration Management)
- 集群管理(Group Membership)
- 共享锁(Locks)
- 队列管理
1.1 结构
zooKeeper使用树形结构管理数据。而且以“/”作为树形结构的根节点。树形结构中的每一个节点都称为“znode”,每个节点上都会保存自己的数据和节点信息。。文件系统中的目录可以存放其他目录和文件,znode中可以存放其他znode,也可以对应一个具体的值(少量(1MB)数据)。znode和它对应的值之间是键值对的关系。
节点的数据:即znode data(节点path, 节点data)的关系就像是java map中(key, value)的关系
节点的子节点children
节点的状态stat:用来描述当前节点的创建、修改记录,包括cZxid、ctime等
1.2 通知机制
使用ZooKeeper的通知机制,项目工程在特定znode上设置Watcher(观察者)来监控当前节点上值的变化。一旦Watcher检测到了数据变化就会立即通知监听者模块,从而自动实现“一处修改,处处生效”的效果。属于异步通知 
ZooKeeper的观察机制是一种异步回调的触发机制。
ZooKeeper支持Watch(观察)机制,客户端可以在znode节点上设置一个Watcher(观察者)。如果被观察的znode结点有变更(数据改变、被删除、子目录节点增加删除)时,Watcher会被触发,zookeeper会通知所属的客户端将接收到节点发生变化的通知,这就是把相应的事件通知给设置过Watcher的Client端。
ZooKeeper里的所有读取操作:getData(),getChildren()和exists()都有设置Watch的选项。
zkServer向客户端发送一个Watch通知,Watch触发一次就失效了。如果想继续Watch的话,需要客户端重新设置Watcher。如果在接收通知后继续得到节点变化通知,就必须另外设置一个新的Watcher继续观察。
节点有不同的改动方式。ZooKeeper维护两个观察列表:数据观察和子节点观察。getData()和exists()设置数据观察。getChildren()设置子节点观察。不同的返回数据有不同的观察。getData()和exists()返回节点的数据,而getChildren()返回子节点列表。
setData()将为znode触发数据观察。create()为创建的节点触发数据观察,为其父节点触发子节点观察。delete()将会为被删除的节点触发数据观察以及子节点观察(因为节点不能再有子节点了),为其父节点触发子节点观察。如果一个节点设置存在观察时尚未创建,并且在断开连接后执行节点创建以及删除操作,那么在节点上设置的观察事件客户端接收不到,事件会丢失。
1.3 节点类型
临时节点:生命周期依赖于创建它们的会话。一旦会话(Session)结束,临 时节点将被自动删除,当然可以也可以手动删除。,ZooKeeper的临时节点不允许拥有子节点。
持久化节点:生命周期不依赖于会话,并且只有在客户端显示执行删除操作的时候,他们才能被删除
| PERSISTENTper | 持久化目录节点。客户端与zookeeper断开连接后,该节点依旧存在 |
|---|---|
| PERSISTENT_SEQUENTIAL | 持久化顺序编号目录节点。客户端与zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号 -s |
| EPHEMERAL | 临时目录节点。客户端与zookeeper断开连接后,该节点被删除 -e |
| EPHEMERAL_SEQUENTIAL | 临时顺序编号目录节点。客户端与zookeeper断开连接后,该节点被删除,只是Zookeeper给该节点名称进行顺序编号 -es |
1.4 节点状态
znode维护了一个stat结构,这个stat包含数据变化的版本号、访问控制列表变化、还有时间戳。版本号和时间戳一起,可让ZooKeeper验证缓存和协调更新。每次znode的数据发生了变化,版本号就增加。
例如:无论何时客户端检索数据,它也一起检索数据的版本号。并且当客户端执行更新或删除时,客户端必须提供他正在改变的znode的版本号。如果它提供的版本号和真实的数据版本号不一致,更新将会失败
属性
| czxid | 引起这个znode创建的zxid,创建节点的事务的zxid(ZooKeeper Transaction Id) |
|---|---|
| ctime | znode被创建的毫秒数(从1970年开始) |
| mzxid | znode最后更新的zxid |
| mtime | znode最后修改的毫秒数(从1970年开始) |
| pZxid | znode最后更新的子节点zxid |
| cversion | znode子节点变化号,znode子节点修改次数 |
| dataversion | znode数据变化号 |
| aclVersion | znode访问控制列表的变化号 |
| ephemeralOwner | 如果是临时节点,这个是znode拥有者的session id。如果不是临时节点则是0。 |
| dataLength | znode的数据长度 |
| numChildren | znode子节点数量 |
1.5 一致性协议 zab协议
zab协议 的全称是 Zookeeper Atomic Broadcast (zookeeper原子广播)。
zookeeper 是通过 zab协议来保证分布式事务的最终一致性 基于zab协议,zookeeper集群中的角色主要有三类
zab广播模式工作原理,通过类似两阶段提交协议的方式解决数据一致性:
- leader从客户端收到一个写请求
- leader生成一个新的事务并为这个事务生成一个唯一的ZXID
- leader将这个事务提议(propose)发送给所有的follows节点
- follower节点将收到的事务请求加入到历史队列(history queue)中,并发送ack给 leader
- 当leader收到大多数follower(半数以上节点)的ack消息,leader会发送commit请求
- 当follower收到commit请求时,从历史队列中将事务请求commit
1.6 zookeeper的leader选举
服务器状态
looking:寻找leader状态。当服务器处于该状态时,它会认为当前集群中没有 leader,因此需要进入leader选举状态。 leading: 领导者状态。表明当前服务器角色是leader。 following: 跟随者状态。表明当前服务器角色是follower。 observing:观察者状态。表明当前服务器角色是observer。
服务器启动时期leader选举
在集群初始化阶段,当有一台服务器server1启动时,其单独无法进行和完成 leader选举,当第二台服务器server2启动时,此时两台机器可以相互通信,每台机器都 试图找到leader,于是进入leader选举过程。选举过程如下:
- 每个server发出一个投票。由于是初始情况,server1和server2都会将自己作为 leader服务器来进行投票,每次投票会包含所推举的服务器的myid和zxid,使用 (myid, zxid)来表示,此时server1的投票为(1, 0),server2的投票为(2, 0),然后各自 将这个投票发给集群中其他机器。
- 集群中的每台服务器接收来自集群中各个服务器的投票。
- 处理投票。针对每一个投票,服务器都需要将别人的投票和自己的投票进行pk,pk规则如下优先检查zxid。zxid比较大的服务器优先作为leader。 如果zxid相同,那么就比较myid。myid较大的服务器作为leader服务器。 对于Server1而言,它的投票是(1, 0),接收Server2的投票为(2, 0),首先会比较 两者的zxid,均为0,再比较myid,此时server2的myid最大,于是更新自己的投票 为(2, 0),然后重新投票,对于server2而言,其无须更新自己的投票,只是再次向集 群中所有机器发出上一次投票信息即可。
- 统计投票。每次投票后,服务器都会统计投票信息,判断是否已经有过半机器接受到 相同的投票信息,对于server1、server2而言,都统计出集群中已经有两台机器接受 了(2, 0)的投票信息,此时便认为已经选出了leader
- 改变服务器状态。一旦确定了leader,每个服务器就会更新自己的状态,如果是 follower,那么就变更为following,如果是leader,就变更为leading。
服务器运行时期leader选举
在zookeeper运行期间,leader与非leader服务器各司其职,即便当有非leader服务器宕机或新加入,此时也不会影响leader,但是一旦leader服务器挂了,那么整个集群将暂停对外服务,进入新一轮leader选举,其过程和启动时期的Leader选举过程基本一致。 假设正在运行的有server1、server2、server3三台服务器,当前leader是 server2,若某一时刻leader挂了,此时便开始Leader选举。选举过程如下:
- 变更状态。leader挂后,余下的服务器都会将自己的服务器状态变更为looking,然 后开始进入leader选举过程。
- 每个server会发出一个投票。在运行期间,每个服务器上的zxid可能不同,此时假定 server1的zxid为122,server3的zxid为122,在第一轮投票中,server1和server3 都会投自己,产生投票(1, 122),(3, 122),然后各自将投票发送给集群中所有机器。
- 接收来自各个服务器的投票。与启动时过程相同
- 处理投票。与启动时过程相同,此时,server3将会成为leader。
- 统计投票。与启动时过程相同。
- 改变服务器的状态。与启动时过程相同
observer角色及其配置
角色特点: 不参与集群的leader选举 、不参与集群中写数据时的ack反馈 为了使用observer角色,在任何想变成observer角色的配置文件中加入如下配置: peerType=observer 并在所有server的配置文件中,配置成observer模式的server的那行配置追加:observer, 例如 server.3=192.168.60.130:2289:3389:observer
1.7 应用场景
配置中心 在平常的业务开发过程中,我们通常需要将系统的一些通用的全局配置,例如机器列表配置,运行时开关配置,数据库配置信息等统一集中存储,让集群所有机器共享配置信息,系统在启动会首先从配置中心读取配置信息,进行初始化。传统的实现方式将配置存储在本地文件和内存中,一旦机器规模更大,配置变更频繁情况下,本地文件和内存方式的配置维护成本较高,使用zookeeper作为分布式的配置中心就可以解决这个问题。 我们将配置信息存在zk中的一个节点中,同时给该节点注册一个数据节点变更的watcher监听,一旦节点数据发生变更,所有的订阅该节点的客户端都可以获取数据变更通知。
负载均衡 建立server节点,并建立监听器监视servers子节点的状态(用于在服务器增添时及时同步当前集群中服务器列表)。在每个服务器启动时,在servers节点下建立具体服务器地址的子节点,并在对应的字节点下存入服务器的相关信息。这样,我们在zookeeper服务器上可以获取当前集群中的服务器列表及相关信息,可以自定义一个负载均衡算法,在每个请求过来时从zookeeper服务器中获取当前集群服务器列表,根据算法选出其中一个服务器来处理请求。
命名服务 命名服务是分布式系统中的基本功能之一。被命名的实体通常可以是集群中的机器、提供的服务地址或者远程对象,这些都可以称作为名字。常见的就是一些分布式服务框架(RPC、RMI)中的服务地址列表,通过使用名称服务客户端可以获取资源的实体、服务地址和提供者信息。命名服务就是通过一个资源引用的方式来实现对资源的定位和使用。在分布式环境中,上层应用仅仅需要一个全局唯一名称,就像数据库中的主键。 在单库单表系统中可以通过自增ID来标识每一条记录,但是随着规模变大分库分表很常见,那么自增ID有仅能针对单一表生成ID,所以在这种情况下无法依靠这个来标识唯一ID。UUID就是一种全局唯一标识符。但是长度过长不易识别。
在Zookeeper中通过创建顺序节点就可以实现,所有客户端都会根据自己的任务类型来创建一个顺序节点,例如 job-00000001 节点创建完毕后,create()接口会返回一个完整的节点名,例如:job-00000002 拼接type类型和完整节点名作为全局唯一的ID
DNS服务 域名配置 在分布式系统应用中,每一个应用都需要分配一个域名,日常开发中,往往使用本地HOST绑定域名解析,开发阶段可以随时修改域名和IP的映射,大大提高开发的调试效率。如果应用的机器规模达到一定程度后,需要频繁更新域名时,需要在规模的集群中变更,无法保证实时性。所有我们在zk上创建一个节点来进行域名配置
域名解析 应用解析时,首先从zk域名节点中获取域名映射的IP和端口。 域名变更 每个应用都会在在对应的域名节点注册一个数据变更的watcher监听,一旦监听的域名节点数据变更,zk会向所有订阅的客户端发送域名变更通知。
集群管理 随着分布式系统规模日益扩大,集群中机器的数量越来越多。有效的集群管理越来越重要了,zookeeper集群管理主要利用了watcher机制和创建临时节点来实现。以机器上下线和机器监控为例:
机器上下线 新增机器的时候,将Agent部署到新增的机器上,当Agent部署启动时,会向zookeeper指定的节点下创建一个临时子节点,当Agent在zk上创建完这个临时节点后,当关注的节点zookeeper/machines下的子节点新加入新的节点时或删除都会发送通知,这样就对机器的上下线进行监控。
机器监控 在机器运行过程中,Agent会定时将主机的的运行状态信息写入到/machines/hostn主机节点,监控中心通过订阅这些节点的数据变化来获取主机的运行信息。
分布式锁 数据库实现分布式锁
首先我们创建一张锁表,锁表中字段设置唯一约束。 定义锁,实现Lock接口,tryLock()尝试获取锁,从锁表中查询指定的锁记 录,如果查询到记录,说明已经上锁,不能再上锁
在lock方法获取锁之前先调用tryLock()方法尝试获取锁,如果未加锁则向锁表中插入一条锁记录来获取锁,这里我们通过循环,如果上锁我们一致等待锁的释放
释放锁,即是将数据库中对应的锁表记录删除 注意在尝试获取锁的方法tryLock中,存在多个线程同时获取锁的情况,可以简单通过synchronized解 决
redis实现分布式锁 redis分布式锁的实现基于setnx(set if not exists),设置成功,返回1;设置失败,返回0,释放锁的操作通过del指令来完成 如果设置锁后在执行中间过程时,程序抛出异常,导致del指令没有调用,锁永远无法释放,这样就会陷入死锁。所以我们拿到锁之后会给锁加上一个过期时间,这样即使中间出现异常,过期时间到后会自动释放锁。 同时在setnx 和 expire 如果进程挂掉,expire不能执行也会死锁。所以要保证setnx和expire是一个原子性操作即可。redis 2.8之后推出了setnx和expire的组合指令
redis实现分布式锁注意的事项: redis如何避免死锁 lock获取锁方法
释放锁 redis实现分布式锁存在的问题,为了解决redis单点问题,我们会部署redis集群,在 Sentinel 集群中,主节点突然挂掉了。同时主节点中有把锁还没有来得及同步到从节点。这样就会导致系统中同样一把锁被两个客户端同时持有,不安全性由此产生。redis官方为了解决这个问题,推出了Redlock 算法解决这个问题。但是带来的网络消耗较大。
zookeeper实现分布式锁 原理:zookeeper通过创建临时序列节点来实现分布式锁,适用于顺序执行的程序,大体思路就是创建临时序列节点,找出最小的序列节点,获取分布式锁,程序执行完成之后此序列节点消失,通过watch来监控节点的变化,从剩下的节点的找到最小的序列节点,获取分布式锁,执行相应处理,依次类推……
首先在ZkLock的构造方法中,连接zk,创建lock根节点 添加watch监听临时顺序节点的删除 获取锁操作 释放锁
分布式队列
队列特性:FIFO(先入先出),zookeeper实现分布式队列的步骤: 在队列节点下创建临时顺序节点 例如/queue_info/192.168.1.1-0000001 调用getChildren()接口来获取/queue_info节点下所有子节点,获取队列中所有元素 比较自己节点是否是序号最小的节点,如果不是,则等待其他节点出队列,在序号最小的节点注册watcher 获取watcher通知后,重复步骤
二、命令
2.1 常用命令
zk命令
# 服务端命令
<NolebasePageProperties />
#启动服务器:
./zkServer.sh start
#停止服务器:
./zkServer.sh stop
# 查看服务状态
./zkServer.sh status
# 重启服务
./zkServer.sh restart
# 客户端命令
#启动客户端:
./zkCli.sh ./zkCli.sh -server locahost:port
#退出客户端:
[zk: localhost:2181(CONNECTED) 6] quit节点操作
#查看当前znode中所包含的内容
ls
#查看当前节点数据并能看到更新次数等数据
ls2
#查看节点状态
stat
#普通创建:不带有-s、-e参数-s:含有序列-e:临时(重启或者超时消失)
create [-s] [-e] path data acl
#设置节点的具体值 更新 也可以基于版本号进行更改,此时类似于乐观锁机制,当你传入的数据版本号 (dataVersion) 和当前节点的数据版本号不符合时,zookeeper 会拒绝本次修改:
set 节点 value值 [版本号]
#获得节点的值
get 节点
#可以删除指定znode,当该znode拥有子znode时,必须先删除其所有子znode,否则操作将失败。
delete path [version]
# 删除带子节点的节点
deleteall /节点path
#rmr命令可用于代替delete命令,rmr是一个递归删除命令,如果发生指定节点拥有子节点时,rmr命令会首先删除子节点。
rmr监听
# 注册的监听器能够在节点内容发生改变的时候,向客户端发出通知。触发一次后就会立即失效。
get path [watch]
# 在节点状态发生改变的时候,向客 户端发出通知
stat path [watch]
# 注册的监听器能够监听该节点下 所有子节点的增加和删除操作
ls path [watch] 或 ls2 path [watch]2.2 四字监控命令
#ZooKeeper支持某些特定的四字命令,他们大多是用来查询ZooKeeper服务的当前状态及相关信息的,使用时通过telnet或nc向ZooKeeper提交相应命令。
#nc命令需要安装对应的程序才可以使用。yum install -y nc
nc
# 使用
echo mntr | nc localhost 2181
#测试服务是否处于正确状态。如果确实如此,那么服务返回“imok ”,否则不做任何响应
ruok
# 输出关于性能和连接的客户端的列表 输出服务器的详细信息:接收/发送包数量、连接数、模式 (leader/follower)、节点总数、延迟。 所有客户端的列表
stat
# 重置server状态
srst
# 输出相关服务配置的详细信息
conf
# 列出所有连接到服务器的客户端的完全的连接 /会话的详细信息。包括“接受 / 发送”的包数量、会话id 、操作延迟、最后的操作执行等等信息
cons
# 重置当前这台服务器所有连接/会话的统计信息
crst
# 列出未经处理的会话和临时节点
dump
# 输出关于服务环境的详细信息(区别于conf命令)
envi
# 列出未经处理的请求
reqs
# 列出服务器watches的简洁信息:连接总数、watching节点总数和 watches总数
wchs
# 通过session列出服务器watch的详细信息,它的输出是一个与watch相关的会话的列表
wchc
# 通过路径列出服务器 watch的详细信息。它输出一个与 session相关的路径
wchp
# 列出集群的健康状态。包括“接受/发送”的包数量、操作延迟、当前服务模 式(leader/follower)、节点总数、watch总数、临时节点总数
mntrconf命令
输出相关服务配置的详细信息 shell终端输入:echo conf| nc localhost 2181
| 属性 | 含义 |
|---|---|
| clientPort | 客户端端口号 |
| dataDir | 数据快照文件目录 默认情况下100000次事务操作生成一次快照 |
| dataLogDir | 事物日志文件目录,生产环境中放在独立的磁盘上 |
| tickTime | 服务器之间或客户端与服务器之间维持心跳的时间间隔(以毫秒为单位) |
| maxClientCnxns | 最大连接数 |
| minSessionTimeout | 最小session超时 minSessionTimeout=tickTime*2 |
| maxSessionTimeout | 最大session超时 maxSessionTimeout=tickTime*20 |
| serverId | 服务器编号 |
| initLimit | 集群中的follower服务器(F)与leader服务器(L)之间初始连接时能容忍的最多心跳数 |
| syncLimit | 集群中的follower服务器(F)与leader服务器(L)之间 请求和应答之间能容忍的最多心跳数 |
| electionAlg | 0:基于UDP的LeaderElection 1:基于UDP的FastLeaderElection 2:基于UDP和认证的<br />FastLeaderElection 3:基于TCP的FastLeaderElection 在3.4.10版本中,<br />默认值为3另外三种算法已经被弃用,并且有计划在之后的版本中将它们彻底删除而不再支持 |
| electionPort | 选举端口 |
| quorumPort | 数据通信端口 |
| peerType | 是否为观察者 1为观察者 |
cons命令
列出所有连接到这台服务器的客户端连接/会话的详细信息 shell终端输入:echo cons| nc localhost 2181
| 属性 | 含义 |
|---|---|
| ip | ip地址 |
| port | 端口号 |
| queued | 等待被处理的请求数,请求缓存在队列中 |
| received | 收到的包数 |
| sent | 发送的包数 |
| sid | 会话id |
| lop | 最后的操作 GETD-读取数据 DELE-删除数据 CREA-创建数据 |
| est | 连接时间戳 |
| to | 超时时间 |
| lcxid | 当前会话的操作id |
| lzxid | 最大事务id |
| lresp | 最后响应时间戳 |
| llat | 最后/最新 延时 |
| minlat | 最小延时 |
| maxlat | 最大延时 |
| avglat | 平均延时 |
crst命令
重置当前这台服务器所有连接/会话的统计信息 shell终端输入:echo crst| nc localhost 2181
dump命令
列出未经处理的会话和临时节点 shell终端输入:echo dump| nc localhost 2181 session id :znode path(1对多 , 处于队列中排队的session和临时节点)
envi命令
输出关于服务器的环境配置信息 shell终端输入:echo envi| nc localhost 2181
| 属性 | 含义 |
|---|---|
| zookeeper.version | 版本 |
| host.name | host信息 |
| java.version | java版本 |
| java.vendor | 供应商 |
| java.home | 运行环境所在目录 |
| java.class.path | classpath |
| java.library.path | 第三方库指定非java类包的位置(如:dll,so) |
| java.io.tmpdir | 默认的临时文件路径 |
| java.compiler | JIT 编译器的名称 |
| os.name | Linux |
| os.arch | amd64 |
| os.version | 3.10.0-514.el7.x86_64 |
| user.name | zookeeper |
| user.home | /home/zookeeper |
| user.dir | /home/zookeeper/zookeeper2181/bin |
rouk命令
测试服务是否处于正确运行状态 shell终端输入:echo ruok| nc localhost 2181
stat命令
输出服务器的详细信息与srvr相似,但是多了每个连接的会话信息 shell终端输入:echo stat| nc localhost 2181
| 属性 | 含义 |
|---|---|
| Zookeeper version | 版本 |
| Latency min/avg/max | 延时 |
| Received | 收包 |
| Sent | 发包 |
| Connections | 连接数 |
| Outstanding | 堆积数 |
| Zxid | 最大事物id |
| Mode | 服务器角色 |
| Node count | 节点数 |
srst命令
重置server状态 shell终端输入:echo srst| nc localhost 2181
wchs命令
列出服务器watches的简洁信息 shell终端输入:echo wchs| nc localhost 2181
| 属性 | 含义 |
|---|---|
| connectsions | 连接数 |
| watch-paths | watch节点数 |
| watchers | watcher数量 |
wchc命令
通过session分组,列出watch的所有节点,它的输出的是一个与 watch 相关的会话的节点列表 shell终端输入:echo wchc| nc localhost 2181
# wchc is not executed because it is not in the whitelist.
# 修改启动指令 zkServer.sh
# 注意找到这个信息
else
echo "JMX disabled by user request" >&2
ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain"
fi
# 下面添加如下信息
ZOOMAIN="-Dzookeeper.4lw.commands.whitelist=* ${ZOOMAIN}"wchp命令
通过路径分组,列出所有的 watch 的session id信息 shell终端输入:echo wchp| nc localhost 2181
# wchp is not executed because it is not in the whitelist.
# 修改启动指令 zkServer.sh
# 注意找到这个信息
else
echo "JMX disabled by user request" >&2
ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain"
fi
# 下面添加如下信息
ZOOMAIN="-Dzookeeper.4lw.commands.whitelist=* ${ZOOMAIN}"mntr命令
列出服务器的健康状态. shell终端输入:echo mntr| nc localhost 2181
| 属性 | 含义 |
|---|---|
| zk_version | 版本 |
| zk_avg_latency | 平均延时 |
| zk_max_latency | 最大延时 |
| zk_min_latency | 最小延时 |
| zk_packets_received | 收包数 |
| zk_packets_sent | 发包数 |
| zk_num_alive_connections | 连接数 |
| zk_outstanding_requests | 堆积请求数 |
| zk_server_state | leader/follower 状态 |
| zk_znode_count | znode数量 |
| zk_watch_count | watch数量 |
| zk_ephemerals_count | 临时节点(znode) |
| zk_approximate_data_size | 数据大小 |
| zk_open_file_descriptor_count | 打开的文件描述符数量 |
| zk_max_file_descriptor_count | 最大文件描述符数量 |
2.3 acl权限控制
acl 权限控制,使用scheme:id:permission 来标识,主要涵盖 3 个方面:
- 权限模式(scheme):授权的策略
- 授权对象(id):授权的对象
- 权限(permission):授予的权限
特性:
- zooKeeper的权限控制是基于每个znode节点的,需要对每个节点设置权限
- 每个znode支持设置多种权限控制方案和多个权限
- 子节点不会继承父节点的权限,客户端无权访问某节点,但可能可以访问它的子节点
授权对象ID是指,权限赋予的实体,例如:IP 地址或用户
权限模式
| 方案 | 描述 |
|---|---|
| world | 只有一个用户:anyone,戴白哦登录zookeeper的所有人 |
| ip | 对客户端使用的ip地址认证 |
| auth | 使用已添加认证的用户认证 |
| digest | 使用“用户名:密码”方式认证 |
授予的权限
create、delete、read、writer、admin也就是 增、删、改、查、管理权限, 这5种权限简写为cdrwa,注意:这5种权限中,delete是指对子节点的删除权限,其它4种 权限指对自身节点的操作权限
| 权限 | ACL简写 | 描述 |
|---|---|---|
| create | c | 可以创建子节点 |
| delete | d | 可以删除子节点(仅下一级节点) |
| read | r | 可以读取节点数据及显示子节点列表 |
| write | w | 可以设置节点数据 |
| admin | a | 可以设置节点访问控制列表权限 |
授权相关命令
| 命令 | 使用方式 | 描述 |
|---|---|---|
| getAcl | getAcl | 读取ACL权限 |
| setAcl | setAcl | 设置ACL权限 |
| addauth | addauth | 添加认证用户 |
案例
# world授权模式
setAcl <path> world:anyone:<acl>
setAcl /node1 world:anyone:cdrwa
# IP授权模式:
setAcl <path> ip:<ip>:<acl>
setAcl /node2 ip:192.168.60.129:cdrwa
# Auth授权模式:
addauth digest <user>:<password> #添加认证用户
setAcl <path> auth:<user>:<acl>
addauth digest itcast:123456
setAcl /node3 auth:itcast:cdrwa
# Digest授权模式:
setAcl <path> digest:<user>:<password>:<acl>
# 密码是经过SHA1及BASE64处理的密文,在SHELL中可以通过以下命令计算:
echo -n <user>:<password> | openssl dgst -binary -sha1 | openssl base64
echo -n itheima:123456 | openssl dgst -binary -sha1 | openssl base64
setAcl /node4 digest:itheima:qlzQzCLKhBROghkooLvb+Mlwv4A=:cdrwa
# 多种模式授权
setAcl /node5 ip:192.168.60.129:cdra,auth:itcast:cdrwa,digest:itheima:qlzQzCLKhBROgh kooLvb+Mlwv4A=:cdrwaACL超级管理员
zookeeper的权限管理模式有一种叫做super,该模式提供一个超管可以方便的访问任何权限的节点
# 生成密钥
echo -n super:admin | openssl dgst -binary -sha1 | openssl base64
# 修改/bin/zkServer.sh脚本文件
nohup $JAVA "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "- Dzookeeper.root.logger=${ZOO_LOG4J_PROP}"
# 添加
"-Dzookeeper.DigestAuthenticationProvider.superDigest=super:xQJmxLMiHGwaqBv st5y6rkB6HQs="
addauth digest super:admin #添加认证用户三、Java代码操作ZK
3.1 ZKClient连接zk
依赖
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.9</version>
</dependency>主要方法
//创建zookeeper对象
//连接zookeeper服务器
public ZooKeeper(
String connectString, // zooKeeper集合主机。多个以,号分隔
int sessionTimeout,
Watcher watcher) throws IOException
{
this(connectString, sessionTimeout, watcher, false);
}
//断开ZooKeeper服务器连接
org.apache.zookeeper.ZooKeeper.close()
// 创建节点
org.apache.zookeeper.ZooKeeper.create(
String, // znode路径
byte[], //存储在节点的数据
List<ACL>, //可以使用Ids.OPEN_ACL_UNSAFE 访问控制列表,可通过ZooDefs.Ids 来获取一些基本的acl列表。
CreateMode//CreateMode本身是一个枚举类型
// 添加后标识为异步方式 方法都可添加
AsyncCallback.StatCallback callBack, Object ctx )//callBack-异步回调接口 ctx-传递上下文参数
// 获取节点上的数据
org.apache.zookeeper.ZooKeeper.getData(
String,
boolean,
Stat)
// 查看子节点
// 同步方式
getChildren(String path, boolean b) // b是否使用连接对象中注册的监视
// 异步方式
getChildren(String path, boolean b,AsyncCallback.ChildrenCallback callBack,Object ctx)
//设置节点上的数据
org.apache.zookeeper.ZooKeeper.setData(
String,
byte[],
int
AsyncCallback.StatCallback callBack, Object ctx
)//使用-1作为版本号的值时可以匹配所有版本号
//判断节点是否存在
org.apache.zookeeper.ZooKeeper.exists(String, boolean) // 是否使用连接对象中注册的监视器。
// 删除节点
// 同步方式
delete(String path, int version)
// 异步方式
delete(String path, int version, AsyncCallback.VoidCallback callBack, Object ctx)代码实现
// 1.连接Zookeeper服务器的信息
String connectString = "192.168.56.150:2181";
int sessionTimeout = 5000;
Watcher watcher = new Watcher() {@Override public void process(WatchedEvent event) {}};
// 2.创建ZooKeeper建立连接
ZooKeeper zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
// 3.通过设置节点值测试连接
byte[] data = "BBB".getBytes();
// -1表示不验证版本号
int version = -1;
Stat stat = zooKeeper.setData("/fruit", data, version);
System.out.println(stat.getVersion());
// 4.关闭连接
zooKeeper.close();通知测试代码
// 一次性通知
// 1.准备连接Zookeeper服务器的连接信息
String connectingString = "192.168.56.100:2181";
int sessionTimeout = 5000;
// 2.创建Zookeeper对象
ZooKeeper zooKeeper = new ZooKeeper(connectingString, sessionTimeout, new Watcher() {
public void process(WatchedEvent event) {}
});
// 3.获取指定节点的值
String path = "/fruit";
byte[] data = zooKeeper.getData(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("/fruit节点的值被修改了!"+Thread.currentThread().getName());
}
}, new Stat());
// 4.将获取节点值时得到的字节数组转化成String类型
String dataString = new String(data);
// 5.打印结果
System.out.println(dataString);
// ※让程序不能停
while(true) {
Thread.sleep(5000);
System.err.println("药不能停,不能停……[代表程序原本要执行的业务逻辑功能……]"+Thread.currentThread().getName());
}
// 6.关闭连接
// zooKeeper.close();持续性通知
// 持续性通知
// 专门封装一个方法用来实现持续的异步监听
public void getNodeData(String path, ZooKeeper zooKeeper) throws KeeperException, InterruptedException {
// 调用获取节点值的方法时设置Watcher进行监控
byte[] data = zooKeeper.getData(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 接收到异步通知时,再次调用获取节点值方法,目的是设置新的Watcher
try {
getNodeData(path, zooKeeper);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, new Stat());
String result = new String(data);
System.err.println("当前节点值="+result);
}
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
// 1.建立连接
String connectingString = "192.168.56.100:2181";
int sessionTimeout = 5000;
ZooKeeper zooKeeper = new ZooKeeper(connectingString, sessionTimeout, new Watcher() {
public void process(WatchedEvent event) {}
});
// 2.获取节点数据
String path = "/fruit";
new AsyncNoticeForever().getNodeData(path, zooKeeper);
// 3.让方法持续运行
while(true) {
Thread.sleep(5000);
System.err.println("药不能停,不能停……[代表程序原本要执行的业务逻辑功能……]"+Thread.currentThread().getName());
}
}3.2 Curator连接ZK
Curator 最初是 Netfix 研发的,后来捐献了 Apache 基金会,目前是 Apache 的顶级项目。提供zooKeeper各种应用场景(比如:分布式锁服务、集群领导选举、 共享计数器、缓存机制、分布式队列等)的抽象封装,实现了Fluent风格的API接口,是最好用,最流行的zookeeper的客户端。其解决session会话超时重连 、watcher反复注册 、简化开发api 、遵循Fluent风格的API 、提供了分布式锁服务、共享计数器、缓存机制等机制 官网:http://curator.apache.org/
依赖
<!--curator-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>创建连接
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryOneTime;
public class CuratorConnection {
public static void main(String[] args) {
// session重连策略
/*
3秒后重连一次,只重连1次
RetryPolicy retryPolicy = new RetryOneTime(3000);
*/
/*
每3秒重连一次,重连3次
RetryPolicy retryPolicy = new RetryNTimes(3,3000);
*/
/*
每3秒重连一次,总等待时间超过10秒后停止重连
RetryPolicy retryPolicy=new RetryUntilElapsed(10000,3000);
*/
// baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
// 创建连接对象
CuratorFramework client= CuratorFrameworkFactory.builder()
// IP地址端口号
.connectString("192.168.60.130:2181,192.168.60.130:2182,192.168.60.130:2183")
// 会话超时时间
.sessionTimeoutMs(5000)
// 重连机制
.retryPolicy(retryPolicy)
// 命名空间
.namespace("create")
// 构建连接对象
.build();
// 打开连接
client.start();
System.out.println(client.isStarted());
// 关闭连接
client.close();
}
}节点新增
// 新增节点
client.create()
// 递归节点的创建 多节点时使用 如:"/node3/node31
.creatingParentsIfNeeded()
// 节点的类型
.withMode(CreateMode.PERSISTENT)
// 节点的权限列表 world:anyone:cdrwa
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
// 异步回调接口 不使用为同步方式
.inBackground(new BackgroundCallback() {
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
// 节点的路径
System.out.println(curatorEvent.getPath());
// 时间类型
System.out.println(curatorEvent.getType());
}
})
// arg1:节点的路径
// arg2:节点的数据
.forPath("/node1", "node1".getBytes());
System.out.println("结束");更新节点
// 更新节点
client.setData()
// 指定版本号
.withVersion(2)
// 异步
.inBackground(new BackgroundCallback() {
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
// 节点的路径
System.out.println(curatorEvent.getPath());
// 事件的类型
System.out.println(curatorEvent.getType());
}
})
// arg1:节点的路径
// arg2:节点的数据
.forPath("/node1", "node11".getBytes());
System.out.println("结束");删除节点
// 删除节点
client.delete()
// 删除包含子节点的节点
.deletingChildrenIfNeeded()
// 版本号
.withVersion(0)
// 异步
.inBackground(new BackgroundCallback() {
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
// 节点路径
System.out.println(curatorEvent.getPath());
// 事件类型
System.out.println(curatorEvent.getType());
}
})
// 节点的路径
.forPath("/node1");
System.out.println("结束");查看节点
// 读取节点数据
byte [] bys=client.getData()
// 读取属性
.storingStatIn(stat)
// 节点的路径
.forPath("/node1");
System.out.println(new String(bys));
// 异步方式读取节点的数据
client.getData()
.inBackground(new BackgroundCallback() {
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
// 节点的路径
System.out.println(curatorEvent.getPath());
// 事件类型
System.out.println(curatorEvent.getType());
// 数据
System.out.println(new String(curatorEvent.getData()));
}
})
.forPath("/node1");
Thread.sleep(5000);
System.out.println("结束");查看子节点
// 读取子节点数据
List<String> list = client.getChildren()
// 节点路径
.forPath("/get");
for (String str : list) {
System.out.println(str);
}
// 异步方式读取子节点数据
client.getChildren()
.inBackground(new BackgroundCallback() {
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
// 节点路径
System.out.println(curatorEvent.getPath());
// 事件类型
System.out.println(curatorEvent.getType());
// 读取子节点数据
List<String> list=curatorEvent.getChildren();
for (String str : list) {
System.out.println(str);
}
}
})
.forPath("/get");
Thread.sleep(5000);
System.out.println("结束");检查节点是否存在
// 判断节点是否存在
Stat stat= client.checkExists()
// 节点路径
.forPath("/node2");
System.out.println(stat.getVersion());
// 异步方式判断节点是否存在
client.checkExists()
.inBackground(new BackgroundCallback() {
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
// 节点路径
System.out.println(curatorEvent.getPath());
// 事件类型
System.out.println(curatorEvent.getType());
System.out.println(curatorEvent.getStat().getVersion());
}
})
.forPath("/node2");
Thread.sleep(5000);
System.out.println("结束");watcherApi curator提供了两种Watcher(Cache)来监听结点的变化 Node Cache : 只是监听某一个特定的节点,监听节点的新增和修改 PathChildren Cache : 监控一个ZNode的子节点. 当一个子节点增加, 更新,删除 时, Path Cache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态
// 监视某个节点的数据变化
// arg1:连接对象
// arg2:监视的节点路径
final NodeCache nodeCache=new NodeCache(client,"/watcher1");
// 启动监视器对象
nodeCache.start();
nodeCache.getListenable().addListener(new NodeCacheListener() {
// 节点变化时回调的方法
public void nodeChanged() throws Exception {
System.out.println(nodeCache.getCurrentData().getPath());
System.out.println(new String(nodeCache.getCurrentData().getData()));
}
});
Thread.sleep(100000);
System.out.println("结束");
//关闭监视器对象
nodeCache.close();
// 监视子节点的变化
// arg1:连接对象
// arg2:监视的节点路径
// arg3:事件中是否可以获取节点的数据
PathChildrenCache pathChildrenCache=new PathChildrenCache(client,"/watcher1",true);
// 启动监听
pathChildrenCache.start();
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
// 当子节点方法变化时回调的方法
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
// 节点的事件类型
System.out.println(pathChildrenCacheEvent.getType());
// 节点的路径
System.out.println(pathChildrenCacheEvent.getData().getPath());
// 节点数据
System.out.println(new String(pathChildrenCacheEvent.getData().getData()));
}
});
Thread.sleep(100000);
System.out.println("结束");
// 关闭监听
pathChildrenCache.close();事务
// 开启事务
client.inTransaction()
.create().forPath("/node1","node1".getBytes())
.and()
.create().forPath("/node2","node2".getBytes())
.and()
//事务提交
.commit();分布式锁 InterProcessMutex:分布式可重入排它锁 InterProcessReadWriteLock:分布式读写锁
// 排他锁
// arg1:连接对象
// arg2:节点路径
InterProcessLock interProcessLock = new InterProcessMutex(client, "/lock1");
System.out.println("等待获取锁对象!");
// 获取锁
interProcessLock.acquire();
for (int i = 1; i <= 10; i++) {
Thread.sleep(3000);
System.out.println(i);
}
// 释放锁
interProcessLock.release();
System.out.println("等待释放锁!");
// 读写锁
InterProcessReadWriteLock interProcessReadWriteLock=new InterProcessReadWriteLock(client, "/lock1");
// 获取读锁对象
InterProcessLock interProcessLock=interProcessReadWriteLock.readLock();
System.out.println("等待获取锁对象!");
// 获取锁
interProcessLock.acquire();
for (int i = 1; i <= 10; i++) {
Thread.sleep(3000);
System.out.println(i);
}
// 释放锁
interProcessLock.release();
System.out.println("等待释放锁!");
// 读写锁
InterProcessReadWriteLock interProcessReadWriteLock=new InterProcessReadWriteLock(client, "/lock1");
// 获取写锁对象
InterProcessLock interProcessLock=interProcessReadWriteLock.writeLock();
System.out.println("等待获取锁对象!");
// 获取锁
interProcessLock.acquire();
for (int i = 1; i <= 10; i++) {
Thread.sleep(3000);
System.out.println(i);
}
// 释放锁
interProcessLock.release();
System.out.println("等待释放锁!");3.3 使用案例
配置中心示例
读取zookeeper中的配置信息,注册watcher监听器,存入本地变量
当zookeeper中的配置信息发生变化时,通过watcher的回调方法捕获数据变化事件
重新获取配置信息
import java.util.concurrent.CountDownLatch;
import com.itcast.watcher.ZKConnectionWatcher;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooKeeper;
public class MyConfigCenter implements Watcher {
// zk的连接串
String IP = "192.168.60.130:2181";
// 计数器对象
CountDownLatch countDownLatch = new CountDownLatch(1);
// 连接对象
static ZooKeeper zooKeeper;
// 用于本地化存储配置信息
private String url;
private String username;
private String password;
@Override
public void process(WatchedEvent event) {
try {
// 捕获事件状态
if (event.getType() == Event.EventType.None) {
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接成功");
countDownLatch.countDown();
} else if (event.getState() == Event.KeeperState.Disconnected) {
System.out.println("连接断开!");
} else if (event.getState() == Event.KeeperState.Expired) {
System.out.println("连接超时!");
// 超时后服务器端已经将连接释放,需要重新连接服务器端
zooKeeper = new ZooKeeper("192.168.60.130:2181", 6000,
new ZKConnectionWatcher());
} else if (event.getState() == Event.KeeperState.AuthFailed) {
System.out.println("验证失败!");
}
// 当配置信息发生变化时
} else if (event.getType() == EventType.NodeDataChanged) {
initValue();
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
// 构造方法
public MyConfigCenter() {
initValue();
}
// 连接zookeeper服务器,读取配置信息
public void initValue() {
try {
// 创建连接对象
zooKeeper = new ZooKeeper(IP, 5000, this);
// 阻塞线程,等待连接的创建成功
countDownLatch.await();
// 读取配置信息
this.url = new String(zooKeeper.getData("/config/url", true, null));
this.username = new String(zooKeeper.getData("/config/username", true, null));
this.password = new String(zooKeeper.getData("/config/password", true, null));
} catch (Exception ex) {
ex.printStackTrace();
}
}
public static void main(String[] args) {
try {
MyConfigCenter myConfigCenter = new MyConfigCenter();
for (int i = 1; i <= 20; i++) {
Thread.sleep(5000);
System.out.println("url:"+myConfigCenter.getUrl());
System.out.println("username:"+myConfigCenter.getUsername());
System.out.println("password:"+myConfigCenter.getPassword());
System.out.println("########################################");
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}分布式唯一id
指定路径生成临时有序节点 取序列号及为分布式环境下的唯一ID
import java.util.concurrent.CountDownLatch;
import com.itcast.watcher.ZKConnectionWatcher;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class GloballyUniqueId implements Watcher {
// zk的连接串
String IP = "192.168.60.130:2181";
// 计数器对象
CountDownLatch countDownLatch = new CountDownLatch(1);
// 用户生成序号的节点
String defaultPath = "/uniqueId";
// 连接对象
ZooKeeper zooKeeper;
@Override
public void process(WatchedEvent event) {
try {
// 捕获事件状态
if (event.getType() == Watcher.Event.EventType.None) {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
System.out.println("连接成功");
countDownLatch.countDown();
} else if (event.getState() == Watcher.Event.KeeperState.Disconnected) {
System.out.println("连接断开!");
} else if (event.getState() == Watcher.Event.KeeperState.Expired) {
System.out.println("连接超时!");
// 超时后服务器端已经将连接释放,需要重新连接服务器端
zooKeeper = new ZooKeeper(IP, 6000,
new ZKConnectionWatcher());
} else if (event.getState() == Watcher.Event.KeeperState.AuthFailed) {
System.out.println("验证失败!");
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
// 构造方法
public GloballyUniqueId() {
try {
//打开连接
zooKeeper = new ZooKeeper(IP, 5000, this);
// 阻塞线程,等待连接的创建成功
countDownLatch.await();
} catch (Exception ex) {
ex.printStackTrace();
}
}
// 生成id的方法
public String getUniqueId() {
String path = "";
try {
//创建临时有序节点
path = zooKeeper.create(defaultPath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
} catch (Exception ex) {
ex.printStackTrace();
}
// /uniqueId0000000001
return path.substring(9);
}
public static void main(String[] args) {
GloballyUniqueId globallyUniqueId = new GloballyUniqueId();
for (int i = 1; i <= 5; i++) {
String id = globallyUniqueId.getUniqueId();
System.out.println(id);
}
}
}分布式锁
每个客户端往/Locks下创建临时有序节点/Locks/Lock 000000001 客户端取得/Locks下子节点,并进行排序,判断排在最前面的是否为自己,如果自己的 锁节点在第一位,代表获取锁成功 如果自己的锁节点不在第一位,则监听自己前一位的锁节点。例如,自己锁节点 Lock 000000001 当前一位锁节点(Lock 000000002)的逻辑 监听客户端重新执行第2步逻辑,判断自己是否获得了锁
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class MyLock {
// zk的连接串
String IP = "192.168.60.130:2181";
// 计数器对象
CountDownLatch countDownLatch = new CountDownLatch(1);
//ZooKeeper配置信息
ZooKeeper zooKeeper;
private static final String LOCK_ROOT_PATH = "/Locks";
private static final String LOCK_NODE_NAME = "Lock_";
private String lockPath;
// 打开zookeeper连接
public MyLock() {
try {
zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.None) {
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接成功!");
countDownLatch.countDown();
}
}
}
});
countDownLatch.await();
} catch (Exception ex) {
ex.printStackTrace();
}
}
//获取锁
public void acquireLock() throws Exception {
//创建锁节点
createLock();
//尝试获取锁
attemptLock();
}
//创建锁节点
private void createLock() throws Exception {
//判断Locks是否存在,不存在创建
Stat stat = zooKeeper.exists(LOCK_ROOT_PATH, false);
if (stat == null) {
zooKeeper.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// 创建临时有序节点
lockPath = zooKeeper.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("节点创建成功:" + lockPath);
}
//监视器对象,监视上一个节点是否被删除
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDeleted) {
synchronized (this) {
notifyAll();
}
}
}
};
//尝试获取锁
private void attemptLock() throws Exception {
// 获取Locks节点下的所有子节点
List<String> list = zooKeeper.getChildren(LOCK_ROOT_PATH, false);
// 对子节点进行排序
Collections.sort(list);
// /Locks/Lock_000000001
int index = list.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));
if (index == 0) {
System.out.println("获取锁成功!");
return;
} else {
// 上一个节点的路径
String path = list.get(index - 1);
Stat stat = zooKeeper.exists(LOCK_ROOT_PATH + "/" + path, watcher);
if (stat == null) {
attemptLock();
} else {
synchronized (watcher) {
watcher.wait();
}
attemptLock();
}
}
}
//释放锁
public void releaseLock() throws Exception {
//删除临时有序节点
zooKeeper.delete(this.lockPath,-1);
zooKeeper.close();
System.out.println("锁已经释放:"+this.lockPath);
}
public static void main(String[] args) {
try {
MyLock myLock = new MyLock();
myLock.createLock();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}四、核心知识
4.1 配置文件说明
#通信心跳数,ZooKeeper服务器心跳时间,单位毫秒
#ZooKeeper使用的基本时间,服务器之间或客户端与服务器之间维持心跳的
#时间间隔,也就是每个tickTime时间就会发送一个心跳,时间单位为毫秒。
#用于心跳机制,并且设置最小的session超时时间为两倍心跳时间
#(session的最小超时时间是2*tickTime)。
tickTime=2000
#LF初始通信时限集群中的Follower跟随者服务器(F)与Leader领导者服务器
#(L)之间初始连接时能容忍的最多心跳数(tickTime的数量)。
#投票选举新Leader的初始化时间,Follower在启动过程中,会从Leader同步
#所有最新数据,然后确定自己能够对外服务的起始状态。
#Leader允许Follower在initLimit时间内完成这个工作。
initLimit=10
#LF同步通信时限集群中Leader与Follower之间的最大响应时间单位,
#假如响应超过syncLimit * tickTime,Leader认为Follwer死掉,
#从服务器列表中删除Follwer。
#在运行过程中,Leader负责与ZooKeeper集群中所有机器进行通信,
#例如通过一些心跳检测机制,来检测机器的存活状态。
#如果L发出心跳包在syncLimit之后,还没有从F那收到响应,
#那么就认为这个F已经不在线了。
syncLimit=5
#数据文件目录+数据持久化路径
#保存内存数据库快照信息的位置,如果没有其他说明,
#更新的事务日志也保存到数据库。
dataDir=/datatmp/zookeeper/data
dataLogDir=/datatmp/zookeeper/logs
#客户端连接端口
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html
#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
#2888,3888 are election port
server.1=zookeeper:2888:38888
#2888端口号是服务之间通信的端口,而3888是;zookeeper与其他应用程序通
#信的端口.而zookeeper是在hosts中已映射了本机的IP
server.A = B:C:D :
#A表示这个是第几号服务器,
#B 是这个服务器的 ip 地址;
#C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;
#D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader4.2 事件监听机制
zookeeper提供了数据的发布/订阅功能,多个订阅者可同时监听某一特定主题对 象,当该主题对象的自身状态发生变化时(例如节点内容改变、节点下的子节点列表改变 等),会实时、主动通知所有订阅者 zookeeper采用了Watcher机制实现数据的发布/订阅功能。该机制在被订阅对象发生变化时会异步通知客户端,因此客户端不必在Watcher注册后轮询阻塞,从而减轻了客户端压力。 watcher机制实际上与观察者模式类似,也可看作是一种观察者模式在分布式场景下的实现方式
Watcher架构
Watcher实现由三个部分组成: Zookeeper服务端 、Zookeeper客户端 、客户端的ZKWatchManager对象
客户端首先将Watcher注册到服务端,同时将Watcher对象保存到客户端的Watch管理器中。当ZooKeeper服务端监听的数据状态发生变化时,服务端会主动通知客户端, 接着客户端的Watch管理器会触发相关Watcher来回调相应处理逻辑,从而完成整体的数据发布/订阅流程
Watcher特性
| 特性 | 说明 |
|---|---|
| 一次性 | watcher是一次性的,一旦被触发就会移除,再次使用时需要重新注册 |
| 客户端顺序回调 | watcher回调是顺序串行化执行的,只有回调后客户端才能看到最新的数 据状态。一个watcher回调逻辑不应该太多,以免影响别的watcher执行 |
| 轻量级 | WatchEvent是最小的通信单元,结构上只包含通知状态、事件类型和节点 路径,并不会告诉数据节点变化前后的具体内容; |
| 时效性 | watcher只有在当前session彻底失效时才会无效,若在session有效期内 快速重连成功,则watcher依然存在,仍可接收到通知; |
Watcher通知状态**(KeeperState)
KeeperState是客户端与服务端连接状态发生变化时对应的通知类型。路径为org.apache.zookeeper.Watcher.Event.KeeperState,是一个枚举类,
| 枚举属性 | 说明 |
|---|---|
| SyncConnected | 客户端与服务器正常连接时 |
| Disconnected | 客户端与服务器断开连接时 |
| Expired | 会话session失效时 |
| AuthFailed | 身份认证失败时 |
Watcher事件类型**(EventType)
EventType是数据节点(znode)发生变化时对应的通知类型。EventType变化时 KeeperState永远处于SyncConnected通知状态下;当KeeperState发生变化时, EventType永远为None。其路径为org.apache.zookeeper.Watcher.Event.EventType, 是一个枚举类
| 枚举属性 | 说明 |
|---|---|
| None | 无 |
| NodeCreated | Watcher监听的数据节点被创建时 |
| NodeDeleted | Watcher监听的数据节点被删除时 |
| NodeDataChanged | Watcher监听的数据节点内容发生变更时(无论内容数据 是否变化) |
| NodeChildrenChanged | Watcher监听的数据节点的子节点列表发生变更时 |
客户端接收到的相关事件通知中只包含状态及类型等信息,不包括节点变化前后的具体内容,变化前的数据需业务自身存储,变化后的数据需调用get等方法重新获取;
Watcher捕获相应事件
Watcher是一个接口,任何实现了Watcher接口的类就是一个新的Watcher。Watcher内部包含了两个枚举类:KeeperState、EventType
建立zookeeper的watcher监听。在zookeeper中采用zk.getChildren(path, watch)、zk.exists(path, watch)、zk.getData(path, watcher, stat)
| 注册方式 | Created | ChildrenChanged | Changed | Deleted |
|---|---|---|---|---|
| zk.exists(“/node-x”,watcher) | 可监控 | 可监控 | 可监控 | |
| zk.getData(“/node-x”,watcher) | 可监控 | 可监控 | ||
| zk.getChildren(“/node-x”,watcher) | 可监控 | 可监控 |
// 客服端与服务器的连接状态
//KeeperState 通知状态
//SyncConnected:客户端与服务器正常连接时
//Disconnected:客户端与服务器断开连接时
//Expired:会话session失效时
//AuthFailed:身份认证失败时
//事件类型为:None
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.util.concurrent.CountDownLatch;
public class ZKConnectionWatcher implements Watcher {
// 计数器对象
static CountDownLatch countDownLatch = new CountDownLatch(1);
// 连接对象
static ZooKeeper zooKeeper;
@Override
public void process(WatchedEvent event) {
try {
// 事件类型
if (event.getType() == Event.EventType.None) {
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接创建成功!");
countDownLatch.countDown();
} else if (event.getState() == Event.KeeperState.Disconnected) {
System.out.println("断开连接!");
} else if (event.getState() == Event.KeeperState.Expired) {
System.out.println("会话超时!");
zooKeeper = new ZooKeeper("192.168.60.130:2181", 5000, new ZKConnectionWatcher());
} else if (event.getState() == Event.KeeperState.AuthFailed) {
System.out.println("认证失败!");
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
public static void main(String[] args) {
try {
zooKeeper = new ZooKeeper("192.168.60.130:2181", 5000, new ZKConnectionWatcher());
// 阻塞线程等待连接的创建
countDownLatch.await();
// 会话id
System.out.println(zooKeeper.getSessionId());
// 添加授权用户
zooKeeper.addAuthInfo("digest1","itcast1:1234561".getBytes());
byte [] bs=zooKeeper.getData("/node1",false,null);
System.out.println(new String(bs));
Thread.sleep(50000);
zooKeeper.close();
System.out.println("结束");
} catch (Exception ex) {
ex.printStackTrace();
}
}
}import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class ZKWatcherExists {
String IP = "192.168.60.130:2181";
ZooKeeper zooKeeper = null;
@Before
public void before() throws IOException, InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
// 连接zookeeper客户端
zooKeeper = new ZooKeeper(IP, 6000, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("连接对象的参数!");
// 连接成功
if (event.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
}
});
countDownLatch.await();
}
@After
public void after() throws InterruptedException {
zooKeeper.close();
}
@Test
public void watcherExists1() throws KeeperException, InterruptedException {
// arg1:节点的路径
// arg2:使用连接对象中的watcher
zooKeeper.exists("/watcher1", true);
Thread.sleep(50000);
System.out.println("结束");
}
@Test
public void watcherExists2() throws KeeperException, InterruptedException {
// arg1:节点的路径
// arg2:自定义watcher对象
zooKeeper.exists("/watcher1", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("自定义watcher");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
}
});
Thread.sleep(50000);
System.out.println("结束");
}
@Test
public void watcherExists3() throws KeeperException, InterruptedException {
// watcher一次性
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
System.out.println("自定义watcher");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
zooKeeper.exists("/watcher1", this);
} catch (Exception ex) {
ex.printStackTrace();
}
}
};
zooKeeper.exists("/watcher1", watcher);
Thread.sleep(80000);
System.out.println("结束");
}
@Test
public void watcherExists4() throws KeeperException, InterruptedException {
// 注册多个监听器对象
zooKeeper.exists("/watcher1", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("1");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
}
});
zooKeeper.exists("/watcher1", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("2");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
}
});
Thread.sleep(80000);
System.out.println("结束");
}
}五、安装
5.1 使用docker部署zookeeper
#拉取zk镜像
docker pull zookeeper:3.5
#创建容器
docker create --name zk -p 2181:2181 zookeeper:3.5
#启动容器
docker start zk5.2 常规安装方式
安装前提是要有java环境
tar –xf zookeeper-3.4.6.tar.gz解压文件到"/usr/local/zookeeper-3.4.6"- 复制conf目录下的zoo_sample.cfg,并命名为zoo.cfg
- 修改zoo.cfg
5.3 集群安装
前置安装与常规安装方式一样,还需在每个zookeeper的 data 目录下创建一个 myid 文件,这个文件就是记录每个服务器的ID,以123为例
echo 1 >/usr/local/zookeeper-cluster/zookeeper-1/data/myid
echo 2 >/usr/local/zookeeper-cluster/zookeeper-2/data/myid
echo 3 >/usr/local/zookeeper-cluster/zookeeper-3/data/myid在每个zoo.cfg配置集群ip列表
vim /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg
vim /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg
vim /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg
# server.服务器ID=服务器IP地址:服务器之间通信端口:服务器之间投票选举端口
server.1=192.168.149.135:2881:3881
server.2=192.168.149.135:2882:3882
server.3=192.168.149.135:2883:3883
# 启动后,leader是领导者,follower是跟随者集群注意事项
- 3个节点的集群,2个从服务器都挂掉,主服务器也无法运行。因为可运行的机器没有超过集群总数量的半数
- 当集群中的主服务器挂了,集群中的其他服务器会自动进行选举状态,然后产生新得leader
- 当领导者产生后,再次有新服务器加入集群,不会影响到现任领导者
5.4 taokeeper监控工具的使用
基于zookeeper的监控管理工具taokeeper,由淘宝团队开源的zk管理中间件, 安装前要求服务前先配置nc 和 sshd
下载数据库脚本
shwget https://github.com/downloads/alibaba/taokeeper/taokeeper.sql下载主程序
shwget https://github.com/downloads/alibaba/taokeeper/taokeeper- monitor.tar.gz下载配置文件
shwget https://github.com/downloads/alibaba/taokeeper/taokeeper-monitor- config.properties配置taokeeper-monitor-config.properties
properties#Daily systemInfo.envName=DAILY #DBCP dbcp.driverClassName=com.mysql.jdbc.Driver #mysql连接的ip地址端口号 dbcp.dbJDBCUrl=jdbc:mysql://192.168.60.130:3306/taokeeper dbcp.characterEncoding=GBK #用户名 dbcp.username=root #密码 dbcp.password=root dbcp.maxActive=30 dbcp.maxIdle=10 dbcp.maxWait=10000 #SystemConstant #用户存储内部数据的文件夹 #创建/home/zookeeper/taokeeperdata/ZooKeeperClientThroughputStat SystemConstent.dataStoreBasePath=/home/zookeeper/taokeeperdata #ssh用户 SystemConstant.userNameOfSSH=zookeeper #ssh密码 SystemConstant.passwordOfSSH=zookeeper #Optional SystemConstant.portOfSSH=22
5. 安装配置tomcat,修改catalina.sh
```sh
#指向配置文件所在的位置
JAVA_OPTS=-DconfigFilePath="/home/zookeeper/taokeeper-monitor- tomcat/webapps/ROOT/conf/taokeeper-monitor-config.properties"- 启动