Go Sync 包使用

sync 包中的主要同步原语

在 Go 语言中,sync 包提供了用于并发编程的同步原语,帮助我们管理 goroutine 之间的协作和数据共享。理解并正确使用这些原语对于编写高效且并发安全的 Go 程序至关重要。

sync 包中包含了多种用于不同同步场景的类型:

1. sync.Mutex:互斥锁

Mutex(Mutual Exclusion Lock,互斥锁)是 Go 中最基本的同步原语,用于保护共享资源,确保在任何给定时间只有一个 goroutine 可以访问该资源。这有效地防止了竞争条件(race condition)。

工作原理:

  • Lock():获取锁。如果锁已被其他 goroutine 持有,当前 goroutine 将阻塞,直到获取到锁。
  • Unlock():释放锁。释放后,其他等待的 goroutine 可以尝试获取锁。

使用场景:

  • 保护共享内存区域,如全局变量、结构体字段、mapslice
  • 防止并发修改导致的数据损坏或不一致。

示例:保护共享计数器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
"fmt"
"sync"
"time"
)

var (
counter int
mu sync.Mutex
)

func increment() {
mu.Lock() // 获取锁
defer mu.Unlock() // 确保在函数退出时释放锁
counter++
fmt.Printf("Incremented: %d\n", counter)
}

func main() {
var wg sync.WaitGroup

for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 1000; j++ {
increment()
}
}()
}

wg.Wait()
fmt.Printf("Final Counter: %d\n", counter)
}

注意事项:

  • 始终使用 defer mu.Unlock() 来确保锁在操作完成后被释放,即使发生 panic。
  • 避免死锁:不要在持有锁的情况下尝试获取同一个锁。
  • 粒度要适中:锁定的代码块应该尽可能小,以最大化并发性,但同时要确保所有对共享资源的访问都被保护。

2. sync.RWMutex:读写互斥锁

RWMutex(Reader-Writer Mutual Exclusion Lock,读写互斥锁)是 Mutex 的一个更高级版本,它区分了读操作和写操作。

工作原理:

  • 写锁 (Lock()/Unlock()): 当一个 goroutine 持有写锁时,所有其他 goroutine(无论是读还是写)都会被阻塞,直到写锁被释放。
  • 读锁 (RLock()/RUnlock()): 多个 goroutine 可以同时持有读锁。只要没有写锁被持有,任意数量的读锁都可以被获取。如果有一个写锁正在等待,新的读锁请求也会被阻塞,以防止写饥饿。

使用场景:

  • 当读操作远多于写操作时。例如,缓存系统、配置读取等。
  • 它可以显著提高并发读取的性能,因为读操作不会互相阻塞。

示例:保护共享数据结构(读多写少)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package main

import (
"fmt"
"sync"
"time"
)

var (
data map[string]string
rwMu sync.RWMutex
)

func init() {
data = make(map[string]string)
}

func readData(key string) {
rwMu.RLock() // 获取读锁
defer rwMu.RUnlock() // 释放读锁
fmt.Printf("Read: %s = %s\n", key, data[key])
}

func writeData(key, value string) {
rwMu.Lock() // 获取写锁
defer rwMu.Unlock() // 释放写锁
data[key] = value
fmt.Printf("Write: %s = %s\n", key, value)
}

func main() {
var wg sync.WaitGroup

// 多个 goroutine 并发读取
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
time.Sleep(time.Duration(id) * 10 * time.Millisecond) // 错开时间
readData("name")
}(i)
}

// 一个 goroutine 进行写操作
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(50 * time.Millisecond) // 等待一部分读操作完成
writeData("name", "GoLang")
}()

// 更多读操作
for i := 5; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
time.Sleep(time.Duration(id) * 10 * time.Millisecond)
readData("name")
}(i)
}

wg.Wait()
fmt.Println("Done.")
}

注意事项:

  • 如果写操作频繁,RWMutex 的优势就不明显,甚至可能因为额外的开销而比 Mutex 慢。
  • 写锁具有排他性,读锁之间可以共享。

3. sync.WaitGroup:等待组

WaitGroup 用于等待一组 goroutine 完成它们的执行。它提供了一种简单而有效的方式来同步主 goroutine 和一组子 goroutine。

工作原理:

  • Add(delta int):将内部计数器增加 delta。通常在启动 goroutine 之前调用 Add(1)
  • Done():将内部计数器减 1。通常在 goroutine 完成任务后通过 defer 调用。
  • Wait():阻塞当前 goroutine,直到内部计数器归零。

使用场景:

  • 主 goroutine 需要等待所有子 goroutine 完成任务才能继续执行。
  • 管理并发任务的生命周期。

示例:等待所有并发任务完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
"fmt"
"sync"
"time"
)

func worker(id int, wg *sync.WaitGroup) {
defer wg.Done() // goroutine 完成时,通知 WaitGroup 计数器减一
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second) // 模拟工作
fmt.Printf("Worker %d finished\n", id)
}

