GCD终章

这是我参与8月更文挑战的第7天,活动详情查看:8月更文挑战数组

栅栏函数

栅栏函数最直接做用是控制任务的执行顺序产生同步的效果。安全

  • dispatch_barrier_async:前面的任务执行完毕才会来到这里
  • dispatch_barrier_sync:做用相同,可是会阻塞线程,影响后面函数的执行。

示例演示

- (void)demo1 {
    dispatch_queue_t concurrentQueue = dispatch_queue_create("cooci", DISPATCH_QUEUE_CONCURRENT);
    // 这里是能够的额!
    /* 1.异步函数 */
    dispatch_async(concurrentQueue, ^{
        NSLog(@"123");
    });
    /* 2.异步函数 */
    dispatch_async(concurrentQueue, ^{
        sleep(1);
        NSLog(@"456");
    });
    /* 3. 栅栏函数 */ // - dispatch_barrier_sync
    dispatch_barrier_async(concurrentQueue, ^{
        NSLog(@"----%@-----",[NSThread currentThread]);
    });
    /* 4. 异步函数 */
    dispatch_async(concurrentQueue, ^{
        NSLog(@"加载那么多,喘口气!!!");
    });
    // 5
    NSLog(@"**********起来干!!");
}
复制代码

image.png 这里使用了异步并发队列,在异步并发的时候使用栅栏函数,前面的任务执行完毕才会来到这里,可是不会阻塞后面任务的执行,因此步骤3栅栏函数必然在步骤一、2以后执行。步骤一、2无序,步骤四、5无序。 注意:若是咱们把这个并发队列换成全局并发队列呢?markdown

dispatch_queue_t concurrentQueue = dispatch_get_global_queue(0, 0);
复制代码

image.png 咱们发现此时栅栏函数并无效果了,也就是说在并发队列中的栅栏函数在全局并发队列中失效了,那么为何呢?咱们此时照例须要上一份dispatch的源码来一窥究竟多线程

dispatch_barrier_sync

void dispatch_barrier_sync(dispatch_queue_t dq, dispatch_block_t work) {
	uintptr_t dc_flags = DC_FLAG_BARRIER | DC_FLAG_BLOCK;
	if (unlikely(_dispatch_block_has_private_data(work))) {
		return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
	}
	_dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}
复制代码

这个代码同样就跟以前介绍的同步函数的很像,咱们定位下调用链 dispatch_barrier_sync -> _dispatch_barrier_sync_f_inline 经过符号断点咱们定位到了_dispatch_sync_f_slow -> _dispatch_sync_invoke_and_complete_recurse -> _dispatch_sync_complete_recurse并发

static void
_dispatch_sync_complete_recurse(dispatch_queue_t dq, dispatch_queue_t stop_dq,
		uintptr_t dc_flags)
{
	bool barrier = (dc_flags & DC_FLAG_BARRIER);
	do {
		if (dq == stop_dq) return;
        // 是否存在barrier 存在的话 前面的队列所有执行
		if (barrier) {
			dx_wakeup(dq, 0, DISPATCH_WAKEUP_BARRIER_COMPLETE);
		} else {
            // 不存在 执行普通的同步函数
			_dispatch_lane_non_barrier_complete(upcast(dq)._dl, 0);
		}
		dq = dq->do_targetq;
		barrier = (dq->dq_width == 1);
	} while (unlikely(dq->do_targetq));
}
复制代码

存在栅栏函数的话走dx_wakeupless

#define dx_wakeup(x, y, z) dx_vtable(x)->dq_wakeup(x, y, z)
复制代码
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane,
	.do_type        = DISPATCH_QUEUE_CONCURRENT_TYPE,
	.do_dispose     = _dispatch_lane_dispose,
	.do_debug       = _dispatch_queue_debug,
	.do_invoke      = _dispatch_lane_invoke,

	.dq_activate    = _dispatch_lane_activate,
	.dq_wakeup      = _dispatch_lane_wakeup,
	.dq_push        = _dispatch_lane_concurrent_push,
);

DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_global, lane,
	.do_type        = DISPATCH_QUEUE_GLOBAL_ROOT_TYPE,
	.do_dispose     = _dispatch_object_no_dispose,
	.do_debug       = _dispatch_queue_debug,
	.do_invoke      = _dispatch_object_no_invoke,

	.dq_activate    = _dispatch_queue_no_activate,
	.dq_wakeup      = _dispatch_root_queue_wakeup,
	.dq_push        = _dispatch_root_queue_push,
);
复制代码

