1. 目录

  • 引言
  • 多线程中的锁技术
    • 自旋锁(spinlock)
    • 互斥锁
    • 递归锁
    • 条件锁
    • 读写锁
  • 参考资料

2. 多线程中的锁技术

在《多线程技术综述(一):通用知识介绍》中,我们介绍了多线程引发的数据安全问题(原子性、可见性、有序性),同时介绍了最主要的一种解决办法是「加锁」。

在了解加锁之前,需要引入两个关键概念:

  • 临界资源:一次仅允许一个线程访问的共享资源,如打印机、全局变量、缓冲区等。
  • 临界区:是指线程访问共享资源的那段代码。

加锁目的就是为了保证安全地访问临界资源,保证一次仅允许被一个线程访问。本文将iOS/Mac开发中常用的锁进行归纳和整理,总体而言可分为自旋锁、互斥锁、递归锁三种,下面是它们的详细介绍。

2.1 自旋锁(spinlock)

自旋锁是指,在加锁时,线程反复检查锁变量是否可用。优点是不需要阻塞线程,不会进入睡眠状态,避免了线程调度高昂的开销,适合等待锁时间较短的场景。缺点是一直循环检查锁会一直占用CPU,处于忙等状态(busy-wait),不适合如磁盘I/O等需要等待锁较长的应用场景。

下面是一个C++的自旋锁的实现逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class spin_lock final
{
public:
void lock() {
while (flag.test_and_set(memory_order_acquire));
}

void unlock() {
flag.clear(memory_order_release);
}

private:
atomic_flag flag;
};

可以看出,在加锁(lock)时有一个while循环让线程一直自旋,直到锁被释放才停止空转。

在iOS中,OSSpinLock为自旋锁,不过在iOS10后不再安全,因此不再详细介绍,官方建议用os_unfair_lock(一种互斥锁,等待锁时会进入睡眠状态)代替。下面是os_unfair_lock的使用例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class os_unfair_lock_demo {

private var money = 200
private var moneyLock: os_unfair_lock = os_unfair_lock_s()

func drawMoney() {
// 加锁
os_unfair_lock_lock(&moneyLock)

var oldMoney = money
sleep(1)
oldMoney -= 20
money = oldMoney

// 解锁
os_unfair_lock_unlock(&moneyLock)
}

func saveMoney() {
os_unfair_lock_lock(&moneyLock)

var oldMoney = money
sleep(1)
oldMoney += 50
money = oldMoney

os_unfair_lock_unlock(&moneyLock)
}
}

2.2 互斥锁

互斥锁在等待锁的时候会进入睡眠状态,只有在锁状态改变的时候会被唤醒,保证临界资源被单一线程独占。与自旋锁相比,优点是等待锁时不会空转,但是睡眠和唤醒等需要操作系统调度,需要一些额外的开销。因此,互斥锁在锁等待时间较长的场景下更具有性价比。下面是iOS/Mac中用的互斥锁。

pthread_mutex_t

pthreads(POSIX线程)标准下的互斥锁,较为底层,可在Linux、Windows等多种平台下运行。下面是它的使用案例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class pthread_mutex_demo {

private var money = 200
private var moneyMutex: pthread_mutex_t = pthread_mutex_t()

init() {
pthread_mutex_init(&moneyMutex, nil)
}

func saveMoney() {
pthread_mutex_lock(&moneyMutex)

var oldMoney = money
sleep(1)
oldMoney += 50
money = oldMoney

pthread_mutex_unlock(&moneyMutex)
}

func drawMoney() {
pthread_mutex_lock(&moneyMutex)

var oldMoney = money
sleep(1)
oldMoney -= 20
money = oldMoney

pthread_mutex_unlock(&moneyMutex)
}

deinit {
// 注意销毁
pthread_mutex_destroy(&moneyMutex)
}
}

NSLock

NSLock是对pthread_mutex_t进行上层封装,使其代码风格及使用方式更加简洁,且更符合iOS/Mac开发者的使用习惯。本文认为日常开发中应该尽可能使用这种高层接口,可以降低Bug的发生率。