func main() {
var wg sync.WaitGroup // 声明一个 WaitGroup

for i := 1; i <= 5; i++ {
wg.Add(1) // 每次启动一个 goroutine,计数器加一
go worker(i, &wg)
}

wg.Wait() // 阻塞主 goroutine,直到所有 worker goroutine 都调用 Done()
fmt.Println("All workers finished")
}

注意事项:

  • Add() 必须在 Wait() 之前调用,否则可能会导致 Wait() 提前返回。
  • 通常将 WaitGroup 作为指针传递给 goroutine 函数,以便它们可以修改同一个计数器。

4. sync.Once:单次执行

Once 类型用于确保某个操作只执行一次,即使在多个 goroutine 并发调用时也是如此。

工作原理:

  • Do(f func()):如果 f 还没有被执行过,那么 Once 会调用 f 并阻塞所有其他 goroutine,直到 f 执行完成。如果 f 已经执行过,后续对 Do 的调用会直接返回,不再执行 f

使用场景:

  • 初始化单例模式。
  • 只执行一次的配置加载。
  • 惰性初始化昂贵资源。

示例:单例初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
"fmt"
"sync"
"time"
)

type Singleton struct {
Name string
}

var (
instance *Singleton
once sync.Once
)

func GetSingletonInstance() *Singleton {
once.Do(func() {
fmt.Println("Initializing Singleton...")
time.Sleep(time.Second) // 模拟耗时初始化
instance = &Singleton{Name: "MyUniqueInstance"}
fmt.Println("Singleton initialized.")
})
return instance
}

func main() {
var wg sync.WaitGroup

for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
s := GetSingletonInstance()
fmt.Printf("Goroutine got instance: %p, Name: %s\n", s, s.Name)
}()
}

wg.Wait()
}

注意事项:

  • Do 方法内部的函数 f 应该是一个无参无返回值的函数。
  • Once 是线程安全的,可以放心在并发环境中使用。

5. sync.Cond:条件变量

Cond(Condition Variable,条件变量)允许 goroutine 在某个条件不满足时等待,并在条件满足时被通知唤醒。它通常与 sync.Mutex 结合使用。

工作原理:

  • NewCond(l Locker):创建一个新的条件变量,需要传入一个 sync.Locker 接口(通常是 *sync.Mutex)。
  • Wait():释放关联的锁,阻塞当前 goroutine,直到被 Signal()Broadcast() 唤醒。被唤醒后,会自动重新获取锁。
  • Signal():唤醒一个正在等待的 goroutine(如果有)。
  • Broadcast():唤醒所有正在等待的 goroutine。

使用场景:

  • 生产者-消费者模型中,当缓冲区为空时消费者等待,当缓冲区有数据时生产者通知消费者。
  • 资源池管理,当资源不足时等待,当资源释放时通知等待者。

示例:生产者-消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package main

import (
"fmt"
"sync"
"time"
)

var (
queue []int
mu sync.Mutex
cond *sync.Cond
)

func producer(id int) {
for i := 0; i < 5; i++ {
mu.Lock()
item := id*100 + i
queue = append(queue, item)
fmt.Printf("Producer %d produced: %d, current queue: %v\n", id, item, queue)
cond.Signal() // 通知一个等待的消费者
mu.Unlock()
time.Sleep(time.Millisecond * 200)
}
}

func consumer(id int) {
for {
mu.Lock()
for len(queue) == 0 {
fmt.Printf("Consumer %d waiting...\n", id)
cond.Wait() // 队列为空,等待条件变量通知
}
item := queue[0]
queue = queue[1:]
fmt.Printf("Consumer %d consumed: %d, remaining queue: %v\n", id, item, queue)
mu.Unlock()
time.Sleep(time.Millisecond * 500)
}
}

func main() {
cond = sync.NewCond(&mu) // 初始化条件变量,关联互斥锁

var wg sync.WaitGroup

// 启动两个生产者
for i := 0; i < 2; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
producer(id)
}(i + 1)
}

// 启动两个消费者
for i := 0; i < 2; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
consumer(id) // 消费者会一直运行,这里为了演示不设定退出条件
}(i + 1)
}

// 由于消费者会一直运行,这里通过时间来控制主程序退出,实际应用中会有更好的退出机制
time.Sleep(5 * time.Second)
fmt.Println("Main finished.")
}

注意事项:

  • Cond 必须始终与 Mutex 配合使用。在调用 Wait() 之前,必须持有锁;Wait() 会自动释放锁并阻塞,被唤醒后会自动重新获取锁。
  • 通常使用 for 循环来检查条件,而不是 if,因为 goroutine 可能因假唤醒(spurious wakeup)而醒来,或者在它获得锁之前条件又变回不满足状态。

总结

sync 包为 Go 语言的并发编程提供了基础而强大的工具。正确地使用 sync.Mutexsync.RWMutexsync.WaitGroupsync.Oncesync.Cond,能够帮助开发者有效地管理共享资源,避免竞争条件,并构建健壮、高效的并发应用程序。在选择同步原语时,应根据具体的并发场景和对性能的需求来决定。在 Go 中,通道(chan)通常是更推荐的并发通信方式,但对于共享内存和复杂的同步逻辑,sync 包仍然是不可或缺的。