笔记-源码解析之dispatch_once、信号量、调度组

单例dispatch_once

static dispatch_once_t onceToken;
    dispatch_once(&onceToken, ^{
        
    });
复制代码

这串代码不用解释,相信你们都熟悉。如今前往源码解析bash

typedef long dispatch_once_t;app

这里的once就是一个long类型,拿到它的指针类型传入到函数里。async

跟着源码走,会进入到这里:ide

dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
	dispatch_once_gate_t l = (dispatch_once_gate_t)val;

#if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
	uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
	if (likely(v == DLOCK_ONCE_DONE)) {
		return;
	}
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
	if (likely(DISPATCH_ONCE_IS_GEN(v))) {
		return _dispatch_once_mark_done_if_quiesced(l, v);
	}
#endif
#endif
	if (_dispatch_once_gate_tryenter(l)) {
		return _dispatch_once_callout(l, ctxt, func);
	}
	return _dispatch_once_wait(l);
}
复制代码

第一句代码把咱们传进来的指针强转成dispatch_once_gate_t,进入这个会发现它是一个结构体,里面只有一个联合体。 再往下看的时候,能够先思考一个问题,单例是如何实现只建立一次的?带着这个问题,咱们往下走,能够直接跳转到最后函数

if (_dispatch_once_gate_tryenter(l)) {
		return _dispatch_once_callout(l, ctxt, func);
	}
复制代码

从这句代码能够看出,这里是建立它。一步一步来,先看条件oop

_dispatch_once_gate_tryenter(dispatch_once_gate_t l)
{
	// os 对象是否存储过
	// unlock
	return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
			(uintptr_t)_dispatch_lock_value_for_self(), relaxed);
}
复制代码

说实话,懵逼,代码具体干吗的不清楚,查资料后,能够理解成判断对象是否在os存储过。而后接着往下走源码分析

_dispatch_once_callout(dispatch_once_gate_t l, void *ctxt,
		dispatch_function_t func)
{
	_dispatch_client_callout(ctxt, func);
	_dispatch_once_gate_broadcast(l);
}
复制代码

看过我以前文章的小伙伴,看到_dispatch_client_callout()这个应该很熟悉了,这里就是执行block里面的内容。建立完以后,就进行下面这个广播的函数学习

_dispatch_once_gate_broadcast(dispatch_once_gate_t l)
{
	dispatch_lock value_self = _dispatch_lock_value_for_self();
	uintptr_t v;
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
	v = _dispatch_once_mark_quiescing(l);
#else
	v = _dispatch_once_mark_done(l);
#endif
	if (likely((dispatch_lock)v == value_self)) return;
	_dispatch_gate_broadcast_slow(&l->dgo_gate, (dispatch_lock)v);
}
复制代码

仔细看这段代码的意思,第一句拿到当前self对象,而后拿到一个v,仔细去看这个v究竟是什么!!!ui

第一个_dispatch_once_mark_quiescing表示正在建立,这里标记了一个_dispatch_once_generation()this

_dispatch_once_mark_quiescing(dispatch_once_gate_t dgo)
{
	return os_atomic_xchg(&dgo->dgo_once, _dispatch_once_generation(), release);
}
复制代码

而后看第二个_dispatch_once_mark_done(),这里表示标记一个DLOCK_ONCE_DONE

_dispatch_once_mark_done(dispatch_once_gate_t dgo)
{
	return os_atomic_xchg(&dgo->dgo_once, DLOCK_ONCE_DONE, release);
}
复制代码

此时此刻,是否还记得咱们一开始跳过的方法里的代码,看下面代码:

uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
	if (likely(v == DLOCK_ONCE_DONE)) {
		return;
	}
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
	if (likely(DISPATCH_ONCE_IS_GEN(v))) {
		return _dispatch_once_mark_done_if_quiesced(l, v);
	}
复制代码

若是标记了获取到的v等于DLOCK_ONCE_DONE,就直接返回;若是是下面的,咱们在进入查看一下:

_dispatch_once_mark_done_if_quiesced(dispatch_once_gate_t dgo, uintptr_t gen)
{
	if (_dispatch_once_generation() - gen >= DISPATCH_ONCE_GEN_SAFE_DELTA) {
		/*
		 * See explanation above, when the quiescing counter approach is taken
		 * then this store needs only to be relaxed as it is used as a witness
		 * that the required barriers have happened.
		 */
		os_atomic_store(&dgo->dgo_once, DLOCK_ONCE_DONE, relaxed);
	}
}
复制代码

不知道走到这里你们是否明白,单例是如何实现只建立一次的。

信号量dispatch_semaphore_t

// 建立信号量对象 信号量 >= 0
    dispatch_semaphore_t sem = dispatch_semaphore_create(1);
    // -1操做
    dispatch_wait(sem, DISPATCH_TIME_FOREVER);
    // +1 操做
    dispatch_semaphore_signal(sem);
复制代码

上面三句代码,就是建立信号量的代码。 wait-1操做至关于阻塞操做,signal则是+1操做。

进入源码分析:

