原生的 Map 有并发访问的安全问题:

var m map[int]int = make(map[int]int, 4)

func Store(key, val int)  {
    m[key] = val
}

func Load(key int) (int, bool) {
    val, ok := m[key]
    return val, ok
}


func main()  {
    var wg sync.WaitGroup
    wg.Add(100)
    for i := 0; i < 100; i++ {
        go func(key int) {
            defer wg.Done()
            Store(key, key)
            Load(key)
        }(i)
    }
    wg.Wait()
}

输出:

fatal error: concurrent map writes

map 不允许并发写,可以使用读写锁解决:

func Store(key, val int)  {
    defer rwlock.Unlock()
    rwlock.Lock()
    m[key] = val
}

func Load(key int) (int, bool) {
    defer rwlock.RUnlock()
    rwlock.RLock()
    val, ok := m[key]
    return val, ok
}

sync 包也提供了一个 开箱即用、并发访问安全 的 map:

image

使用 sync.Map 修改上面的代码:

var m sync.Map

func Store(key, val int) {
	m.Store(key, val)
}

func Load(key int) (int, bool) {
	val, ok := m.Load(key)
	if ok { // 确实有 key
		res, ok2 := val.(int)
		if ok2 { // 检查 val 的类型
			return res, ok
		}
	}
    return -1, false
}

sync.Map 底层是怎么实现的

数据结构

type Map struct {
	mu Mutex // 保护 dirty

	read atomic.Pointer[readOnly] // 允许并发读

	dirty map[any]*entry // 非 goroutine 安全的原始 map

	misses int // 记录了读取 read,miss 的次数
}

再来看看 readOlny 是个啥:

// readOnly is an immutable struct stored atomically in the Map.read field.
type readOnly struct {
	m       map[any]*entry
	amended bool // true if the dirty map contains some key not in m.(如果 dirty 包含一些不存在于 m 中的 key,那么为 true)
}

可以发现:

  • Map 使用了两个原生的 map,分别是 read.m、dirty
  • read 的类型是 atomic.Pointer[readOnly],可以并发地读。但如果需要更新 read,则需要加锁保护
  • dirty 是一个原始 map,新增加的 key 都存储在这里
  • mu 是互斥锁,用于保护 dirty 的读写操作
  • 如果一个 key 在 dirty 而不在 read 中(这个 key 是后添加的),那么递增 misses

无论是 read,还是 dirty,其 value 类型都是指向 entry 的指针,那么 entry 是个啥?

// An entry is a slot in the map corresponding to a particular key.
type entry struct {
	p atomic.Pointer[any]
}

entry 实际上使用原子指针封装了真实 value

这里借用 深度解密 Go 语言之 sync.map 的一张图片来帮助理解:

image

Store

Store 方法用于向 sync.Map 存储数据

可以先想想实现 Store 的大方向:

  • 判断 key 是否存在
    • 如果存在,那么修改 value 即可
    • 否则,新增一个 entry 到 map 中

来看看 sync 包是怎么实现的:

func (m *Map) Store(key, value any) {
	_, _ = m.Swap(key, value)
}

func (m *Map) Swap(key, value any) (previous any, loaded bool) {
	read := m.loadReadOnly()
    // 如果 read 中,包含这个 key,那么直接修改即可(这个修改的过程在 trySwap 中,基于 CAS 实现原子修改 entry)
	if e, ok := read.m[key]; ok {
		if v, ok := e.trySwap(&value); ok {
			if v == nil {
				return nil, false
			}
			return *v, true
		}
	}

    // 准备操作 dirty,加锁
	m.mu.Lock()
	read = m.loadReadOnly() // 双重检查(double-check)
	if e, ok := read.m[key]; ok {
        // 如果 read map 中存在该 key,但 p == expunged,则说明 m.dirty != nil 并且 m.dirty 中不存在该 key 值,直接修改 dirty
        if e.unexpungeLocked() {
			m.dirty[key] = e
		}
		if v := e.swapLocked(&value); v != nil {
			loaded = true
			previous = *v
		}
	} else if e, ok := m.dirty[key]; ok {
		if v := e.swapLocked(&value); v != nil {
			loaded = true
			previous = *v
		}
	} else {
		if !read.amended { // 说明 dirty 为 nil
			// We're adding the first new key to the dirty map.
			// Make sure it is allocated and mark the read-only map as incomplete.
			m.dirtyLocked() // 给 dirty 分配内存
			m.read.Store(&readOnly{m: read.m, amended: true}) // 修改 amended 为 true
		}
		m.dirty[key] = newEntry(value) // 在 dirty 中添加新的 key-value
	}
	m.mu.Unlock()
	return previous, loaded
}

func (m *Map) dirtyLocked() {
	if m.dirty != nil {
		return
	}

	read := m.loadReadOnly()
	// 给 dirty 分配内存
	m.dirty = make(map[any]*entry, len(read.m))
	// 将 read 的所有元素拷贝到 dirty 中
	for k, e := range read.m {
		if !e.tryExpungeLocked() {
			m.dirty[k] = e
		}
	}
}

