目录

Golang为并发而生

https://image.xfeng.io/QmcBo633Tw6s8xJzxkKUzy3tQ8iaU2wcHsQHA1DiSi3PHZ.png

1. 概述

Google一开始写Golang的时候就是为了解决Google内部业务的高并发需求,而且Golang的一大特点就是高并发,所以本文就介绍与Golang高并发相关的原理,概念以及技术点。

我会首先介绍一些概念,如:并行和并发,进程、线程和协程以及它们的区别,然后介绍Golang里面的goroutine和channel,它们是Golang实现高并发的关键,在聊一下select,定时器,runtime和同步锁,最后介绍Go的并发优势,并发模型和Go的调度器。

2. 并行、并发

学过操作系统的话,应该对并行和并发不陌生

https://image.xfeng.io/QmYLEJF5L5obQnRvfgY5EKTbgVMd3CJGGZQAJ1fP7NtBUK.png

在同一时刻,有多条指令在多个处理器上同时执行

在同一时刻只能有一条指令执行,但多个进程指令被快速轮换执行(根据不同的情况有不同的轮换算法)

并行与并发的区别:

  • 并行在多处理器系统中存在,而并发可以在但处理器和多处理器系统中存在
  • 并行要求程序能够同时执行多个操作,而并发只要求程序假装同时执行多个操作(一个时间片执行一个操作,再轮换多个操作)

3. 进程、线程、协程

https://image.xfeng.io/QmZXJ55wTw2X61TssdSuWaAF1TF4wAx1JY2oH1TvAUZ5nj.png

是包含计算机指令,用户数据和系统数据的程序执行环境,以及包含其允许时获得的其他类型资源

相对进程是更加小巧而轻量的实体,线程有进程创建且包含自己的控制流和栈,进程和线程的区别在于:进程是正在执行的二进制文件,而线程是进程的子集

协程(goroutine)是Go程序并发执行的最小单元,因goroutine不像Unix进行那样是自治的实体,goroutine主要优点是非常轻巧,轻松运行成千上万个都没问题,goroutine比线程还轻量,goroutine需要一个进程的环境才能存在,创建goroutine的时候,需要一个进程且这个进程至少有一个线程。协程是一种用户态的轻量级线程,协程的调度完全由用户控制,协程间的切换只需要保保存任务的上下文,没有内核的开销。线程栈空间通常是 2M,Goroutine 栈空间最小 2K

4. goroutine

上面介绍了协程(下文统一用goroutine)的概念,下面介绍一下goroutine的实际语法。

在Go语言中使用go关键字后跟函数名称或定义完整的匿名函数即可开启一个新的goroutine,使用go关键字调用函数后会立即返回,该函数在后台作为goroutine运行,程序的其余部分会继续执行。

创建一个goroutine

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main
import (
	"fmt"
	"time"
)

func main()  {
	go function()
	go func() {
		for i := 10; i < 20; i++ {
			fmt.Print(i, " ")
		}
	}()
	time.Sleep(1 * time.Second)
}

func function() {
	for i := 0; i < 10; i++ {
		fmt.Print(i)
	}
	fmt.Println()
}

你可能会发现上面的输出不是固定的(main函数可能会提前结束),我们可以用sync包来解决这个问题。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main
import (
	"flag"
	"fmt"
	"sync"
)

func main() {
	n := flag.Int("n", 20, "Number of goroutines")
	flag.Parse()
	count := *n
	fmt.Printf("Going to create %d goroutines.\\n", count)
	var waitGroup sync.WaitGroup //定义sync.WaitGroup类型的变量

	fmt.Printf("%#v\\n", waitGroup)
	for i := 0; i < count; i++ { //使用for循环创建所需数量的goroutine
		waitGroup.Add(1) //每次调用都会增加sync.WaitGroup变量中的计数器,防止出现任何竞争条件
		go func(x int) {
			defer waitGroup.Done() //sync.WaitGroup变量减一
			fmt.Printf("%d ", x)
		}(i)
	}

	fmt.Printf("%#v\\n", waitGroup)
	waitGroup.Wait() //sync.Wait调用将阻塞,直到sync.WaitGroup变量中的计数器为0,从而保证所有groutine能执行完成
	fmt.Println("\\nExiting...")
}