dispatch_semaphore_create(long value)
{
	dispatch_semaphore_t dsema;
	if (value < 0) {
		return DISPATCH_BAD_INPUT;
	}

	dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore),
			sizeof(struct dispatch_semaphore_s));
	dsema->do_next = DISPATCH_OBJECT_LISTLESS;
	dsema->do_targetq = _dispatch_get_default_queue(false);
	dsema->dsema_value = value;
	_dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
	dsema->dsema_orig = value;
	return dsema;
}
复制代码

第一步就声明了一个信号量对象,而后判断value,小于0就是不正确操做。后面就是建立对象,开辟空间。最主要的两步其实就是value的赋值。

接下来看一下dispatch_wait():

#define dispatch_wait(object, timeout) \
		_Generic((object), \
			dispatch_block_t:dispatch_block_wait, \
			dispatch_group_t:dispatch_group_wait, \
			dispatch_semaphore_t:dispatch_semaphore_wait \
		)((object),(timeout))
复制代码

这里咱们看的是信号量,因此走到dispatch_semaphore_wait方法里:

dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
	long value = os_atomic_dec2o(dsema, dsema_value, acquire);
	if (likely(value >= 0)) {
		return 0;
	}
	return _dispatch_semaphore_wait_slow(dsema, timeout);
}
复制代码

中间一个if条件判断,若是value >= 0,则直接返回一个0,返回0表示堵塞,容许进来的线程为0; 因此咱们看下os_atomic_dec2o()方法具体作了什么:

#define os_atomic_dec2o(p, f, m) \ os_atomic_sub2o(p, f, 1, m) =======> 宏定义

#define os_atomic_sub2o(p, f, v, m) \ os_atomic_sub(&(p)->f, (v), m) ========> 宏定义

#define os_atomic_sub(p, v, m) \ _os_atomic_c11_op((p), (v), m, sub, -) ========> 宏定义
复制代码

连续的几个宏定义,其实后面还有,不过看到这里已经够了,sub1-,不知道你们有没有看到这些敏感的字样,实际上这一步就是进行-1操做。

再看_dispatch_semaphore_wait_slow()方法,走到这步,说明value值是小于0的

_dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
		dispatch_time_t timeout)
{
	long orig;

	_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
	switch (timeout) {
	default:
		if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
			break;
		}
		// Fall through and try to undo what the fast path did to
		// dsema->dsema_value
	case DISPATCH_TIME_NOW:
		orig = dsema->dsema_value;
		while (orig < 0) {
			if (os_atomic_cmpxchgvw2o(dsema, dsema_value, orig, orig + 1,
					&orig, relaxed)) {
				return _DSEMA4_TIMEOUT();
			}
		}
		// Another thread called semaphore_signal().
		// Fall through and drain the wakeup.
	case DISPATCH_TIME_FOREVER:
		_dispatch_sema4_wait(&dsema->dsema_sema);
		break;
	}
	return 0;
}
复制代码

咔咔咔。。。其实这么多代码,就是告诉咱们会一直的进行等待,它是对咱们第二个参数设置的遍历。 咱们这里设置的是forever,永久等待。注释也帮咱们解答了,若是想要唤醒,那么就调用semaphore_signal()方法。

dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
	long value = os_atomic_inc2o(dsema, dsema_value, release);
	if (likely(value > 0)) {
		return 0;
	}
	if (unlikely(value == LONG_MIN)) {
		DISPATCH_CLIENT_CRASH(value,
				"Unbalanced call to dispatch_semaphore_signal()");
	}
	return _dispatch_semaphore_signal_slow(dsema);
}
复制代码

这里代码和前面的wait很类似,先看看os_atomic_inc2o()方法:

#define os_atomic_inc2o(p, f, m) \ os_atomic_add2o(p, f, 1, m)

#define os_atomic_add2o(p, f, v, m) \ os_atomic_add(&(p)->f, (v), m)

#define os_atomic_add(p, v, m) \ _os_atomic_c11_op((p), (v), m, add, +)
复制代码

经过上面的代码,我相信聪明的你已经猜到了os_atomic_inc2o()的意义了,没错,就是+1操做。

后面的方法_dispatch_semaphore_signal_slow表示持续加一的状态,最后返回1,表示为非阻塞状态。

调度组dispatch_group_t

dispatch_group_create()

    dispatch_group_enter()
    dispatch_group_leave() 

    dispatch_group_async(<#dispatch_group_t _Nonnull group#>, <#dispatch_queue_t _Nonnull queue#>, ^{
    
    });

    dispatch_group_notify(, , );
复制代码

调度组咱们围绕这几个方法来讲,先看建立dispatch_group_create()

dispatch_group_create(void)
{
	return _dispatch_group_create_with_count(0);
}

_dispatch_group_create_with_count(uint32_t n)
{
	dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group),
			sizeof(struct dispatch_group_s));
	dg->do_next = DISPATCH_OBJECT_LISTLESS;
	dg->do_targetq = _dispatch_get_default_queue(false);
	if (n) {
		os_atomic_store2o(dg, dg_bits,
				-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed);
		os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411>
	}
	return dg;
}
复制代码

