什么是 GCD
?GCD(Grand Central Dispatch)
是异步执行任务的技术之一。咱们只须要将定义的任务追加到适当的 Dispatch Queue
中,GCD
就能帮咱们生成必要的线程并执行咱们的任务并且不须要编写任何线程管理代码。所以使用 GCD
是 very easy
的,首先建立队列,而后将任务添加到队列,最后指定执行任务函数,搞定收工。算法
// 建立同步队列
dispatch_queue_t queue = dispatch_queue_create("cc", DISPATCH_QUEUE_SERIAL);
// 添加任务
dispatch_block_t block = ^{
NSLog(@"hello word");
};
// 指定执行任务函数,依赖队列执行
dispatch_async(queue, block);
复制代码
可是 GCD
又是否真的如此简单呢?有兴趣的话就一块儿去 GCD
的源码看看呗,看看又不花钱。swift
首先,建立两个常见的队列,串行队列和并发队列,而后分别 po
一下里面都有什么东东,再顺便把主队列和全局队列分别一块儿给看了,先对队列里面的东西有一丢丢的印象就能够了。数组
width
和串行队列的
width
是同样的,其余的也有不少相同的,难怪说主队列是特殊的串行队列。可是并发队列和全局队列的
width
却相差 1,有点奇怪。接下来咱们就到源码中一探究竟吧。在此附上
libdispatch源码连接,但愿你们能一块儿开开心心看源码,本文使用的是
libdispatch-1008.220.2
版本。
来到源码以后,固然是全局搜索快速定位 dispatch_queue_create
位置,看它到底在里面作了什么坏事,而后发现队列是经过 _dispatch_lane_create_with_target
建立的,常规接口隔离操做。_dispatch_lane_create_with_target
第一句代码就是将咱们的外界传进来区分串行仍是并发的参数传进去建立了一个 dispatch_queue_attr_info_t
类型的结构体,有鬼。bash
dispatch_queue_attr_info_t dqai = _dispatch_queue_attr_to_info(dqa);
复制代码
一查 dispatch_queue_attr_info_t
是一个结构体位域,结构体位域能够经过一些位运算取出咱们想要的内容,过滤掉咱们不想要的数据。并发
typedef struct dispatch_queue_attr_info_s {
dispatch_qos_t dqai_qos : 8;
int dqai_relpri : 8;
uint16_t dqai_overcommit:2;
uint16_t dqai_autorelease_frequency:2;
uint16_t dqai_concurrent:1;
uint16_t dqai_inactive:1;
} dispatch_queue_attr_info_t;
复制代码
接下来咱们来看一下是怎么建立这个结构体的吧。app
dispatch_queue_attr_info_t
_dispatch_queue_attr_to_info(dispatch_queue_attr_t dqa)
{
dispatch_queue_attr_info_t dqai = { };
// 串行队列直接返回空的 dqai 结构体
if (!dqa) return dqai;
#if DISPATCH_VARIANT_STATIC
if (dqa == &_dispatch_queue_attr_concurrent) {
dqai.dqai_concurrent = true;
return dqai;
}
#endif
if (dqa < _dispatch_queue_attrs ||
dqa >= &_dispatch_queue_attrs[DISPATCH_QUEUE_ATTR_COUNT]) {
DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
}
// 苹果的算法
size_t idx = (size_t)(dqa - _dispatch_queue_attrs);
// 并发队列结构体位域的默认配置和赋值
dqai.dqai_inactive = (idx % DISPATCH_QUEUE_ATTR_INACTIVE_COUNT);
idx /= DISPATCH_QUEUE_ATTR_INACTIVE_COUNT;
dqai.dqai_concurrent = !(idx % DISPATCH_QUEUE_ATTR_CONCURRENCY_COUNT);
idx /= DISPATCH_QUEUE_ATTR_CONCURRENCY_COUNT;
dqai.dqai_relpri = -(idx % DISPATCH_QUEUE_ATTR_PRIO_COUNT);
idx /= DISPATCH_QUEUE_ATTR_PRIO_COUNT;
dqai.dqai_qos = idx % DISPATCH_QUEUE_ATTR_QOS_COUNT;
idx /= DISPATCH_QUEUE_ATTR_QOS_COUNT;
dqai.dqai_autorelease_frequency =
idx % DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY_COUNT;
idx /= DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY_COUNT;
dqai.dqai_overcommit = idx % DISPATCH_QUEUE_ATTR_OVERCOMMIT_COUNT;
idx /= DISPATCH_QUEUE_ATTR_OVERCOMMIT_COUNT;
return dqai;
}
复制代码
从上面能够发现串行队列就是一个空的结构体,并发队列会对结构体进行一系列的默认配置和赋值,重点关注 dqai.dqai_concurrent
等一些 属性都是有值的,这也会是咱们下面的源码的一个分水岭。接着继续往下分析,在忽略了一些代码以后,终于发现了重点所在 vtable
,这个应该一个表,而后咱们来看看它都作了些什么。异步
const void *vtable;
if (dqai.dqai_concurrent) {
// 经过 dqai.dqai_concurrent 来区分并发和串行
vtable = DISPATCH_VTABLE(queue_concurrent);
} else {
vtable = DISPATCH_VTABLE(queue_serial);
}
复制代码
建立了 vtable
以后再经过 vtable
开辟内存,生成响应的对象 queue
。async
dispatch_lane_t dq = _dispatch_object_alloc(vtable,
sizeof(struct dispatch_lane_s));
_dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
(dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0));
复制代码
这个 init
构造函数的第三个参数是个三目运算符,若是是并发队列是一个宏定义,串行队列是 1,而后再搜搜这个宏定义发现竟然是 #define DISPATCH_QUEUE_WIDTH_MAX (0x1000ull - 2)
,妈耶,这不就是上面咱们打印串行队列和并发队列的 width
的值嘛!原来就是在这初始化的。那为啥是减 2 而不是减 1 呢?减 1 就是咱们的全局并发队列!函数
// 验证
#define DISPATCH_QUEUE_WIDTH_POOL (0x1000ull - 1)
struct dispatch_queue_global_s _dispatch_mgr_root_queue = {
DISPATCH_GLOBAL_OBJECT_HEADER(queue_global),
.dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE,
.do_ctxt = &_dispatch_mgr_root_queue_pthread_context,
.dq_label = "com.apple.root.libdispatch-manager",
.dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL),
.dq_priority = DISPATCH_PRIORITY_FLAG_MANAGER |
DISPATCH_PRIORITY_SATURATED_OVERRIDE,
.dq_serialnum = 3,
.dgq_thread_pool_size = 1,
};
复制代码
好,接着往下看,又找到了 dq->do_targetq = tq;
,这个 tq
又是在哪赋值的呢?我在上面找到了赋值代码。oop
#define DISPATCH_QOS_UNSPECIFIED ((dispatch_qos_t)0)
#define DISPATCH_QOS_DEFAULT ((dispatch_qos_t)4)
// Serial queues default to overcommit!
// 若是是并发 overcommit = _dispatch_queue_attr_overcommit_disabled
// 若是是串行 overcommit = _dispatch_queue_attr_overcommit_enabled
overcommit = dqai.dqai_concurrent ?
_dispatch_queue_attr_overcommit_disabled :
_dispatch_queue_attr_overcommit_enabled;
if (!tq) {
tq = _dispatch_get_root_queue(
qos == DISPATCH_QOS_UNSPECIFIED ? DISPATCH_QOS_DEFAULT : qos, // 4
overcommit == _dispatch_queue_attr_overcommit_enabled)->_as_dq; // 0 1
}
static inline dispatch_queue_global_t
_dispatch_get_root_queue(dispatch_qos_t qos, bool overcommit)
{
if (unlikely(qos < DISPATCH_QOS_MIN || qos > DISPATCH_QOS_MAX)) {
DISPATCH_CLIENT_CRASH(qos, "Corrupted priority");
}
// qos 为 4,4-1= 3
// 2*3 + 0或者1 = 6/7
// 而后再去数组 _dispatch_root_queues 里取数组的 6 或者 7 的下标指针地址
return &_dispatch_root_queues[2 * (qos - 1) + overcommit];
}
复制代码
因为 tq
的值是经过 _dispatch_root_queues
数组取出来的,直接到数组里面看就一目了然了。我就只列举了串行队列、并发队列(全局和并发是同样的)和主队列的。由此能够发现 tq
就是 dq_label
的值,也就是外面队列 target
的值。
_DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.utility-qos.overcommit",
.dq_serialnum = 9,
),
_DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT, DISPATCH_PRIORITY_FLAG_FALLBACK,
.dq_label = "com.apple.root.default-qos",
.dq_serialnum = 10,
),
_DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT,
DISPATCH_PRIORITY_FLAG_FALLBACK | DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.default-qos.overcommit",
.dq_serialnum = 11,
),
复制代码
既然串行队列和并发队列的 target
信息是从 _dispatch_root_queues
结构体数组取出来的,那么 _dispatch_root_queues
又是在哪建立的呢?咱们来到最早初始化的 libdispatcdispatch_queue_createh_init
里的查找,最终在 _dispatch_introspection_init
找到了一些代码。
for
循环,调用
_dispatch_trace_queue_create
,再取出
_dispatch_root_queues
里的地址指针一个一个建立出来的。
alloc
和 init
里面的源码我都发现了 dispatch_object_t
这个类型,而后一搜竟然是一个联合体,这个就有意思了,经过联合体同一时间只能一个有值的特性给函数带来了多态性。typedef union {
struct _os_object_s *_os_obj;
struct dispatch_object_s *_do;
struct dispatch_queue_s *_dq;
struct dispatch_queue_attr_s *_dqa;
struct dispatch_group_s *_dg;
struct dispatch_source_s *_ds;
struct dispatch_mach_s *_dm;
struct dispatch_mach_msg_s *_dmsg;
struct dispatch_semaphore_s *_dsema;
struct dispatch_data_s *_ddata;
struct dispatch_io_s *_dchannel;
} dispatch_object_t DISPATCH_TRANSPARENT_UNION;
复制代码
到此,GCD
的大门就已经打开,接下来咱们来点深刻一点的东西,也让你们不枉此行。
当咱们了解队列的建立以后,添加任务就是一个 block
,block
参数的类型是 dispatch_block_t
,它是一个没有参数,没有返回值的 block
:typedef void (^dispatch_block_t)(void);
没什么太多的好研究的,接下来就来看看执行任务的函数都在底层作了些什么吧。接下来的内容会比较的干巴巴,可是看的越深,对本身帮助会更大。
执行任务的函数分为两种,同步和异步函数(即同步线程和异步线程)。
dispatch_sync
,必须等待当前语句执行完毕,才会执行下一条语 句,不会开启线程,在当前执行任务。dispatch_async
。不用等待当前语句执行完毕,就能够执行下一条语句,会开启线程执行任务。在分析同步函数源码以前,先提出两个疑问,同步是怎么执行的呢?死锁是在同步的状况下形成的,那底层源码又是怎么样的呢?带着这两个疑问点咱们接着分析dispatch_sync
源码。 咱们来到 dispatch_sync
函数实现:
void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
uintptr_t dc_flags = DC_FLAG_BLOCK;
if (unlikely(_dispatch_block_has_private_data(work))) {
return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
}
_dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}
复制代码
dispatch_sync
里的 unlikely
的意思是基本上不会走,而后就来到 _dispatch_sync_f
函数,_dispatch_sync_f
的第三个参数是将 block
包装了一下,具体包装以下:
#define _dispatch_Block_invoke(bb) \
((dispatch_function_t)((struct Block_layout *)bb)->invoke)
复制代码
省略各类分支以后就会来到 _dispatch_sync_f_inline
函数,实现以下:
static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
// 串行就会走这下面
if (likely(dq->dq_width == 1)) {
return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
}
// ... 下面代码暂时省略
}
复制代码
常常层层难关,发现串行队列好像就是调用了 _dispatch_barrier_sync_f
函数,看到这个函数是否是想起了一些什么,这是否是就是栅栏函数,那分析完同步以后,栅栏也就迎刃而解了,爽歪歪。继续往里分析,最终来到 _dispatch_barrier_sync_f_inline
:
static inline void
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
// 获取线程ID
dispatch_tid tid = _dispatch_tid_self();
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
// 死锁
if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
DC_FLAG_BARRIER | dc_flags);
}
if (unlikely(dl->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func,
DC_FLAG_BARRIER | dc_flags);
}
_dispatch_introspection_sync_begin(dl);
_dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
}
复制代码
在这个函数里会先获取线程 id
,由于队列须要绑定到线程而后依赖执行,而死锁的缘由在于同步线程里的任务出现你等我,我等你的现象,因此只有 _dispatch_queue_try_acquire_barrier_sync
用到了线程 id
,在里面果不其然发现了秘密:
static inline bool
_dispatch_queue_try_acquire_barrier_sync_and_suspend(dispatch_lane_t dq,
uint32_t tid, uint64_t suspend_count)
{
uint64_t init = DISPATCH_QUEUE_STATE_INIT_VALUE(dq->dq_width);
uint64_t value = DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER |
_dispatch_lock_value_from_tid(tid) |
(suspend_count * DISPATCH_QUEUE_SUSPEND_INTERVAL);
uint64_t old_state, new_state;
// 从 os 底层获取信息,也就是经过线程和当前队列获取 new_state 返回出去
return os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, acquire, {
uint64_t role = old_state & DISPATCH_QUEUE_ROLE_MASK;
if (old_state != (init | role)) {
os_atomic_rmw_loop_give_up(break);
}
new_state = value | role;
});
}
复制代码
从 os
底层获取到了一个 new_state
以后,就会执行 _dispatch_sync_f_slow
,若是你碰见过死锁,对这个函数确定不陌生吧,我们就来看看里面到底作了什么,是否是有点火烧眉毛!
static void
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
dispatch_function_t func, uintptr_t top_dc_flags,
dispatch_queue_class_t dqu, uintptr_t dc_flags)
{
dispatch_queue_t top_dq = top_dqu._dq;
dispatch_queue_t dq = dqu._dq;
if (unlikely(!dq->do_targetq)) {
return _dispatch_sync_function_invoke(dq, ctxt, func);
}
pthread_priority_t pp = _dispatch_get_priority();
// 初始化保存 block 以及其余信息的结构体
struct dispatch_sync_context_s dsc = {
.dc_flags = DC_FLAG_SYNC_WAITER | dc_flags,
.dc_func = _dispatch_async_and_wait_invoke,
.dc_ctxt = &dsc,
.dc_other = top_dq,
.dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
.dc_voucher = _voucher_get(),
.dsc_func = func,
.dsc_ctxt = ctxt,
.dsc_waiter = _dispatch_tid_self(),
};
// 将 block push 到 queue 里面去
_dispatch_trace_item_push(top_dq, &dsc);
// 死锁的最终函数
__DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);
// ... 忽略了一些暂时不须要分析的代码
}
复制代码
经过 _dispatch_trace_item_push
函数能够发现队列其实就是一个用来提交 block
的对象,当 block
push 到队列中后,将按照 先入先出(FIFO) 的顺序进行处理,系统在 GCD
的底层会维护一个线程池,用来执行这些 block
。那这些 block
又是在哪调用的呢? 因为步骤有点多,我就不一一分析了,把大体的流程图写在下面,你们能够本身去跟一下,体验一下源码的乐趣。
dispatch_sync
└──_dispatch_sync_f
└──_dispatch_sync_invoke_and_complete
└──_dispatch_sync_function_invoke_inline
└──_dispatch_client_callout
└──f(ctxt);
复制代码
第一个同步是怎么执行的问题解决了,接下来解决第二个死锁问题,要想解决这个问题,就须要来到 __DISPATCH_WAIT_FOR_QUEUE__
里一探究竟。
static void
__DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
{
// 获取队列的状态,看是不是处于等待状态
uint64_t dq_state = _dispatch_wait_prepare(dq);
if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
"dispatch_sync called on queue "
"already owned by current thread");
}
// ... 忽略了一些暂时不须要分析的代码
}
复制代码
获取到队列的状态以后,又调用 _dq_state_drain_locked_by
函数最后来到下面这个函数,这个函数就是死锁的关键所在。
static inline bool
_dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
{ // lock_value 为队列状态,tid 为线程 id
// ^ (异或运算法) 两个相同就会出现 0 不然为1
return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
}
复制代码
从上面能够看出,若是队列和线程都是处于等待状态,最终的返回结果为 YES
,就会来到 DISPATCH_CLIENT_CRASH
形成 crash
。
分析完同步函数以后,接下来就该来分析分析异步函数了,异步函数会须要开启线程去执行任务,因此这应该会是一个重点,找到重点以后咱们就能够继续研究了,同样先来到异步函数的实现:
void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DC_FLAG_CONSUME;
dispatch_qos_t qos;
qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
复制代码
异步函数会经过 _dispatch_continuation_init
先对 block
进行包装即函数式保存,由于异步须要在合适的时机进行线程回调 block
,以后就调用 _dispatch_continuation_async
进行异步处理,因为异步的分支比较多,我下面列出我所分析的步骤:
dispatch_async
└──_dispatch_continuation_async
└──dx_push
└──dq_push
└──_dispatch_root_queue_push
└──_dispatch_root_queue_push_inline
└──_dispatch_root_queue_poke
└──_dispatch_root_queue_poke_slow
复制代码
在 _dispatch_root_queue_poke_slow
里就能找到开辟线程相关的代码了,那咱们就来分析一波吧:
static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
int remaining = n;
int r = ENOSYS;
_dispatch_root_queues_init();
_dispatch_debug_root_queue(dq, __func__);
_dispatch_trace_runtime_event(worker_request, dq, (uint64_t)n);
#if !DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_ROOT_QUEUES
if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE)
#endif
{
_dispatch_root_queue_debug("requesting new worker thread for global "
"queue: %p", dq);
r = _pthread_workqueue_addthreads(remaining,
_dispatch_priority_to_pp_prefer_fallback(dq->dq_priority));
(void)dispatch_assume_zero(r);
return;
}
#endif // !DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_POOL
dispatch_pthread_root_queue_context_t pqc = dq->do_ctxt;
if (likely(pqc->dpq_thread_mediator.do_vtable)) {
while (dispatch_semaphore_signal(&pqc->dpq_thread_mediator)) {
_dispatch_root_queue_debug("signaled sleeping worker for "
"global queue: %p", dq);
if (!--remaining) {
return;
}
}
}
bool overcommit = dq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
if (overcommit) {
// 串行队列
os_atomic_add2o(dq, dgq_pending, remaining, relaxed);
} else {
if (!os_atomic_cmpxchg2o(dq, dgq_pending, 0, remaining, relaxed)) {
_dispatch_root_queue_debug("worker thread request still pending for "
"global queue: %p", dq);
return;
}
}
// floor 为 0,remaining 是根据队列任务的状况处理的
int can_request, t_count;
// 获取线程池的大小
t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);
do {
// 计算能够请求的数量
can_request = t_count < floor ? 0 : t_count - floor;
if (remaining > can_request) {
_dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
remaining, can_request);
os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
remaining = can_request;
}
if (remaining == 0) {
// 线程池满了,就会报出异常的状况
_dispatch_root_queue_debug("pthread pool is full for root queue: "
"%p", dq);
return;
}
} while (!os_atomic_cmpxchgvw2o(dq, dgq_thread_pool_size, t_count,
t_count - remaining, &t_count, acquire));
pthread_attr_t *attr = &pqc->dpq_thread_attr;
pthread_t tid, *pthr = &tid;
#if DISPATCH_USE_MGR_THREAD && DISPATCH_USE_PTHREAD_ROOT_QUEUES
if (unlikely(dq == &_dispatch_mgr_root_queue)) {
pthr = _dispatch_mgr_root_queue_init();
}
#endif
do {
_dispatch_retain(dq);
// 开辟线程
while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
if (r != EAGAIN) {
(void)dispatch_assume_zero(r);
}
_dispatch_temporary_resource_shortage();
}
} while (--remaining);
#else
(void)floor;
#endif // DISPATCH_USE_PTHREAD_POOL
}
复制代码
好了,同步和异步暂时分析到这,接下来分析的内容相对比较简单,semaphore
和 group
。
关于信号量的 API
很少,主要是三个,create
、wait
和 signal
。 信号量在初始化时要指定 value
,随后内部将这个 value
存储起来。实际操做时会存两个 value
,一个是当前的value
,一个是记录初始 value
。信号的 wait
和 signal
是互逆的两个操做,若是 value
大于 0,前者将 value
减一,此时若是 value
小于零就一直等待。初始 value
必须大于等于 0,若是为 0 并随后调用 wait
方法,线程将被阻塞直到别的线程调用了 signal
方法。简单的介绍了一下使用方法,接下来直接来看源码吧。
这个函数就是去初始化 dsema
,并设置 value
的值。
dispatch_semaphore_t
dispatch_semaphore_create(long value)
{
dispatch_semaphore_t dsema;
// 若是 value 小于 0 直接返回 0
if (value < 0) {
return DISPATCH_BAD_INPUT;
}
dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore),
sizeof(struct dispatch_semaphore_s));
dsema->do_next = DISPATCH_OBJECT_LISTLESS;
dsema->do_targetq = _dispatch_get_default_queue(false);
dsema->dsema_value = value;
_dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
dsema->dsema_orig = value;
return dsema;
}
复制代码
long
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
long value = os_atomic_dec2o(dsema, dsema_value, acquire);
if (likely(value >= 0)) {
return 0;
}
return _dispatch_semaphore_wait_slow(dsema, timeout);
}
复制代码
dispatch_atomic_dec2o
是一个宏,会调用 GCC
内置的函数 __sync_sub_and_fetch
,实现减法的原子性操做。所以这一行的意思是将 dsema
的值减一,并把新的值赋给 value
。若是减一后的 value
大于等于 0 就马上返回,没有任何操做,不然进入等待状态。
_dispatch_semaphore_wait_slow
函数针对不一样的 timeout
参数,分了三种状况考虑:
case DISPATCH_TIME_NOW:
orig = dsema->dsema_value;
while (orig < 0) {
if (os_atomic_cmpxchgvw2o(dsema, dsema_value, orig, orig + 1,
&orig, relaxed)) {
return _DSEMA4_TIMEOUT();
}
}
复制代码
这种状况 while
判断必定会成立,由于若是 value
大于等于 0,在上一个函数 dispatch_semaphore_wait
中就已经返回了,判断成立,内部的 if
判断必定也成立,此时会将 value
加一(也就是变为 0) 并返回。加一的缘由是为了抵消 wait
函数一开始的减一操做。此时函数调用方会获得返回值 KERN_OPERATION_TIMED_OUT
,表示因为等待时间超时而返回。
第二种状况是 DISPATCH_TIME_FOREVER
这个 case
:
case DISPATCH_TIME_FOREVER:
_dispatch_sema4_wait(&dsema->dsema_sema);
break;
}
复制代码
这种状况会调用系统的 semaphore_wait
方法继续等待,直到收到 signal
调用。
第三种就是一个默认的状况:
default:
if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
break;
}
复制代码
在default
分支下,咱们指定一个超时时间,在 _dispatch_sema4_timedwait
里面会去判断当前操做是否超时,这和 DISPATCH_TIME_FOREVER
的处理比较相似,不一样的是咱们调用了内核提供的 semaphore_timedwait
方法能够指定超时时间。
这个函数的实现相对来讲比较简单,由于它不须要阻塞,只用唤醒。
long
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
long value = os_atomic_inc2o(dsema, dsema_value, release);
if (likely(value > 0)) {
return 0;
}
if (unlikely(value == LONG_MIN)) {
DISPATCH_CLIENT_CRASH(value,
"Unbalanced call to dispatch_semaphore_signal()");
}
return _dispatch_semaphore_signal_slow(dsema);
}
复制代码
当咱们了解了信号量以后,group
是一个很是容易理解的概念,咱们先看看如何建立 group
:
dispatch_group_t
dispatch_group_create(void)
{
return _dispatch_group_create_with_count(0);
}
static inline dispatch_group_t
_dispatch_group_create_with_count(uint32_t n)
{
dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group),
sizeof(struct dispatch_group_s));
dg->do_next = DISPATCH_OBJECT_LISTLESS;
dg->do_targetq = _dispatch_get_default_queue(false);
if (n) {
os_atomic_store2o(dg, dg_bits,
-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed);
os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411>
}
return dg;
}
复制代码
这个方法就是开辟了内存空间,可是从 os_atomic_store2o
能够看出 group
底层也维护了一个 value
值。
从 enter
能够看出,当进组的时候会经过 os_atomic_sub_orig2o
对 value
减 4。
void
dispatch_group_enter(dispatch_group_t dg)
{
uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits,
DISPATCH_GROUP_VALUE_INTERVAL, acquire);
uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK;
if (unlikely(old_value == 0)) {
_dispatch_retain(dg); // <rdar://problem/22318411>
}
if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) {
DISPATCH_CLIENT_CRASH(old_bits,
"Too many nested calls to dispatch_group_enter()");
}
}
复制代码
出组的时候会对 value
进行加值,若是 new_state
和 old_state
相等,就会调用 _dispatch_group_wake
继续后面代码的执行,这个函数后面会说到。
void
dispatch_group_leave(dispatch_group_t dg)
{
uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
DISPATCH_GROUP_VALUE_INTERVAL, release);
uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);
if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {
old_state += DISPATCH_GROUP_VALUE_INTERVAL;
do {
new_state = old_state;
if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
} else {
new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
}
if (old_state == new_state) break;
} while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
old_state, new_state, &old_state, relaxed)));
return _dispatch_group_wake(dg, old_state, true);
}
if (unlikely(old_value == 0)) {
DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
"Unbalanced call to dispatch_group_leave()");
}
}
复制代码
dispatch_group_async
就是对 enter
和 leave
的封装,当 block
调用完成以后进行 callout
以后就出组了。
void
dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_block_t db)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
dispatch_qos_t qos;
// 保存任务
qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
_dispatch_continuation_group_async(dg, dq, dc, qos);
}
static inline void
_dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_continuation_t dc, dispatch_qos_t qos)
{ // 进组
dispatch_group_enter(dg);
dc->dc_data = dg;
_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
static inline void
_dispatch_continuation_with_group_invoke(dispatch_continuation_t dc)
{
struct dispatch_object_s *dou = dc->dc_data;
unsigned long type = dx_type(dou);
if (type == DISPATCH_GROUP_TYPE) {
_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
_dispatch_trace_item_complete(dc);
// 出组
dispatch_group_leave((dispatch_group_t)dou);
} else {
DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type");
}
}
复制代码
这个方法用于等待 group
中全部任务执行完成,能够理解为信号量 wait
的封装:
long
dispatch_group_wait(dispatch_group_t dg, dispatch_time_t timeout)
{
uint64_t old_state, new_state;
os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, relaxed, {
if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
os_atomic_rmw_loop_give_up_with_fence(acquire, return 0);
}
if (unlikely(timeout == 0)) {
os_atomic_rmw_loop_give_up(return _DSEMA4_TIMEOUT());
}
new_state = old_state | DISPATCH_GROUP_HAS_WAITERS;
if (unlikely(old_state & DISPATCH_GROUP_HAS_WAITERS)) {
os_atomic_rmw_loop_give_up(break);
}
});
return _dispatch_group_wait_slow(dg, _dg_state_gen(new_state), timeout);
}
复制代码
若是当前 value
和原始 value
相同,代表任务已经所有完成,直接返回 0,若是 timeout
为 0 也会马上返回,不然调用 _dispatch_group_wait_slow
。 在 _dispatch_group_wait_slow
会一直等到任务完成返回 0 ,固然若是一直没有完成就会返回 timeout
。
这个函数主要作的就是循环调用 dispatch_async_f
异步执行在 notify
函数中注册的回调。
static void
_dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release)
{
uint16_t refs = needs_release ? 1 : 0; // <rdar://problem/22318411>
if (dg_state & DISPATCH_GROUP_HAS_NOTIFS) {
dispatch_continuation_t dc, next_dc, tail;
// Snapshot before anything is notified/woken
dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail);
do {
dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data;
next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
_dispatch_continuation_async(dsn_queue, dc,
_dispatch_qos_from_pp(dc->dc_priority), dc->dc_flags);
_dispatch_release(dsn_queue);
} while ((dc = next_dc));
refs++;
}
if (dg_state & DISPATCH_GROUP_HAS_WAITERS) {
_dispatch_wake_by_address(&dg->dg_gen);
}
if (refs) _dispatch_release_n(dg, refs);
}
复制代码
dispatch_once
仅仅是一个包装,内部直接调用了 dispatch_once_f
:
void
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
dispatch_once_gate_t l = (dispatch_once_gate_t)val;
#if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
if (likely(v == DLOCK_ONCE_DONE)) {
return;
}
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
if (likely(DISPATCH_ONCE_IS_GEN(v))) {
return _dispatch_once_mark_done_if_quiesced(l, v);
}
#endif
#endif
if (_dispatch_once_gate_tryenter(l)) {
return _dispatch_once_callout(l, ctxt, func);
}
return _dispatch_once_wait(l);
}
复制代码
第一次调用时外部传进来的 onceToken
仍是空指针,因此 val
为 NULL
,_dispatch_once_gate_tryenter(l)
判断 l->dgo_once
是否标记为 DLOCK_ONCE_UNLOCKED
,可是 #define DLOCK_ONCE_UNLOCKED ((uintptr_t)0)
,因此 if 判断是成立的,就会进行 block
回调,而后在经过 _dispatch_once_gate_broadcast
将 l->dgo_once
标记为 DLOCK_ONCE_DONE
,下次再进来的时候就会直接返回保证代码只执行一次。
本文主要整理了 GCD
中常见的 API
以及底层的实现原理。
dispatch_sync
将任务 block
经过 push
到队列中,而后按照 FIFO
去执行。dispatch_async
会把任务包装并保存,以后就会开辟相应线程去执行已保存的任务。semaphore
主要使用 signal
和 wait
这两个接口,底层分别调用了内核提供的方法。在调用 signal
方法后,先将 value
减一,若是大于零马上返回,不然陷入等待。signal
方法将信号量加一,若是 value
大于零马上返回,不然说明唤醒了某一个等待线程,此时由系统决定哪一个线程的等待方法能够返回。dispatch_group
底层也是维护了一个 value
的值,等待 group
完成实际上就是等待 value
恢复初始值。而 notify
的做用是将全部注册的回调组装成一个链表,在 dispatch_async
完成时判断 value
是否是恢复初始值,若是是则调用 dispatch_async
异步执行全部注册的回调。dispatch_once
经过一个静态变量来标记 block
是否已被执行,同时使用加锁确保只有一个线程能执行,执行完 block
后会唤醒其余全部等待的线程。最后在此感谢 深刻了解GCD 对个人启发。