5. channel

channel(通道)是Go共的一种通信机制,允许goroutine之间进行数据传输。

一些明确的规定:

  • 每个channel只允许交换指定类型的数据,也就是通道的元素类型
  • 要是channel正常运行,需要保证通道有数据接受方法

使用chan 关键字即可声明一个channel,可以使用close()函数来关闭通道

当使用channel作为函数时,可以指定其为单向channel

https://image.xfeng.io/QmP3kRBjtqR2ZjxGpusJ2HLR6txUyVtsEzK1JUqqc9w6TL.png

5.1 channel的写入

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
package main
import (
	"fmt"
	"time"
)
func main() {
	c := make(chan int)
	go writeToChannel(c, 10)
	time.Sleep(1 * time.Second)
}
func writeToChannel(c chan int, x int) {
	fmt.Println(x)
	c <- x
	close(c)
	fmt.Println(x)
}

5.2 从channel接受数据

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main
import (
	"fmt"
	"time"
)
func main() {
	c := make(chan int)
	go writeToChannel(c, 10)
	time.Sleep(1 * time.Second)
	fmt.Println("Read:", <-c)
	time.Sleep(1 * time.Second)
	_, ok := <-c
	if ok {
		fmt.Println("Channel is open!")
	}else {
		fmt.Println("Channel is closed!")
	}
}
func writeToChannel(c chan int, x int) {
	fmt.Println("l", x)
	c <- x
	close(c)
	fmt.Println("2", x)
}

5.3 channel作为函数参数传递

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
	"fmt"
	//"time"
)
func main() {
	c := make(chan bool, 1)
	for i := 0; i < 10; i++ {
		go Go(c, i)
	}

	<-c
}
func Go(c chan bool, index int) {
	sum := 0
	for i := 0; i < 1000000; i++ {
		sum += i
	}
	fmt.Println(sum)
	c <- true
}

6. select

Go中select语句看起来像channels的switch语句,实际上,select允许goroutine等待多个通信操作,因此,使用select的主要好处是:select可以处理多个channels,进行非阻塞操作。

注意:使用channels和select的最大问题是 死锁 。为了解锁死锁问题,后面会介绍同步锁。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main
import(
	"fmt"
	"math/rand"
	"os"
	"strconv"
	"time"
)
func main() {
	rand.Seed(time.Now().Unix())
	createNumber := make(chan int)
	end := make(chan bool)
	if len(os.Args) != 2 {
		fmt.Println("Please give me an integer!")
		return
	}
	n, _ := strconv.Atoi(os.Args[1])
	fmt.Printf("Going to create %d random numbers.\\n", n)
	go gen(0, 2*n, createNumber, end)
	for i := 0; i < n; i++ {
		fmt.Printf("%d ", <-createNumber)
	}
    time.Sleep(5 * time.Second)  //给gen()函数中的time.After()函数足够时间返回,从而激活select分支
	fmt.Println("Exting...")
    end <- true  //激活gen()里面的select语句中的case->end 分支来终止程序并执行相关代码
}
func gen(min, max int, createNumber chan int, end chan bool) {
	for {
		select {
		case createNumber <- rand.Intn(max-min) + min:
		case <- end:
			close(end)
			return
		case <- time.After(4 * time.Second): //time.After函数在指定时间过后返回,因此它将在其他channels被阻塞时解锁select语句
			fmt.Println("\\ntime.After()!") //可以把这个case当作default分支
		}
	}
}

注意:select语句不需要default分支

select语句不是按顺序计算的,因为所有的channels都是同时检查的

如果select语句中没有channels是准备好的,那么select语句就会 阻塞 ,直到有channels准备好,Go运行时就会在这些准备好的channels之间做 随机选择 ,做到公平一致

select最大的优点是:可以连接、编排、管理多个channels

当channels连接goroutine的时候,select连接那些连接goroutine的channels

7. 定时器

介绍select的时候也用到了定时器,那么什么是定时器呢?

定时器是一种通过设置一项任务,在未来的某个时刻执行该任务的机制

定时器有两种:

  • 只执行一次的延时模式
  • 每隔一段时间执行一次的间隔模式

