图源:
数据竞态
在多线程编程中,遇到的最大麻烦就是当多个线程对同一个数据进行操作时,因为代码交错执行引发的一些问题:
package main
import (
"fmt"
"sync"
)
type bank struct {
amount int
}
func (b *bank) SaveMoney(amount int) {
b.amount += amount
}
func (b *bank) GetAmount() int {
return b.amount
}
func main() {
for i := 0; i < 10; i++ {
bankTest()
}
// bank amount: 300
// bank amount: 300
// bank amount: 300
// bank amount: 100
// bank amount: 300
// bank amount: 300
// bank amount: 300
// bank amount: 300
// bank amount: 300
// bank amount: 300
}
func bankTest() {
var b bank
var gwg sync.WaitGroup
gwg.Add(1)
go func() {
defer gwg.Done()
b.SaveMoney(100)
fmt.Println("bank amount: ", b.GetAmount())
}()
gwg.Add(1)
go func() {
defer gwg.Done()
b.SaveMoney(200)
}()
gwg.Wait()
}
这是根据《Go程序设计语言》中的示例代码改写的一个例子,这里构建了一个结构体bank
,代表一个共享的银行账户,可以存钱和查看余额。bankTest
是测试并发访问下可能出现问题的函数,主goroutine启动两个goroutine来分别存钱,并且其中一个goroutine会在存完钱后查看余额。
实际运行多次我们发现结果并不一致,这是因为两个goroutine是同时运行的,并非是顺序执行,所以是可能A线程存完100块后,B线程存了200,然后A线程再查看余额,此时就是300块。除了这种情况以外,也有可能是A全部执行完后,B再存钱,此时输出结果就是100块。当然也可能是B存完钱后A再执行,此时结果也是300块。
但可能的情况并不仅仅是这三种,因为计算机实际执行程序时是以底层的汇编指令为最小执行单元来执行的,并非是高级语言的单行代码,这在多线程编程中尤其致命。
比如b.amount += amount
这条代码,实际执行中可能会拆分成两条汇编语句:
c = b.amount + amount b.amount = c
也就是说+
运算和赋值运算可能并非是在一条汇编语句内完成的,这就会导致更诡异的结果,比如A线程执行到这里,刚完成+
运算,还没有来得及进行赋值操作,此时B线程完成了赋值操作,也就是说此时的b.amount
已经是200了。然后A再执行赋值的汇编语句,结果b.amount
又变成了100。也就是说B线程存的200块“不翼而飞”。
《Go程序设计语言》对此的一句评论相当有趣——“不要相信你在多线程编程时的直觉,因为那往往是错的”。
在学习Python时我也看到过类似的话,关于多线程编程最好的告诫就是——“不要多线程编程”。当然这并不是说不要用编写并发程序,而是说不要写传统的多线程编程,因为那样你会遇到很多麻烦,且很难排查和解决。事实上很多编程语言在语言层面尝试解决该问题,比如Python的全局线程锁,这可以看作是试图将多线程这头老虎关在笼子里的做法,在此基础上使用并发或异步都可以很好地解决并发问题。当然这也并非没有代价,但综合来看是相当值得的。与Python相比,Go语言的goroutine更像是传统的多线程编程,不过采用了其它方式来避免传统多线程编程的问题,在后面我们会详细说明。
上面示例中展现的这种,因为并发线程交错执行代码,并针对同一个变量进行访问产生的问题,称作是“数据竞态”,在这种情况下,并发访问普通变量是“并发不安全的”,而如果一个变量可以安全地并发访问(比如通道或者net.Connect
),则会被称为并发安全的。此外,如果一个类型的所有方法都可以安全地并发访问,则这种类型可以被视作是并发安全的。
解决数据竞态
针对数据竟态,有以下几种方式可以解决:
不写入数据
之所以会出现数据竟态,是因为多个线程并发访问共享数据时,有至少一个尝试写入,如果所有的并发线程都只尝试读取,而不写入,自然也就不存在数据竟态。
在编程中,我们经常会遇到一种“延迟初始化”的问题:
package main
type student struct {
name string
age int
}
type students struct {
stds map[string]*student
}
func (s *students) getStudent(name string) *student {
student, ok := s.stds[name]
if !ok {
student := initStudent(name)
s.stds[name] = student
}
return student
}
func initStudent(name string) *student {
return &student{}
}
func main() {
var ss students
go func() {
ss.getStudent("std1")
}()
go func() {
ss.getStudent("std2")
}()
}
通过结构体students
可以获取学生信息,但students
本身是延迟加载数据的,如果是单个goroutine程序,这么做没有任何问题,但如果是在多个goroutine中访问该结构体的实例就可能会遇到问题,即使他们访问的不是同一个map
的key
,因为map
底层的机制是哈希表,哈希表会进行空间扩展,并发地访问哈希表并写入数据,这谁也不知道会发生什么。
这种情况下最简单的一个解决方式是:在进行并发访问前先生成所有数据,再并发读取:
func main() {
var ss students
ss.getStudent("std1")
ss.getStudent("std2")
go func() {
ss.getStudent("std1")
}()
go func() {
ss.getStudent("std2")
}()
}
这里在并发访问前先调用getStudent
方法生成相应数据,这样稍后开启的多个goroutine只会从同一个map
中读取数据,并不涉及数据写入,这样就不会有数据竟态的问题。
通过通道共享数据
使用通道来传递共享数据是一个更常见的解决方式,Go语言有一句真理——“不要通过共享内存来通信,而应该通过通信来共享内存”。也就是说在多个线程共享访问同一个变量时,我们应当通过通道来访问这个变量的副本,而不是直接访问这个变量。
我们来看之前bank
的例子:
package main
import (
"fmt"
"sync"
"time"
)
type bank struct {
amount int
saveChan chan int
getChan chan int
closeChan chan struct{}
}
func (b *bank) Init() {
b.saveChan = make(chan int)
b.getChan = make(chan int)
b.closeChan = make(chan struct{})
}
func (b *bank) SaveMoney(amount int) {
b.saveChan <- amount
}
func (b *bank) GetAmount() int {
return <-b.getChan
}
func (b *bank) StartBank() {
for {
select {
case amount := <-b.saveChan:
b.amount += amount
case b.getChan <- b.amount:
case <-b.closeChan:
return
}
}
}
func (b *bank) Close() {
close(b.closeChan)
}
func main() {
for i := 0; i < 10; i++ {
bankTest()
}
// bank amount: 300
// bank amount: 300
// bank amount: 300
// bank amount: 100
// bank amount: 300
// bank amount: 300
// bank amount: 300
// bank amount: 300
// bank amount: 300
// bank amount: 300
}
func bankTest() {
var b bank
b.Init()
go b.StartBank()
var gwg sync.WaitGroup
gwg.Add(1)
go func() {
defer gwg.Done()
b.SaveMoney(100)
fmt.Println("bank amount: ", b.GetAmount())
}()
gwg.Add(1)
go func() {
defer gwg.Done()
b.SaveMoney(200)
}()
gwg.Wait()
b.Close()
time.Sleep(time.Second)
}
在这个例子中,仅有go b.StartBank
这个goroutine可以修改b
中的变量amount
,其它的并发goroutine仅可以通过GetAmount
或SaveMoney
方法通过通道来传递数据间接影响共享变量。实际运行这个例子会发现依然会出现多次运行结果不一致的情况,这也不难理解,因为理想情况是在A线程写入和读取结果的时候,B是不能操作银行的,而上面这个例子中显然不是这样的,写入和读取是两条不同的通道。
上面例子的遗留问题可以通过互斥锁来解决。
需要说明的是,通道并非仅仅能传递共享数据的拷贝,在某些情况下也可以传递共享数据指针,比如在中提到的那个蛋糕师和流水线的问题,对于蛋糕实例,每一个蛋糕当前仅有一个蛋糕师可以操作,所以我们完全可以在当前蛋糕师完成制作步骤后,将蛋糕指针通过通道传递给下一个蛋糕师,然后就不再操作该蛋糕,虽然在蛋糕的生命周期内,它会存在于各个蛋糕师那里,但在某一个时刻,仅有一个蛋糕师会操作该蛋糕,所以同样不会存在数据竟态的问题,这种情况可以被称作“串行受限”。
互斥锁
互斥锁其实是一个传统的并发解决方式,熟悉传统多线程编程的老司机应该不陌生,不过可能是叫做“资源锁”或者“线程锁”,但实质都是针对共享变量的一把锁,通过在操作前加锁,操作后解锁来实现并发情况下操作同一个变量不会出现数据竟态。
我们用互斥锁来改写上面的示例:
package main
import (
"fmt"
"sync"
)
type bank struct {
amountMutex sync.Mutex
amount int
}
func (b *bank) SaveMoney(amount int) {
b.amountMutex.Lock()
b.amount += amount
b.amountMutex.Unlock()
}
func (b *bank) GetAmount() int {
b.amountMutex.Lock()
defer b.amountMutex.Unlock()
return b.amount
}
func main() {
for i := 0; i < 10; i++ {
bankTest()
}
// bank amount: 300
// bank amount: 300
// bank amount: 300
// bank amount: 100
// bank amount: 300
// bank amount: 300
// bank amount: 300
// bank amount: 300
// bank amount: 300
// bank amount: 300
}
func bankTest() {
var b bank
var gwg sync.WaitGroup
gwg.Add(1)
go func() {
defer gwg.Done()
b.SaveMoney(100)
fmt.Println("bank amount: ", b.GetAmount())
}()
gwg.Add(1)
go func() {
defer gwg.Done()
b.SaveMoney(200)
}()
gwg.Wait()
}
通过添加互斥锁,我们就可以让并发的多个goroutine安全地读写共享变量,而不用担心之前某些情况下钱会不翼而飞的问题。
假如我们要添加一个取钱的操作:
func (b *bank) WithDraw(amount int) bool {
b.SaveMoney(-amount)
if b.GetAmount() < 0 {
b.SaveMoney(amount)
return false
}
return true
}
显然,在执行这段操作时需要互斥地占有共享变量b.amount
,所以可能会很自然地添加上互斥锁:
func (b *bank) WithDraw(amount int) bool {
b.amountMutex.Lock()
defer b.amountMutex.Unlock()
b.saveMoney(-amount)
...
}
但这样是不正确的,因为同一个互斥锁是不能重复加锁的,也就是说在withDraw
执行后,给互斥锁加锁,之后又调用了b.saveMoney()
,这个方法同样会尝试给同一个互斥锁加锁,而这个锁已经被加锁了,所以程序就只能阻塞在这里,不能继续执行,也就是说出现了“死锁”。这种情况是我们在多线程编程是要极力避免的。
这种情况可以用以下的方式避免:
type bank struct {
amountMutex sync.Mutex
amount int
}
func (b *bank) SaveMoney(amount int) {
b.amountMutex.Lock()
b.saveMoney(amount)
b.amountMutex.Unlock()
}
func (b *bank) saveMoney(amount int) {
b.amount += amount
}
func (b *bank) GetAmount() int {
b.amountMutex.Lock()
defer b.amountMutex.Unlock()
return b.getAmount()
}
func (b *bank) getAmount() int {
return b.amount
}
func (b *bank) WithDraw(amount int) bool {
b.amountMutex.Lock()
defer b.amountMutex.Unlock()
b.saveMoney(-amount)
if b.getAmount() < 0 {
b.saveMoney(amount)
return false
}
return true
}
这里将具体逻辑从对应方法中拆分出来,编写成了首字母小写的结构体内部无锁版本的方法,首字母大写的方法供外部调用,进行了加锁,而内部无锁版本的可以用于内部使用,这样可以确保安全地访问共享变量。
虽然这样看起来有点蠢,但挺实用。
互斥锁虽然看起来不错,但是实际上有个致命问题,通过添加互斥锁,对共享变量的一切访问都变成了互斥的、串行的行为,这当然会大大影响并发的效率,理论上,出了写入操作时应当绝对互斥,在只存在并发读取时是可以并发进行的,Go语言也为此提供了额外的一种锁:读写锁。
上面的示例改写为读写锁很容易:
type bank struct {
amountMutex sync.RWMutex
amount int
}
...
func (b *bank) GetAmount() int {
b.amountMutex.RLock()
defer b.amountMutex.RUnlock()
return b.getAmount()
}
其它部分保持不变即可,因为Lock
和UnLock
就表示添加和解除写锁。现在GetAmount
就可以满足大量goroutine的并发读取操作了。
需要注意的是,这样并不是没有代价的,读写锁比普通的互斥锁有更复杂的薄帐操作,所以只有在大量并发读取共享变量的情况下比普通的互斥锁有优势。
延迟初始化
之前我们提到的那个延迟初始化的例子,也可以用互斥锁来改写:
type students struct {
stdsMutex sync.Mutex
stds map[string]*student
}
func (s *students) getStudent(name string) *student {
s.stdsMutex.Lock()
defer s.stdsMutex.Unlock()
student, ok := s.stds[name]
if !ok {
student := initStudent(name)
s.stds[name] = student
}
return student
}
也可以用读写锁来改善性能:
type students struct {
stdsMutex sync.RWMutex
stds map[string]*student
}
func (s *students) getStudent(name string) *student {
s.stdsMutex.RLock()
student, ok := s.stds[name]
s.stdsMutex.RUnlock()
if !ok {
s.stdsMutex.Lock()
student, ok = s.stds[name]
if !ok {
student := initStudent(name)
s.stds[name] = student
}
s.stdsMutex.Unlock()
}
return student
}
需要注意的是,在添加互斥锁后二次检查key
值是否存在是有意义的,因为读锁解除和添加互斥锁这之间是有可能被其它goroutine完成初始化的。
如果我们可以一次性完成初始化,可以用另一种方式来编写类似的代码:
type students struct {
stdsMutex sync.Once
stds map[string]*student
}
func (s *students) GetStudent(name string) *student {
s.stdsMutex.Do(s.initAll)
return s.stds[name]
}
func (ss *students) initAll() {
ss.stds["std1"] = initStudent("std1")
ss.stds["std2"] = initStudent("std2")
}
sync.Once
可以保证GetStudent
方法第一次被调用时,互斥地通过s.initAll
函数实现数据初始化,之后GetStudent
就可以像读锁那样并发被调用。
文章评论