本身建立的并发队列的话=_dispatch_lane_wakeup, 全局并发队列的话=_dispatch_root_queue_wakeup异步

queue_concurrent VS queue_global

void
_dispatch_lane_wakeup(dispatch_lane_class_t dqu, dispatch_qos_t qos,
		dispatch_wakeup_flags_t flags)
{
	dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;

	if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
		return _dispatch_lane_barrier_complete(dqu, qos, flags);
	}
	if (_dispatch_queue_class_probe(dqu)) {
		target = DISPATCH_QUEUE_WAKEUP_TARGET;
	}
	return _dispatch_queue_wakeup(dqu, qos, flags, target);
}
复制代码
void
_dispatch_root_queue_wakeup(dispatch_queue_global_t dq,
		DISPATCH_UNUSED dispatch_qos_t qos, dispatch_wakeup_flags_t flags)
{
	if (!(flags & DISPATCH_WAKEUP_BLOCK_WAIT)) {
		DISPATCH_INTERNAL_CRASH(dq->dq_priority,
				"Don't try to wake up or override a root queue");
	}
	if (flags & DISPATCH_WAKEUP_CONSUME_2) {
		return _dispatch_release_2_tailcall(dq);
	}
}
复制代码

从源码的地方咱们也能够明显的看出来不一样。在全局并发队列中并无判断跟栅栏函数有关的地方,而本身建立的并发队列则有对栅栏函数的判断_dispatch_lane_barrier_completeasync

static void
_dispatch_lane_barrier_complete(dispatch_lane_class_t dqu, dispatch_qos_t qos,
		dispatch_wakeup_flags_t flags)
{
	dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
	dispatch_lane_t dq = dqu._dl;

	if (dq->dq_items_tail && !DISPATCH_QUEUE_IS_SUSPENDED(dq)) {
		struct dispatch_object_s *dc = _dispatch_queue_get_head(dq);
        // 同步函数
		if (likely(dq->dq_width == 1 || _dispatch_object_is_barrier(dc))) {
			if (_dispatch_object_is_waiter(dc)) {
				return _dispatch_lane_drain_barrier_waiter(dq, dc, flags, 0);
			}
		} else if (dq->dq_width > 1 && !_dispatch_object_is_barrier(dc)) {
			return _dispatch_lane_drain_non_barriers(dq, dc, flags);
		}
		// ...
	}
	//...
	return _dispatch_lane_class_barrier_complete(dq, qos, flags, target, owned);
}
复制代码

若是是同步队列就会等待不然就进入到了完成_dispatch_lane_class_barrier_complete,也就是说要保证前面的全部任务都执行完成。ide

由此也验证了上面的结论:全局并发队列不处理栅栏函数相关,因此栅栏函数在全局并发队列中无用。这样设计的缘由是,系统级别的也会调用全局并发队列,而栅栏函数本质是卡住了当前的线程,这样影响效率。栅栏函数必需要在同一个队列中使用,好比使用AFN的时候咱们并不能拿到AFN当前的队列,因此这个栅栏函数平时使用的场景并很少,而咱们使用的最多的是调度组 ​函数

信号量dispatch_semaphore_t

dispatch_semaphore_create 建立信号量,里面的数字是表示最大并发数 dispatch_semaphore_wait 信号量等待 -1 dispatch_semaphore_signal 信号量释放 +1

-(void)test {	
	dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
   dispatch_semaphore_t sem = dispatch_semaphore_create(0); 
    //任务1
    dispatch_async(queue, ^{
        dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); // 等待
        NSLog(@"执行任务1");
        NSLog(@"任务1完成");
    });
    
    //任务2
    dispatch_async(queue, ^{
        sleep(2);
        NSLog(@"执行任务2");
        NSLog(@"任务2完成");
        dispatch_semaphore_signal(sem); // 发信号 +1
    });
    
    //任务3
    dispatch_async(queue, ^{
        dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
        sleep(2);
        NSLog(@"执行任务3");
        NSLog(@"任务3完成");
        dispatch_semaphore_signal(sem);
    });
 }
复制代码

image.png

任务3不会执行,永远等待下去了。 ​

dispatch_semaphore_create

* @param value
 * The starting value for the semaphore. Passing a value less than zero will
 * cause NULL to be returned.
复制代码

信号量的起始值。传递一个小于零的值将致使返回NULL。 ​

dispatch_semaphore_signal

