APIs
ZK 的使用场景
ZK 主要用于 单个数据中心 内:
- 管理集群的配置信息
- 选举 Leader
- Test-And-Set 服务的实现
- …
Znode
Zookeeper 的 API 某种程度上来说像是一个文件系统。它有一个层级化的目录结构,有一个根目录(root),之后每个应用程序有自己的子目录。比如说应用程序 1 将自己的文件保存在 APP1 目录下,应用程序 2 将自己的文件保存在 APP2 目录下,这些目录又可以包含文件和其他的目录。
文件和目录都被称为 Znodes。Zookeeper 中包含了 3 种类型的 Znode:
- Regular Znode:普通 Znode,创建后永久存在,不会自动删除
- Ephemeral Znode:这种 Znode 与创建它的客户端会话绑定,如果客户端挂掉(超时没有心跳),ZK 会将这个 Znode 删掉
- Sequential Znode:顺序 Znode,命名格式为
,当若干个客户端并发的创建名叫 filename 的 Sequential Znode 时,ZK 保证生成的 num 是递增的,不会重复
APIs
ZK 的 API 很少,但很强大,通过组合这些 API 可以实现很多功能,这里直接用 Robert 教授的话 来展示:
CREATE(PATH,DATA,FLAG)。入参分别是文件的全路径名 PATH,数据 DATA,和表明 znode 类型的 FLAG。这里有意思的是,CREATE 的语义是排他的。也就是说,如果我向 Zookeeper 请求创建一个文件,如果我得到了 yes 的返回,那么说明这个文件之前不存在,我是第一个创建这个文件的客户端;如果我得到了 no 或者一个错误的返回,那么说明这个文件之前已经存在了。所以,客户端知道文件的创建是排他的。在后面有关锁的例子中,我们会看到,如果有多个客户端同时创建同一个文件,实际成功创建文件(获得了锁)的那个客户端是可以通过 CREATE 的返回知道的。
DELETE(PATH,VERSION)。入参分别是文件的全路径名 PATH,和版本号 VERSION。有一件事情我之前没有提到,每一个 znode 都有一个表示当前版本号的 version,当 znode 有更新时,version 也会随之增加。对于 delete 和一些其他的 update 操作,你可以增加一个 version 参数,表明当且仅当 znode 的当前版本号与传入的 version 相同,才执行操作。当存在多个客户端同时要做相同的操作时,这里的参数 version 会非常有帮助(并发操作不会被覆盖)。所以,对于 delete,你可以传入一个 version 表明,只有当 znode 版本匹配时才删除。
EXIST(PATH,WATCH)。入参分别是文件的全路径名 PATH,和一个有趣的额外参数 WATCH。通过指定 watch,你可以监听对应文件的变化。不论文件是否存在,你都可以设置 watch 为 true,这样 Zookeeper 可以确保如果文件有任何变更,例如创建,删除,修改,都会通知到客户端。此外,判断文件是否存在和 watch 文件的变化,在 Zookeeper 内是原子操作。所以,当调用 exist 并传入 watch 为 true 时,不可能在 Zookeeper 实际判断文件是否存在,和建立 watch 通道之间,插入任何的创建文件的操作,这对于正确性来说非常重要。
GETDATA(PATH,WATCH)。入参分别是文件的全路径名 PATH,和 WATCH 标志位。这里的 watch 监听的是文件的内容的变化。
SETDATA(PATH,DATA,VERSION)。入参分别是文件的全路径名 PATH,数据 DATA,和版本号 VERSION。如果你传入了 version,那么 Zookeeper 当且仅当文件的版本号与传入的 version 一致时,才会更新文件。
LIST(PATH)。入参是目录的路径名,返回的是路径下的所有文件。
使用 ZooKeeper 实现计数器
因为 ZK 的读取不满足线性一致,要想正确实现计数器递增操作,需要保证 count 的 读取与写入 是一个原子操作,以保证多个客户端并发读写的正确性:
for {
// 获取原始 count 和对应的版本号
count, version := GetData("count-file", true)
// 设置 count 为 count + 1,当且仅当此时 count-file 的 version 与此前一致才更新
if SetData("count-file", count+1, version) == true {
break
}
}
时间复杂度为 $O(n^2)$,因为一次循环,所有客户端中只有一个能够成功,其它全部都会失败,假设有 1000 个客户端,在最坏的条件下,一个客户端执行一次递增操作,需要 1000 次循环
因此仅适合低负载场景
使用 ZooKeeper 实现非扩展锁
这里讨论的有点像「分布式锁」的实现
func Lock() {
for {
// 尝试创建锁文件,该文件与客户端会话绑定
if Create("lock", data, O_Ephemeral) == true {
// 如果自己是第一个创建的,会返回 true,说明拿到了锁
return
}
// 设置 watch,如果此时存在 lock,那么等待 ZK 通知 lock 文件被删除
if Exist("lock", true) == true {
wait()
}
// 否则说明在 Create 到 Exist 调用期间,锁被释放,重新尝试
}
}
func Unlock(version int) {
Delete("lock", version)
}
注意 Exist 的调用,除了设置 watch 以外,还相当于双重判断了 lock 到底存不存在
但这种实现方式存在「惊群效应」,因此被叫做「非扩展锁」
为什么会有「惊群效应」?因为当一个客户端释放锁时,ZK 会通知所有 watch 了这个 lock file 的客户端
也就是说,剩余的客户端被 几乎同时唤醒,重试获取锁
使用 ZooKeeper 实现扩展锁
为了解决上面提到的「惊群效应」,这里提供另一种实现:
// 以 Ephemeral、Sequential 模式创建锁文件
// 这里假设文件名为 lock-6
Create("lock", data, O_Ephemeral | O_Sequential)
for {
// 获取以 lock 开头的所有文件
List("lock*")
// 如果没有比自己创建的文件名更小的(即 lock-1 ... lock-5)
// 说明自己是第一个创建的,获取成功
if no lower-file {
break
}
// 如果存在比自己小的下一个文件(即 lock-5)
// 等待,直到 ZK 通知
if Exist(next-lower-file, true) {
wait()
}
// continue
}
上面的代码,理想情况下,最多循环两次即可获取锁,因为 watch 的对象仅仅是一个,因此避免了「惊群效应」
这里讲一下 List:
List 得到了文件的列表,我们就知道了比自己序列号更小的下一个锁文件。Zookeeper 可以确保,一旦一个序列号,比如说 27,被使用了,那么之后创建的 Sequential 文件不会使用更小的序列号。所以,我们可以确定第一次 LIST 之后,不会有序列号低于 27 的锁文件被创建,那为什么在重试的时候要再次 LIST 文件?为什么不直接跳过?
答案是,持有更低序列号 Sequential 文件的客户端,可能在我们没有注意的时候就释放了锁,也可能已经挂了。比如说,我们是排在第 27 的客户端,但是排在第 26 的客户端在它获得锁之前就挂了。因为它挂了,Zookeeper 会自动的删除它的锁文件(因为创建锁文件时,同时也指定了 ephemeral=TRUE)。所以这时,我们要等待的是序列号 25 的锁文件释放。所以,尽管不可能再创建序列号更小的锁文件,但是排在前面的锁文件可能会有变化,所以我们需要在循环的最开始再次调用 LIST,以防在等待锁的队列里排在我们前面的客户端挂了。
只要不存在比自己序号更低的锁文件,就成功获取到了锁
这种实现方式感觉实现了一种「排序」等待机制,客户端获取锁的顺序,与第一次创建锁文件的顺序是一致的