APIs

ZK 的使用场景

ZK 主要用于 单个数据中心 内:

  • 管理集群的配置信息
  • 选举 Leader
  • Test-And-Set 服务的实现

Znode

Zookeeper 的 API 某种程度上来说像是一个文件系统。它有一个层级化的目录结构,有一个根目录(root),之后每个应用程序有自己的子目录。比如说应用程序 1 将自己的文件保存在 APP1 目录下,应用程序 2 将自己的文件保存在 APP2 目录下,这些目录又可以包含文件和其他的目录。

文件和目录都被称为 Znodes。Zookeeper 中包含了 3 种类型的 Znode:

  • Regular Znode:普通 Znode,创建后永久存在,不会自动删除
  • Ephemeral Znode:这种 Znode 与创建它的客户端会话绑定,如果客户端挂掉(超时没有心跳),ZK 会将这个 Znode 删掉
  • Sequential Znode:顺序 Znode,命名格式为 ,当若干个客户端并发的创建名叫 filename 的 Sequential Znode 时,ZK 保证生成的 num 是递增的,不会重复

APIs

ZK 的 API 很少,但很强大,通过组合这些 API 可以实现很多功能,这里直接用 Robert 教授的话 来展示:

  • CREATE(PATH,DATA,FLAG)。入参分别是文件的全路径名 PATH,数据 DATA,和表明 znode 类型的 FLAG。这里有意思的是,CREATE 的语义是排他的。也就是说,如果我向 Zookeeper 请求创建一个文件,如果我得到了 yes 的返回,那么说明这个文件之前不存在,我是第一个创建这个文件的客户端;如果我得到了 no 或者一个错误的返回,那么说明这个文件之前已经存在了。所以,客户端知道文件的创建是排他的。在后面有关锁的例子中,我们会看到,如果有多个客户端同时创建同一个文件,实际成功创建文件(获得了锁)的那个客户端是可以通过 CREATE 的返回知道的。

  • DELETE(PATH,VERSION)。入参分别是文件的全路径名 PATH,和版本号 VERSION。有一件事情我之前没有提到,每一个 znode 都有一个表示当前版本号的 version,当 znode 有更新时,version 也会随之增加。对于 delete 和一些其他的 update 操作,你可以增加一个 version 参数,表明当且仅当 znode 的当前版本号与传入的 version 相同,才执行操作。当存在多个客户端同时要做相同的操作时,这里的参数 version 会非常有帮助(并发操作不会被覆盖)。所以,对于 delete,你可以传入一个 version 表明,只有当 znode 版本匹配时才删除。

  • EXIST(PATH,WATCH)。入参分别是文件的全路径名 PATH,和一个有趣的额外参数 WATCH。通过指定 watch,你可以监听对应文件的变化。不论文件是否存在,你都可以设置 watch 为 true,这样 Zookeeper 可以确保如果文件有任何变更,例如创建,删除,修改,都会通知到客户端。此外,判断文件是否存在和 watch 文件的变化,在 Zookeeper 内是原子操作。所以,当调用 exist 并传入 watch 为 true 时,不可能在 Zookeeper 实际判断文件是否存在,和建立 watch 通道之间,插入任何的创建文件的操作,这对于正确性来说非常重要。

  • GETDATA(PATH,WATCH)。入参分别是文件的全路径名 PATH,和 WATCH 标志位。这里的 watch 监听的是文件的内容的变化。

  • SETDATA(PATH,DATA,VERSION)。入参分别是文件的全路径名 PATH,数据 DATA,和版本号 VERSION。如果你传入了 version,那么 Zookeeper 当且仅当文件的版本号与传入的 version 一致时,才会更新文件。

  • LIST(PATH)。入参是目录的路径名,返回的是路径下的所有文件。

使用 ZooKeeper 实现计数器