首先介绍一下NSLock的主要函数或方法

  • NSLock 实现了NSLocking协议,因此具备了基本加锁和解锁能力,方法分别为lock()unlock()
  • open func `try`() -> Bool
    • 尝试加锁,成功的话获取锁并返回True,不成功的话返回False,无论如何都会继续执行,不阻塞线程。
  • open func lock(before limit: Date) -> Bool
    • 若获取不到锁,在截至时间内阻塞线程,超出后返回True/False。

下面是它的使用案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class NSLock_Demo {

private var money = 200
private var moneyLock = NSLock()

func saveMoney() {

moneyLock.lock()

var oldMoney = money
sleep(1)
oldMoney += 50
money = oldMoney

moneyLock.unlock()
}

func drawMoney() {
moneyLock.lock()

var oldMoney = money
sleep(1)
oldMoney -= 20
money = oldMoney

moneyLock.unlock()
}
}

对于lock()unlock()函数,只要了解过多线程,通过直觉就能明白该如何使用。try()稍微会有一些不一样,为了说清楚它的使用方法,本文设计了一个案例,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class NSLock_try_Demo {

private var money = 200 {
didSet {
print("money: \(money)")
}
}
private var moneyLock = NSLock()

func saveMoney(index: Int) {
moneyLock.lock()
print("index\(index):saveMoney.moneyLock.lock()")

var oldMoney = money
oldMoney += 50
money = oldMoney

moneyLock.unlock()
print("index\(index):saveMoney.moneyLock.unlock()")
}

func drawMoney(index: Int) {

if moneyLock.try() {
print("index\(index):drawMoney.moneyLock.try.success()")
var oldMoney = money
oldMoney -= 20
money = oldMoney
moneyLock.unlock()
print("index\(index):drawMoney.moneyLock.try.unlock()")
} else {
print("index\(index):drawMoney.moneyLock.try.fail")
}
}

class func run() {
let demo = NSLock_try_Demo()
for i in 0...5 {
DispatchQueue.global().async {
demo.saveMoney(index: i)
}
}

for i in 0...5 {
DispatchQueue.global().async {
demo.drawMoney(index: i)
}
}
}
}

执行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
index0:saveMoney.moneyLock.lock()
money: 250
index0:drawMoney.moneyLock.try.fail
index1:drawMoney.moneyLock.try.success()
money: 230
index1:drawMoney.moneyLock.try.unlock()
index2:saveMoney.moneyLock.lock()
money: 280
index2:drawMoney.moneyLock.try.fail
index2:saveMoney.moneyLock.unlock()
index4:drawMoney.moneyLock.try.success()
money: 260
index3:drawMoney.moneyLock.try.fail
index4:drawMoney.moneyLock.try.unlock()
index0:saveMoney.moneyLock.unlock()
index5:drawMoney.moneyLock.try.fail
index4:saveMoney.moneyLock.lock()
money: 310
index4:saveMoney.moneyLock.unlock()
index5:saveMoney.moneyLock.lock()
money: 360
index5:saveMoney.moneyLock.unlock()
index1:saveMoney.moneyLock.lock()
money: 410
index1:saveMoney.moneyLock.unlock()
index3:saveMoney.moneyLock.lock()
money: 460
index3:saveMoney.moneyLock.unlock()

可以看出,取钱调用try()只有两次成功获取到锁,不成功时直接执行try()后续的的代码。

2.3 递归锁

递归锁是一种特殊的互斥锁,允许同一线程多次获得同一把锁。下面用一个例子说明递归场景下普通互斥锁存在的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class NSLock_DeadLock_Demo {

var numbers = [1,2,3,4,5,6]
let lock = NSLock()

// 统计数组的总和
func countNumbers(startIndex: Int) -> Int {
// 上锁🔒,第二层递归就会一直等待锁。
lock.lock()
print("locked:\(startIndex)")
if startIndex == numbers.count - 1 {
let res = numbers[startIndex]
lock.unlock()
return res
}
// 递归进入下一层,未解锁
let res = numbers[startIndex] + countNumbers(startIndex: startIndex + 1)

lock.unlock()
return res
}

// 开始测试
class func run() {
let demo = NSLock_DeadLock_Demo()
let data = demo.countNumbers(startIndex: 0)
print(data)
}
}

执行NSLock_DeadLock_Demo.run()的输出结果为locked:0,第二层递归函数一直等待锁,且永远无法继续往下走,造成了死锁。

