Golang
主页 > 脚本 > Golang >

Go并发编程sync.Cond的使用

2022-05-03 | 酷站 | 点击:

简介

Go 标准库提供 Cond 原语的目的是,为等待 / 通知场景下的并发问题提供支持。Cond 通常应用于等待某个条件的一组 goroutine,等条件变为 true 的时候,其中一个 goroutine 或者所有的 goroutine 都会被唤醒执行。

Cond 是和某个条件相关,这个条件需要一组 goroutine 协作共同完成,在条件还没有满足的时候,所有等待这个条件的 goroutine 都会被阻塞住,只有这一组 goroutine 通过协作达到了这个条件,等待的 goroutine 才可能继续进行下去。

这个条件可以是我们自定义的 true/false 逻辑表达式。

但是 Cond 使用的比较少,因为在大部分场景下是可以被 Channel 和 WaitGroup 来替换的。

详细介绍

下面就是 Cond 的数据结构和对外提供的方法,Cond 内部维护了一个等待队列和锁实例。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

type Cond struct {

   noCopy noCopy

 

   // 锁

   L Locker

 

   // 等待队列

   notify  notifyList

   checker copyChecker

}

 

func NeWCond(l Locker) *Cond

func (c *Cond) Broadcast()

func (c *Cond) Signal()

func (c *Cond) Wait()

案例:Redis连接池

可以看一下下面的代码,使用了 Cond 实现一个 Redis 的连接池,最关键的代码就是在链表为空的时候需要调用 Cond 的 Wait 方法,将 gorutine 进行阻塞。然后 goruntine 在使用完连接后,将连接返回池子后,需要通知其他阻塞的 goruntine 来获取连接。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

package main

 

import (

   "container/list"

   "fmt"

   "math/rand"

   "sync"

   "time"

)

 

// 连接池

type Pool struct {

   lock    sync.Mutex // 锁

   clients list.List  // 连接

   cond    *sync.Cond // cond实例

   close   bool       // 是否关闭

}

 

// Redis Client

type Client struct {

   id int32

}

 

// 创建Redis Client

func NewClient() *Client {

   return &Client{

      id: rand.Int31n(100000),

   }

}

 

// 关闭Redis Client

func (this *Client) Close() {

   fmt.Printf("Client:%d 正在关闭", this.id)

}

 

// 创建连接池

func NewPool(maxConnNum int) *Pool {

   pool := new(Pool)

   pool.cond = sync.NewCond(&pool.lock)

 

   // 创建连接

   for i := 0; i < maxConnNum; i++ {

      client := NewClient()

      pool.clients.PushBack(client)

   }

 

   return pool

}

 

// 从池子中获取连接

func (this *Pool) Pull() *Client {

   this.lock.Lock()

   defer this.lock.Unlock()

 

   // 已关闭

   if this.close {

      fmt.Println("Pool is closed")

      return nil

   }

 

   // 如果连接池没有连接 需要阻塞

   for this.clients.Len() <= 0 {

      this.cond.Wait()

   }

 

   // 从链表中取出头节点,删除并返回

   ele := this.clients.Remove(this.clients.Front())

   return ele.(*Client)

}

 

// 将连接放回池子

func (this *Pool) Push(client *Client) {

   this.lock.Lock()

   defer this.lock.Unlock()

 

   if this.close {

      fmt.Println("Pool is closed")

      return

   }

 

   // 向链表尾部插入一个连接

   this.clients.PushBack(client)

 

   // 唤醒一个正在等待的goruntine

   this.cond.Signal()

}

 

// 关闭池子

func (this *Pool) Close() {

   this.lock.Lock()

   defer this.lock.Unlock()

 

   // 关闭连接

   for e := this.clients.Front(); e != nil; e = e.Next() {

      client := e.Value.(*Client)

      client.Close()

   }

 

   // 重置数据

   this.close = true

   this.clients.Init()

}

 

