原生的 Map 有并发访问的安全问题:
var m map[int]int = make(map[int]int, 4)
func Store(key, val int) {
m[key] = val
}
func Load(key int) (int, bool) {
val, ok := m[key]
return val, ok
}
func main() {
var wg sync.WaitGroup
wg.Add(100)
for i := 0; i < 100; i++ {
go func(key int) {
defer wg.Done()
Store(key, key)
Load(key)
}(i)
}
wg.Wait()
}
输出:
fatal error: concurrent map writes
map 不允许并发写,可以使用读写锁解决:
func Store(key, val int) {
defer rwlock.Unlock()
rwlock.Lock()
m[key] = val
}
func Load(key int) (int, bool) {
defer rwlock.RUnlock()
rwlock.RLock()
val, ok := m[key]
return val, ok
}
sync 包也提供了一个 开箱即用、并发访问安全 的 map:
使用 sync.Map 修改上面的代码:
var m sync.Map
func Store(key, val int) {
m.Store(key, val)
}
func Load(key int) (int, bool) {
val, ok := m.Load(key)
if ok { // 确实有 key
res, ok2 := val.(int)
if ok2 { // 检查 val 的类型
return res, ok
}
}
return -1, false
}
sync.Map 底层是怎么实现的
数据结构
type Map struct {
mu Mutex // 保护 dirty
read atomic.Pointer[readOnly] // 允许并发读
dirty map[any]*entry // 非 goroutine 安全的原始 map
misses int // 记录了读取 read,miss 的次数
}
再来看看 readOlny 是个啥:
// readOnly is an immutable struct stored atomically in the Map.read field.
type readOnly struct {
m map[any]*entry
amended bool // true if the dirty map contains some key not in m.(如果 dirty 包含一些不存在于 m 中的 key,那么为 true)
}
可以发现:
- Map 使用了两个原生的 map,分别是 read.m、dirty
- read 的类型是
atomic.Pointer[readOnly],可以并发地读。但如果需要更新 read,则需要加锁保护 - dirty 是一个原始 map,新增加的 key 都存储在这里
- mu 是互斥锁,用于保护 dirty 的读写操作
- 如果一个 key 在 dirty 而不在 read 中(这个 key 是后添加的),那么递增 misses
无论是 read,还是 dirty,其 value 类型都是指向 entry 的指针,那么 entry 是个啥?
// An entry is a slot in the map corresponding to a particular key.
type entry struct {
p atomic.Pointer[any]
}
entry 实际上使用原子指针封装了真实 value
这里借用 深度解密 Go 语言之 sync.map 的一张图片来帮助理解:
Store
Store 方法用于向 sync.Map 存储数据
可以先想想实现 Store 的大方向:
- 判断 key 是否存在
- 如果存在,那么修改 value 即可
- 否则,新增一个 entry 到 map 中
来看看 sync 包是怎么实现的:
func (m *Map) Store(key, value any) {
_, _ = m.Swap(key, value)
}
func (m *Map) Swap(key, value any) (previous any, loaded bool) {
read := m.loadReadOnly()
// 如果 read 中,包含这个 key,那么直接修改即可(这个修改的过程在 trySwap 中,基于 CAS 实现原子修改 entry)
if e, ok := read.m[key]; ok {
if v, ok := e.trySwap(&value); ok {
if v == nil {
return nil, false
}
return *v, true
}
}
// 准备操作 dirty,加锁
m.mu.Lock()
read = m.loadReadOnly() // 双重检查(double-check)
if e, ok := read.m[key]; ok {
// 如果 read map 中存在该 key,但 p == expunged,则说明 m.dirty != nil 并且 m.dirty 中不存在该 key 值,直接修改 dirty
if e.unexpungeLocked() {
m.dirty[key] = e
}
if v := e.swapLocked(&value); v != nil {
loaded = true
previous = *v
}
} else if e, ok := m.dirty[key]; ok {
if v := e.swapLocked(&value); v != nil {
loaded = true
previous = *v
}
} else {
if !read.amended { // 说明 dirty 为 nil
// We're adding the first new key to the dirty map.
// Make sure it is allocated and mark the read-only map as incomplete.
m.dirtyLocked() // 给 dirty 分配内存
m.read.Store(&readOnly{m: read.m, amended: true}) // 修改 amended 为 true
}
m.dirty[key] = newEntry(value) // 在 dirty 中添加新的 key-value
}
m.mu.Unlock()
return previous, loaded
}
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
read := m.loadReadOnly()
// 给 dirty 分配内存
m.dirty = make(map[any]*entry, len(read.m))
// 将 read 的所有元素拷贝到 dirty 中
for k, e := range read.m {
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}
大致步骤如下:
- 检查 read 中是否有这个 key,如果有,直接原子修改 entry 即可(这个修改在 read 和 dirty 都是可见的),返回
- 第一步没有成功:要么 read 中没有这个 key,要么 key 被标记为删除。则先加锁,再进行后续的操作
- 双重检查 read,如果 read map 中存在该 key,但 p == expunged,则说明 m.dirty != nil 并且 m.dirty 中不存在该 key 值(被删除了),直接修改 dirty,返回
- 否则,检查 dirty 是否为 nil,如果为 nil,分配内存,修改 amended 为 true
- 在 dirty 中添加新的 key-value
可以发现,新增的 key 都是加在 dirty 中的
在给 dirty 分配内存时,需要将 read 的所有数据(除了删除过的)拷贝到 dirty 中,以便后续快速将 dirty 提升为 read
Load
还是先想想实现 Load 的大方向:
- 判断这个 key 是否存在:
- 如果存在,那么返回
- 否则返回对应类型的零值
核心就在判断 key 是否存在上面
我们来看看 sync 包是怎么实现的:
func (m *Map) Load(key any) (value any, ok bool) {
read := m.loadReadOnly()
e, ok := read.m[key]
if !ok && read.amended { // 如果 read 中没有,并且 dirty 不为空
m.mu.Lock()
// Avoid reporting a spurious miss if m.dirty got promoted while we were
// blocked on m.mu. (If further loads of the same key will not miss, it's
// not worth copying the dirty map for this key.)
read = m.loadReadOnly()
e, ok = read.m[key]
if !ok && read.amended { // 双重检查
e, ok = m.dirty[key] // 在 dirty 中获取 key 对应的 value
// Regardless of whether the entry was present, record a miss: this key
// will take the slow path until the dirty map is promoted to the read
// map.
m.missLocked() // 无论 key 对应的 value 是否存在,都要执行 missLocked(因为访问了 dirty)
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}
Load 的实现很简单,主要分为两个大的步骤:在 read 中查找,以及在 dirty 中查找:
- 在 read 中查找,如果有,直接返回(相当于 cache hit)
- 如果 read 中没有,并且 dirty 不为空,尝试在 dirty 查找(相当于 cache miss)
- 双重检查一波,如果确实 read 没有这个 key,那么在 dirty 获取
- 因为访问了 dirty,无论 key 对应的 value 是否存在,执行 missLocked
- 返回结果
missLocked 的实现如下:
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
m.read.Store(&readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
这个函数主要做了两件事:
- 递增 misses
- 如果 misses >= dirty 中的 key 总个数,说明出现了较严重的 cache miss,会影响 Store 和 Load 的性能,需要:
- 将 dirty 提升为 read
- 将原 dirty 置为 nil
从 Store 和 Load 的源码分析可以看出:sync.Map 还是比较 适合读多写少的场景,如果写比较多(造成主要的数据都在 dirty 中),会导致部分读取要先读 read,再读 dirty,性能不佳
Delete
func (m *Map) Delete(key any) {
m.LoadAndDelete(key) // 实际上调用的还是 LoadAndDelete
}
func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {
read := m.loadReadOnly()
e, ok := read.m[key]
if !ok && read.amended { // 如果 read 不存在,并且 dirty 不为 nil
m.mu.Lock()
read = m.loadReadOnly()
e, ok = read.m[key]
if !ok && read.amended { // 双重检查
e, ok = m.dirty[key]
delete(m.dirty, key) // 在 dirty 中删除 key
// Regardless of whether the entry was present, record a miss: this key
// will take the slow path until the dirty map is promoted to the read
// map.
m.missLocked() // misses++,进一步的,可能提升 dirty 为 read
}
m.mu.Unlock()
}
if ok {
return e.delete() // 原子删除
}
return nil, false
}
func (e *entry) delete() (value any, ok bool) {
for {
p := e.p.Load()
if p == nil || p == expunged {
return nil, false
}
// p 不为 nil,并且 p 之前没有被删除过
if e.p.CompareAndSwap(p, nil) {
return *p, true
}
}
}
总结
- sync.map 是线程安全的
- 读取,插入,删除保持常数级的时间复杂度。
- 通过读写分离(read 负责读,dirty 负责读与写),降低锁时间 来提高效率,适用于 读多写少 的场景。
- 调用 Load 函数时,如果在 read 中没有找到 key,则会将 misses 值原子地增加 1,当 misses 增加到和 dirty 的长度相等时,会将 dirty 提升为 read,减少「读 miss」。
- 新写入的 key 会保存到 dirty 中,如果这时 dirty 为 nil,就会先新创建一个 dirty,并将 read 中未被删除的元素拷贝到 dirty。