目 录CONTENT

文章目录

Go学习系列14-协程与通道

cplinux98
2022-11-19 / 0 评论 / 0 点赞 / 853 阅读 / 3,029 字 / 正在检测是否收录...

什么是并发

并发的优势

  • 可以充分利用CPU核心的优势,提高程序的执行效率
  • 并发能充分利用CPU与其他硬件设备的异步性,如文件操作等

几种并发模式

  • 多进程
    • 操作系统层面的并发模式
    • 优点:进程间互不影响
    • 缺点:开销大
  • 多线程
    • 系统层面的并发模式,使用最多、最有效的一种模式
    • 可以理解为轻量级别的进程
    • 优点:比进程开销小
    • 缺点:开销仍然较大
  • goroutine
    • 用户态线程,不需要操作系统进行抢占式调度
    • 优点:因为他们的创建和销毁不需要通过操作系统去做,所以速度很快,可以提高任务并发性,编程简单,结构清晰,不用系统调度,很容易控制,并且很灵活

协程并发模型

go语言是通过通信的方式来共享数据。

go语言中有调度器来调度goroutine、对接系统级线程,它是Go语言运行时系统的重要组成部分,主要负责统筹调配Go并发编程模型中的三个主要元素G(goroutine)、P(processor)、M(machine)。

  • M
    • 系统级线程
  • P
    • 一个中介,可以引用若干个G,能够使G在恰当的时机与M对接,并得到运行
    • 负责G的运行安排:G阻塞时分离G与M、G运行时对接G与M、申请M资源、销毁M资源
  • G
    • goroutine
package main

import "fmt"

func main() {
	for i := 0; i < 10; i++ {
		i := i
		go func() {
			fmt.Println(i)
		}()
	}
}

上面的程序并没有任何输出结果,是因为:

  • go里面的G的准备是需要一些时间的,所以通过go关键字触发的协程是慢于go语句的执行时间的
  • 只要Go语句本身执行完毕,Go程序不会等待Go函数的执行,他会立刻执行后边的语句,这就是异步并发执行
  • 主Goroutine(main函数)执行完毕后,程序就会结束运行,无论其他的goroutine是否运行,都不会再执行了

如果在for语句块后面暂停一下main函数,看下效果

package main

import (
	"fmt"
	"time"
)

func main() {
	for i := 0; i < 10; i++ {
		i := i
		go func() {
			fmt.Println(i)
		}()
	}
	time.Sleep(1 * time.Second)
}
//4
//9
//5
//6
//7
//8
//0
//3
//2
//1

虽然可以让其他goroutine并发执行,但是这个暂停时间我们掌控不好,时间长了降低程序运行效率,时间短了有的goroutine还没有执行完成,所以需要一个合适的方式来解决该问题。

channel(通道)

这是一种goroutine间的通信方式,channel是一个容器,只能存放特定类型,可以通过make来创建,作用就是写入和读取

make(chan 类型)
ch <- 类型元素  // 发送,发送的是元素的副本
var hh 类型
hh <- ch // 接收  可以不用接收变量

初始值是nil,对它所作得发送操作和接收操作会永远处于阻塞状态,它们所属的goroutine都不会执行。

当关闭一个channel后,再向里面发送数据,会panic;

当关闭一个channel后,再关闭channel,会panic。

