做者:bool周 原文连接:我所理解的 iOS 并发编程ios
不管在哪一个平台,并发编程都是一个让人头疼的问题。庆幸的是,相对于服务端,客户端的并发编程简单了许多。这篇文章主要讲述一些基于 iOS 平台的一些并发编程相关东西,我写博客习惯于先介绍原理,后介绍用法,毕竟对于 API 的使用,官网有更好的文档。git
为了便于理解,这里先解释一些相关概念。若是你对这些概念已经很熟悉,能够直接跳过。github
从操做系统定义上来讲,进程就是系统进行资源分配和调度的基本单位,系统建立一个线程后,会为其分配对应的资源。在 iOS 系统中,进程能够理解为就是一个 App。iOS 并无提供能够建立进程的 API,即便你调用 fork()
函数,也不能建立新的进程。因此,本文所说的并发编程,都是针对线程来讲的。算法
线程是程序执行流的最小单元。通常状况下,一个进程会有多个线程,或者至少有一个线程。一个线程有建立、就绪、运行、阻塞和死亡五种状态。线程能够共享进程的资源,全部的问题也是由于共享资源引发的。数据库
操做系统引入线程的概念,是为了使过个 CPU 更好的协调运行,充分发挥他们的并行处理能力。例如在 iOS 系统中,你能够在主线程中进行 UI 操做,而后另启一些线程来处理与 UI 操做无关的事情,两件事情并行处理,速度比较快。这就是并发的大体概念。编程
按照 wiki 上面解释:是分时操做系统分配给每一个正在运行的进程微观上的一段CPU时间(在抢占内核中是:从进程开始运行直到被抢占的时间)。线程能够被认为是 ”微进程“,所以这个概念也能够用到线程方面。swift
通常操做系统使用时间片轮转算法进行调度,即每次调度时,老是选择就绪队列的队首进程,让其在CPU上运行一个系统预先设置好的时间片。一个时间片内没有完成运行的进程,返回到绪队列末尾从新排队,等待下一次调度。不一样的操做系统,时间片的范围不一致,通常都是毫秒(ms)级别。api
死锁是因为多个线程(进程)在执行过程当中,由于争夺资源而形成的互相等待现象,你能够理解为卡主了。产生死锁的必要条件有四个:数组
为了便于理解,这里举一个例子:一座桥,同一时间只容许一辆车通过(互斥)。两辆车 A,B 从桥的两端开上桥,走到桥的中间。此时 A 车不愿退(不可剥夺),又想占用 B 车所占据的道路;B 车此时也不愿退,又想占用 A 车所占据的道路(请求和保持)。此时,A 等待 B 占用的资源,B 等待 A 占用的资源(环路等待),两车僵持下去,就造成了死锁现象。xcode
当多个线程同时访问一块共享资源(例如数据库),由于时序性问题,会致使数据错乱,这就是线程不安全。例如数据库中某个整形字段的 value 为 0,此时两个线程同时对其进行写入操做,线程 A 拿到原值为 0,加一后变为 1;线程 B 并非在 A 加完后拿的,而是和 A 同时拿的,加完后也是 1,加了两次,理想值应该为 2,可是数据库中最终值倒是 1。实际开发场景可能要比这个复杂的多。
所谓的线程安全,能够理解为在多个线程操做(例如读写操做)这部分数据时,不会出现问题。
由于线程共享进程资源,在并发状况下,就会出现线程安全问题。为了解决此问题,就出现了锁这个概念。在多线程环境下,当你访问一些共享数据时,拿到访问权限,给数据加锁,在这期间其余线程不可访问,直到你操做完以后进行解锁,其余线程才能够对其进行操做。
iOS 提供了多种锁,ibireme 大神的 这篇文章 对这些锁进行了性能分析,我这里直接把图 cp 过来了:
下面针对这些锁,逐一分析。
ibireme 大神的文章也说了,虽然这个锁性能最高,可是已经不安全了,建议再也不使用,这里简单说一下。
OSSpinLock 是一种自旋锁,主要提供了加锁(OSSpinLockLock
)、尝试枷锁(OSSpinLockTry
)和解锁(OSSpinLockUnlock
)三个方法。对一块资源进行加锁时,若是尝试加锁失败,不会进入睡眠状态,而是一直进行询问(自旋),占用 CPU资源,不适用于较长时间的任务。在自旋期间,由于占用 CPU 致使低优先级线程拿不到 CUP 资源,没法完成任务并释放锁,从而造成了优先级反转。
so,虽然性能很高,可是不要用了。并且 Apple 也已经将这个类比较为 deprecate 了。
自旋锁 & 互斥锁 二者大致相似,区别在于:自旋锁属于 busy-waiting 类型锁,尝试加锁失败,会一直处于询问状态,占用 CPU 资源,效率高;互斥锁属于 sleep-waiting 类型锁,在尝试失败以后,会被阻塞,而后进行上下文切换置于等待队列,由于有上下文切换,效率较低。 在 iOS 中 NSLock 属于互斥锁。
优先级反转 :当一个高优先级任务访问共享资源时,该资源已经被一个低优先级任务抢占,阻塞了高优先级任务;同时,该低优先级任务被一个次高优先级的任务所抢先,从而没法及时地释放该临界资源。最终使得任务优先级被倒置,发生阻塞。(引用自 wiki
关于自旋锁的原理,bestswifter 的文章 深刻理解 iOS 开发中的锁 这篇文章讲得很好,我这里大部分锁的知识引用于此,建议读一下原文。
自旋锁是加不上就一直尝试,也就是一个循环,直到尝试加上锁,伪代码以下:
bool lock = false; // 一开始没有锁上,任何线程均可以申请锁
do {
while(test_and_set(&lock); // test_and_set 是一个原子操做,尝试加锁
Critical section // 临界区
lock = false; // 至关于释放锁,这样别的线程能够进入临界区
Reminder section // 不须要锁保护的代码
}
复制代码
使用 :
OSSpinLock spinLock = OS_SPINLOCK_INIT;
OSSpinLockLock(&spinLock);
// 被锁住的资源
OSSpinLockUnlock(&spinLock);
复制代码
dispatch_semaphore 并不属于锁,而是信号量。二者的区别以下:
dispatch_semaphore 使用分为三步:create、wait 和 signal。以下:
// create
dispatch_semaphore_t semaphore = dispatch_semaphore_create(1);
// thread A
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
// execute task A
NSLog(@"task A");
sleep(10);
dispatch_semaphore_signal(semaphore);
});
// thread B
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
// execute task B
NSLog(@"task B");
dispatch_semaphore_signal(semaphore);
});
复制代码
执行结果:
2018-05-03 21:40:09.068586+0800 ConcurrencyTest[44084:1384262] task A
2018-05-03 21:40:19.072951+0800 ConcurrencyTest[44084:1384265] task B
复制代码
thread A,B 是两个异步线程,通常状况下,各自执行本身的事件,互不干涉。可是根据 console 输出,B 是在 A 执行完了 10s 执行以后才执行的,显然受到阻塞。使用 dispatch_semaphore 大体执行过程这样:建立 semaphore 时,信号量值为 1;执行到线程 A 的 dispatch_semaphore_wait
时,信号量值减 1,变为 0;而后执行任务 A,执行完毕后 sleep
方法阻塞当前线程 10s;与此同时,线程 B 执行到了 dispatch_semaphore_wait
,因为信号量此时为 0,且线程 A 中设置的为 DISPATCH_TIME_FOREVER
,所以须要等到线程 A sleep 10s 以后,执行 dispatch_semaphore_signal
将信号量置为 1,线程 B 的任务才开始执行。
根据上面的描述,dispatch_semaphore 的原理大体也就了解了。GCD 源码 对这些方法定义以下:
long dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout) {
long value = dispatch_atomic_dec2o(dsema, dsema_value);
dispatch_atomic_acquire_barrier();
if (fastpath(value >= 0)) {
return 0;
}
return _dispatch_semaphore_wait_slow(dsema, timeout);
}
static long
_dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
dispatch_time_t timeout)
{
long orig;
again:
// Mach semaphores appear to sometimes spuriously wake up. Therefore,
// we keep a parallel count of the number of times a Mach semaphore is
// signaled (6880961).
while ((orig = dsema->dsema_sent_ksignals)) {
if (dispatch_atomic_cmpxchg2o(dsema, dsema_sent_ksignals, orig,
orig - 1)) {
return 0;
}
}
struct timespec _timeout;
int ret;
switch (timeout) {
default:
do {
uint64_t nsec = _dispatch_timeout(timeout);
_timeout.tv_sec = (typeof(_timeout.tv_sec))(nsec / NSEC_PER_SEC);
_timeout.tv_nsec = (typeof(_timeout.tv_nsec))(nsec % NSEC_PER_SEC);
ret = slowpath(sem_timedwait(&dsema->dsema_sem, &_timeout));
} while (ret == -1 && errno == EINTR);
if (ret == -1 && errno != ETIMEDOUT) {
DISPATCH_SEMAPHORE_VERIFY_RET(ret);
break;
}
// Fall through and try to undo what the fast path did to
// dsema->dsema_value
case DISPATCH_TIME_NOW:
while ((orig = dsema->dsema_value) < 0) {
if (dispatch_atomic_cmpxchg2o(dsema, dsema_value, orig, orig + 1)) {
errno = ETIMEDOUT;
return -1;
}
}
// Another thread called semaphore_signal().
// Fall through and drain the wakeup.
case DISPATCH_TIME_FOREVER:
do {
ret = sem_wait(&dsema->dsema_sem);
} while (ret != 0);
DISPATCH_SEMAPHORE_VERIFY_RET(ret);
break;
}
goto again;
}
复制代码
以上时对 wait 方法的定义,若是你不想看代码,能够直接听我说:
dispatch_semaphore_wait
方法时,若是信号量大于 0,直接返回;不然进入后续步骤。_dispatch_semaphore_wait_slow
方法根据传入 timeout
参数不一样,使用 switch-case 处理。semaphore_timedwait
方法进行等待,直至超时。semaphore_wait
进行等待,直到收到 singal
信号。至于 dispatch_semaphore_signal
就比较简单了,源码以下:
long dispatch_semaphore_signal(dispatch_semaphore_t dsema) {
dispatch_atomic_release_barrier();
long value = dispatch_atomic_inc2o(dsema, dsema_value);
if (fastpath(value > 0)) {
return 0;
}
if (slowpath(value == LONG_MIN)) {
DISPATCH_CLIENT_CRASH("Unbalanced call to dispatch_semaphore_signal()");
}
return _dispatch_semaphore_signal_slow(dsema);
}
复制代码
_dispatch_semaphore_signal_slow
,这个方法的做用是调用内核的 semaphore_signal 函数唤醒信号量,而后返回 1。Pthreads 是 POSIX Threads 的缩写。pthread_mutex
属于互斥锁,即尝试加锁失败后悔阻塞线程并睡眠,会进行上下文切换。锁的类型主要有三种:PTHREAD_MUTEX_NORMAL
、PTHREAD_MUTEX_ERRORCHECK
、PTHREAD_MUTEX_RECURSIVE
。
使用以下:
pthread_mutex_t mutex; // 定义锁
pthread_mutexattr_t attr; // 定义 mutexattr_t 变量
pthread_mutexattr_init(&attr); // 初始化attr为默认属性
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL); // 设置锁的属性
pthread_mutex_init(&mutex, &attr); // 建立锁
pthread_mutex_lock(&mutex); // 申请锁
// 临界区
pthread_mutex_unlock(&mutex); // 释放锁
复制代码
NSLock 属于互斥锁,是 Objective-C 封装的一个对象。虽然咱们不知道 Objective-C 是如何实现的,可是咱们能够在 swift 源码 中找到他的实现 :
...
internal var mutex = _PthreadMutexPointer.allocate(capacity: 1)
...
open func lock() {
pthread_mutex_lock(mutex)
}
open func unlock() {
pthread_mutex_unlock(mutex)
#if os(macOS) || os(iOS)
// Wakeup any threads waiting in lock(before:)
pthread_mutex_lock(timeoutMutex)
pthread_cond_broadcast(timeoutCond)
pthread_mutex_unlock(timeoutMutex)
#endif
}
复制代码
能够看出他只是将 pthread_mutex
封装了一下。只由于比 pthread_mutex
慢一些,难道是由于方法层级之间的调用,多了几回压栈操做???
常规使用:
NSLock *mutexLock = [NSLock new];
[mutexLock lock];
// 临界区
[muteLock unlock];
复制代码
NSCondition 能够同时起到 lock 和条件变量的做用。一样你能够在 swift 源码 中找到他的实现 :
open class NSCondition: NSObject, NSLocking {
internal var mutex = _PthreadMutexPointer.allocate(capacity: 1)
internal var cond = _PthreadCondPointer.allocate(capacity: 1)
public override init() {
pthread_mutex_init(mutex, nil)
pthread_cond_init(cond, nil)
}
deinit {
pthread_mutex_destroy(mutex)
pthread_cond_destroy(cond)
mutex.deinitialize(count: 1)
cond.deinitialize(count: 1)
mutex.deallocate()
cond.deallocate()
}
open func lock() {
pthread_mutex_lock(mutex)
}
open func unlock() {
pthread_mutex_unlock(mutex)
}
open func wait() {
pthread_cond_wait(cond, mutex)
}
open func wait(until limit: Date) -> Bool {
guard var timeout = timeSpecFrom(date: limit) else {
return false
}
return pthread_cond_timedwait(cond, mutex, &timeout) == 0
}
open func signal() {
pthread_cond_signal(cond)
}
open func broadcast() {
pthread_cond_broadcast(cond)
}
open var name: String?
}
复制代码
能够看出,它仍是遵循 NSLocking 协议,lock 方法一样仍是使用的 pthread_mutex
,wait 和 signal 使用的是 pthread_cond_wait
和 pthread_cond_signal
。
使用 NSCondition 是,先对要操做的临界区加锁,而后由于条件不知足,使用 wait 方法阻塞线程;待条件知足以后,使用 signal 方法进行通知。下面是一个 生产者-消费者的例子:
NSCondition *condition = [NSCondition new];
NSMutableArray *products = [NSMutableArray array];
// consume
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
[condition lock];
while (products.count == 0) {
[condition wait];
}
[products removeObjectAtIndex:0];
[condition unlock];
});
// product
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
[condition lock];
[products addObject:[NSObject new]];
[condition signal];
[condition unlock];
});
复制代码
NSConditionLock 是经过使用 NSCondition 来实现的,遵循 NSLocking 协议,而后这是 swift 源码 (源码比较占篇幅,我这里简化一下):
open class NSConditionLock : NSObject, NSLocking {
internal var _cond = NSCondition()
...
open func lock(whenCondition condition: Int) {
let _ = lock(whenCondition: condition, before: Date.distantFuture)
}
open func `try`() -> Bool {
return lock(before: Date.distantPast)
}
open func tryLock(whenCondition condition: Int) -> Bool {
return lock(whenCondition: condition, before: Date.distantPast)
}
open func unlock(withCondition condition: Int) {
_cond.lock()
_thread = nil
_value = condition
_cond.broadcast()
_cond.unlock()
}
open func lock(before limit: Date) -> Bool {
_cond.lock()
while _thread != nil {
if !_cond.wait(until: limit) {
_cond.unlock()
return false
}
}
_thread = pthread_self()
_cond.unlock()
return true
}
open func lock(whenCondition condition: Int, before limit: Date) -> Bool {
_cond.lock()
while _thread != nil || _value != condition {
if !_cond.wait(until: limit) {
_cond.unlock()
return false
}
}
_thread = pthread_self()
_cond.unlock()
return true
}
...
}
复制代码
能够看出它使用了一个 NSCondition 全局变量来实现 lock 和 unlock 方法,都是一些简单的代码逻辑,就不详细说了。
使用 NSConditionLock 注意:
-[unlockWithCondition:]
并非知足条件时解锁,而是解锁后,修改 condition 值。typedef NS_ENUM(NSInteger, CTLockCondition) {
CTLockConditionNone = 0,
CTLockConditionPlay,
CTLockConditionShow
};
- (void)testConditionLock {
NSConditionLock *conditionLock = [[NSConditionLock alloc] initWithCondition:CTLockConditionPlay];
// thread one
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
[conditionLock lockWhenCondition:CTLockConditionNone];
NSLog(@"thread one");
sleep(2);
[conditionLock unlock];
});
// thread two
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
sleep(1);
if ([conditionLock tryLockWhenCondition:CTLockConditionPlay]) {
NSLog(@"thread two");
[conditionLock unlockWithCondition:CTLockConditionShow];
NSLog(@"thread two unlocked");
} else {
NSLog(@"thread two try lock failed");
}
});
// thread three
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
sleep(2);
if ([conditionLock tryLockWhenCondition:CTLockConditionPlay]) {
NSLog(@"thread three");
[conditionLock unlock];
NSLog(@"thread three locked success");
} else {
NSLog(@"thread three try lock failed");
}
});
}
// thread four
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
sleep(4);
if ([conditionLock tryLockWhenCondition:CTLockConditionShow]) {
NSLog(@"thread four");
[conditionLock unlock];
NSLog(@"thread four unlocked success");
} else {
NSLog(@"thread four try lock failed");
}
});
}
复制代码
而后看输出结果 :
2018-05-05 16:34:33.801855+0800 ConcurrencyTest[97128:3100768] thread two
2018-05-05 16:34:33.802312+0800 ConcurrencyTest[97128:3100768] thread two unlocked
2018-05-05 16:34:34.804384+0800 ConcurrencyTest[97128:3100776] thread three try lock failed
2018-05-05 16:34:35.806634+0800 ConcurrencyTest[97128:3100778] thread four
2018-05-05 16:34:35.806883+0800 ConcurrencyTest[97128:3100778] thread four unlocked success
复制代码
能够看出,thread one 由于条件和初始化不符,加锁失败,未输出 log; thread two 条件相符,解锁成功,并修改加锁条件;thread three 使用原来的加锁条件,显然没法加锁,尝试加锁失败; thread four 使用修改后的条件,加锁成功。
NSRecursiveLock 属于递归锁。而后这是 swift 源码,只贴一下关键部分:
open class NSRecursiveLock: NSObject, NSLocking {
...
public override init() {
super.init()
#if CYGWIN
var attrib : pthread_mutexattr_t? = nil
#else
var attrib = pthread_mutexattr_t()
#endif
withUnsafeMutablePointer(to: &attrib) { attrs in
pthread_mutexattr_settype(attrs, Int32(PTHREAD_MUTEX_RECURSIVE))
pthread_mutex_init(mutex, attrs)
}
}
...
}
复制代码
它是使用 PTHREAD_MUTEX_RECURSIVE
类型的 pthread_mutex_t
初始化的。递归所能够在一个线程中重复调用,而后底层会记录加锁和解锁次数,当两者次数相同时,才能正确解锁,释放这块临界区。
使用例子:
- (void)testRecursiveLock {
NSRecursiveLock *recursiveLock = [NSRecursiveLock new];
int (^__block fibBlock)(int) = ^(int num) {
[recursiveLock lock];
if (num < 0) {
[recursiveLock unlock];
return 0;
}
if (num == 1 || num == 2) {
[recursiveLock unlock];
return num;
}
int newValue = fibBlock(num - 1) + fibBlock(num - 2);
[recursiveLock unlock];
return newValue;
};
int value = fibBlock(10);
NSLog(@"value is %d", value);
}
复制代码
@synchronized 是牺牲性能来换取语法上的简洁。若是你想深刻了解,建议你去读 这篇文章。这里说一下他的大概原理:
@synchronized 的加锁过程,大概是这个样子:
@try {
objc_sync_enter(obj); // lock
// 临界区
} @finally {
objc_sync_exit(obj); // unlock
}
复制代码
@synchronized 的存储结构,是使用哈希表来实现的。当你传入一个对象后,会为这个对象分配一个锁。锁和对象打包成一个对象,而后和一个锁在进行二次打包成一个对象,能够理解为 value;经过一个算法,根据对象的地址获得一个值,做为 key。而后以 key-value 的形式写入哈希表。结构大概是这个样子:
存储的时候,是以哈希表结构存储,不是我上面画的顺序存储,上面只是一个节点而已。
@synchronized 的使用就很简单了 :
NSMutableArray *elementArray = [NSMutableArray array];
@synchronized(elementArray) {
[elementArray addObject:[NSObject new]];
}
复制代码
前面也说了,pthreads 是 POSIX Threads 的缩写。这个东西通常咱们用不到,这里简单介绍一下。Pthreads 是POSIX的线程标准,定义了建立和操纵线程的一套API。实现POSIX 线程标准的库常被称做Pthreads,通常用于Unix-like POSIX 系统,如Linux、 Solaris。
NSThread
是对内核 mach kernel 中的 mach thread 的封装,一个 NSThread
对象就是一个线程。使用频率比较低,除了 API 的使用,没什么可讲的。若是你已经熟悉这些 API,能够跳过这一节了。
使用初始化方法初始化一个 NSTherad
对象,调用 -[cancel]
、-[start
、-[main]
方法对线程进行操做,通常线程执行完即销毁,或者由于某种异常退出。
/** 使用 target 对象的中的方法做为执行主体,能够经过 argument 传递一些参数。 - (instancetype)initWithTarget:(id)target selector:(SEL)selector object:(nullable id)argument; /** 使用 block 对象做为执行主体 */
- (instancetype)initWithBlock:(void (^)(void))block;
/** 类方法,上面对象方法须要调用 -[start] 方法启动线程,下面两个方法不须要手动启动 */
+ (void)detachNewThreadWithBlock:(void (^)(void))block;
+ (void)detachNewThreadSelector:(SEL)selector toTarget:(id)target withObject:(nullable id)argument;
复制代码
/** 说一下最后一个参数,这里你至少指定一个 mode 执行 selector,若是你传 nil 或者空数组,selector 不会执行,虽然方法定义写了 nullable */
- (void)performSelectorOnMainThread:(SEL)aSelector withObject:(nullable id)arg waitUntilDone:(BOOL)wait modes:(nullable NSArray<NSString *> *)array;
- (void)performSelectorOnMainThread:(SEL)aSelector withObject:(nullable id)arg waitUntilDone:(BOOL)wait;
复制代码
/** modes 参数同上一个 */
- (void)performSelector:(SEL)aSelector onThread:(NSThread *)thr withObject:(nullable id)arg waitUntilDone:(BOOL)wait modes:(nullable NSArray<NSString *> *)array;
- (void)performSelector:(SEL)aSelector onThread:(NSThread *)thr withObject:(nullable id)arg waitUntilDone:(BOOL)wait
复制代码
- (void)performSelectorInBackground:(SEL)aSelector withObject:(nullable id)arg;
复制代码
@property (class, readonly, strong) NSThread *currentThread;
复制代码
使用线程相关方法时,记得设置好 name,方便后面调试。同时也设置好优先级等其余参数。
performSelector: 系列方法已经不太安全,慎用。
GCD 是基于 C 实现的一套 API,并且是开源的,若是有兴趣,能够在 这里 down 一份源码研究一下。GCD 是由系统帮咱们处理多线程调度,非常方便,也是使用频率最高的。这一章节主要讲解一下 GCD 的原理和使用。
在讲解以前,咱们先有个概览,看一下 GCD 为咱们提供了那些东西:
系统所提供的 API,彻底能够知足咱们平常开发需求了。下面就根据这些模块分别讲解一下。
GCD 为咱们提供了两类队列,串行队列 和 并行队列。二者的区别是:
除此以外,还要解释一个容易混淆的概念,并发和并行:
最后,还有一个概念,同步和异步:
咱们使用时,通常使用这几个队列:
主队列 - dispatch_get_main_queue :一个特殊的串行队列。在 GCD 中,方法主队列中的任务都是在主线程执行。当咱们更新 UI 时想 dispatch 到主线程,可使用这个队列。
- (void)viewDidLoad {
[super viewDidLoad];
dispatch_async(dispatch_get_main_queue(), ^{
// UI 相关操做
});
复制代码
} ```
全局并行队列 - dispatch_get_global_queue : 系统提供的一个全局并行队列,咱们能够经过指定参数,来获取不一样优先级的队列。系统提供了四个优先级,因此也能够认为系统为咱们提供了四个并行队列,分别为 :
dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0);
dispatch_async(queue, ^{
// 相关操做
});
复制代码
自定义队列 :你能够本身定义串行或者并行队列,来执行一些相关的任务,平时开发中也建议用自定义队列。建立自定义队列时,须要两个参数。一个是队列的名字,方便咱们再调试时查找队列使用,命名方式采用的是反向 DNS 命名规则;一个是队列类型,传 NULL 或者 DISPATCH_QUEUE_SERIAL 表明串行队列,传 DISPATCH_QUEUE_CONCURRENT 表明并行队列,一般状况下,不要传 NULL,会下降可读性。 DISPATCH_QUEUE_SERIAL_INACTIVE 表明串行不活跃队列,DISPATCH_QUEUE_CONCURRENT_INACTIVE 表明并行不活跃队列,在执行 block 任务时,须要被激活。
复制代码
dispatch_queue_t queue = dispatch_queue_create("com.bool.dispatch",DISPATCH_QUEUE_SERIAL); ```
dispatch_queue_set_specific
、dispatch_queue_get_specific
和 dispatch_get_specific
方法,为 queue 设置关联的 key 或者根据 key 找到关联对象等操做。能够说,系统为咱们提供了 5 中不一样的队列,运行在主线程中的 main queue;3 个不一样优先级的 global queue; 一个优先级更低的 background queue。除此以外,开发者能够自定义一些串行和并行队列,这些自定义队列中被调度的全部 block 最终都会被放到系统全局队列和线程池中,后面会讲这部分原理。盗用一张经典图:
咱们大多数状况下,都是使用 dispatch_asyn()
作异步操做,由于程序原本就是顺序执行,不多用到同步操做。有时候咱们会把 dispatch_syn()
当作锁来用,以达到保护的做用。
系统维护的是一个队列,根据 FIFO 的规则,将 dispatch 到队列中的任务一一执行。有时候咱们想把一些任务延后执行如下,例如 App 启动时,我想让主线程中一个耗时的工做放在后,能够尝试用一下 dispatch_asyn()
,至关于把任务从新追加到了队尾。
dispatch_async(dispatch_get_main_queue(), ^{
// 想要延后的任务
});
复制代码
一般状况下,咱们使用 dispatch_asyn()
是不会形成死锁的。死锁通常出如今使用 dispatch_syn()
的时候。例如:
dispatch_sync(dispatch_get_main_queue(), ^{
NSLog(@"dead lock");
});
复制代码
想上面这样写,启动就会报错误。如下状况也如此:
dispatch_queue_t queue = dispatch_queue_create("com.bool.dispatch", DISPATCH_QUEUE_SERIAL);
dispatch_async(queue, ^{
NSLog(@"dispatch asyn");
dispatch_sync(queue, ^{
NSLog(@"dispatch asyn -> dispatch syn");
});
});
复制代码
在上面的代码中,dispatch_asyn()
整个 block(称做 blcok_asyn) 当作一个任务追加到串行队列队尾,而后开始执行。在 block_asyn 内部中,又进行了 dispatch_syn()
,想一想要执行 block_syn。由于是串行队列,须要前一个执行完(block_asyn),再执行后面一个(block_syn);可是要执行完 block_asyn,须要执行内部的 block_syn。互相等待,造成死锁。
现实开发中,还有更复杂的死锁场景。不过如今编译器很友好,咱们能在编译执行时就检测到了。
针对下面这几行代码,咱们分析一下它的底层过程:
- (void)viewDidLoad {
[super viewDidLoad];
dispatch_queue_t queue = dispatch_queue_create("com.bool.dispatch", DISPATCH_QUEUE_SERIAL);
dispatch_async(queue, ^{
NSLog(@"dispatch asyn test");
});
}
复制代码
建立队列
源码很长,但实际只有一个方法,逻辑比较清晰,以下:
/** 开发者调用的方法 */
dispatch_queue_t
dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
{
return _dispatch_queue_create_with_target(label, attr,
DISPATCH_TARGET_QUEUE_DEFAULT, true);
}
/** 内部实际调用方法 */
DISPATCH_NOINLINE
static dispatch_queue_t
_dispatch_queue_create_with_target(const char *label, dispatch_queue_attr_t dqa,
dispatch_queue_t tq, bool legacy)
{
// 1.初步判断
if (!slowpath(dqa)) {
dqa = _dispatch_get_default_queue_attr();
} else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
}
// 2.配置队列参数
dispatch_qos_t qos = _dispatch_priority_qos(dqa->dqa_qos_and_relpri);
#if !HAVE_PTHREAD_WORKQUEUE_QOS
if (qos == DISPATCH_QOS_USER_INTERACTIVE) {
qos = DISPATCH_QOS_USER_INITIATED;
}
if (qos == DISPATCH_QOS_MAINTENANCE) {
qos = DISPATCH_QOS_BACKGROUND;
}
#endif // !HAVE_PTHREAD_WORKQUEUE_QOS
_dispatch_queue_attr_overcommit_t overcommit = dqa->dqa_overcommit;
if (overcommit != _dispatch_queue_attr_overcommit_unspecified && tq) {
if (tq->do_targetq) {
DISPATCH_CLIENT_CRASH(tq, "Cannot specify both overcommit and "
"a non-global target queue");
}
}
if (tq && !tq->do_targetq &&
tq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) {
// Handle discrepancies between attr and target queue, attributes win
if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
if (tq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) {
overcommit = _dispatch_queue_attr_overcommit_enabled;
} else {
overcommit = _dispatch_queue_attr_overcommit_disabled;
}
}
if (qos == DISPATCH_QOS_UNSPECIFIED) {
dispatch_qos_t tq_qos = _dispatch_priority_qos(tq->dq_priority);
tq = _dispatch_get_root_queue(tq_qos,
overcommit == _dispatch_queue_attr_overcommit_enabled);
} else {
tq = NULL;
}
} else if (tq && !tq->do_targetq) {
// target is a pthread or runloop root queue, setting QoS or overcommit
// is disallowed
if (overcommit != _dispatch_queue_attr_overcommit_unspecified) {
DISPATCH_CLIENT_CRASH(tq, "Cannot specify an overcommit attribute "
"and use this kind of target queue");
}
if (qos != DISPATCH_QOS_UNSPECIFIED) {
DISPATCH_CLIENT_CRASH(tq, "Cannot specify a QoS attribute "
"and use this kind of target queue");
}
} else {
if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
// Serial queues default to overcommit!
overcommit = dqa->dqa_concurrent ?
_dispatch_queue_attr_overcommit_disabled :
_dispatch_queue_attr_overcommit_enabled;
}
}
if (!tq) {
tq = _dispatch_get_root_queue(
qos == DISPATCH_QOS_UNSPECIFIED ? DISPATCH_QOS_DEFAULT : qos,
overcommit == _dispatch_queue_attr_overcommit_enabled);
if (slowpath(!tq)) {
DISPATCH_CLIENT_CRASH(qos, "Invalid queue attribute");
}
}
// 3. 初始化队列
if (legacy) {
// if any of these attributes is specified, use non legacy classes
if (dqa->dqa_inactive || dqa->dqa_autorelease_frequency) {
legacy = false;
}
}
const void *vtable;
dispatch_queue_flags_t dqf = 0;
if (legacy) {
vtable = DISPATCH_VTABLE(queue);
} else if (dqa->dqa_concurrent) {
vtable = DISPATCH_VTABLE(queue_concurrent);
} else {
vtable = DISPATCH_VTABLE(queue_serial);
}
switch (dqa->dqa_autorelease_frequency) {
case DISPATCH_AUTORELEASE_FREQUENCY_NEVER:
dqf |= DQF_AUTORELEASE_NEVER;
break;
case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM:
dqf |= DQF_AUTORELEASE_ALWAYS;
break;
}
if (legacy) {
dqf |= DQF_LEGACY;
}
if (label) {
const char *tmp = _dispatch_strdup_if_mutable(label);
if (tmp != label) {
dqf |= DQF_LABEL_NEEDS_FREE;
label = tmp;
}
}
dispatch_queue_t dq = _dispatch_object_alloc(vtable,
sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD);
_dispatch_queue_init(dq, dqf, dqa->dqa_concurrent ?
DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
(dqa->dqa_inactive ? DISPATCH_QUEUE_INACTIVE : 0));
dq->dq_label = label;
#if HAVE_PTHREAD_WORKQUEUE_QOS
dq->dq_priority = dqa->dqa_qos_and_relpri;
if (overcommit == _dispatch_queue_attr_overcommit_enabled) {
dq->dq_priority |= DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
}
#endif
_dispatch_retain(tq);
if (qos == QOS_CLASS_UNSPECIFIED) {
// legacy way of inherithing the QoS from the target
_dispatch_queue_priority_inherit_from_target(dq, tq);
}
if (!dqa->dqa_inactive) {
_dispatch_queue_inherit_wlh_from_target(dq, tq);
}
dq->do_targetq = tq;
_dispatch_object_debug(dq, "%s", __func__);
return _dispatch_introspection_queue_create(dq);
}
复制代码
根据代码生成的流程图,不想看代码直接看图,下同:
根据流程图,这个方法的步骤以下:
dispatch_queue_create()
方法以后,内部会调用 _dispatch_queue_create_with_target()
方法。_dispatch_object_alloc
方法申请一个 dispatch_queue_t 对象空间,dq。DISPATCH_QUEUE_WIDTH_MAX
即最大,不设限;串行的会设置为 1。异步执行
这个版本异步执行的代码,由于方法拆分不少,因此显得很乱。源码以下:
/** 开发者调用 */
void dispatch_async(dispatch_queue_t dq, dispatch_block_t work) {
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT;
_dispatch_continuation_init(dc, dq, work, 0, 0, dc_flags);
_dispatch_continuation_async(dq, dc);
}
/** 内部调用,包一层,再深刻调用 */
DISPATCH_NOINLINE
void
_dispatch_continuation_async(dispatch_queue_t dq, dispatch_continuation_t dc)
{
_dispatch_continuation_async2(dq, dc,
dc->dc_flags & DISPATCH_OBJ_BARRIER_BIT);
}
/** 根据 barrier 关键字区别串行仍是并行,分两支 */
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_async2(dispatch_queue_t dq, dispatch_continuation_t dc,
bool barrier)
{
if (fastpath(barrier || !DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width))) {
// 串行
return _dispatch_continuation_push(dq, dc);
}
// 并行
return _dispatch_async_f2(dq, dc);
}
/** 并行又多了一层调用,就是这个方法 */
DISPATCH_NOINLINE
static void
_dispatch_async_f2(dispatch_queue_t dq, dispatch_continuation_t dc)
{
if (slowpath(dq->dq_items_tail)) {// 少路径
return _dispatch_continuation_push(dq, dc);
}
if (slowpath(!_dispatch_queue_try_acquire_async(dq))) {// 少路径
return _dispatch_continuation_push(dq, dc);
}
// 多路径
return _dispatch_async_f_redirect(dq, dc,
_dispatch_continuation_override_qos(dq, dc));
}
/** 主要用来重定向 */
DISPATCH_NOINLINE
static void
_dispatch_async_f_redirect(dispatch_queue_t dq,
dispatch_object_t dou, dispatch_qos_t qos)
{
if (!slowpath(_dispatch_object_is_redirection(dou))) {
dou._dc = _dispatch_async_redirect_wrap(dq, dou);
}
dq = dq->do_targetq;
// Find the queue to redirect to
while (slowpath(DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width))) {
if (!fastpath(_dispatch_queue_try_acquire_async(dq))) {
break;
}
if (!dou._dc->dc_ctxt) {
dou._dc->dc_ctxt = (void *)
(uintptr_t)_dispatch_queue_autorelease_frequency(dq);
}
dq = dq->do_targetq;
}
// 同步异步最终都是调用的这个方法,将任务追加到队列中
dx_push(dq, dou, qos);
}
... 省略一些调用层级,
/** 核心方法,经过 dc_flags 参数区分了是 group,仍是串行,仍是并行 */
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_invoke_inline(dispatch_object_t dou, voucher_t ov,
dispatch_invoke_flags_t flags)
{
dispatch_continuation_t dc = dou._dc, dc1;
dispatch_invoke_with_autoreleasepool(flags, {
uintptr_t dc_flags = dc->dc_flags;
_dispatch_continuation_voucher_adopt(dc, ov, dc_flags);
if (dc_flags & DISPATCH_OBJ_CONSUME_BIT) { // 并行
dc1 = _dispatch_continuation_free_cacheonly(dc);
} else {
dc1 = NULL;
}
if (unlikely(dc_flags & DISPATCH_OBJ_GROUP_BIT)) { // group
_dispatch_continuation_with_group_invoke(dc);
} else { // 串行
_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
_dispatch_introspection_queue_item_complete(dou);
}
if (unlikely(dc1)) {
_dispatch_continuation_free_to_cache_limit(dc1);
}
});
_dispatch_perfmon_workitem_inc();
}
复制代码
不想看代码,直接看图:
根据流程图描述一下过程:
dispatch_async()
方法,而后内部建立了一个 _dispatch_continuation_init
队列,将 queue、block 这些信息和这个 dc 绑定起来。这过程当中 copy 了 block。_dispatch_async_f_redirect
方法中,从新寻找依赖目标队列,而后追加过去。_dispatch_continuation_invoke_inline
方法里区分串行仍是并行。由于这个方法会被频繁调用,因此定义成了内联函数。对于串行队列,咱们使用信号量控制,执行前信号量置为 wait,执行完毕后发送 singal;对于调度组,咱们会在执行完以后调用 dispatch_group_leave
。同步执行
同步执行,相对来讲比较简单,源码以下 :
/** 开发者调用 */
void dispatch_sync(dispatch_queue_t dq, dispatch_block_t work) {
if (unlikely(_dispatch_block_has_private_data(work))) {
return _dispatch_sync_block_with_private_data(dq, work, 0);
}
dispatch_sync_f(dq, work, _dispatch_Block_invoke(work));
}
/** 内部调用 */
DISPATCH_NOINLINE void dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func) {
if (likely(dq->dq_width == 1)) {
return dispatch_barrier_sync_f(dq, ctxt, func);
}
// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
if (unlikely(!_dispatch_queue_try_reserve_sync_width(dq))) {
return _dispatch_sync_f_slow(dq, ctxt, func, 0);
}
_dispatch_introspection_sync_begin(dq);
if (unlikely(dq->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dq, ctxt, func, 0);
}
_dispatch_sync_invoke_and_complete(dq, ctxt, func);
}
复制代码
同步执行,相对来讲简单些,大致逻辑差很少。偷懒一下,就不画图了,直接描述:
dispatch_sync()
方法,大多数路径,都会调用 dispatch_sync_f()
方法。dispatch_barrier_sync_f()
方法来保证原子操做。_dispatch_introspection_sync_begin
和 _dispatch_sync_invoke_and_complete
来保证同步。dispatch_after 通常用于延后执行一些任务,能够用来代替 NSTimer,由于有时候 NSTimer 问题太多了。在后面的一章里,我会整体讲一下多线程中的问题,这里就不详细说了。通常咱们这样来使用 dispatch_after :
- (void)viewDidLoad {
[super viewDidLoad];
dispatch_queue_t queue = dispatch_queue_create("com.bool.dispatch", DISPATCH_QUEUE_SERIAL);
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(NSEC_PER_SEC * 2.0f)),queue, ^{
// 2.0 second execute
});
}
复制代码
在作页面过渡时,刚进入到新的页面咱们并不会当即更新一些 view,为了引发用户注意,咱们会过会儿再进行更新,能够中此 API 来完成。
源码以下:
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_after(dispatch_time_t when, dispatch_queue_t queue,
void *ctxt, void *handler, bool block)
{
dispatch_timer_source_refs_t dt;
dispatch_source_t ds;
uint64_t leeway, delta;
if (when == DISPATCH_TIME_FOREVER) {
#if DISPATCH_DEBUG
DISPATCH_CLIENT_CRASH(0, "dispatch_after called with 'when' == infinity");
#endif
return;
}
delta = _dispatch_timeout(when);
if (delta == 0) {
if (block) {
return dispatch_async(queue, handler);
}
return dispatch_async_f(queue, ctxt, handler);
}
leeway = delta / 10; // <rdar://problem/13447496>
if (leeway < NSEC_PER_MSEC) leeway = NSEC_PER_MSEC;
if (leeway > 60 * NSEC_PER_SEC) leeway = 60 * NSEC_PER_SEC;
// this function can and should be optimized to not use a dispatch source
ds = dispatch_source_create(&_dispatch_source_type_after, 0, 0, queue);
dt = ds->ds_timer_refs;
dispatch_continuation_t dc = _dispatch_continuation_alloc();
if (block) {
_dispatch_continuation_init(dc, ds, handler, 0, 0, 0);
} else {
_dispatch_continuation_init_f(dc, ds, ctxt, handler, 0, 0, 0);
}
// reference `ds` so that it doesn't show up as a leak
dc->dc_data = ds;
_dispatch_trace_continuation_push(ds->_as_dq, dc);
os_atomic_store2o(dt, ds_handler[DS_EVENT_HANDLER], dc, relaxed);
if ((int64_t)when < 0) {
// wall clock
when = (dispatch_time_t)-((int64_t)when);
} else {
// absolute clock
dt->du_fflags |= DISPATCH_TIMER_CLOCK_MACH;
leeway = _dispatch_time_nano2mach(leeway);
}
dt->dt_timer.target = when;
dt->dt_timer.interval = UINT64_MAX;
dt->dt_timer.deadline = when + leeway;
dispatch_activate(ds);
}
复制代码
dispatch_after()
内部会调用 _dispatch_after()
方法,而后先判断延迟时间。若是为 DISPATCH_TIME_FOREVER
(永远不执行),则会出现异常;若是为 0 则当即执行;不然的话会建立一个 dispatch_timer_source_refs_t 结构体指针,将上下文相关信息与之关联。而后使用 dispatch_source 相关方法,将定时器和 block 任务关联起来。定时器时间到时,取出 block 任务开始执行。
若是咱们有一段代码,在 App 生命周期内最好只初始化一次,这时候使用 dispatch_once 最好不过了。例如咱们单例中常常这样用:
+ (instancetype)sharedManager {
static BLDispatchManager *sharedInstance = nil;
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
sharedInstance = [[BLDispatchManager alloc] initPrivate];
});
return sharedInstance;
}
复制代码
还有在定义 NSDateFormatter
时使用:
- (NSString *)todayDateString {
static NSDateFormatter *formatter = nil;
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
formatter = [NSDateFormatter new];
formatter.locale = [NSLocale localeWithLocaleIdentifier:@"en_US_POSIX"];
formatter.timeZone = [NSTimeZone timeZoneForSecondsFromGMT:8 * 3600];
formatter.dateFormat = @"yyyyMMdd";
});
return [formatter stringFromDate:[NSDate date]];
}
复制代码
由于这是很经常使用的一个代码片断,因此被加在了 Xcode 的 code snippet 中。
它的源代码以下:
/** 一个结构体,里面为当前的信号量、线程端口和指向下一个节点的指针 */
typedef struct _dispatch_once_waiter_s {
volatile struct _dispatch_once_waiter_s *volatile dow_next;
dispatch_thread_event_s dow_event;
mach_port_t dow_thread;
} *_dispatch_once_waiter_t;
/** 咱们调用的方法 */
void dispatch_once(dispatch_once_t *val, dispatch_block_t block) {
dispatch_once_f(val, block, _dispatch_Block_invoke(block));
}
/** 实际执行的方法 */
DISPATCH_NOINLINE void dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func) {
#if !DISPATCH_ONCE_INLINE_FASTPATH
if (likely(os_atomic_load(val, acquire) == DLOCK_ONCE_DONE)) {
return;
}
#endif // !DISPATCH_ONCE_INLINE_FASTPATH
return dispatch_once_f_slow(val, ctxt, func);
}
DISPATCH_ONCE_SLOW_INLINE static void dispatch_once_f_slow(dispatch_once_t *val, void *ctxt, dispatch_function_t func) {
#if DISPATCH_GATE_USE_FOR_DISPATCH_ONCE
dispatch_once_gate_t l = (dispatch_once_gate_t)val;
if (_dispatch_once_gate_tryenter(l)) {
_dispatch_client_callout(ctxt, func);
_dispatch_once_gate_broadcast(l);
} else {
_dispatch_once_gate_wait(l);
}
#else
_dispatch_once_waiter_t volatile *vval = (_dispatch_once_waiter_t*)val;
struct _dispatch_once_waiter_s dow = { };
_dispatch_once_waiter_t tail = &dow, next, tmp;
dispatch_thread_event_t event;
if (os_atomic_cmpxchg(vval, NULL, tail, acquire)) {
dow.dow_thread = _dispatch_tid_self();
_dispatch_client_callout(ctxt, func);
next = (_dispatch_once_waiter_t)_dispatch_once_xchg_done(val);
while (next != tail) {
tmp = (_dispatch_once_waiter_t)_dispatch_wait_until(next->dow_next);
event = &next->dow_event;
next = tmp;
_dispatch_thread_event_signal(event);
}
} else {
_dispatch_thread_event_init(&dow.dow_event);
next = *vval;
for (;;) {
if (next == DISPATCH_ONCE_DONE) {
break;
}
if (os_atomic_cmpxchgv(vval, next, tail, &next, release)) {
dow.dow_thread = next->dow_thread;
dow.dow_next = next;
if (dow.dow_thread) {
pthread_priority_t pp = _dispatch_get_priority();
_dispatch_thread_override_start(dow.dow_thread, pp, val);
}
_dispatch_thread_event_wait(&dow.dow_event);
if (dow.dow_thread) {
_dispatch_thread_override_end(dow.dow_thread, val);
}
break;
}
}
_dispatch_thread_event_destroy(&dow.dow_event);
}
#endif
}
复制代码
不想看代码直接看图 (emmm... 根据逻辑画完图才发现,其实这个图也挺乱的,因此我将两个主分支用不一样颜色标记处理):
根据这个图,我来表述一下主要过程:
咱们调用 dispatch_once()
方法以后,内部多数状况下会调用 dispatch_once_f_slow()
方法,这个方法才是真正的执行方法。
os_atomic_cmpxchg(vval, NULL, tail, acquire)
这个方法,执行过程实际是这个样子
if (*vval == NULL) {
*vval = tail = &dow;
return true;
} else {
return false
}
复制代码
咱们初始化的 once_token,也就是 *vval 实际是 0,因此第一次执行时是返回 true 的。if() 中的这个方法是原子操做,也就是说,若是多个线程同时调用这个方法,只有一个线程会进入 true 的分支,其余都进入 else 分支。
这里先说进入 true 分支。进入以后,会执行对应的 block,也就是对应的任务。而后 next 指向 *vval, *vval 标记为 DISPATCH_ONCE_DONE
,即执行的是这样一个过程:
next = (_dispatch_once_waiter_t)_dispatch_once_xchg_done(val);
// 实际执行时这样的
next = *vval;
*vval = DISPATCH_ONCE_DONE;
复制代码
而后 tail = &dow
。此时咱们发现,原来的 *vval = &dow -> next = *vval
,实际则是 next = &dow
,若是没有其余线程(或者调用)进入 else 分支,&dow 实际没有改变,即 tail == tmp
。此时 while (tail != tmp)
是不会执行的,分支结束。
若是有其余线程(或者调用)进入了 else 分支,那么就已经生成了一个等待响应的链表。此时进入 &dow 已经改变,成为了链表尾部,*vval 是链表头部。进入 while 循环后,开始遍历链表,依次发送信号进行唤起。
而后说进入 else 分支的这些调用。进入分支后,随即进入一个死循环,直到发现 *vval 已经标记为了 DISPATCH_ONCE_DONE
才跳出循环。
发现 *vval 不是 DISPATCH_ONCE_DONE
以后,会将这个节点追加到链表尾部,并调用信号量的 wait 方法,等待被唤起。
以上为所有的执行过程。经过源码能够看出,使用的是 原子操做 + 信号量来保证 block 只会被执行屡次,哪怕是在多线程状况下。
这样一个关于 dispatch_once
递归调用会产生死锁的现象,也就很好解释了。看下面代码:
- (void)dispatchOnceTest {
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
[self dispatchOnceTest];
});
}
复制代码
经过上面分析,在 block 执行完,并将 *vval 置为 DISPATCH_ONCE_DONE
以前,其余的调用都会进入 else 分支。第二次递归调用,信号量处于等待状态,须要等到第一个 block 执行完才能被唤起;可是第一个 block 所执行的内容就是进行第二次调用,这个任务被 wait 了,也便是说 block 永远执行不完。死锁就这样发生了。
有时候没有时序性依赖的时候,咱们会用 dispatch_apply
来代替 for loop
。例如咱们下载一组图片:
/** 使用 for loop */
- (void)downloadImages:(NSArray <NSURL *> *)imageURLs {
for (NSURL *imageURL in imageURLs) {
[self downloadImageWithURL:imageURL];
}
}
/** dispatch_apply */
- (void)downloadImages:(NSArray <NSURL *> *)imageURLs {
dispatch_queue_t downloadQueue = dispatch_queue_create("com.bool.download", DISPATCH_QUEUE_CONCURRENT);
dispatch_apply(imageURLs.count, downloadQueue, ^(size_t index) {
NSURL *imageURL = imageURLs[index];
[self downloadImageWithURL:imageURL];
});
}
复制代码
进行替换是须要注意几个问题:
至于原理,就不大篇幅讲了。大概是这个样子:这个方法是同步的,会阻塞当前线程,直到全部的 block 任务都完成。若是提交到并发队列,每一个任务执行顺序是不必定的。
更多时候,咱们执行下载任务,并不但愿阻塞当前线程,这时咱们可使用 dispatch_group
。
当处理批量异步任务时,dispatch_group
是一个很好的选择。针对上面说的下载图片的例子,咱们能够这样作:
- (void)downloadImages:(NSArray <NSURL *> *)imageURLs {
dispatch_group_t taskGroup = dispatch_group_create();
dispatch_queue_t queue = dispatch_queue_create("com.bool.group", DISPATCH_QUEUE_CONCURRENT);
for (NSURL *imageURL in imageURLs) {
dispatch_group_enter(taskGroup);
// 下载方法是异步的
[self downloadImageWithURL:imageURL withQueue:queue completeHandler:^{
dispatch_group_leave(taskGroup);
}];
}
dispatch_group_notify(taskGroup, queue, ^{
// all task finish
});
/** 若是使用这个方法,内部执行异步任务,会当即到 dispatch_group_notify 方法中,由于是异步,系统认为已经执行完了。因此这个方法使用很少。 */
dispatch_group_async(taskGroup, queue, ^{
})
}
复制代码
关于原理方面,和 dispatch_async()
方法相似,前面也提到。这里只说一段代码:
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_continuation_t dc)
{
dispatch_group_enter(dg);
dc->dc_data = dg;
_dispatch_continuation_async(dq, dc);
}
复制代码
这段代码中,调用了 dispatch_group_enter(dg)
方法进行标记,最终都会和 dispatch_async()
走到一样的方法里 _dispatch_continuation_invoke_inline()
。在里面判断类型为 group,执行 task,执行结束后调用 dispatch_group_leave((dispatch_group_t)dou)
,和以前的 enter 对应。
以上是 Dispatch Queues 内容的介绍,咱们平时使用 GCD 的过程当中,60% 都是使用的以上内容。
在 iOS 8 中,Apple 为咱们提供了新的 API,Dispatch Block
相关。虽然以前咱们能够向 dispatch 传递 block 参数,做为任务,可是这里和以前的不同。以前常常说,使用 NSOperation
建立的任务能够 cancel,使用 GCD 不能够。可是在 iOS 8 以后,能够 cancel 任务了。
建立一个 block 并执行。
- (void)dispatchBlockTest {
// 不指定优先级
dispatch_block_t dsBlock = dispatch_block_create(0, ^{
NSLog(@"test block");
});
// 指定优先级
dispatch_block_t dsQosBlock = dispatch_block_create_with_qos_class(0, QOS_CLASS_USER_INITIATED, -1, ^{
NSLog(@"test block");
});
dispatch_async(dispatch_get_main_queue(), dsBlock);
dispatch_async(dispatch_get_main_queue(), dsQosBlock);
// 直接建立并执行
dispatch_block_perform(0, ^{
NSLog(@"test block");
});
复制代码
} ```
阻塞当前任务,等 block 执行完在继续执行。
- (void)dispatchBlockTest {
dispatch_queue_t queue = dispatch_queue_create("com.bool.block", DISPATCH_QUEUE_SERIAL);
dispatch_block_t dsBlock = dispatch_block_create(0, ^{
NSLog(@"test block");
});
dispatch_async(queue, dsBlock);
// 等到 block 执行完
dispatch_block_wait(dsBlock, DISPATCH_TIME_FOREVER);
NSLog(@"block was finished");
}
复制代码
block 执行完后,收到通知,执行其余任务
- (void)dispatchBlockTest {
dispatch_queue_t queue = dispatch_queue_create("com.bool.block", DISPATCH_QUEUE_SERIAL);
dispatch_block_t dsBlock = dispatch_block_create(0, ^{
NSLog(@"test block");
});
dispatch_async(queue, dsBlock);
// block 执行完收到通知
dispatch_block_notify(dsBlock, queue, ^{
NSLog(@"block was finished,do other thing");
});
NSLog(@"execute first");
}
复制代码
对 block 进行 cancel 操做
- (void)dispatchBlockTest {
dispatch_queue_t queue = dispatch_queue_create("com.bool.block", DISPATCH_QUEUE_SERIAL);
dispatch_block_t dsBlock1 = dispatch_block_create(0, ^{
NSLog(@"test block1");
});
dispatch_block_t dsBlock2 = dispatch_block_create(0, ^{
NSLog(@"test block2");
});
dispatch_async(queue, dsBlock1);
dispatch_async(queue, dsBlock2);
// 第二个 block 将会被 cancel,不执行
dispatch_block_cancel(dsBlock2);
}
复制代码
Dispatch Barriers 能够理解为调度屏障,经常使用于多线程并发读写操做。例如:
@interface ViewController ()
@property (nonatomic, strong) dispatch_queue_t imageQueue;
@property (nonatomic, strong) NSMutableArray *imageArray;
@end
@implementation ViewController
- (void)viewDidLoad {
[super viewDidLoad];
self.imageQueue = dispatch_queue_create("com.bool.image", DISPATCH_QUEUE_CONCURRENT);
self.imageArray = [NSMutableArray array];
}
/** 保证写入时不会有其余操做,写完以后到主线程更新 UI */
- (void)addImage:(UIImage *)image {
dispatch_barrier_async(self.imageQueue, ^{
[self.imageArray addObject:image];
dispatch_async(dispatch_get_main_queue(), ^{
// update UI
});
});
}
/** 这里的 dispatch_sync 起到了 lock 的做用 */
- (NSArray <UIImage *> *)images {
__block NSArray *imagesArray = nil;
dispatch_sync(self.imageQueue, ^{
imagesArray = [self.imageArray mutableCopy];
});
return imagesArray;
}
@end
复制代码
转化成图可能好理解一些:
dispatch_barrier_async()
的原理和 dispatch_async()
差很少,只不过设置的 flags 不同:
void
dispatch_barrier_async(dispatch_queue_t dq, dispatch_block_t work)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
// 在 dispatch_async() 中只设置了 DISPATCH_OBJ_CONSUME_BIT
uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_BARRIER_BIT;
_dispatch_continuation_init(dc, dq, work, 0, 0, dc_flags);
_dispatch_continuation_push(dq, dc);
}
复制代码
后面都是 push 到队列中,而后,获取任务时一个死循环,在从队列中获取任务一个一个执行,若是判断 flag 为 barrier,终止循环,则单独执行这个任务。它后面的任务放入一个队列,等它执行完了再开始执行。
DISPATCH_ALWAYS_INLINE
static dispatch_queue_wakeup_target_t
_dispatch_queue_drain(dispatch_queue_t dq, dispatch_invoke_context_t dic,
dispatch_invoke_flags_t flags, uint64_t *owned_ptr, bool serial_drain)
{
...
for (;;) {
...
first_iteration:
dq_state = os_atomic_load(&dq->dq_state, relaxed);
if (unlikely(_dq_state_is_suspended(dq_state))) {
break;
}
if (unlikely(orig_tq != dq->do_targetq)) {
break;
}
if (serial_drain || _dispatch_object_is_barrier(dc)) {
if (!serial_drain && owned != DISPATCH_QUEUE_IN_BARRIER) {
if (!_dispatch_queue_try_upgrade_full_width(dq, owned)) {
goto out_with_no_width;
}
owned = DISPATCH_QUEUE_IN_BARRIER;
}
next_dc = _dispatch_queue_next(dq, dc);
if (_dispatch_object_is_sync_waiter(dc)) {
owned = 0;
dic->dic_deferred = dc;
goto out_with_deferred;
}
} else {
if (owned == DISPATCH_QUEUE_IN_BARRIER) {
// we just ran barrier work items, we have to make their
// effect visible to other sync work items on other threads
// that may start coming in after this point, hence the
// release barrier
os_atomic_xor2o(dq, dq_state, owned, release);
owned = dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
} else if (unlikely(owned == 0)) {
if (_dispatch_object_is_sync_waiter(dc)) {
// sync "readers" don't observe the limit
_dispatch_queue_reserve_sync_width(dq);
} else if (!_dispatch_queue_try_acquire_async(dq)) {
goto out_with_no_width;
}
owned = DISPATCH_QUEUE_WIDTH_INTERVAL;
}
next_dc = _dispatch_queue_next(dq, dc);
if (_dispatch_object_is_sync_waiter(dc)) {
owned -= DISPATCH_QUEUE_WIDTH_INTERVAL;
_dispatch_sync_waiter_redirect_or_wake(dq,
DISPATCH_SYNC_WAITER_NO_UNLOCK, dc);
continue;
}
...
}
复制代码
关于 dispatch_source
咱们使用的少之又少,他是 BSD 系统内核功能的包装,常常用来监测某些事件发生。例如监测断点的使用和取消。[这里][https://developer.apple.com/documentation/dispatch/dispatch_source_type_constants?language=objc] 介绍了能够监测的事件:
例如咱们能够经过下面代码,来监测断点的使用和取消:
@interface ViewController ()
@property (nonatomic, strong) dispatch_source_t signalSource;
@property (nonatomic, assign) dispatch_once_t signalOnceToken;
@end
@implementation ViewController
- (void)viewDidLoad {
dispatch_once(&_signalOnceToken, ^{
dispatch_queue_t queue = dispatch_get_main_queue();
self.signalSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_SIGNAL, SIGSTOP, 0, queue);
if (self.signalSource) {
dispatch_source_set_event_handler(self.signalSource, ^{
// 点击一下断点,再取消断点,便会执行这里。
NSLog(@"debug test");
});
dispatch_resume(self.signalSource);
}
});
}
复制代码
还有 diapatch_after()
就是依赖 dispatch_source()
来实现的。咱们能够本身实现一个相似的定时器:
- (void)customTimer {
dispatch_source_t timerSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, DISPATCH_TARGET_QUEUE_DEFAULT);
dispatch_source_set_timer(timerSource, dispatch_time(DISPATCH_TIME_NOW, 5.0 * NSEC_PER_SEC), 2.0 * NSEC_PER_SEC, 5);
dispatch_source_set_event_handler(timerSource, ^{
NSLog(@"dispatch source timer");
});
self.signalSource = timerSource;
dispatch_resume(self.signalSource);
}
复制代码
使用 dispatch_source
时,大体过程是这样的:咱们建立一个 source,而后加到队列中,并调用 dispatch_resume()
方法,便会从队列中唤起 source,执行对应的 block。下面是一个详细的流程图,咱们结合这张图来讲一下:
建立一个 source 对象,过程和建立 queue 相似,因此后面一些操做,和操做 queue 很相似。
dispatch_source_t
dispatch_source_create(dispatch_source_type_t dst, uintptr_t handle,
unsigned long mask, dispatch_queue_t dq)
{
dispatch_source_refs_t dr;
dispatch_source_t ds;
dr = dux_create(dst, handle, mask)._dr;
if (unlikely(!dr)) {
return DISPATCH_BAD_INPUT;
}
// 申请内存空间
ds = _dispatch_object_alloc(DISPATCH_VTABLE(source),
sizeof(struct dispatch_source_s));
// 初始化一个队列,而后配置参数,彻底被当作一个 queue 来处理
_dispatch_queue_init(ds->_as_dq, DQF_LEGACY, 1,
DISPATCH_QUEUE_INACTIVE | DISPATCH_QUEUE_ROLE_INNER);
ds->dq_label = "source";
ds->do_ref_cnt++; // the reference the manager queue holds
ds->ds_refs = dr;
dr->du_owner_wref = _dispatch_ptr2wref(ds);
if (slowpath(!dq)) {
dq = _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, true);
} else {
_dispatch_retain((dispatch_queue_t _Nonnull)dq);
}
ds->do_targetq = dq;
if (dr->du_is_timer && (dr->du_fflags & DISPATCH_TIMER_INTERVAL)) {
_dispatch_source_set_interval(ds, handle);
}
_dispatch_object_debug(ds, "%s", __func__);
return ds;
}
复制代码
设置 event_handler。从源码中看出,用的是 dispatch_continuation_t
进行绑定,和以前绑定 queue 同样,将 block copy 了一份。后面执行的时候,拿出来用。而后将这个任务 push 到队列里。
void
dispatch_source_set_event_handler(dispatch_source_t ds,
dispatch_block_t handler)
{
dispatch_continuation_t dc;
// 这里实际就是在初始化 dispatch_continuation_t
dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, true);
// 通过一顿操做,将任务 push 到队列中。
_dispatch_source_set_handler(ds, DS_EVENT_HANDLER, dc);
}
复制代码
调用 resume 方法,执行 source。通常新建立的都是暂停状态,这里判断是暂停状态,就开始唤起。
void dispatch_resume(dispatch_object_t dou) {
DISPATCH_OBJECT_TFB(_dispatch_objc_resume, dou);
if (dx_vtable(dou._do)->do_suspend) {
dx_vtable(dou._do)->do_resume(dou._do, false);
}
}
复制代码
最后一步,是最核心的异步,唤起任务开始执行。以前的 queue 最终也是走到这样相似的一步,能够看返回类型都是 dispatch_queue_wakeup_target_t
,基本是沿着 queue 的逻辑一路 copy 过来。这个方法,通过一系列判断,保证全部的 source 都会在正确的队列上面执行;若是队列和任务不对应,那么就返回正确的队列,从新派发让任务在正确的队列上执行。
DISPATCH_ALWAYS_INLINE
static inline dispatch_queue_wakeup_target_t
_dispatch_source_invoke2(dispatch_object_t dou, dispatch_invoke_context_t dic,
dispatch_invoke_flags_t flags, uint64_t *owned)
{
dispatch_source_t ds = dou._ds;
dispatch_queue_wakeup_target_t retq = DISPATCH_QUEUE_WAKEUP_NONE;
// 获取当前 queue
dispatch_queue_t dq = _dispatch_queue_get_current();
dispatch_source_refs_t dr = ds->ds_refs;
dispatch_queue_flags_t dqf;
...
// timer 事件处理
if (dr->du_is_timer &&
os_atomic_load2o(ds, ds_timer_refs->dt_pending_config, relaxed)) {
dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
if (!(dqf & (DSF_CANCELED | DQF_RELEASED))) {
// timer has to be configured on the kevent queue
if (dq != dkq) {
return dkq;
}
_dispatch_source_timer_configure(ds);
}
}
// 是否安装 source
if (!ds->ds_is_installed) {
// The source needs to be installed on the kevent queue.
if (dq != dkq) {
return dkq;
}
_dispatch_source_install(ds, _dispatch_get_wlh(),
_dispatch_get_basepri());
}
// 是否暂停,由于以前判断过,通常不可能走到这里
if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(ds))) {
// Source suspended by an item drained from the source queue.
return ds->do_targetq;
}
// 是否在
if (_dispatch_source_get_registration_handler(dr)) {
// The source has been registered and the registration handler needs
// to be delivered on the target queue.
if (dq != ds->do_targetq) {
return ds->do_targetq;
}
// clears ds_registration_handler
_dispatch_source_registration_callout(ds, dq, flags);
}
...
if (!(dqf & (DSF_CANCELED | DQF_RELEASED)) &&
os_atomic_load2o(ds, ds_pending_data, relaxed)) {
// 有些 source 还有未完成的数据,须要经过目标队列上的回调进行传送;有些 source 则须要切换到管理队列上去。
if (dq == ds->do_targetq) {
_dispatch_source_latch_and_call(ds, dq, flags);
dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
prevent_starvation = dq->do_targetq ||
!(dq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT);
if (prevent_starvation &&
os_atomic_load2o(ds, ds_pending_data, relaxed)) {
retq = ds->do_targetq;
}
} else {
return ds->do_targetq;
}
}
if ((dqf & (DSF_CANCELED | DQF_RELEASED)) && !(dqf & DSF_DEFERRED_DELETE)) {
// 已经被取消的 source 须要从管理队列中卸载。卸载完成后,取消的 handler 须要交付到目标队列。
if (!(dqf & DSF_DELETED)) {
if (dr->du_is_timer && !(dqf & DSF_ARMED)) {
// timers can cheat if not armed because there's nothing left
// to do on the manager queue and unregistration can happen
// on the regular target queue
} else if (dq != dkq) {
return dkq;
}
_dispatch_source_refs_unregister(ds, 0);
dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
if (unlikely(dqf & DSF_DEFERRED_DELETE)) {
if (!(dqf & DSF_ARMED)) {
goto unregister_event;
}
// we need to wait for the EV_DELETE
return retq ? retq : DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT;
}
}
if (dq != ds->do_targetq && (_dispatch_source_get_event_handler(dr) ||
_dispatch_source_get_cancel_handler(dr) ||
_dispatch_source_get_registration_handler(dr))) {
retq = ds->do_targetq;
} else {
_dispatch_source_cancel_callout(ds, dq, flags);
dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
}
prevent_starvation = false;
}
if (_dispatch_unote_needs_rearm(dr) &&
!(dqf & (DSF_ARMED|DSF_DELETED|DSF_CANCELED|DQF_RELEASED))) {
// 须要在管理队列进行 rearm 的
if (dq != dkq) {
return dkq;
}
if (unlikely(dqf & DSF_DEFERRED_DELETE)) {
// 若是咱们能够直接注销,不须要 resume
goto unregister_event;
}
if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(ds))) {
// 若是 source 已经暂停,不须要在管理队列 rearm
return ds->do_targetq;
}
if (prevent_starvation && dr->du_wlh == DISPATCH_WLH_ANON) {
return ds->do_targetq;
}
if (unlikely(!_dispatch_source_refs_resume(ds))) {
goto unregister_event;
}
if (!prevent_starvation && _dispatch_wlh_should_poll_unote(dr)) {
_dispatch_event_loop_drain(KEVENT_FLAG_IMMEDIATE);
}
}
return retq;
}
复制代码
还有一些其余的方法,这里就不介绍了。有兴趣的能够看源码,太多了。
咱们可使用 Dispatch I/O 快速读取一些文件,例如这样 :
- (void)readFile {
NSString *filePath = @"/.../青花瓷.m";
dispatch_queue_t queue = dispatch_queue_create("com.bool.readfile", DISPATCH_QUEUE_SERIAL);
dispatch_fd_t fd = open(filePath.UTF8String, O_RDONLY,0);
dispatch_io_t fileChannel = dispatch_io_create(DISPATCH_IO_STREAM, fd, queue, ^(int error) {
close(fd);
});
NSMutableData *fileData = [NSMutableData new];
dispatch_io_set_low_water(fileChannel, SIZE_MAX);
dispatch_io_read(fileChannel, 0, SIZE_MAX, queue, ^(bool done, dispatch_data_t _Nullable data, int error) {
if (error == 0 && dispatch_data_get_size(data) > 0) {
[fileData appendData:(NSData *)data];
}
if (done) {
NSString *str = [[NSString alloc] initWithData:fileData encoding:NSUTF8StringEncoding];
NSLog(@"read file completed, string is :\n %@",str);
}
});
}
复制代码
输出结果:
ConcurrencyTest[41479:5357296] read file completed, string is :
天青色等烟雨 而我在等你
月色被打捞起 晕开告终局
复制代码
若是读取大文件,咱们能够进行切片读取,将文件分割多个片,放在异步线程中并发执行,这样会比较快一些。
关于源码,简单看了一下,调度逻辑和以前的任务相似。而后读写操做,是调用的一些底层接口实现,这里就偷懒一下不详细说了。使用 Dispatch I/O,多数状况下是为了并发读取一个大文件,提升读取速度。
上面已经讲了概览图中的大部分东西,还有一些未讲述,这里简单描述一下:
dispatch_object。GCD 用 C 函数实现的对象,不能经过集成 dispatch 类实现,也不能用 alloc 方法初始化。GCD 针对 dispatch_object 提供了一些接口,咱们使用这些接口能够处理一些内存事件、取消和暂停操做、定义上下文和处理日志相关工做。dispatch_object 必需要手动管理内存,不遵循垃圾回收机制。
dispatch_time。在 GCD 中使用的时间对象,能够建立自定义时间,也可使用 DISPATCH_TIME_NOW
、DISPATCH_TIME_FOREVER
这两个系统给出的时间。
以上为 GCD 相关知识,此次使用的源码版本为最新版本 —— 912.30.4.tar.gz,和以前看的版本代码差距很大,由于代码量的增长,新版本代码比较乱,不过基本原理仍是差很少的。曾经我一度认为,最上面的是最新版本...
Operations 也是咱们在并发编程中经常使用的一套 API,根据 官方文档 划分的结构以下图:
其中 NSBlockOperation
和 NSInvocationOperation
是基于 NSOperation
的子类化实现。相对于 GCD,Operations 的原理要稍微好理解一些,下面就将用法和原理介绍一下。
每个 operation 能够认为是一个 task。NSOperation
本事是一个抽象类,使用前需子类化。幸运的是,Apple 为咱们实现了两个子类:NSInvocationOperation
、NSBlockOperation
。咱们也能够本身去定义一个 operation。下面介绍一下基本使用:
建立一个 NSInvocationOperation
对象并在当前线程执行.
NSInvocationOperation *invocationOperation = [[NSInvocationOperation alloc] initWithTarget:self selector:@selector(log) object:nil];
[invocationOperation start];
复制代码
建立一个 NSBlockOperation
对象并执行 (每一个 block 不必定会在当前线程,也不必定在同一线程执行).
NSBlockOperation *blockOpeartion = [NSBlockOperation blockOperationWithBlock:^{
NSLog(@"block operation");
}];
// 能够添加多个 block
[blockOpeartion addExecutionBlock:^{
NSLog(@"other block opeartion");
}];
[blockOpeartion start];
复制代码
自定义一个 Operation。当咱们不须要操做状态的时候,只须要实现 main()
方法便可。须要操做状态的后面再说.
@interface BLOpeartion : NSOperation
@end
@implementation BLOpeartion
- (void)main {
NSLog(@"BLOperation main method");
}
@end
- (void)viewDidLoad {
[super viewDidLoad];
BLOperation *blOperation = [BLOperation new];
[blOperation start];
}
复制代码
每一个 operation 之间设置依赖.
NSBlockOperation *blockOpeartion1 = [NSBlockOperation blockOperationWithBlock:^{
NSLog(@"block operation1");
}];
NSBlockOperation *blockOpeartion2 = [NSBlockOperation blockOperationWithBlock:^{
NSLog(@"block operation2");
}];
// 2 须要在 1 执行完以后再执行。
[blockOpeartion2 addDependency:blockOpeartion1];
复制代码
与队列相关的使用,后面再说.
NSOperation
内置了一个强大的状态机,一个 operation 从初始化到执行完毕这一辈子命周期,对应了各类状态。下面是在 WWDC 2015 Advanced NSOperations 出现的一张图:
operation 一开始是 Pending 状态,表明即将进入 Ready;进入 Ready 以后,表明任务能够执行;而后进入 Executing 状态;最后执行完成,进入 Finished 状态。过程当中,除了 Finished 状态,在其余几个状态中均可以进行 Cancelled。
NSOperation
并无开源。可是 swift 开源了,在 swift 中它叫 Opeartion
,咱们能够在 这里 找到他的源码。我这里 copy 了一份:
open class Operation : NSObject {
let lock = NSLock()
internal weak var _queue: OperationQueue?
// 默认几个状态都是 false
internal var _cancelled = false
internal var _executing = false
internal var _finished = false
internal var _ready = false
// 用一个集合来保存依赖它的对象
internal var _dependencies = Set<Operation>()
// 初始化一些 dispatch_group 对象,来管理 operation 以及其依赖对象的 执行。
#if DEPLOYMENT_ENABLE_LIBDISPATCH
internal var _group = DispatchGroup()
internal var _depGroup = DispatchGroup()
internal var _groups = [DispatchGroup]()
#endif
public override init() {
super.init()
#if DEPLOYMENT_ENABLE_LIBDISPATCH
_group.enter()
#endif
}
internal func _leaveGroups() {
// assumes lock is taken
#if DEPLOYMENT_ENABLE_LIBDISPATCH
_groups.forEach() { $0.leave() }
_groups.removeAll()
_group.leave()
#endif
}
// 默认实现的 start 方法中,执行 main 方法,线程安全,下同。执行先后设置 _executing。
open func start() {
if !isCancelled {
lock.lock()
_executing = true
lock.unlock()
main()
lock.lock()
_executing = false
lock.unlock()
}
finish()
}
// 默认实现的 finish 方法中,标记 _finished 状态。
internal func finish() {
lock.lock()
_finished = true
_leaveGroups()
lock.unlock()
if let queue = _queue {
queue._operationFinished(self)
}
...
}
// main 方法默认空,须要子类去实现。
open func main() { }
// 调用 cancel 方法后,只是标记状态,具体操做在 main 中,调用 cancel 后也被认为是 finish。
open func cancel() {
lock.lock()
_cancelled = true
lock.unlock()
}
/** 几个状态的 get 方法,省略 */
...
// 是否为异步任务,默认为 false。这个方法在 OC 中永远不会去实现
open var isAsynchronous: Bool {
return false
}
// 设置依赖,即将 operation 放到集合中
open func addDependency(_ op: Operation) {
lock.lock()
_dependencies.insert(op)
op.lock.lock()
#if DEPLOYMENT_ENABLE_LIBDISPATCH
_depGroup.enter()
op._groups.append(_depGroup)
#endif
op.lock.unlock()
lock.unlock()
}
...
// 默认队列优先级为 normal
open var queuePriority: QueuePriority = .normal
public var completionBlock: (() -> Void)?
open func waitUntilFinished() {
#if DEPLOYMENT_ENABLE_LIBDISPATCH
_group.wait()
#endif
}
// 线程优先级
open var threadPriority: Double = 0.5
/// - Note: Quality of service is not directly supported here since there are not qos class promotions available outside of darwin targets.
open var qualityOfService: QualityOfService = .default
open var name: String?
internal func _waitUntilReady() {
#if DEPLOYMENT_ENABLE_LIBDISPATCH
_depGroup.wait()
#endif
_ready = true
}
}
复制代码
代码很简单,具体过程能够直接看注释,就不另说了。除此以外,咱们能够看出,Operation
总不少方法造做都加了锁,说明这个类是线程安全的,当咱们对 NSOperation
进行子类化时,重写方法要注意线程暗转问题。
NSOperation
的不少花式操做,都是结合着 NSOperationQueue
进行的。咱们在使用的时候,也是二者结合着使用。下面对其进行详细分析。
start
方法去执行,operation 会自动执行。suspended
属性来暂停或者启动还未执行的 operation。-[cancelAllOperations]
方法来取消队列中的任务。mainQueue
方法来回到主队列(主线程);能够经过 currentQueue
方法来获取当前队列。使用例子:
- (void)testOperationQueue {
NSOperationQueue *operationQueue = [NSOperationQueue new];
// 设置最大并发数量为 3
[operationQueue setMaxConcurrentOperationCount:3];
NSInvocationOperation *invocationOpeartion = [[NSInvocationOperation alloc] initWithTarget:self selector:@selector(log) object:nil];
[operationQueue addOperation:invocationOpeartion];
[operationQueue addOperationWithBlock:^{
NSLog(@"block operation");
// 回到主线程执行任务
[[NSOperationQueue mainQueue] addOperationWithBlock:^{
NSLog(@"execute in main thread");
}];
}];
// 暂停还未开始执行的任务
operationQueue.suspended = YES;
// 取消全部任务
[operationQueue cancelAllOperations];
}
复制代码
有一个问题要特别说明一下:
NSOperationQueue
和 GCD 中的队列不一样。GCD 中的队列是遵循 FIFO 原则,先加入队列的先执行;NSOperationQueue
中的任务,根据谁先进入到 Ready
状态,谁先执行。若是有多个任务同时达到 Ready
状态,那么根据优先级来执行。
例以下面的任务中,4 先到达了 Ready
状态,4 先执行。并非按照 1,2,3... 顺序执行。
咱们依然是在 swift 中找到相关源码,而后来进行分析:
// 默认最大并发数量为 int 最大值
public extension OperationQueue {
public static let defaultMaxConcurrentOperationCount: Int = Int.max
}
// 使用一个 list 来保存各个优先级的 operation。调用其中的方法对 operation 进行增删等操做。
internal struct _OperationList {
var veryLow = [Operation]()
var low = [Operation]()
var normal = [Operation]()
var high = [Operation]()
var veryHigh = [Operation]()
var all = [Operation]()
mutating func insert(_ operation: Operation) { ... }
mutating func remove(_ operation: Operation) { ... }
mutating func dequeue() -> Operation? { ... }
var count: Int {
return all.count
}
func map<T>(_ transform: (Operation) throws -> T) rethrows -> [T] {
return try all.map(transform)
}
}
open class OperationQueue: NSObject {
...
// 使用一个信号量的来控制并发数量
var __concurrencyGate: DispatchSemaphore?
var __underlyingQueue: DispatchQueue? {
didSet {
let key = OperationQueue.OperationQueueKey
oldValue?.setSpecific(key: key, value: nil)
__underlyingQueue?.setSpecific(key: key, value: Unmanaged.passUnretained(self))
}
}
...
internal var _underlyingQueue: DispatchQueue {
lock.lock()
if let queue = __underlyingQueue {
lock.unlock()
return queue
} else {
...
// 信号量的值根据最大并发数量来肯定。每当执行一个任务,wait 信号量减一,signal 信号量加一,当信号量为0时,一直等待,直接大于 0 才会正常执行。
if maxConcurrentOperationCount == 1 {
attr = []
__concurrencyGate = DispatchSemaphore(value: 1)
} else {
attr = .concurrent
if maxConcurrentOperationCount != OperationQueue.defaultMaxConcurrentOperationCount {
__concurrencyGate = DispatchSemaphore(value:maxConcurrentOperationCount)
}
}
let queue = DispatchQueue(label: effectiveName, attributes: attr)
if _suspended {
queue.suspend()
}
__underlyingQueue = queue
lock.unlock()
return queue
}
}
#endif
...
// 出队列,每一个任务执行时拿出队列执行
internal func _dequeueOperation() -> Operation? {
lock.lock()
let op = _operations.dequeue()
lock.unlock()
return op
}
open func addOperation(_ op: Operation) {
addOperations([op], waitUntilFinished: false)
}
// 主要执行方法。先判断 operation 是否 ready,处于 ready 后判断是否 cancel。没有 cancel 则执行。
internal func _runOperation() {
if let op = _dequeueOperation() {
if !op.isCancelled {
op._waitUntilReady()
if !op.isCancelled {
op.start()
}
}
}
}
// 将任务加到队列中。若是不指定任务优先级,执行的还快一些。不然须要对不一样优先级进行划分,而后执行
open func addOperations(_ ops: [Operation], waitUntilFinished wait: Bool) {
#if DEPLOYMENT_ENABLE_LIBDISPATCH
var waitGroup: DispatchGroup?
if wait {
waitGroup = DispatchGroup()
}
#endif
lock.lock()
// 将 operation 依依加入 list,根据优先级保存到不一样数组中
ops.forEach { (operation: Operation) -> Void in
operation._queue = self
_operations.insert(operation)
}
lock.unlock()
// 遍历执行,使用了 diapatch group,控制 enter 和 leave
ops.forEach { (operation: Operation) -> Void in
#if DEPLOYMENT_ENABLE_LIBDISPATCH
if let group = waitGroup {
group.enter()
}
// 经过信号量来控制并发数量
let block = DispatchWorkItem(flags: .enforceQoS) { () -> Void in
if let sema = self._concurrencyGate {
sema.wait()
self._runOperation()
sema.signal()
} else {
self._runOperation()
}
if let group = waitGroup {
group.leave()
}
}
_underlyingQueue.async(group: queueGroup, execute: block)
#endif
}
#if DEPLOYMENT_ENABLE_LIBDISPATCH
if let group = waitGroup {
group.wait()
}
#endif
}
internal func _operationFinished(_ operation: Operation) { ... }
open func addOperation(_ block: @escaping () -> Swift.Void) { ... }
// 返回值不必定准确
open var operations: [Operation] { ... }
// 返回值不必定准确
open var operationCount: Int { ... }
open var maxConcurrentOperationCount: Int = OperationQueue.defaultMaxConcurrentOperationCount
// suppend 属性的 get & set 方法。默认不暂停
internal var _suspended = false
open var isSuspended: Bool { ... }
...
// operation 在获取系统资源时的优先级
open var qualityOfService: QualityOfService = .default
// 依次调用每一个 operation 的 cancel 方法
open func cancelAllOperations() { ... }
open func waitUntilAllOperationsAreFinished() {
#if DEPLOYMENT_ENABLE_LIBDISPATCH
queueGroup.wait()
#endif
}
static let OperationQueueKey = DispatchSpecificKey<Unmanaged<OperationQueue>>()
// 经过使用 GCD 中的 getSpecific 方法获取当前队列
open class var current: OperationQueue? {
#if DEPLOYMENT_ENABLE_LIBDISPATCH
guard let specific = DispatchQueue.getSpecific(key: OperationQueue.OperationQueueKey) else {
if _CFIsMainThread() {
return OperationQueue.main
} else {
return nil
}
}
return specific.takeUnretainedValue()
#else
return nil
#endif
}
// 定义主队列,最大并发数量为 1,获取主队列时将这个值返回
private static let _main = OperationQueue(_queue: .main, maxConcurrentOperations: 1)
open class var main: OperationQueue { ... }
}
复制代码
代码很长,可是简单,能够直接经过注释来理解了。这里屡一下:
Ready
状态且没被 Cancel
的依次执行。concurrencyGate
这个信号量来控制并发数量。每当执行一个任务,wait 信号量减一,signal 信号量加一,当信号量为0时,一直等待,直接大于 0 才会正常执行。以前说了自定义普通的 NSOperation
,只须要重写 main
方法就能够了,可是由于咱们没有处理并发状况,线程执行结束操做,KVO 机制,因此这种普通的不建议用来作并发任务。下面讲一下如何自定义并行的 NSOperation
。
必需要实现的一些方法:
start
方法,在你想要执行的线程中调用此方法。不须要调用 super 方法。main
方法,在 start
方法中调用,任务主体。isExecuting
方法,是否正在执行,要实现 KVO 机制。isConcurrent
方法,已经弃用,由 isAsynchronous
来代替。isAsynchronous
方法,在并发任务中,须要返回 YES。@interface BLOperation ()
@property (nonatomic, assign) BOOL executing;
@property (nonatomic, assign) BOOL finished;
@end
@implementation BLOperation
@synthesize executing;
@synthesize finished;
- (instancetype)init {
self = [super init];
if (self) {
executing = NO;
finished = NO;
}
return self;
}
- (void)start {
if ([self isCancelled]) {
[self willChangeValueForKey:@"isFinished"];
finished = YES;
[self didChangeValueForKey:@"isFinished"];
return;
}
[self willChangeValueForKey:@"isExecuting"];
[NSThread detachNewThreadSelector:@selector(main) toTarget:self withObject:nil];
executing = YES;
[self didChangeValueForKey:@"isExecuting"];
}
- (void)main {
NSLog(@"main begin");
@try {
@autoreleasepool {
NSLog(@"custom operation");
NSLog(@"currentThread = %@", [NSThread currentThread]);
NSLog(@"mainThread = %@", [NSThread mainThread]);
[self willChangeValueForKey:@"isFinished"];
[self willChangeValueForKey:@"isExecuting"];
executing = NO;
finished = YES;
[self didChangeValueForKey:@"isExecuting"];
[self didChangeValueForKey:@"isFinished"];
}
} @catch (NSException *exception) {
NSLog(@"exception is %@", exception);
}
NSLog(@"main end");
}
- (BOOL)isExecuting {
return executing;
}
- (BOOL)isFinished {
return finished;
}
- (BOOL)isAsynchronous {
return YES;
}
@end
复制代码
关于 NSBlockOpeartion
,主要实现了 main
方法,而后用一个数组保存加进来的其余 block,源码以下:
open class BlockOperation: Operation {
typealias ExecutionBlock = () -> Void
internal var _block: () -> Void
internal var _executionBlocks = [ExecutionBlock]()
public init(block: @escaping () -> Void) {
_block = block
}
override open func main() {
lock.lock()
let block = _block
let executionBlocks = _executionBlocks
lock.unlock()
block()
executionBlocks.forEach { $0() }
}
open func addExecutionBlock(_ block: @escaping () -> Void) {
lock.lock()
_executionBlocks.append(block)
lock.unlock()
}
open var executionBlocks: [() -> Void] {
lock.lock()
let blocks = _executionBlocks
lock.unlock()
return blocks
}
}
复制代码
关于 NSOperation
的相关东西,到此结束。
相对于 API 的使用和基本原理的了解,我认为最重要的仍是这一部分。毕竟咱们仍是要拿这些东西来开发的。并发编程中有不少坑,这里简单介绍一些。
咱们都知道,NSNotification
在哪一个线程 post,最终就会在哪一个线程执行。若是咱们不是在主线程 post 的,可是却在主线程接收的,并且咱们指望 selector 在主线程执行。这时候咱们须要注意下,在 selector 须要 dispatch 到主线程才能够。固然你也可使用 addObserverForName:object:queue:usingBlock:
来指定执行 block 的 queue。
@implementation BLPostNotification
- (void)postNotification {
dispatch_queue_t queue = dispatch_queue_create("com.bool.post.notification", DISPATCH_QUEUE_SERIAL);
dispatch_async(queue, ^{
// 从非主线程发送通知 (通知名字最好定义成一个常量)
[[NSNotificationCenter defaultCenter] postNotificationName:@"downloadImage" object:nil];
});
}
@end
@implementation ImageViewController
- (void)viewDidLoad {
[super viewDidLoad];
[[NSNotificationCenter defaultCenter] addObserver:self selector:@selector(show) name:@"downloadImage" object:nil];
}
- (void)showImage {
// 须要 dispatch 到主线程更新 UI
dispatch_async(dispatch_get_main_queue(), ^{
// update UI
});
}
@end
复制代码
使用 NSTimer
时,在哪一个线程生成的 timer,就在哪一个线程销毁,不然会有意想不到的结果。官方这样描述的:
However, for a repeating timer, you must invalidate the timer object yourself by calling its invalidate method. Calling this method requests the removal of the timer from the current run loop; as a result, you should always call the invalidate method from the same thread on which the timer was installed.
@interface BLTimerTest ()
@property (nonatomic, strong) dispatch_queue_t queue;
@property (nonatomic, strong) NSTimer *timer;
@end
@implementation BLTimerTest
- (instancetype)init {
self = [super init];
if (self) {
_queue = dispatch_queue_create("com.bool.timer.test", DISPATCH_QUEUE_SERIAL);
}
return self;
}
- (void)installTimer {
dispatch_async(self.queue, ^{
self.timer = [NSTimer scheduledTimerWithTimeInterval:3.0f repeats:YES block:^(NSTimer * _Nonnull timer) {
NSLog(@"test timer");
}];
});
}
- (void)clearTimer {
dispatch_async(self.queue, ^{
if ([self.timer isValid]) {
[self.timer invalidate];
self.timer = nil;
}
});
}
@end
复制代码
在开发中,咱们常用 dispatch_once
,可是递归调用会形成死锁。例以下面这样:
- (void)dispatchOnceTest {
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
[self dispatchOnceTest];
});
}
复制代码
至于为何会死锁,上文介绍 Dispatch Once 的时候已经说明了,这里就很少作介绍了。提醒一下使用的时候要注意,不要形成递归调用。
在使用 dispatch_group
的时候,dispatch_group_enter(taskGroup)
和 dispatch_group_leave(taskGroup)
必定要成对,不然也会出现崩溃。大多数状况下咱们都会注意,可是有时候可能会疏忽。例如多层 for loop 时 :
- (void)testDispatchGroup {
NSString *path = @"";
NSFileManager *fileManager = [NSFileManager defaultManager];
NSArray *folderList = [fileManager contentsOfDirectoryAtPath:path error:nil];
dispatch_group_t taskGroup = dispatch_group_create();
for (NSString *folderName in folderList) {
dispatch_group_enter(taskGroup);
NSString *folderPath = [@"path" stringByAppendingPathComponent:folderName];
NSArray *fileList = [fileManager contentsOfDirectoryAtPath:folderPath error:nil];
for (NSString *fileName in fileList) {
dispatch_async(_queue, ^{
// 异步任务
dispatch_group_leave(taskGroup);
});
}
}
}
复制代码
上面的 dispatch_group_enter(taskGroup)
在第一层 for loop 中,而 dispatch_group_leave(taskGroup)
在第二层 for loop 中,二者的关系是一对多,很容形成崩溃。有时候嵌套层级太多,很容易忽略这个问题。
关于 iOS 并发编程,就总结到这里。后面若是有一些 best practices 我会更新进来。另外,由于文章比较长,可能会出现一个错误,欢迎指正,我会对此加以修改。