很多并发模型在涉及到线程间通信时,往往采用互斥锁来实现数据的安全性,但这种方式会消耗很多性能

Go 采用的 CSP 并发模型提倡:通过 通信共享内存 而不是通过共享内存而实现通信。

如果说 goroutine 是 Go 程序并发的执行体,channel 就是它们之间的连接。

channel 是可以让一个 goroutine 发送特定值到另一个 goroutine 的通信机制。

通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。

channel 类型

channel 是 Go 的一种类型,定义一个 channel:

var <变量名> chan <元素类型>

定义一个 channel 需要指定 channel 存储的元素类型:

var ch_int chan int      // 元素类型为 int
var ch_sl_int chan []int // 元素类型为 int 型的切片

初始化 channel

初始化一个 channel 需要使用 make 函数:

ch := make(chan <元素类型>, [缓冲区大小])

缓冲区大小是一个可选项,默认为 0

没有使用 make 初始化的 channel 的值为 nil

channel 的操作

ch := make(chan int, 2)
ch <- 1                              // 写数据到 channel
ch <- 2                              // 写数据到 channel
fmt.Printf("len(ch): %v\n", len(ch)) // 获取 ch 的元素个数
fmt.Printf("cap(ch): %v\n", cap(ch)) // 获取 ch 的缓冲区大小
i := <-ch                            // 从 channel 读数据
fmt.Printf("i: %v\n", i)
<-ch      // 从 channel 读数据,忽略结果
close(ch) // 关闭通道
ch <- 1   // panic

注意:

不同于文件描述符,channel 可以被垃圾回收,这意味着我们可以不用必须关闭通道

此外,如果要关闭通道,这个操作 一般由发送方执行

关闭后的通道的特点:

  • 对一个关闭的通道再发送值就会导致 panic。
  • 对一个关闭的通道进行接收会一直获取值直到通道为空。
  • 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。
  • 关闭一个已经关闭的通道会导致 panic。

channel 的缓冲区

无缓冲

func main() {
	ch := make(chan int, 0)	// 缓冲区大小为 0
	ch <- 1
	fmt.Printf("Success!")
}

上述代码可以编译通过,但是运行时会输出:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
        /Users/Sky_Lee/Documents/Go/Test/test.go:20 +0x28
exit status 2

发生了死锁!

由于创建的是无缓冲的通道,向通道发送了数据后,却没有其它线程去接收

无缓冲的通道只有在有接收方能够接收值的时候才能发送成功,否则会一直处于等待发送的阶段

这样,当前线程就被阻塞了

解决方法:可以创建一个 goroutine 去接收 channel 的数据:

func recv(ch chan int)  {
	i := <- ch
	fmt.Printf("recv: i: %v\n", i)
}

func main() {
	ch := make(chan int, 0)	// 缓冲区大小为 0
	go recv(ch) // 创建一个 goroutine 去接收 channel 的数据
	ch <- 1
	fmt.Printf("Success!")
}

也可以扩大缓冲区的大小,只要缓冲区没满,发送方就不会阻塞

有缓冲

通过在 make 时指定 channel 的缓冲区大小:

func main() {
	ch := make(chan int, 1)	// 缓冲区大小为 1
	ch <- 1					// 不会阻塞
	fmt.Printf("Success!")
}

阻塞时机

通过上述分析,可以得出:

  • 向一个缓冲区已满的 channel 发送数据会阻塞
  • 从一个缓冲区已空的 channel 接收数据会阻塞

可以利用无缓存的 channel 实现协程间同步:

func f(ch chan int) {
	fmt.Printf("In f...\n")
	time.Sleep(time.Second) // 让 f 的执行周期长一些
	<-ch                    // 只有 f 从 channel 接收了数据,main 函数才能继续执行
}

func main() {
	ch := make(chan int) // 缓冲区大小为 0
	go f(ch)
	ch <- 1
	fmt.Printf("In main...\n")
}

多返回值

从 channel 接收值时,怎么知道当前 channel 已经被关闭,从而结束接收数据呢?

事实上,从 channel 接收值时,支持以下多返回值模式:

v, ok := <-ch
  • v: 从 channel 接收到的值
  • ok:布尔值,channel 是否关闭

基于此,可以编写循环读取 channel ,直到关闭:

func f1(ch chan int) {
	for {
		v, ok := <- ch
		if !ok { // channel 已经关闭
			fmt.Printf("Channel closed.\n")
			break
		}
		fmt.Printf("v: %v\n", v)
	}
}

func main() {
	ch := make(chan int) // 缓冲区大小为 0
	go f1(ch)
	for i := 0; i < 10; i++ {
		ch <- i
	}
	// close(ch) // 不关也行,main 函数结束后,垃圾回收,也会关闭通道
	fmt.Printf("Success!\n")
}

注意:不要使用 len(ch) 来判断通道是否关闭!

ch := make(chan int) // 缓冲区大小为 0
fmt.Printf("(len(ch) == 0): %v\n", (len(ch) == 0)) // true, 但 ch 并没有关闭

for range

可以使用 for range 循环从 channel 接收数据:

func f1(ch chan int) {
	for v := range ch {
		fmt.Printf("v: %v\n", v)
	}
	fmt.Printf("Channel closed.\n")
}