func main() {

 

   var wg sync.WaitGroup

 

   pool := NewPool(3)

   for i := 1; i <= 10; i++ {

      wg.Add(1)

      go func(index int) {

 

         defer wg.Done()

 

         // 获取一个连接

         client := pool.Pull()

 

         fmt.Printf("Time:%s | 【goruntine#%d】获取到client[%d]\n", time.Now().Format("15:04:05"), index, client.id)

         time.Sleep(time.Second * 5)

         fmt.Printf("Time:%s | 【goruntine#%d】使用完毕,将client[%d]放回池子\n", time.Now().Format("15:04:05"), index, client.id)

 

         // 将连接放回池子

         pool.Push(client)

      }(i)

   }

 

   wg.Wait()

}

运行结果:

Time:15:10:25 | 【goruntine#7】获取到client[31847]
Time:15:10:25 | 【goruntine#5】获取到client[27887]
Time:15:10:25 | 【goruntine#10】获取到client[98081]
Time:15:10:30 | 【goruntine#5】使用完毕,将client[27887]放回池子
Time:15:10:30 | 【goruntine#6】获取到client[27887]               
Time:15:10:30 | 【goruntine#10】使用完毕,将client[98081]放回池子
Time:15:10:30 | 【goruntine#7】使用完毕,将client[31847]放回池子 
Time:15:10:30 | 【goruntine#1】获取到client[31847]               
Time:15:10:30 | 【goruntine#9】获取到client[98081]               
Time:15:10:35 | 【goruntine#6】使用完毕,将client[27887]放回池子
Time:15:10:35 | 【goruntine#3】获取到client[27887]              
Time:15:10:35 | 【goruntine#1】使用完毕,将client[31847]放回池子
Time:15:10:35 | 【goruntine#4】获取到client[31847]              
Time:15:10:35 | 【goruntine#9】使用完毕,将client[98081]放回池子
Time:15:10:35 | 【goruntine#2】获取到client[98081]              
Time:15:10:40 | 【goruntine#3】使用完毕,将client[27887]放回池子
Time:15:10:40 | 【goruntine#8】获取到client[27887]              
Time:15:10:40 | 【goruntine#2】使用完毕,将client[98081]放回池子
Time:15:10:40 | 【goruntine#4】使用完毕,将client[31847]放回池子
Time:15:10:45 | 【goruntine#8】使用完毕,将client[27887]放回池子

注意点

看一下源码就知道了,因为 Wait 方法的执行逻辑是先将 goruntine 添加到等待队列中,然后释放锁,然后阻塞,等唤醒后,会继续加锁。如果在调用 Wait 前不加锁,但是里面会解锁,执行的时候就会报错。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

//

//    c.L.Lock()

//    for !condition() {

//        c.Wait()

//    }

//    ... make use of condition ...

//    c.L.Unlock()

//

func (c *Cond) Wait() {

   c.checker.check()

    

   // 添加到等待队列

   t := runtime_notifyListAdd(&c.notify)

   c.L.Unlock()

    

   // 阻塞

   runtime_notifyListWait(&c.notify, t)

   c.L.Lock()

}

就拿上面的 redis 连接案例来进行说明吧,我这里是使用了 for 循环来进行检测。如果将 for 循环改成使用 if,也就是只判断一次,会有什么问题?可以停下来先想想

上面说了调用者也可以使用 Broadcast 方法来唤醒 goruntine ,如果使用的是 Broadcast 方法,所有的 goruntine 都会被唤醒,然后大家都去链表中去获取 redis 连接了,就会出现部分 goruntine拿不到连接,

实际上没有那么多连接可以获取,因为每次只会放回一个连接到池子中。

1

2

3

4

5

6

7

8

// 如果连接池没有连接 需要阻塞

for this.clients.Len() <= 0 {

  this.cond.Wait()

}

 

// 获取连接

ele := this.clients.Remove(this.clients.Front())

return ele.(*Client)

原文链接:https://juejin.cn/post/7093041338836320292
相关文章
最新更新