channel的特性:

  • 对于同一个channel,发送操作之间是互斥的,接收操作之间也是互斥的,
    • 也就是说同一时间只能执行同一个发送操作,直到这个元素被完全复制进channel后,该channel才可以继续接收其他元素。
    • 同理,同一时刻只执行一次接收操作,直到这个元素被完全移出该channel后,才会依次执行其他接收操作
  • 对于channel里面的元素来说,发送操作和接收操作之间也是互斥的
    • 还没复制到channel里面的元素,这个时候是不能查看或接收的
  • 发送操作和接收操作中,对元素的处理都是原子(不可分割)的
    • 发送操作只有两种状态
      • 没复制元素值
      • 已经复制完毕
      • 不会出现只复制了一部分的情况,channel会保证元素值的完整性和操作channel的唯一性
      • channel中的同一个元素,要么被一个发送操作放入,要么被一个接收操作取出,不会出现同一个值,又被发送,又被取出
  • 发送操作在完全成功之前会被阻塞,接收操作也是如此
    • 发送步骤:
      • 复制元素值
      • 放置副本到channel内部
    • 接收操作
      • 复制channel内的元素值
      • 放置副本到接收方
      • 删除原值
    • 发送和接收都有可能被阻塞,甚至是长时间阻塞

两种channel类型

  • 缓冲channel
    • 初始化的时候定义channel元素的数量
    • 在channel已经满了的情况下,全部发送操作会被阻塞,直到channel中有元素被取走
      • 等待发送操作的goroutine会顺序的进入到一个等待队列,保证进入的顺序是相等的
    • 在channel是空的情况下,所有接收操作都会被阻塞,直到channel内部有元素出现
      • 等待接收操作的goroutine会顺序的进入到一个等待队列,保证取出的顺序是相等的
    • 缓冲是用异步的方式传递数据的
    • 缓冲channel是发送方和接收方的桥梁,元素值先从发送方复制到缓冲channel中,再由缓冲channel复制给接收方
    • 使用cap来获取channel容量,len来获取channel内的元素个数
    • 举例:有个快递,快递小哥可以放在快递柜子里面,我再从快递柜取出快递
  • 无缓冲channel
    • 初始化的时候不定义channel元素的数量
    • 无论发送还是接收,一旦开始执行就会阻塞,直到配对的操作也开始执行,才会继续传递,这是一种同步的方式传递数据,只有收发双方接上了,数据才会被传递。
    • 数据是直接从发送方复制到接收方,中间不会用无缓冲channel做中转
    • 举例:有个快递,必须要当面签收,只有和快递小哥约好时间,才能当面取到快递

单向channel

当一个channel作为参数时,我们可以让他同时发送和接收消息,这会使调用者产生困惑,所以遵循最小原则,只执行一种操作,要么发送chan <- string,要么接收<-channel,以此来避免误用。

package main

import "fmt"

func first(c chan<- string) {
	c <- "买菜"
	close(c)
}

func second(c1 <-chan string, c2 chan<- string) {
	r := <-c1
	c2 <- r + " 买肉"
	close(c2)
}

func Cook(c <-chan string) {
	for r := range c {
		fmt.Println(r + "已经准备好,吃顿好的!")
	}
}

func main() {
	ch1 := make(chan string)
	ch2 := make(chan string)
	go first(ch1)
	go second(ch1, ch2)
	Cook(ch2)
	fmt.Println("洗碗。。")
}

无缓冲channel

package main

import "fmt"

func Eat(foodName string, c chan bool) {
	fmt.Println("我正在吃" + foodName)
	c <- true
}

func main() {
	fmt.Println("主goroutine开始运行")
	c := make(chan bool)
	go Eat("生蚝", c)
	fmt.Println("主goroutine运行结束")
	<-c
	close(c)
}

//主goroutine开始运行
//主goroutine运行结束
//我正在吃生蚝

select

select是用来监听与channel有关的I/0操作的,当发生I/O操作时,会触发相应的动作。

与switch用法几乎相同,由case语句和一个默认分支组成。case语句必须是一个针对channel的操作。

每个case语句指定一次通信,select会一直等待,直到有通信通知到某个case可以执行为止。

  • 如果有case执行,其他的case语句不会被触发,
  • 如果没有default分支,select会永远等待。
  • 如果有多个case同时被通知,会随机选择一个执行
  • channel为nil时,发送操作和接收操作会被永远阻塞,case为nil时,它将永远不会被选择
  • 所有channel表达式都会被求值,所有的case语句也都会被求值,顺序是:自上而下,从左到右