单向 channel

在生产者消费者模型中,我们可能希望:

  • 生产者只向 channel 写数据
  • 消费者只从 channel 读数据

这种场景下,就要使用到单向 channel:

<name> <-chan <type> // 只接收通道
<name> chan<- <type> // 只发送通道

例如:

var ch1 <-chan int
// 这里为了演示,没有给 channel 分配内存
<- ch1   // 正确
ch1 <- 1 // 错误,cannot send to receive-only channel ch1

利用单向 channel,实现一个简易的生产者消费者模型:

package main

import (
	"fmt"
	"strconv"
	"sync"
)

func producer(ch chan<- int)  {
	defer wg.Done()
	// 生产 100 个数据
	for i := 0; i < 100; i++ {
		ch <- i
	}
	close(ch) // 注意,这里需要主动关闭,否则消费者会一直等待,导致死锁
}

func consumer(ch <-chan int, name string) {
	defer wg.Done()
	for v := range ch {
		fmt.Printf("%s: recv data %d\n", name, v)
	}
}

var wg sync.WaitGroup

func main()  {
	ch := make(chan int, 5)

	// 1 个生产者,多个消费者
	wg.Add(1)
	go producer(ch)

	for i := 0; i < 5; i++ {
		wg.Add(1)
		go consumer(ch, "consumer " + strconv.Itoa(i))
	}

	wg.Wait() // 等待所有协程退出
}

不难看出:在函数传参时,双向 channel 可以直接赋值给单向 channel,但不能反向转换

select 多路“复用”

没有接触 select 前,如果想要从多个 channel 获取值,可能会这样写:

for {
	data0, ok0 := <- ch1

	data1, ok1 := <- ch2

	// ...
}

但是这样的效率很低,如果任意一个操作阻塞,那么即使其它 channel 是可用的,也无法读取

select 就是用来解决这个问题的

select 是 Go 语言中用于处理并发通信的重要控制结构之一。select 语句用于在多个通道操作中选择一个可用的操作并执行它。它允许在不阻塞的情况下等待多个通道中的任何一个产生数据。

select {
	case v := <- ch0 // ch0 可以读取
	// ...
	case ch1 <- 1   // ch1 可以写入
	// ...
	case <- ch2		// ch2 可以读取
	// ...
	default
	// 默认分支,可以省略
}

结构与 switch 语句很像

select 语句的特点:

  • 可以同时处理一个或多个 channel 的操作
  • 如果有多个分支满足,随机 选择一个分支执行

select 的执行流程如下:

  • 每一个 case 应该是通道的读/写操作
  • 从上到下,从左到右执行所有的 case
  • 从所有可用的 case 中 随机 选择一个
  • 如果没有可用的 case,执行 default 分支

例如:

var channels = [3]chan int{
	nil,
	make(chan int),
	nil,
}

var numbers = []int{1, 2, 3}

func getNumber(i int) int {
	fmt.Printf("numbers[%d]\n", i)
	return numbers[i]
}

func getChan(i int) chan int {
	fmt.Printf("channels[%d]\n", i)
	return channels[i]
}

func test5()  {
	select {
	case getChan(0) <- getNumber(0):
		fmt.Println("The first candidate case is selected.")
	case getChan(1) <- getNumber(1):
		fmt.Println("The second candidate case is selected.")
	case getChan(2) <- getNumber(2):
		fmt.Println("The third candidate case is selected")
	default:
		fmt.Println("No candidate case is selected!")
	}
}

输出:

channels[1]
numbers[1]
channels[0]
numbers[0]
channels[2]
numbers[2]
No candidate case is selected!

元素值在经过通道传递时会被复制,那么这个复制是是浅拷贝还是深拷贝呢

发送操作包括了「复制元素值」和「放置副本到通道内部」这两个步骤。

接收操作通常包含了「复制通道内的元素值」「放置副本到接收方」「删掉原值」三个步骤。

那么这里的「复制」是浅拷贝还是深拷贝呢?

做个实验看看:

// 验证进入 channel 的过程是深拷贝还是浅拷贝
func test2() {
	a0 := []int{1, 2, 3, 4, 5}

	ch := make(chan []int, 1)
	ch <- a0

	a0[0] = 6

	a1 := <-ch
	fmt.Printf("a1[0]: %v\n", a1[0]) // 如果为 6,说明为浅拷贝
}

// 验证出 channel 的过程是深拷贝还是浅拷贝
func test3() {
	a0 := []int{1, 2, 3, 4, 5}

	ch := make(chan []int, 1)
	ch <- a0

	a1 := <-ch

	a1[0] = 6

	fmt.Printf("a0[0]: %v\n", a0[0]) // 如果为 6,说明为浅拷贝
}

输出均为 6

由于切片是引用类型,如果发生浅拷贝,那么对拷贝后的切片的修改会反映到原切片,这里输出为 6,正好符合浅拷贝

这里插一嘴:Go 语言默认情况下,所有的拷贝都是浅拷贝

浅拷贝相对深拷贝,拷贝时不需要重新分配大量内存,性能更好

当然浅拷贝有些时候不是我们期望的,这个时候往往需要自己深拷贝