intptr_t dispatch_semaphore_signal(dispatch_semaphore_t dsema) //0 {
	long value = os_atomic_inc2o(dsema, dsema_value, release); // +=1 value = 1
	if (likely(value > 0)) {
		return 0;   //直接返回0
	}
	if (unlikely(value == LONG_MIN)) {
		DISPATCH_CLIENT_CRASH(value,
				"Unbalanced call to dispatch_semaphore_signal()");
	}
	return _dispatch_semaphore_signal_slow(dsema);
}
复制代码

dispatch_semaphore_wait

intptr_t dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout) {
	long value = os_atomic_dec2o(dsema, dsema_value, acquire); //-=1 0-1 = -1
	if (likely(value >= 0)) {
		return 0;
	}
	return _dispatch_semaphore_wait_slow(dsema, timeout); // 走这里
}
复制代码

上面的例子中,建立的信号量的最大并发数是0,进入到wait这里,0-1 = -1直接到了_dispatch_semaphore_wait_slow这个函数

static intptr_t
_dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
		dispatch_time_t timeout) // 参数1:0 参数2:FOREVER
{

	switch (timeout) {
	default:
		if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
			break;
		}
	// ...
	case DISPATCH_TIME_FOREVER:
		_dispatch_sema4_wait(&dsema->dsema_sema);
		break;
	}
	return 0;
}
复制代码
void
_dispatch_sema4_wait(_dispatch_sema4_t *sema)
{
	int ret = 0;
	do {
		ret = sem_wait(sema);
	} while (ret == -1 && errno == EINTR);
	DISPATCH_SEMAPHORE_VERIFY_RET(ret);
}
复制代码

sem_wait底层Pthread封装的内核代码,咱们只要关注这里的do while循环,本质就是do while死循环等待信号量变为正。 ​

调度组dispatch_group

最直接的做⽤:控制任务执⾏顺序

  • dispatch_group_create :建立组
  • dispatch_group_async: 进组任务
  • dispatch_group_notify : 进组任务执⾏完毕通知
  • dispatch_group_wait : 进组任务执⾏等待时间
  • dispatch_group_enter: 进组
  • dispatch_group_leave :出组

方案一: dispatch_group_async使用:

- (void)groupDemo{
    dispatch_group_t group = dispatch_group_create();
    dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
    
    dispatch_group_async(group, queue, ^{
    });
    
    dispatch_group_async(group, queue, ^{
        
    });
    dispatch_group_notify(group, dispatch_get_main_queue(), ^{

    });
    
}
复制代码

方案二: enterleave搭配

- (void)groupDemo{
    dispatch_group_t group = dispatch_group_create();
    dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
    
    dispatch_group_async(group, queue, ^{
    });
    
    dispatch_group_enter(group);
    dispatch_async(queue, ^{
       dispatch_group_leave(group);
    });
    
    dispatch_group_notify(group, dispatch_get_main_queue(), ^{

    });
}
复制代码

上面两个效果同样。 为何dispatch_group_async = dispatch_group_enter + dispatch_group_leave

dispatch_group_create

dispatch_group_t dispatch_group_create(void) {
	return _dispatch_group_create_with_count(0);
}
复制代码

_dispatch_group_create_with_count函数跟信号量那个挺相似的 ​

dispatch_group_enter

进入到源码里面发现这个是个--操做 ​

dispatch_group_leave

void dispatch_group_leave(dispatch_group_t dg) {
	uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
			DISPATCH_GROUP_VALUE_INTERVAL, release); //++ -1-> 0
	uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);// -1 & 极大值
		// old_value == DISPATCH_GROUP_VALUE_MASK
    	// 因此这一句的判断就是当 old_value = -1时
	if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {
		old_state += DISPATCH_GROUP_VALUE_INTERVAL;
		do {
			new_state = old_state;
			if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
				new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
				new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
			} else {
				new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
			}
			if (old_state == new_state) break;
		} while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
				old_state, new_state, &old_state, relaxed)));
		return _dispatch_group_wake(dg, old_state, true);
	}

	if (unlikely(old_value == 0)) {
		DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
				"Unbalanced call to dispatch_group_leave()");
	}
}
复制代码

由后面的注释分析能够知道,当dg = -1的时候,回来到一个do while的循环直到唤醒_dispatch_group_wake,这里唤醒的是dispatch_group_notify。回到上面的分析,咱们先进组enter也就是先--,此时dg=-1, 在出组leave函数来到do while循环,函数块走完以后,唤醒notify

dispatch_group_notify

static inline void
_dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
		dispatch_continuation_t dsn)
{
    //...
	if ((uint32_t)old_state == 0) {
			os_atomic_rmw_loop_give_up({
					return _dispatch_group_wake(dg, new_state, false);
			});
	}
}
复制代码

