Go语言并发编程基础

多线程(1)

Go线程基本用法

使用go关键字便可以创建一个新的Go线程(goroutine),goruntine是由Go运行时进行管理的轻量级线程,执行

1
go f( x , y , z )

会启动一个新的Go线程并执行

1
f( x , y , z )

其中f,x,y和z的求值发生在当前的线程中,而f的执行发生在新的线程中

Go线程在相同的地址空间中运行,因此在访问共享的内存时必须进行同步

示例1:goroutine.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package main

import (
"fmt"
"time"
)

func say(s string) {
for i := 0; i < 5; i++ {
time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
}

func main() {
go say("world")
say("hello")
}

编译运行

1
2
3
4
5
6
7
8
9
10
world
hello
hello
world
world
hello
hello
world
world
hello

可以看到两个线程是并行的

信道

信道的基本用法

既然有了多线程,那么程序要如何在多个线程之间通信,我们需要一种在多个线程之间交换数据的手段,这便是信道

信道是带有类型的管道,我们可以使用<-操作符来使用管道发送或接收值

1
2
ch <- v //将v发送至信道ch
v := <-ch //从ch接收值并赋值给v

与映射和切片一样,信道在使用前必须先创建

1
ch := make(chan int) //声明一个存储int类型的信道ch并初始化

在默认情况下,发送和接收操作在另一端准备好之前都会阻塞,这样可以保证在没有显式锁或条件变量(condition variables)的情况下保持同步

示例2:channels.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import "fmt"

func sum(s []int, c chan int) {
sum := 0
for _, v := range s{
sum += v
}
c <-sum //将sum送入c
}

func main() {
s := []int{7,2,8,-9,4,0}

c := make(chan int)
go sum(s[:len(s)/2], c)
go sum(s[len(s)/2:], c)
x, y := <-c, <-c

fmt.Println( x, y, x+y )
}

编译运行

1
-5 17 12

信道可以是带缓冲的,同样使用make()我们将缓冲长度作为第二个参数来初始化一个带缓冲信道

1
ch := make(chan int, 100)

上面说过在默认情况下多个线程是同步的,而使用带缓冲的信道则可以实现线程之间的异步:

  • 仅当信道的缓冲区被填满后,再向其发送数据才会阻塞
  • 当信道缓冲区为空时,接收方会阻塞

示例3:buffered-channels.go

1
2
3
4
5
6
7
8
9
10
11
package main

import "fmt"

func main() {
ch := make(chan int,2)
ch <- 1
ch <- 2
fmt.Println(<-ch)
fmt.Println(<-ch)
}

编译运行

1
2
1
2

示例中初始化了一个缓冲区长度为2的信道,我们向这个信道同时发送了两个数据而没有阻塞


对示例进行一下修改

1
2
3
4
5
6
7
8
func main() {
ch := make(chan int,2)
ch <- 1
ch <- 2
ch <- 3
fmt.Println(<-ch)
fmt.Println(<-ch)
}

编译运行

1
fatal error: all goroutines are asleep - deadlock!

可以看到在缓冲区被填满后向信道发送数据的行为会报错。同样的,从一个空信道中读取数据也会报错

1
2
3
4
5
6
func main() {
ch := make(chan int,2)
ch <- 1
fmt.Println(<-ch)
fmt.Println(<-ch)
}

编译运行

1
2
1
fatal error: all goroutines are asleep - deadlock!

信道的遍历和关闭

信道可以在发送者没有需要发送的值后通过close被关闭。接收者可以通过为接收表达式分配第二个参数来测试信道是否被关闭,如果没有值可接收且信道已被关闭,则在执行完

1
v , ok := <-ch

之后,ok的值会被设置为false

遍历一个信道中缓存的所有值可以使用for循环,形式与一般的使用range的循环并没有区别

1
for i := range c

循环会不断地从信道中接收值,直到这个信道被关闭

注意

只有发送者才能关闭通道,而接收者不能

向一个关闭的信道发送数据会引发程序恐慌(panic)

信道与文件不同,通常情况下无需关闭它们,只有在必须告诉接收者不再有需要发送的值时3才有必要关闭,例如终止一个range循环

示例4:range-and-close.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package main

import "fmt"

func fibonacci(n int,c chan int) {
x, y := 0, 1
for i := 0; i < n; i++ {
c <- x
x, y = y , x + y
}
close(c)
}

func main() {
c := make(chan int, 10)
go fibonacci(cap(c), c)
for i := range c {
fmt.Println(i)
}
}

编译运行

1
2
3
4
5
6
7
8
9
10
0
1
1
2
3
5
8
13
21
34

多线程(2)

select的用法

使用select语句可以使线程等待多个通信操作,select会阻塞直到其中的某个分支可以继续执行为止,这时便会执行该分支

1
2
3
4
5
6
7
8
9
10
11
12
13
select {
case case1:
...
case case2:
...

...

case casen:
...
default:
...
}

当多个分支都准备好时会随机选择一个执行

示例5:select.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import "fmt"

func fibonacci(c, quit chan int) {
x, y := 0, 1
for {
select {
case c <- x:
x, y = y, x+y
case <-quit:
fmt.Println("quit")
return
}
}
}

func main() {
c := make(chan int)
quit := make(chan int)
go func() {
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
quit <- 0
}()
fibonacci(c, quit)
}

编译运行

1
2
3
4
5
6
7
8
9
10
11
0
1
1
2
3
5
8
13
21
34
quit

当select中的其他分支均未准备好时,会执行default分支,使用这种方法可以在尝试发送或接收时不发生阻塞

1
2
3
4
5
6
select {
case i := <-c:
// 使用i
default:
// 从c中接收会阻塞时执行
}

下面的示例就展示了这种用法

示例6:default-selection.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
"fmt"
"time"
)