大致步骤如下:

  • 检查 read 中是否有这个 key,如果有,直接原子修改 entry 即可(这个修改在 read 和 dirty 都是可见的),返回
  • 第一步没有成功:要么 read 中没有这个 key,要么 key 被标记为删除。则先加锁,再进行后续的操作
  • 双重检查 read,如果 read map 中存在该 key,但 p == expunged,则说明 m.dirty != nil 并且 m.dirty 中不存在该 key 值(被删除了),直接修改 dirty,返回
  • 否则,检查 dirty 是否为 nil,如果为 nil,分配内存,修改 amended 为 true
  • 在 dirty 中添加新的 key-value

可以发现,新增的 key 都是加在 dirty 中的

在给 dirty 分配内存时,需要将 read 的所有数据(除了删除过的)拷贝到 dirty 中,以便后续快速将 dirty 提升为 read

Load

还是先想想实现 Load 的大方向:

  • 判断这个 key 是否存在:
    • 如果存在,那么返回
    • 否则返回对应类型的零值

核心就在判断 key 是否存在上面

我们来看看 sync 包是怎么实现的:

func (m *Map) Load(key any) (value any, ok bool) {
	read := m.loadReadOnly()
	e, ok := read.m[key]
	if !ok && read.amended { // 如果 read 中没有,并且 dirty 不为空
		m.mu.Lock()
		// Avoid reporting a spurious miss if m.dirty got promoted while we were
		// blocked on m.mu. (If further loads of the same key will not miss, it's
		// not worth copying the dirty map for this key.)
		read = m.loadReadOnly()
		e, ok = read.m[key]
		if !ok && read.amended { // 双重检查
			e, ok = m.dirty[key] // 在 dirty 中获取 key 对应的 value
			// Regardless of whether the entry was present, record a miss: this key
			// will take the slow path until the dirty map is promoted to the read
			// map.
			m.missLocked() // 无论 key 对应的 value 是否存在,都要执行 missLocked(因为访问了 dirty)
		}
		m.mu.Unlock()
	}
	if !ok {
		return nil, false
	}
	return e.load()
}

Load 的实现很简单,主要分为两个大的步骤:在 read 中查找,以及在 dirty 中查找:

  • 在 read 中查找,如果有,直接返回(相当于 cache hit)
  • 如果 read 中没有,并且 dirty 不为空,尝试在 dirty 查找(相当于 cache miss)
  • 双重检查一波,如果确实 read 没有这个 key,那么在 dirty 获取
  • 因为访问了 dirty,无论 key 对应的 value 是否存在,执行 missLocked
  • 返回结果

missLocked 的实现如下:

func (m *Map) missLocked() {
	m.misses++
	if m.misses < len(m.dirty) {
		return
	}
	m.read.Store(&readOnly{m: m.dirty})
	m.dirty = nil
	m.misses = 0
}

这个函数主要做了两件事:

  • 递增 misses
  • 如果 misses >= dirty 中的 key 总个数,说明出现了较严重的 cache miss,会影响 Store 和 Load 的性能,需要:
    • 将 dirty 提升为 read
    • 将原 dirty 置为 nil

从 Store 和 Load 的源码分析可以看出:sync.Map 还是比较 适合读多写少的场景,如果写比较多(造成主要的数据都在 dirty 中),会导致部分读取要先读 read,再读 dirty,性能不佳

Delete

func (m *Map) Delete(key any) {
	m.LoadAndDelete(key) // 实际上调用的还是 LoadAndDelete
}

func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {
	read := m.loadReadOnly()
	e, ok := read.m[key]
	if !ok && read.amended { // 如果 read 不存在,并且 dirty 不为 nil
		m.mu.Lock()
		read = m.loadReadOnly()
		e, ok = read.m[key]
		if !ok && read.amended { // 双重检查
			e, ok = m.dirty[key]
			delete(m.dirty, key) // 在 dirty 中删除 key
			// Regardless of whether the entry was present, record a miss: this key
			// will take the slow path until the dirty map is promoted to the read
			// map.
			m.missLocked() // misses++,进一步的,可能提升 dirty 为 read
		}
		m.mu.Unlock()
	}
	if ok {
		return e.delete() // 原子删除
	}
	return nil, false
}

func (e *entry) delete() (value any, ok bool) {
	for {
		p := e.p.Load()
		if p == nil || p == expunged {
			return nil, false
		}
		// p 不为 nil,并且 p 之前没有被删除过
		if e.p.CompareAndSwap(p, nil) {
			return *p, true
		}
	}
}

总结

  • sync.map 是线程安全的
  • 读取,插入,删除保持常数级的时间复杂度。
  • 通过读写分离(read 负责读,dirty 负责读与写),降低锁时间 来提高效率,适用于 读多写少 的场景。
  • 调用 Load 函数时,如果在 read 中没有找到 key,则会将 misses 值原子地增加 1,当 misses 增加到和 dirty 的长度相等时,会将 dirty 提升为 read,减少「读 miss」
  • 新写入的 key 会保存到 dirty 中,如果这时 dirty 为 nil,就会先新创建一个 dirty,并将 read 中未被删除的元素拷贝到 dirty