因此,在这种递归函数场景下,必须要让同一线程能够重复加锁,其他线程需要等待锁。只有在同一线程的解锁数与加锁数匹配时,其他线程才能获取锁。下面是iOS/Mac中常用的递归锁方法。

pthread_mutex_t

使用pthread_mutex_t作为递归锁时,需要在初始化时设置相应参数,较为繁琐,下面是一段示例代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

class pthread_mutex_t_recursive_demo {

private var recursiveMutex = pthread_mutex_t()
var numbers = [1,2,3,4,5,6]

// 初始化锁的属性
init() {
var attr = pthread_mutexattr_t()
pthread_mutexattr_init(&attr)
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE)

// 初始化递归锁
pthread_mutex_init(&recursiveMutex, &attr)

// 销毁属性
pthread_mutexattr_destroy(&attr)
}

// 统计数组的总和
func countNumbers(startIndex: Int) -> Int {
// 上锁🔒,同一线程前提下,第二层递归可以直接进入锁
pthread_mutex_lock(&recursiveMutex)
print("locked:\(startIndex), thread: \(Thread.current)")
if startIndex == numbers.count - 1 {
let res = numbers[startIndex]
pthread_mutex_unlock(&recursiveMutex)
return res
}
// 递归进入下一层,未解锁
let res = numbers[startIndex] + countNumbers(startIndex: startIndex + 1)

// 解锁
pthread_mutex_unlock(&recursiveMutex)
return res
}

// 开始测试
class func run() {

let demo = pthread_mutex_t_recursive_demo()

DispatchQueue.global().async {
let data = demo.countNumbers(startIndex: 0)
print("data:\(data), thread: \(Thread.current)")
print("\n")
}

DispatchQueue.global().async {
let data = demo.countNumbers(startIndex: 0)
print("data:\(data), thread: \(Thread.current)")
print("\n")
}
}
}

执行pthread_mutex_t_recursive_demo.run()后,输出结果为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
locked0, thread: <NSThread: 0x600000e9f040>{number = 2, name = (null)}
locked1, thread: <NSThread: 0x600000e9f040>{number = 2, name = (null)}
locked2, thread: <NSThread: 0x600000e9f040>{number = 2, name = (null)}
locked3, thread: <NSThread: 0x600000e9f040>{number = 2, name = (null)}
locked4, thread: <NSThread: 0x600000e9f040>{number = 2, name = (null)}
locked5, thread: <NSThread: 0x600000e9f040>{number = 2, name = (null)}
data:21, thread: <NSThread: 0x600000e9f040>{number = 2, name = (null)}


locked0, thread: <NSThread: 0x600000ed23c0>{number = 3, name = (null)}
locked1, thread: <NSThread: 0x600000ed23c0>{number = 3, name = (null)}
locked2, thread: <NSThread: 0x600000ed23c0>{number = 3, name = (null)}
locked3, thread: <NSThread: 0x600000ed23c0>{number = 3, name = (null)}
locked4, thread: <NSThread: 0x600000ed23c0>{number = 3, name = (null)}
locked5, thread: <NSThread: 0x600000ed23c0>{number = 3, name = (null)}
data:21, thread: <NSThread: 0x600000ed23c0>{number = 3, name = (null)}
可以看出,结果正常了,与我们的预期相符。当第一个线程0x600000e9f040加锁后,第二个线程0x600000ed23c0一直在等待锁,直到第一个线程的递归函数完全结束并释放完它所有的锁。

NSRecursiveLock