能够看出就是建立了一个dispatch_group_t类型的对象,比较简单。

下面看dispatch_group_enter():

dispatch_group_enter(dispatch_group_t dg)
{
	// The value is decremented on a 32bits wide atomic so that the carry
	// for the 0 -> -1 transition is not propagated to the upper 32bits.
	uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits,
			DISPATCH_GROUP_VALUE_INTERVAL, acquire);
	uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK;
	if (unlikely(old_value == 0)) {
		_dispatch_retain(dg); // <rdar://problem/22318411>
	}
	if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) {
		DISPATCH_CLIENT_CRASH(old_bits,
				"Too many nested calls to dispatch_group_enter()");
	}
}
复制代码

其实能够经过注释看到下面的操做是0 -> 1的操做,很明显是这个方法os_atomic_sub_orig2o

#define os_atomic_sub_orig2o(p, f, v, m) \ os_atomic_sub_orig(&(p)->f, (v), m)
#define os_atomic_sub_orig(p, v, m) \ _os_atomic_c11_op_orig((p), (v), m, sub, -)
复制代码

而后判断,等于0直接返回,形成堵塞。

接着看dispatch_group_leave()

dispatch_group_leave(dispatch_group_t dg)
{
	// The value is incremented on a 64bits wide atomic so that the carry for
	// the -1 -> 0 transition increments the generation atomically.
	uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
			DISPATCH_GROUP_VALUE_INTERVAL, release);
	uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);

	if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {
		// 省略一些无关代码
		return _dispatch_group_wake(dg, old_state, true);
	}

	if (unlikely(old_value == 0)) {
		DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
				"Unbalanced call to dispatch_group_leave()");
	}
}
复制代码

一样的道理,第一步就是执行+1的操做,这里就不重复了。最后面有个判断,表示+1操做以后,还等于0,就会报错,这就要求dispatch_group_enter()dispatch_group_leave()要成对出现,否则会有问题。

主要看后面的方法_dispatch_group_wake

_dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release)
{
	uint16_t refs = needs_release ? 1 : 0; // <rdar://problem/22318411>

	if (dg_state & DISPATCH_GROUP_HAS_NOTIFS) {
		dispatch_continuation_t dc, next_dc, tail;

		// Snapshot before anything is notified/woken <rdar://problem/8554546>
		dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail);
		do {
			dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data;
			next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
			_dispatch_continuation_async(dsn_queue, dc,
					_dispatch_qos_from_pp(dc->dc_priority), dc->dc_flags);
			_dispatch_release(dsn_queue);
		} while ((dc = next_dc));

		refs++;
	}

	if (dg_state & DISPATCH_GROUP_HAS_WAITERS) {
		_dispatch_wake_by_address(&dg->dg_gen);
	}

	if (refs) _dispatch_release_n(dg, refs);
}
复制代码

走到这步就代表出里调度组,执行队列里的全部任务,里面有个do...while循环。

下面看另外一个方法dispatch_group_async()

dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
		dispatch_block_t db)
{
	dispatch_continuation_t dc = _dispatch_continuation_alloc();
	uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
	dispatch_qos_t qos;

	qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
	_dispatch_continuation_group_async(dg, dq, dc, qos);
}
复制代码

第一步_dispatch_continuation_init()操做是保存任务块
接着看_dispatch_continuation_group_async()

_dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
		dispatch_continuation_t dc, dispatch_qos_t qos)
{
	dispatch_group_enter(dg);
	dc->dc_data = dg;
	_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
复制代码

走到这里咱们看到一个熟悉的方法dispatch_group_enter()上面刚刚说过的,而方法_dispatch_continuation_async()在上一篇文章里描述函数的部分也介绍过。不了解的小伙伴能够去看看

接着看方法_dispatch_group_notify()

_dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
		dispatch_continuation_t dsn)
{
	prev = os_mpsc_push_update_tail(os_mpsc(dg, dg_notify), dsn, do_next);
	if (os_mpsc_push_was_empty(prev)) _dispatch_retain(dg);
	os_mpsc_push_update_prev(os_mpsc(dg, dg_notify), prev, dsn, do_next);
	if (os_mpsc_push_was_empty(prev)) {
		os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, release, {
			new_state = old_state | DISPATCH_GROUP_HAS_NOTIFS;
			if ((uint32_t)old_state == 0) {
				os_atomic_rmw_loop_give_up({
					return _dispatch_group_wake(dg, new_state, false);
				});
			}
		});
	}
}
复制代码

其实前面的一系列判断说什么都不重要,最重要的是咱们看到了方法_dispatch_group_wake(),是否是又回到了dispatch_group_leave()方法
总结: 其实dispatch_group_async()_dispatch_group_notify()就是对dispatch_group_enter()dispatch_group_leave()的封装,不过我的感受后面二者更加灵活。

上面描述若是有什么错误,还但愿小伙伴们指出,一块儿学习一块儿交流!!!

相关文章
相关标签/搜索