func main() {
tick := time.Tick(100 * time.Millisecond)
boom := time.After(500 * time.Millisecond)
for {
select {
case <-tick:
fmt.Println("tick.")
case <-boom:
fmt.Println("BOOM!")
return
default:
fmt.Println(" .")
time.Sleep(50 * time.Millisecond)
}
}
}

编译运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    .
.
tick.
.
.
tick.
.
.
tick.
.
.
tick.
.
.
BOOM!

练习

练习1:等价二叉查找树

不同二叉树的叶子节点上可以保存相同的值序列,例如下面两隔二叉树都保存了序列‘1,1,2,3,5,8,13’
二叉树
在大多数语言中检查两个二叉树是否保存了相同序列的函数都相当复杂。

我们将使用Go的并发和信道来编写一个简单的解法


本例使用了tree包,他定义了类型

1
2
3
4
5
type Tree struct {
Left *Tree
Value int
Right *Tree
}

你需要

  1. 实现Walk函数

  2. 测试Walk函数

    函数tree.New(k)用于构造一个随机结构的已排序二叉查找树,它保存了值‘k,2k,3k,…,10k’

    创建一个新的信道ch并且对其进行步进

    1
    go Walk(tree.New(1), ch)

    然后从信道中读取并打印10个值,它们应当是1,2,3……10

  3. 用Walk实现Same函数来检测t1和t2是否存储了相同的值

  4. 测试Same函数

    Same(tree.New(1),tree.New(1))应当返回true,反之Same(tree.New(1),tree.New(2))应当返回false

Tree的具体文档可以在这里找到

练习1:exercise-equivalent-binary-trees.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main

import (
"golang.org/x/tour/tree"
"fmt"
)


// Walk 步进 tree t 将所有的值从 tree 发送到 channel ch。
func Walk(t *tree.Tree, ch chan int) {
if t == nil {
return
} else {
Walk(t.Left, ch)
ch <- t.Value
Walk(t.Right, ch)
}
return
}

// Same 检测树 t1 和 t2 是否含有相同的值。
func Same(t1, t2 *tree.Tree) bool {
ch1, ch2 := make(chan int), make(chan int)
go Walk(t1, ch1)
go Walk(t2, ch2)
for {
if <-ch1 == <-ch2 {
return true
}
return false
}
}