select {
    case <- ch:
    // 语句块1
    case ch <- 1:
    // 语句块2
    default:
    // 默认执行语句
}
package main

import (
	"fmt"
	"time"
)

func GetFood1(c chan string) {
	time.Sleep(3 * time.Second)
	close(c)
}

func GetFood2(c chan string) {
	c <- "清蒸鱼好了" + time.Now().String()
	time.Sleep(2 * time.Second)
}

func GetFood3(c chan string) {
	c <- "烤生蚝好了" + time.Now().String()
	time.Sleep(2 * time.Second)
}

func main() {
	c1 := make(chan string)
	c2 := make(chan string)
	c3 := make(chan string)

	go GetFood1(c1)
	go GetFood2(c2)
	go GetFood3(c3)

	select {
	case r := <-c1:
		fmt.Println(r)
	case r := <-c2:
		fmt.Println(r)
	case r := <-c3:
		fmt.Println(r)
		//default:
		//	fmt.Println("当前菜还没做好" + time.Now().String())

	}
}

  • 不注释掉default
    • 因为执行到default前,程序时间没到2s,所以程序会执行default操作,结束主goroutine,其他的子goroutine就不会被再执行了
  • 注释掉default
    • select会被阻塞,等待有case被满足,但上面有2个暂停2s的程序,所以会随机选择一个执行(多次执行就能看出随机结果)

select同样可以使用break来结束select

select {
    case r:= <-c3:
    fmt.Println(r)
    break
}

验证求值顺序

package main

import "fmt"

var ch1 chan string
var ch2 chan string
var ch3 chan string

var chs = []chan string{ch1, ch2, ch3}
var foods = []string{"红烧肉", "清蒸鱼", "烤生蚝"}

func getFood1(i int) string {
	fmt.Printf("Foods [%d]\n", i)
	return foods[i]
}

func getChan(i int) chan string {
	fmt.Printf("chs [%d]\n", i)
	return chs[i]
}

func main() {
	select {
	case getChan(0) <- getFood1(2):
		fmt.Println("1th case is selected.")
	case getChan(1) <- getFood1(1):
		fmt.Println("2th case is selected.")
	default:
		fmt.Println("default.")
	}
}
//chs [0]
//Foods [2]
//chs [1]
//Foods [1]
//default.


虽然上面的select看着会执行default,但每个case语句都会被求值执行。

关闭channel

channel会设置一个标志位来提示当前发送操作已经完毕,这个channel后面就没有元素了。

当channel不再使用时,必须要进行关闭close(ch)

当使用for range来循环channel的值时,如果channel的值为nil,那么这条for语句会永远地阻塞在for关键字那一行上。

判断channel是否关闭:

v, ok := <-ch
// ok == false 就是关闭

关闭示例

package main

import (
	"fmt"
	"time"
)

func MakeFood(c chan string) {
	foods := []string{"1", "2", "3", "4", "5"}
	for _, item := range foods {
		c <- item
        fmt.Println("现在放的菜是: " + item)
		// time.Sleep(3 * time.Second)
	}
	fmt.Println("菜都放完了")
	close(c)
}

func main() {
	ch := make(chan string)
	go MakeFood(ch)
	for i := range ch {
		fmt.Println(i + "菜来了")
	}
	fmt.Println("菜上完了")
}

上面的示例的结果可能会有些迷惑

  • 取消注释sleep时,每次都会出现2个放菜,再出现2个上菜
    • 看着像无缓冲channel里面放入了2个元素,但其实并不是,这里面还是一个放一个拿的
    • 打印顺序异常是因为打印属于io操作,io操作会比正常的计算操作慢一点,所以打印的结果才不一样
  • 打开注释sleep时,放菜和上菜的顺序是按照同步来的
    • 是因为sleep阻塞住了,有了打印io操作的时间,所以打印结果才顺序输出
0

评论区