这里能够看到当old_state == 0 ,_dispatch_group_wake开启正常的异步或者同步函数也就是block的call out流程。若是在异步的时候,先执行到了notify,那么把此时的block跟当前的组绑定,等到leave出组的通知的时候,_dispatch_group_wake(dg, old_state, true)。这也就是为何有两处都调用了这个方法,主要的目的是为了解决异步加载的时序问题,是否是设计的很是nice! ​

dispatch_group_async = dispatch_group_enter + dispatch_group_leave

由上面的例子咱们知道这两个是等效的,那么dispatch_group_async是怎么封装进组和出组的实现呢

void dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq, dispatch_block_t db) {
	dispatch_continuation_t dc = _dispatch_continuation_alloc();
	uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
	dispatch_qos_t qos;

	qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
	_dispatch_continuation_group_async(dg, dq, dc, qos);
}
复制代码
static inline void
_dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
		dispatch_continuation_t dc, dispatch_qos_t qos)
{
	dispatch_group_enter(dg);
	dc->dc_data = dg;
	_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
复制代码

明显能够看到在函数_dispatch_continuation_group_async有个进组的操做dispatch_group_enter(dg)_dispatch_continuation_async -> dx_push -> _dispatch_root_queue_push -> _dispatch_root_queue_push_inline -> _dispatch_root_queue_poke -> _dispatch_root_queue_poke_slow -> _dispatch_root_queues_init -> _dispatch_root_queues_init_once -> _dispatch_worker_thread2 -> _dispatch_root_queue_drain -> _dispatch_continuation_pop_inline -> _dispatch_continuation_invoke_inline 这些流程以前都来过,就很少说

if (unlikely(dc_flags & DC_FLAG_GROUP_ASYNC)) {
			_dispatch_continuation_with_group_invoke(dc);
		} else {
			_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
			_dispatch_trace_item_complete(dc);
		}
复制代码

_dispatch_continuation_with_group_invoke

static inline void
_dispatch_continuation_with_group_invoke(dispatch_continuation_t dc)
{
	struct dispatch_object_s *dou = dc->dc_data;
	unsigned long type = dx_type(dou);
	if (type == DISPATCH_GROUP_TYPE) {
		_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
		_dispatch_trace_item_complete(dc);
		dispatch_group_leave((dispatch_group_t)dou);
	} else {
		DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type");
	}
}
复制代码

_dispatch_client_callout以后看到了出组的代码dispatch_group_leave,咱们想一想也是,在当前的队列的任务执行完成以后才能调用出组来通知。 ​

dispatch_source

GCDrunLoop实际上是同等级,没有所谓的归属关系,dispatch_source本质上就是经过条件来控制block的执行,它的CPU负荷⾮常⼩,尽可能不占⽤资源。在任⼀线程上调⽤它的的⼀个函数dispatch_source_merge_data后,会执⾏dispatch_source事先定义好的句柄(能够把句柄简单理解为⼀个block)这个过程叫Customevent⽤户事件。

  • dispatch_source_create :建立源
  • dispatch_source_set_event_handler :设置源事件回调
  • dispatch_source_merge_data :源事件设置数据
  • dispatch_source_get_data :获取源事件数据
  • dispatch_resume :继续
  • dispatch_suspend :挂起

使用方法比较简单

-(void)demo {
    // 1.建立队列
    self.queue = dispatch_queue_create("hb.com", NULL);
    // 2.建立源
    self.source = dispatch_source_create(DISPATCH_SOURCE_TYPE_DATA_ADD, 0, 0, dispatch_get_main_queue());
    // 3.源事件回调
    dispatch_source_set_event_handler(self.source, ^{
        
        NSLog(@"%@",[NSThread currentThread]);
        
        NSUInteger value = dispatch_source_get_data(self.source);
        self.totalComplete += value;
        NSLog(@"进度: %.2f",self.totalComplete/100.0);
        self.progressView.progress = self.totalComplete/100.0;
    });
    
    self.isRunning = YES;
    dispatch_resume(self.source);
}
// 4.在使用的地方dispatch_source_merge_data修改源数据
// 5.dispatch_resume 继续
复制代码

而且这里不受runLoop的影响是一个workLoop,本质是一个pthread下层封装。 ​

补充:可变数组线程安全吗

在多线程中操做同一个数组并不安全,由于会出现同时写入的状况,也就是同一时间对同一片内存空间操做不安全。而atomic只能保证自身安全不能保证外部访问安全,解决方法能够在对可变数组操做的时候加入一个栅栏函数至关于加锁的功能

相关文章
相关标签/搜索