与传统的加锁控制并发的对比
在访问临界资源时,可以使用传统的加锁方式,但如果项目变得复杂,这种方式容易出现 bug,编码困难
Go 使用 channel 配合 select 来做并发控制,使得编写并发代码变得很简单
使用 channel 可以实现 goroutine 间通信,甚至可以将多个 channel 的数据汇总到一个 channel 中
当然传统的加锁 + 共享内存也可以做到,但实现较复杂,我们不得不把精力放在如何编写「正确」的并发代码上,反而忽略了业务逻辑的处理
Go 的并发原则非常优秀,目标就是简单:尽量使用 channel;把 goroutine 当作免费的资源,随便用。
channel 实现原理
下面简要分析一下 channel 实现原理,基于 Go 1.21.1
数据结构
channel 的源码位于 /src/runtime/chan.go,结构定义如下:
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
挑几个重要的说明一下:
- buf:channel 的缓冲区(针对有缓冲的 channel,无缓冲的 channel 这个字段为 nil)
- closed:标记 channel 是否关闭
- sendx、recvx:分别指向 buf 可发送位置、可接收位置
- sendq、recvq:分别指向因发送阻塞的 goroutine 队列,以及因接收阻塞的 goroutine 队列
具体的,waitq 的定义如下:
type waitq struct {
first *sudog
last *sudog
}
waitq 实际上是一个双向链表,每一个元素都是 sudog 类型
sudog 封装了 goroutine:
// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
type sudog struct {
g *g // goroutine
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
// ...
}
引用 什么是 channel 的一张图来加以说明:
创建 channel
创建 channel,需要使用 make:
ch := make(chan struct{}, cap)
该方法底层会调用 makechan:
func makechan(t *chantype, size int64) *hchan {
var c *hchan
// ...
}
当我们分配不带缓冲区的 channel 时,makechan 仅会涉及一次内存分配
c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
如果 channel 带缓冲区,那么就还要单独为 buf 分配一次内存:
// 涉及两次内存分配
c = new(hchan)
c.buf = newarray(elem, int(size)) // 为 buf 分配内存
接收数据
当我们尝试从 channel 接收数据时,根据是否带 ok ,底层分别调用 chanrecv1、chanrecv2 函数:
// entry points for <- c from compiled code.
//
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
其中:
- c 是指向 channel 的指针
- elem 是指向待赋值元素的指针
如果成功从 channel 获取了值,那么 received 为 true
无论哪个方法,都会调用 chanrecv 函数:
// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// ...
}
- 如果 channel 有可用数据:
- 如果 ep 为 nil,说明调用者忽略了读取的数据
- 返回 true,true
- 如果 channel 没有可用数据:
- 如果是非阻塞调用,返回 false,false
- 否则,阻塞当前 goroutine
- 如果 channel 已被关闭:
- 如果 ep 不为 nil,给 ep 赋上一个对应类型的 零值
- 返回 true,false
为什么有 block 参数?channel 还有非阻塞的吗?
这种情况通常发生在使用 select 的情况下,比如:
select {
case value := <-ch:
// 从通道中成功接收到值
default:
// 通道空,接收操作无法立即进行
}
在这种情况下,对于 ch 的读取操作,编译器就会转译成 chanrecv(c, &value, true)
如何从 channel 读取数据?
首先,怎么才能判断 channel 有数据呢?
先来判断 sendq 是否为空
如果不为空,只有两种情况:
- channel 不带缓冲区
- channel 带缓冲区,但是缓冲区满了
无论哪种情况,都说明 channel 有可用数据
在 sendq 不为空的情况,实际上会调用 recv 函数:
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
那么 recv 做了什么事情呢?
简单来说:
- 如果是不带缓冲区的 channel:直接拷贝数据,从 sender goroutine -> receiver goroutine
- 如果是带缓冲区的 channel:
- 将 buf 队头的数据拷贝给 ep
- 将 sender 发送的数据追加到 buf 队尾
- 唤醒 sender
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// ...
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// ...
}
如果 sendq 为空,就要看看 buf 是否为空:
- 如果为空,说明 channel 没有可用数据,阻塞
- 如果不为空,说明 channel 有可用数据,根据 recvx 来确定读取位置,再读取数据
// buf 不为空
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
阻塞当前 goroutine,怎么实现的?
直接上源码:
// no sender available: block on this channel.
gp := getg() // 获取当前 goroutine 的虚拟内存地址
mysg := acquireSudog() // 创建一个 sudog
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 保存 ep 的地址到 mysg(sudog)中
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg) // 进入到当前 channel 的 recvq 等待队列
// 阻塞当前 goroutine
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
当前 goroutine 唤醒后,怎么读取 channel 的值呢?
接着上源码:
// someone woke us up(某个 goroutine 将我们唤醒)
// 执行一些扫尾工作
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
似乎没有看到 recv 操作
还记得前面提到的 保存 ep 的地址到 mysg(sudog)中 这个操作嘛?
实际上,当一个 sender 发送数据时,如果 recvq 不为空,会将数据直接拷贝到 mysg.elem 中(这个过程下文会讲),也就相当于保存到了 ep 中
发送数据
当我们尝试向一个 channel 发送数据,底层实际上调用了 chansend1 函数:
// entry point for c <- x from compiled code.
//
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
chansend1 又会调用 chansend 函数(套娃了属于是):
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
}
有了 chanrecv 的基础,理解 chansend 就比较简单了:
- 如果 recvq 不为空,说明有 goroutine 正在等待,将数据直接拷贝到 recvq 的队头的 goroutine
- 如果 channel 不带缓冲区:将当前 goroutine 挂到 sendq,阻塞 goroutine
- 如果 channel 带缓冲区:
- 如果 buf 没满,那么将当前数据 append 到 buf 队尾
- 否则,将当前 goroutine 挂到 sendq,阻塞 goroutine
- 如果 channel 已经关闭,直接 panic
还是来看看源码:
当 recvq 不为空,说明有 goroutine 正在等待,会调用 send 函数:
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
send 函数做了什么事?
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// ...省略
// 如果 sudog 的 elem 字段不为 nil
// 将数据直接拷贝到 sudog 的 goroutine 的栈区
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 唤醒 goroutine
goready(gp, skip+1)
}
当 buf 足以存放新的数据,基于 sendx 来存放数据到 buf:
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
如果阻塞发送数据,并且 recvq 为空,并且 buf 不够存放新的数据,阻塞当前 goroutine:
// Block on the channel. Some receiver will complete our operation for us.
gp := getg() // 获取指向当前 goroutine 的指针
mysg := acquireSudog() // 创建 sudog
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep // 保存 ep 的地址到 mysg(sudog)中
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg) // 将当前 goroutine 挂到 sendq 的队尾
// 阻塞操作
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
// 防止 ep 被 GC
KeepAlive(ep)
当前 goroutine 唤醒之后:
// someone woke us up.
// 两种情况:
// 1. 接收数据
// 2. 关闭 channel
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
// 检查当前 channel 是否被关闭
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel")) // 如果被关闭,panic
}
return true
注意:当前 goroutine 唤醒之后,如果 channel 关闭了,会 panic 掉
关闭 channel
关闭 channel 的过程就比较简单了:
- 如果当前 channel 已经被关闭,或者为 nil,panic
- 给 closed 字段赋值为 1
- 唤醒所有的 reader(返回值为对应类型的零值)
- 唤醒所有的 sender(它们会 panic)
源码如下:
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
racerelease(c.raceaddr())
}
c.closed = 1
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
channel 进阶
发送与接收数据的本质
在 之前的文章 提到了一个问题:元素值在经过通道传递时会被复制,那么这个复制是是浅拷贝还是深拷贝呢
那篇文章已经给出了解答:这个过程是 浅拷贝
深入 channel 底层是这样回答的:
Remember all transfer of value on the go channels happens with the copy of value.
channel 的发送和接收操作本质上都是浅拷贝,无论是从 sender 的栈到 chan buf,还是从 chan buf 到 receiver,或者是直接从 sender 到 receiver
happened before
给出维基百科的定义:
In computer science, the happened-before relation (denoted: → ) is a relation between the result of two events, such that if one event should happen before another event, the result must reflect that, even if those events are in reality executed out of order (usually to optimize program flow). This involves ordering events based on the potential causal relationship of pairs of events in a concurrent system, especially asynchronous distributed systems.
翻译成中文就是:
在计算机科学中,happened before(记作:→)是两个事件结果之间的一种关系,它指的是如果一个事件应该在另一个事件之前发生,那么结果必须反映这一点,即使这些事件在现实中是以不同顺序执行的(通常是为了优化程序流程)。这涉及到根据并发系统中事件对的潜在因果关系来对事件进行排序,特别是在异步分布式系统中。
这点怎么在 Go 中体现呢?
基于 happened before,我们可以使用 channel 实现 goroutine 之间的同步:
var ch chan struct{}
var x int
func init() {
ch = make(chan struct{})
x = -1
}
func write(val int) {
time.Sleep(time.Second * 3)
x = val
}
func read() int {
return x
}
func main() {
var wg sync.WaitGroup
wg.Add(2)
// 我们希望 read 在 write 完毕后执行
go func() {
defer wg.Done()
write(114514)
ch <- struct{}{}
}()
go func() {
defer wg.Done()
<-ch
fmt.Printf("read(): %v\n", read())
}()
wg.Wait()
}
输出:
Sky_Lee@SkyLeeMacBook-Pro test % go build
Sky_Lee@SkyLeeMacBook-Pro test % ./test
read(): 114514
优雅关闭 channel
在分析 closechan 的源码时,提到了:关闭一个已经关闭的 channel,会 panic
分析 chansend 的源码时,提到了:写入一个关闭的 channel,会 panic
那么应该如何关闭 channel,避免 panic 呢?
一种暴力的方法是使用 defer + recover:你 panic 呗,哥们直接给你 recover 掉
但是这种方式不符合我们的「优雅」关闭
这里给出关闭 channel 的原则:
don’t close a channel from the receiver side and don’t close a channel if the channel has multiple concurrent senders.
也就是:
- 不要在 receiver 关闭 channel
- 当有多个 sender 时,不要关闭 channel
实际上我们可以不用关闭 channel,当没有任何一个 goroutine 使用 channel 时,它会被 GC 掉
channel 的应用
- 停止信号
- 任务定时
- 控制并发数
停止信号
func goroutineA(ch chan struct{}) {
for range time.Tick(time.Second) {
select {
case <- ch: // 停止信号
fmt.Println("goroutine A is stopped.")
return
default:
}
fmt.Println("goroutine A is running...")
}
}
func main() {
ch := make(chan struct{})
go goroutineA(ch)
time.Sleep(5 * time.Second)
ch <- struct{}{} // 发送停止信号
time.Sleep(1 * time.Second)
fmt.Println("main goroutine is exiting...")
}
任务定时
func main() {
ch := time.Tick(time.Second)
i := 0
for {
<- ch // 1s 执行一次
fmt.Printf("i = %v\n", i)
i++
if i == 5 {
break
}
}
}
控制并发
package main
import (
"fmt"
"sync/atomic"
"time"
)
var activeGoroutines int32
func init() {
activeGoroutines = 0
}
func work(id int) {
atomic.AddInt32(&activeGoroutines, 1)
defer atomic.AddInt32(&activeGoroutines, -1)
// fmt.Printf("goroutine-%d is working...\n", id)
time.Sleep(time.Second * 1)
}
// 限流
func limitRate(id int, ch chan struct{}) {
ch <- struct{}{}
work(id)
<-ch
}
func main() {
limit := 10
ch := make(chan struct{}, limit) // 控制并发量为 10
for i := 0; i < 100; i++ { // 100 个 goroutine 并发执行 work
go limitRate(i, ch)
}
// 每秒拉取一次活跃 goroutine 的数量
for range time.Tick(time.Second) {
fmt.Printf("Active goroutines: %d\n", atomic.LoadInt32(&activeGoroutines))
}
}