func main() {
t := tree.New(1)
ch := make(chan int)
go Walk(t, ch)
for i := 0; i < 10; i++ {
fmt.Println(<-ch)
}
fmt.Println(Same(tree.New(1), tree.New(1)))
fmt.Println(Same(tree.New(1), tree.New(2)))
}

编译运行

1
2
3
4
5
6
7
8
9
10
11
12
1
2
3
4
5
6
7
8
9
10
true
false

练习2:Web爬虫

在这个练习中,我们将会使用Go的并发特性来并行化一个Web爬虫。

  • 修改Crawl函数来并行地抓取URL,并且保证不重复

提示:

你可以用一个map来缓存已经获取的URL,但是要注意map本身并不是并发安全的


  • 什么是并发安全

在多个线程访问同一个对象时,如果不需要考虑这些线程的运行时环境下的调度和交替执行,也不需要额外同步,或者在调用方进行任何其他操作,调用这个对象的行为都可以获得正确的结果,那么这个对象就是线程安全的

Go的标准库中提供了互斥锁来保证变量的并发安全,它提供给我们一个sync.Mutex互斥锁类型及其两个方法:Lock和Unlock

通过在代码前调用Lock方法,在代码后调用Unlock方法便可以保证一段代码的互斥执行,这样同一时刻就只有一个go线程可以访问对象


练习2:exercise-web-crawler.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package main

import (
"fmt"
"sync"
)

type Fetcher interface {
// Fetch 返回 URL 的 body 内容,并且将在这个页面上找到的 URL 放到一个 slice 中。
Fetch(url string) (body string, urls []string, err error)
}

type Cache struct {
cache map[string]int
mux sync.Mutex
wg sync.WaitGroup
}

var c = Cache{cache: make(map[string]int)}

// Crawl 使用 fetcher 从某个 URL 开始递归的爬取页面,直到达到最大深度。
func Crawl(url string, depth int, fetcher Fetcher) {
// TODO: 并行的抓取 URL。
// TODO: 不重复抓取页面。
// 下面并没有实现上面两种情况:
defer c.wg.Done()
if depth <= 0 {
return
}
c.mux.Lock()
c.cache[url]++
c.mux.Unlock()
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("found: %s %q\n", url, body)
for _, u := range urls {
c.mux.Lock()
if _, ok := c.cache[u]; !ok {
c.wg.Add(1)
go Crawl(u, depth-1, fetcher)
}
c.mux.Unlock()
}
return
}

func main() {
c.wg.Add(1)
Crawl("https://golang.org/", 4, fetcher)
c.wg.Wait()
}

// fakeFetcher 是返回若干结果的 Fetcher。
type fakeFetcher map[string]*fakeResult

type fakeResult struct {
body string
urls []string
}

func (f fakeFetcher) Fetch(url string) (string, []string, error) {
if res, ok := f[url]; ok {
return res.body, res.urls, nil
}
return "", nil, fmt.Errorf("not found: %s", url)
}

// fetcher 是填充后的 fakeFetcher。
var fetcher = fakeFetcher{
"https://golang.org/": &fakeResult{
"The Go Programming Language",
[]string{
"https://golang.org/pkg/",
"https://golang.org/cmd/",
},
},
"https://golang.org/pkg/": &fakeResult{
"Packages",
[]string{
"https://golang.org/",
"https://golang.org/cmd/",
"https://golang.org/pkg/fmt/",
"https://golang.org/pkg/os/",
},
},
"https://golang.org/pkg/fmt/": &fakeResult{
"Package fmt",
[]string{
"https://golang.org/",
"https://golang.org/pkg/",
},
},
"https://golang.org/pkg/os/": &fakeResult{
"Package os",
[]string{
"https://golang.org/",
"https://golang.org/pkg/",
},
},
}

编译运行

1
2
3
4
5
found: https://golang.org/ "The Go Programming Language"
not found: https://golang.org/cmd/
found: https://golang.org/pkg/ "Packages"
found: https://golang.org/pkg/os/ "Package os"
found: https://golang.org/pkg/fmt/ "Package fmt"
作者

Luc_41

发布于

2020-03-29

更新于

2020-04-04

许可协议

评论