NSRecursiveLock是更加简单直观,且更符合iOS开发者直觉的递归锁。因此,在日常开发中,本文更推荐中使用它而不是pthread_mutex_t。下面是前面例子的NSRecursiveLock版本示例代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class NSRecursiveLock_Demo {

private let recursiveLock = NSRecursiveLock()
var numbers = [1,2,3,4,5,6]

// 统计数组的总和
func countNumbers(startIndex: Int) -> Int {
// 上锁🔒,第二层递归就会一直等待锁。
recursiveLock.lock()
print("locked:\(startIndex), thread: \(Thread.current)")
if startIndex == numbers.count - 1 {
let res = numbers[startIndex]
recursiveLock.unlock()
return res
}
// 递归进入下一层,未解锁
let res = numbers[startIndex] + countNumbers(startIndex: startIndex + 1)
recursiveLock.unlock()
return res
}

// 开始测试
class func run() {

let demo = NSRecursiveLock_Demo()

DispatchQueue.global().async {
let data = demo.countNumbers(startIndex: 0)
print("data:\(data), thread: \(Thread.current)")
print("\n")
}

DispatchQueue.global().async {
let data = demo.countNumbers(startIndex: 0)
print("data:\(data), thread: \(Thread.current)")
print("\n")
}
}
}
执行NSRecursiveLock_Demo.run()后,输出结果如下,与前文中pthread_mutex_t递归锁的运行结果一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
locked0, thread: <NSThread: 0x600000f97300>{number = 2, name = (null)}
locked1, thread: <NSThread: 0x600000f97300>{number = 2, name = (null)}
locked2, thread: <NSThread: 0x600000f97300>{number = 2, name = (null)}
locked3, thread: <NSThread: 0x600000f97300>{number = 2, name = (null)}
locked4, thread: <NSThread: 0x600000f97300>{number = 2, name = (null)}
locked5, thread: <NSThread: 0x600000f97300>{number = 2, name = (null)}
data:21, thread: <NSThread: 0x600000f97300>{number = 2, name = (null)}


locked0, thread: <NSThread: 0x600000ff0200>{number = 3, name = (null)}
locked1, thread: <NSThread: 0x600000ff0200>{number = 3, name = (null)}
locked2, thread: <NSThread: 0x600000ff0200>{number = 3, name = (null)}
locked3, thread: <NSThread: 0x600000ff0200>{number = 3, name = (null)}
locked4, thread: <NSThread: 0x600000ff0200>{number = 3, name = (null)}
locked5, thread: <NSThread: 0x600000ff0200>{number = 3, name = (null)}
data:21, thread: <NSThread: 0x600000ff0200>{number = 3, name = (null)}
2.4 条件锁

在了解条件锁之前,先来了解一下经典的「生产者-消费者模型」。下图是该模型的示意图。多个生产者线程负责生产数据并放入到缓冲队列中,多个消费者线程负责从缓冲队列中取数据消费。

要完成这个需求,需要满足以下几点:

  • 保证生产者只在缓冲队列不满的前提下,生产并放入数据到缓冲数据中。
  • 保证消费者只在缓冲队列不为空的前提下,从队列取出并消费数据。
  • 当缓冲队列满时,生产者线程进入休眠状态,等待消费者消费数据后被唤醒。
  • 当缓冲队列空时,消费者线程进入休眠状态,等待生产者生产数据后被唤醒。

下面是这个案例的Swift代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65

class NSCondition_Demo {

private let condition = NSCondition()

var buffer: [Int] = []
let capacity = 5

// 生产者
func producer(index: Int) {
condition.lock()

while buffer.count == capacity {
print("(ID: \(index)) producer.condition.wait()")
// 等待时,会暂时释放锁,让其他等待锁的线程进入。
condition.wait()
}
let newData = index
buffer.append(Int(newData))
print("(ID: \(index)) 生产:")
print(buffer)
print("(ID: \(index)) producer.condition.broadcast()")
// 唤醒其他在等待中的线程
condition.broadcast()

condition.unlock()
}

// 消费者
func consumer(index: Int) {
condition.lock()

while buffer.count == 0 {
print("(ID: \(index)) consumer.condition.wait()")
// 等待时,会暂时释放锁,让其他等待锁的线程进入。
condition.wait()
}

buffer.removeLast()
print("(ID: \(index)) 消费:")
print(buffer)
print("(ID: \(index)) consumer.condition.broadcast()")
// 唤醒其他在等待中的线程
condition.broadcast()

condition.unlock()
}

class func run() {
let demo = NSCondition_Demo()
for i in 0...5 {
DispatchQueue.global().async {
sleep(2)
demo.producer(index: i)
}
}

for i in 0...5 {
DispatchQueue.global().async {
sleep(1)
demo.consumer(index: i)
}
}
}
}

