用一个例子来简单介绍保证并发安全的几种方式

Introduction

大家好,我是说的道理,今天来点大家想看的东西啊。

问题介绍

那么这边呢是我们的示例代码,在我自己做code review的时候我就发现了这边有很大概率会出现race condition的情况。但是我当时没管,因为我认为在我的应用场景下大概率碰不到这种情况。但是实际还是出现了,那么就来修复一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// todo: make it thread safe, now it's not, photos might be overwritten
func (s *srvImpl) AddPhotos(userID string, mealCreateAt time.Time, photo [][]byte) error {
fileIds := make([]storage.FileID, 0, len(photo))
for _, p := range photo {
fileId, err := s.storageApi.StoreAt("", "photo", p, storage.FileMeta{
Prefix: "",
Filename: "",
Length: 0,
})
if err != nil {
s.log.ErrorW("store photo failed", "err", err, "userID", userID, "createAt", mealCreateAt)
return foodiary.ErrFailToGetMeal
}
fileIds = append(fileIds, fileId)
}
meal, err := s.repo.FindMeal(userID, mealCreateAt)
if err != nil {
s.log.ErrorW("find meal failed", "err", err, "userID", userID, "createAt", mealCreateAt)
return foodiary.ErrMealNotFound
}
meal.Photos = append(meal.Photos, fileIds...)
err = s.repo.Upsert(meal)
if err != nil {
s.log.ErrorW("upsert meal failed", "err", err, "userID", userID, "createAt", mealCreateAt)
return foodiary.ErrFailModifyMeal
}
return nil
}

问题分析

先不提repo层的设计问题,单纯来看如何解决并发问题。实际上,问题出现在FindMealUpsert之间,如果两个线程同时执行这个函数,一个线程往Meal中添加图片A,另一个线程往同一个Meal里添加图片B,那么就很有可能出现只有图片A被添加,或者只有图片B被添加的情况。

所以要解决这个问题,就是要保证一个线程在FindMealUpsert这段完成之间,没有其他线程能够修改这个Meal

那么要实现这些功能,我们可以从如下几个方向思考:

  1. 锁 (Locking)
  2. 事务 (Transaction)
  3. 队列 (Queue)

锁 Locking

锁大致可以分为乐观锁和悲观锁,在这里我们会分别介绍

悲观锁

这个就是最常见的锁的概念,假设一定会发生冲突,所以在操作一个资源的时候,停止一切其他线程对这个资源的操作。

至于这个锁要怎么实现,那就取决与你自己的选择了。比如,如果你的资源需要在好多个服务间或者分布式系统中使用,那么可以使用类似redis的方式来实现分布式锁,这种方式可以在分布式环境下有效保证资源的一致性。如果只是单个应用,那么可以直接使用应用内存锁,例如使用sync.Mutex,来控制同一时刻只有一个线程能访问资源。

优缺点:

  • 优点:适用于高冲突场景,确保数据的一致性和正确性。任何时候只有一个线程可以操作资源,避免了并发修改导致的数据不一致问题。
  • 缺点:会阻塞其他线程的访问,可能会导致性能下降,尤其是在资源锁定时间长的情况下,降低系统的并发性。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
lockKey := fmt.Sprintf("meal_lock:%s:%d", userID, mealCreateAt.Unix())
lock, err := redisClient.Lock(lockKey, 5*time.Second)
if err != nil {
return err
}
defer lock.Unlock()

meal, err := s.repo.FindMeal(userID, mealCreateAt)
if err != nil {
return foodiary.ErrMealNotFound
}
meal.Photos = append(meal.Photos, fileIds...)
err = s.repo.Upsert(meal)
if err != nil {
return foodiary.ErrFailModifyMeal
}

乐观锁

简单来说就是假设每次拿数据的时候都认为没有其他线程会对同一个资源进行修改,所以不对该操作进行上锁。但是如果要修改资源,则先提交资源修改的请求,再确认这个资源是否被修改过了。如果没有被修改过,那就很好。如果被修改过了,那么就放弃本次操作,或者重新获取资源,并重试本次操作直到成功。

一般来说,实现乐观锁可以通过添加一个独立的可比较的字段实现,比如版本号、时间戳等。

比如使用版本号,可以在每次对数据进行更新的时候对版本号+1,在更新的时候判断这条数据是否还是原来的版本号,如果是的就更新,如果不是,就是有另外一个线程修改了这个资源。

优缺点

  • 优点:不需要持有锁,性能好,适用于低并发场景。
  • 缺点:如果冲突频繁,会导致不断重试,降低性能。

比如,我们可以通过如下代码实现乐观锁,当出现foodiary.ErrConflict的时候,对该操作进行重试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
meal, err := s.repo.FindMeal(userID, mealCreateAt)
if err != nil {
return foodiary.ErrMealNotFound
}

originalVersion := meal.Version
meal.Photos = append(meal.Photos, newFileIds...)
meal.Version++

err = s.repo.UpsertWithVersion(meal, originalVersion)
if err != nil {
if err == foodiary.ErrVersionConflict {
s.log.ErrorW("conflict detected while updating meal", "userID", userID, "createAt", mealCreateAt)
return foodiary.ErrConflict
}
return foodiary.ErrFailModifyMeal
}

事务 (Transaction)

事务是数据库管理中的概念,通常用来保证一组操作要么全部成功,要么全部失败回滚。事务在保证数据一致性方面非常有用,尤其是在多个操作需要原子性的时候。

例如,在处理Meal时,如果我们需要同时更新Meal的多个字段(如添加照片和更新描述),使用事务可以确保这些操作要么全部成功,要么全部失败,以保持数据的一致性。

但是在我们的场景中,事务并不完全适用。因为我们的问题是多个线程在同时尝试修改同一个Meal,而事务的作用是在单次操作中保证原子性,并不能防止多个线程竞争修改同一条数据。因此,单纯使用事务并不能彻底解决并发冲突的问题。

队列 (Queue)

另一种常见的解决并发修改问题的方法是使用队列。我们可以将每个对Meal的修改请求都放入一个队列中,确保所有的修改操作是按顺序执行的。

在Go语言中,我们可以使用chan来实现一个简单的队列。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type mealUpdateTask struct {
userID string
mealCreateAt time.Time
photos [][]byte
errChan chan error
}

tasksChan := make(chan mealUpdateTask)

func (s *srvImpl) processTasks() {
for task := range tasksChan {
err := s.addPhotosInternal(task.userID, task.mealCreateAt, task.photos)
task.errChan <- err
}
}

func (s *srvImpl) AddPhotos(userID string, mealCreateAt time.Time, photos [][]byte) error {
errChan := make(chan error)
tasksChan <- mealUpdateTask{userID, mealCreateAt, photos, errChan}
return <-errChan
}

这种方式通过一个单独的处理线程来串行化所有的对Meal的修改操作,确保不会有并发问题。当然,对于复杂的生产环境,我们可以引入消息队列系统,例如RabbitMQ、Kafka等,用来处理更多的请求和提高可靠性。

优缺点

  • 优点:保证操作顺序,避免并发问题。
  • 缺点:可能会带来延迟,队列的处理速度可能成为瓶颈。