Go语言中的定时器比较完善,所有的API都在time包中

7.1 延时模式

延迟执行有两种:time.After 和 time.Sleep

7.1.1 time.After

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
package main

import (
	"fmt"
	"time"
)
func main() {
	fmt.Println("1")
	timeAfterTrigger := time.After(1 * time.Second)
	<-timeAfterTrigger
	fmt.Println("2")
}

time包提供了运算好的几个int类型常量

1
2
3
4
5
6
7
8
const (
	Nanosecond  Duration = 1
	Microsecond          = 1000 * Nanosecond
	Millisecond          = 1000 * Microsecond
	Second               = 1000 * Millisecond
	Minute               = 60 * Second
	Hour                 = 60 * Minute
)

7.1.2 time.Sleep

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
package main

import (
	"fmt"
	"time"
)
func main() {
	fmt.Println("1")
	time.Sleep(1 * time.Second)
	fmt.Println("2")
}

两者的区别是:time.Sleep 是阻塞当前协程的,而time.After是基于channel实现的,可以在不同协程中传递

7.2 间隔模式

间隔模式有分为两种:一种是执行N次后结束,另一种是程序不停休的执行

7.2.1 time.NewTicker

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
package main

import (
	"fmt"
	"time"
)
func main() {
	fmt.Println("1")
	count := 0
	timeTicker := time.NewTicker(1 * time.Second)
	for {
		<-timeTicker.C
		fmt.Println("每隔 1 秒输出 2")
		count++
		if count >= 5 {
			timeTicker.Stop()
		}
	}
}

7.2.2 time.Tick

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
package main

import (
	"fmt"
	"time"
)
func main() {
	t := time.Tick(1 * time.Second)
	for {
		<-t
		fmt.Println("每隔 1 秒输出一次")
	}
}

7.3 控制定时器

定时器提供了Stop方法和Reset方法

  • Stop方法的作用是停止定时器
  • Reset方法的作用是改变定时器的间隔时间

7.3.1 time.Stop

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
package main

import (
	"fmt"
	"time"
)

func main() {
	timer := time.NewTimer(time.Second * 6)
	go func() {
		<-timer.C
		fmt.Println("时间到")
	}()
	timer.Stop()
}

7.3.2 time.Reset

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
package main

import (
	"fmt"
	"time"
)

func main() {
	fmt.Println("1")
	count := 0
	timeTicker := time.NewTicker(1 * time.Second)
	for {
		<-timeTicker.C
		fmt.Println("2")
		count++
		if count >= 3 {
			timeTicker.Reset(2 * time.Second)
		}
	}
}

8. runtime

runtime是Go语言运行所需要的基础设施,如:控制goroutine的功能,debug,pprof、trace、race进行检测的支持,内存分配,系统操作和CPU相关操作的封装(信号处理、系统调用、寄存器操作、原子操作等),map、channel、string等内置类型及反射的实现

与Java、python中的runtime不同,Java、python的runtime是虚拟机的,而Go的runtime是和用户代码一起编译到一个可执行文件中的

runtime发展历程:

https://image.xfeng.io/QmUs5dJJwJXmpd1ySZEUrwix3EA2p3RykomTp5UrSeXUfq.png

9. 同步锁

上文提高 channels和select的最大问题是 死锁 ,这小节介绍解决死锁的问题–同步锁

Go语言同步锁有两种方式:原子锁,互斥锁

9.1 原子锁

可以借助某个信号向所有的goroutine发送消息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

var (
	shotdown int64 // 该标志向多个goroutine通知状态
	wg       sync.WaitGroup
)

func main() {
	wg.Add(2)

	go doWork("A")
	go doWork("B")

	time.Sleep(1 * time.Second)

	atomic.StoreInt64(&shotdown, 1) // 修改
	wg.Wait()
}

func doWork(s string) {
	defer wg.Done()

	for {
		fmt.Printf("Doing homework %s\\n", s)
		time.Sleep(2 * time.Second)

		if atomic.LoadInt64(&shotdown) == 1 { // 读取
			fmt.Printf("Shotdown home work %s\\n", s)
			break
		}
	}
}

9.2 互斥锁