因为 ZK 的读取不满足线性一致,要想正确实现计数器递增操作,需要保证 count 的 读取与写入 是一个原子操作,以保证多个客户端并发读写的正确性:

for {
    // 获取原始 count 和对应的版本号
    count, version := GetData("count-file", true)
    // 设置 count 为 count + 1,当且仅当此时 count-file 的 version 与此前一致才更新
    if SetData("count-file", count+1, version) == true {
        break
    }
}

时间复杂度为 $O(n^2)$,因为一次循环,所有客户端中只有一个能够成功,其它全部都会失败,假设有 1000 个客户端,在最坏的条件下,一个客户端执行一次递增操作,需要 1000 次循环

因此仅适合低负载场景

使用 ZooKeeper 实现非扩展锁

这里讨论的有点像「分布式锁」的实现

func Lock() {
    for {
        // 尝试创建锁文件,该文件与客户端会话绑定
        if Create("lock", data, O_Ephemeral) == true {
            // 如果自己是第一个创建的,会返回 true,说明拿到了锁
            return
        }
        // 设置 watch,如果此时存在 lock,那么等待 ZK 通知 lock 文件被删除
        if Exist("lock", true) == true {
            wait()
        }
        // 否则说明在 Create 到 Exist 调用期间,锁被释放,重新尝试
    }
}

func Unlock(version int) {
    Delete("lock", version)
}

注意 Exist 的调用,除了设置 watch 以外,还相当于双重判断了 lock 到底存不存在

但这种实现方式存在「惊群效应」,因此被叫做「非扩展锁」

为什么会有「惊群效应」?因为当一个客户端释放锁时,ZK 会通知所有 watch 了这个 lock file 的客户端

也就是说,剩余的客户端被 几乎同时唤醒,重试获取锁

使用 ZooKeeper 实现扩展锁

为了解决上面提到的「惊群效应」,这里提供另一种实现:

// 以 Ephemeral、Sequential 模式创建锁文件
// 这里假设文件名为 lock-6
Create("lock", data, O_Ephemeral | O_Sequential)
for {
    // 获取以 lock 开头的所有文件
    List("lock*")

    // 如果没有比自己创建的文件名更小的(即 lock-1 ... lock-5)
    // 说明自己是第一个创建的,获取成功
    if no lower-file {
        break
    }

    // 如果存在比自己小的下一个文件(即 lock-5)
    // 等待,直到 ZK 通知
    if Exist(next-lower-file, true) {
        wait()
    }

    // continue
}

上面的代码,理想情况下,最多循环两次即可获取锁,因为 watch 的对象仅仅是一个,因此避免了「惊群效应」

这里讲一下 List:

List 得到了文件的列表,我们就知道了比自己序列号更小的下一个锁文件。Zookeeper 可以确保,一旦一个序列号,比如说 27,被使用了,那么之后创建的 Sequential 文件不会使用更小的序列号。所以,我们可以确定第一次 LIST 之后,不会有序列号低于 27 的锁文件被创建,那为什么在重试的时候要再次 LIST 文件?为什么不直接跳过?

答案是,持有更低序列号 Sequential 文件的客户端,可能在我们没有注意的时候就释放了锁,也可能已经挂了。比如说,我们是排在第 27 的客户端,但是排在第 26 的客户端在它获得锁之前就挂了。因为它挂了,Zookeeper 会自动的删除它的锁文件(因为创建锁文件时,同时也指定了 ephemeral=TRUE)。所以这时,我们要等待的是序列号 25 的锁文件释放。所以,尽管不可能再创建序列号更小的锁文件,但是排在前面的锁文件可能会有变化,所以我们需要在循环的最开始再次调用 LIST,以防在等待锁的队列里排在我们前面的客户端挂了。

只要不存在比自己序号更低的锁文件,就成功获取到了锁

这种实现方式感觉实现了一种「排序」等待机制,客户端获取锁的顺序,与第一次创建锁文件的顺序是一致的

参考资料