在上面例子中,用了条件锁NSCondition,它的几个关键方法及解释如下:

  • 实现了NSLocking中lockunlock方法,因此具备基本的锁功能
  • func wait()
    • 线程进入阻塞状态,休眠,暂时放开持有的锁
  • func signal()
    • 唤醒一个因调用wait()进入等待的线程
  • func broadcast()
    • 唤醒所有因调用wait()进入等待的线程

运行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
(ID: 1) consumer.condition.wait()
(ID: 0) consumer.condition.wait()
(ID: 2) consumer.condition.wait()
(ID: 4) consumer.condition.wait()
(ID: 3) consumer.condition.wait()
(ID: 5) consumer.condition.wait()
(ID: 5) 生产:
[5]
(ID: 5) producer.condition.broadcast()
(ID: 1) 消费:
[]
(ID: 1) consumer.condition.broadcast()
(ID: 0) consumer.condition.wait()
(ID: 2) consumer.condition.wait()
(ID: 4) consumer.condition.wait()
(ID: 3) consumer.condition.wait()
(ID: 5) consumer.condition.wait()
(ID: 0) 生产:
[0]
(ID: 0) producer.condition.broadcast()
(ID: 1) 生产:
[0, 1]
(ID: 1) producer.condition.broadcast()
(ID: 2) 生产:
[0, 1, 2]
(ID: 2) producer.condition.broadcast()
(ID: 4) 生产:
[0, 1, 2, 4]
(ID: 4) producer.condition.broadcast()
(ID: 0) 消费:
[0, 1, 2]
(ID: 0) consumer.condition.broadcast()
(ID: 3) 生产:
[0, 1, 2, 3]
(ID: 3) producer.condition.broadcast()
(ID: 2) 消费:
[0, 1, 2]
(ID: 2) consumer.condition.broadcast()
(ID: 4) 消费:
[0, 1]
(ID: 4) consumer.condition.broadcast()
(ID: 3) 消费:
[0]
(ID: 3) consumer.condition.broadcast()
(ID: 5) 消费:
[]
(ID: 5) consumer.condition.broadcast()

从结果可以得到以下几个结论:

  • 在代码中,消费者线程睡眠时间较短(1秒),因此先执行。但是缓冲队列中为空,因此消费者线程进入等待睡眠状态。
  • 睡眠2秒后,生产者线程开始执行,生产数据后唤醒睡眠中的消费者线程去消费,后面的逻辑以此类推。
  • 最后通过输出可以看出,生产和消费各6次,与预期一致。
2.5 读写锁

为了满足「多读单写」这个需求,前面我们已经用栅栏(barrier)实现过了。下面介绍一种pthread中的读写锁,首先介绍,一下基本的方法:

  • pthread_rwlock_init
    • 初始化读写锁
  • pthread_rwlock_rdlock
    • 获取读锁。若写锁已加锁,则读锁需要阻塞等待,否则可以多次获得读锁。
  • pthread_rwlock_wrlock
    • 获取写锁,若当前读写锁被持有,则阻塞等待,否则获取成功。
  • pthread_rwlock_unlock
    • 解除读写锁。

下面是一个具体的示例代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class pthread_rw_Demo {

var text = "我喜欢"
var rwLock = pthread_rwlock_t()

init() {
pthread_rwlock_init(&rwLock, nil)
}

func readText() {
pthread_rwlock_rdlock(&rwLock)
sleep(5)
print(text)
pthread_rwlock_unlock(&rwLock)
}

func write(txt: String) {
pthread_rwlock_wrlock(&rwLock)
sleep(2)
text.append(txt)
pthread_rwlock_unlock(&rwLock)
}

func run() {
let queue = DispatchQueue(label: "com.test.rw",attributes: .concurrent)
queue.async {
// 第一次读
self.readText()
}
queue.async {
// 第二次读
self.readText()
}

// 写(栅栏)
queue.async() {
self.write(txt: "游泳")
}

queue.async {
// 第三次读
self.readText()
}
queue.async {
// 第四次读
self.readText()
}
queue.async {
// 第五次读
self.readText()
}
}
}

运行结果与之前栅栏(barrier)的输出一样,具体为:

1
2
3
4
5
我喜欢
我喜欢
我喜欢游泳
我喜欢游泳
我喜欢游泳

3. 参考资料

[1] NSLock | Apple Developer Documentation