通过 mutex ,能够将一段临界区间包含起来,只运行单个goroutine执行

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
	"fmt"
	"runtime"
	"sync"
)

var (
	counter int
	wg      sync.WaitGroup
	mutex   sync.Mutex // 定义临界区
)

func main() {
	wg.Add(2)

	go incCount(1)
	go incCount(2)

	wg.Wait()
	fmt.Printf("Final Counter: %d\\n", counter)
}

func incCount(i int) {
	defer wg.Done()

	for count := 0; count < 2; count++ {
		mutex.Lock()
		{
			value := counter
			runtime.Gosched()
			value++
			counter = value
		}
		mutex.Unlock()
	}
}

10. Go并发优势

Go语言为并发编程而内置的上层API基于CSP(communicating sequential processes, 顺序通信进程)模型。这就意味着显式锁都是可以避免的,因为Go语言通过相册安全的通道发送和接受数据以实现同步,这大大地简化了并发程序的编写。

一般情况下,一个普通的桌面计算机跑十几二十个线程就有点负载过大了,但是同样这台机器却可以轻松地让成百上千甚至过万个goroutine进行资源竞争

11. Go并发模型

Go语言实现了两种并发形式:

  • 多线程共享内存(通过共享内存来通信)
  • CSP(communicating sequential processes)并发模型(以通信的方式来共享内存)

Do not communicate by sharing memory; instead, share memory by communicating

Java、C++、python它们的线程都是通过共享内存来通信的

Go的CSP并发模型通过 goroutine 和 channel 来实现

goroutine 与 channel 结合使用案例:

https://image.xfeng.io/QmNT22wG73r73oehEFHTni7E1kozu81gsnqi7JyZ2zHScY.png

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main
import (
	"fmt"
)


//write Data
func writeData(intChan chan int) {
	for i := 1; i <= 50; i++ {
		//放入数据
		intChan<- i //
		fmt.Println("writeData ", i)
	}
	close(intChan) //关闭
}

//read data
func readData(intChan chan int, exitChan chan bool) {

	for {
		v, ok := <-intChan
		if !ok {
			break
		}
		fmt.Printf("readData 读到数据=%v\\n", v)
	}
	//readData 读取完数据后,即任务完成
	exitChan<- true
	close(exitChan)

}

func main() {

	//创建两个管道
	intChan := make(chan int, 10)
	exitChan := make(chan bool, 1)

	go writeData(intChan)
	go readData(intChan, exitChan)

	for {
		_, ok := <-exitChan
		if !ok {
			break
		}
	}
}

12 Go调度器

GO语言的调度器使用了三种结构:

G代表goroutine,每个 Goroutine 对应一个 G 结构体,G 存储 Goroutine 的运行堆栈、状态以及任务函数,可重用

M代表内核线程,代表着真正执行计算的资源,在绑定有效的 P 后,进入 schedule 循环;而 schedule 循环的机制大致是从 Global 队列、P 的 Local 队列以及 wait 队列中获取

P代表逻辑处理器,表示调度的上下文。可以把它看作是一个局部的调度器,让Go代码跑在一个单独的线程上。这是让Go从一个N:1调度器映射到一个M:N调度器的关键。

对 G 来说,P 相当于 CPU 核,G 只有绑定到 P 才能被调度。

对 M 来说,P 提供了相关的执行环境(Context),如内存分配状态(mcache),任务队列(G)等

P 的数量决定了系统内最大可并行的 G 的数量(前提:物理 CPU 核数 >= P 的数量)。

P 的数量由用户设置的 GoMAXPROCS 决定,但是不论 GoMAXPROCS 设置为多大,P 的数量最大为256

用经典的 地鼠推车搬砖 的模型来说明三者关系

https://image.xfeng.io/Qmbe5r4ZDDJth1wtS25K3auZUZSs2nLKaURabN9WmjeMZQ.png

地鼠的工作任务是:工地上有若干砖头,地鼠借助小车把砖头运送到火种上

13. 总结

本文介绍了与Golang并发相关的一些知识,从最开始的一些基础概念,包括:并行、并发和进程、线程、协程,到Golang并发的一些实际用法,包括:goroutine、channel、select、定时器和同步锁,也简单的介绍了runtime,最后介绍了Go的调度器模型。