今天来写写C#中的异步迭代器 - 机制、概念和一些好用的特性c#
迭代器的概念在C#中出现的比较早,不少人可能已经比较熟悉了。缓存
一般迭代器会用在一些特定的场景中。bash
举个例子:有一个foreach
循环:服务器
foreach (var item in Sources) { Console.WriteLine(item); }
这个循环实现了一个简单的功能:把Sources
中的每一项在控制台中打印出来。微信
有时候,Sources
可能会是一组彻底缓存的数据,例如:List<string>
:框架
IEnumerable<string> Sources(int x) { var list = new List<string>(); for (int i = 0; i < 5; i++) list.Add($"result from Sources, x={x}, result {i}"); return list; }
这里会有一个小问题:在咱们打印Sources
的第一个的数据以前,要先运行完整运行Sources()
方法来准备数据,在实际应用中,这可能会花费大量时间和内存。更有甚者,Sources
多是一个无边界的列表,或者不定长的开放式列表,比方一次只处理一个数据项目的队列,或者自己没有逻辑结束的队列。异步
这种状况,C#给出了一个很好的迭代器解决:async
IEnumerable<string> Sources(int x) { for (int i = 0; i < 5; i++) yield return $"result from Sources, x={x}, result {i}"; }
这个方式的工做原理与上一段代码很像,但有一些根本的区别 - 咱们没有用缓存,而只是每次让一个元素可用。this
为了帮助理解,来看看foreach
在编译器中的解释:线程
using (var iter = Sources.GetEnumerator()) { while (iter.MoveNext()) { var item = iter.Current; Console.WriteLine(item); } }
固然,这个是省略掉不少东西后的概念解释,咱们不纠结这个细节。但大致的意思是这样的:编译器对传递给foreach
的表达式调用GetEnumerator()
,而后用一个循环去检查是否有下一个数据(MoveNext()
),在获得确定答案后,前进并访问Current
属性。而这个属性表明了前进到的元素。
为防止非受权转发,这儿给出本文的原文连接:https://
上面这个例子,咱们经过MoveNext()
/Current
方式访问了一个没有大小限制的向前的列表。咱们还用到了yield
迭代器这个很复杂的东西 - 至少我是这么认为的。
咱们把上面的例子中的yield
去掉,改写一下看看:
IEnumerable<string> Sources(int x) => new GeneratedEnumerable(x); class GeneratedEnumerable : IEnumerable<string> { private int x; public GeneratedEnumerable(int x) => this.x = x; public IEnumerator<string> GetEnumerator() => new GeneratedEnumerator(x); IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); } class GeneratedEnumerator : IEnumerator<string> { private int x, i; public GeneratedEnumerator(int x) => this.x = x; public string Current { get; private set; } object IEnumerator.Current => Current; public void Dispose() { } public bool MoveNext() { if (i < 5) { Current = $"result from Sources, x={x}, result {i}"; i++; return true; } else { return false; } } void IEnumerator.Reset() => throw new NotSupportedException(); }
这样写完,对照上面的yield
迭代器,理解工做过程就比较容易了:
IEnumerable
。注意,IEnumerable
和IEnumerator
是不一样的。Sources
时,就建立了GeneratedEnumerable
。它存储状态参数x
,并公开了须要的IEnumerable
方法。foreach
迭代数据时,会调用GetEnumerator()
,而它又调用GeneratedEnumerator
以充当数据上的游标。MoveNext()
方法逻辑上实现了for循环,只不过,每次调用MoveNext()
只执行一步。更多的数据会经过Current
回传过来。另外补充一点:MoveNext()
方法中的return false
对应于yield break
关键字,用于终止迭代。是否是好理解了?
下面说说异步中的迭代器。
上面的迭代,是同步的过程。而如今Dotnet开发工做更倾向于异步,使用async/await
来作,特别是在提升服务器的可伸缩性方面应用特别多。
上面的代码最大的问题,在于MoveNext()
。很明显,这是个同步的方法。若是它运行须要一段时间,那线程就会被阻塞。这会让代码执行过程变得不可接受。
咱们能作得最接近的方法是异步获取数据:
async Task<List<string>> Sources(int x) {...}
可是,异步获取数据并不能解决数据缓存延迟的问题。
好在,C#为此特地增长了对异步迭代器的支持:
public interface IAsyncEnumerable<out T> { IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default); } public interface IAsyncEnumerator<out T> : IAsyncDisposable { T Current { get; } ValueTask<bool> MoveNextAsync(); } public interface IAsyncDisposable { ValueTask DisposeAsync(); }
注意,从.NET Standard 2.1
和.NET Core 3.0
开始,异步迭代器已经包含在框架中了。而在早期版本中,须要手动引入:
# dotnet add package Microsoft.Bcl.AsyncInterfaces
目前这个包的版本号是5.0.0。
仍是上面例子的逻辑:
IAsyncEnumerable<string> Source(int x) => throw new NotImplementedException();
看看foreach
能够await
后的样子:
await foreach (var item in Sources) { Console.WriteLine(item); }
编译器会将它解释为:
await using (var iter = Sources.GetAsyncEnumerator()) { while (await iter.MoveNextAsync()) { var item = iter.Current; Console.WriteLine(item); } }
这儿有个新东西:await using
。与using
用法相同,但释放时会调用DisposeAsync
,而不是Dispose
,包括回收清理也是异步的。
这段代码其实跟前边的同步版本很是类似,只是增长了await
。可是,编译器会分解并重写异步状态机,它就变成异步的了。原理不细说了,不是本文关注的内容。
那么,带有yield
的迭代器如何异步呢?看代码:
async IAsyncEnumerable<string> Sources(int x) { for (int i = 0; i < 5; i++) { await Task.Delay(100); // 这儿模拟异步延迟 yield return $"result from Sources, x={x}, result {i}"; } }
嗯,看着就舒服。
这就完了?图样图森破。异步有一个很重要的特性:取消。
那么,怎么取消异步迭代?
异步方法经过CancellationToken
来支持取消。异步迭代也不例外。看看上面IAsyncEnumerator<T>
的定义,取消标志也被传递到了GetAsyncEnumerator()
方法中。
那么,若是是手工循环呢?咱们能够这样写:
await foreach (var item in Sources.WithCancellation(cancellationToken).ConfigureAwait(false)) { Console.WriteLine(item); }
这个写法等同于:
var iter = Sources.GetAsyncEnumerator(cancellationToken); await using (iter.ConfigureAwait(false)) { while (await iter.MoveNextAsync().ConfigureAwait(false)) { var item = iter.Current; Console.WriteLine(item); } }
没错,ConfigureAwait
也适用于DisposeAsync()
。因此最后就变成了:
await iter.DisposeAsync().ConfigureAwait(false);
异步迭代的取消捕获作完了,接下来怎么用呢?
看代码:
IAsyncEnumerable<string> Sources(int x) => new SourcesEnumerable(x); class SourcesEnumerable : IAsyncEnumerable<string> { private int x; public SourcesEnumerable(int x) => this.x = x; public async IAsyncEnumerator<string> GetAsyncEnumerator(CancellationToken cancellationToken = default) { for (int i = 0; i < 5; i++) { await Task.Delay(100, cancellationToken); // 模拟异步延迟 yield return $"result from Sources, x={x}, result {i}"; } } }
若是有CancellationToken
经过WithCancellation
传过来,迭代器会在正确的时间被取消 - 包括异步获取数据期间(例子中的Task.Delay
期间)。固然咱们还能够在迭代器中任何一个位置检查IsCancellationRequested
或调用ThrowIfCancellationRequested()
。
此外,编译器也会经过[EnumeratorCancellation]
来完成这个任务,因此咱们还能够这样写:
async IAsyncEnumerable<string> Sources(int x, [EnumeratorCancellation] CancellationToken cancellationToken = default) { for (int i = 0; i < 5; i++) { await Task.Delay(100, cancellationToken); // 模拟异步延迟 yield return $"result from Sources, x={x}, result {i}"; } }
这个写法与上面的代码实际上是同样的,区别在于加了一个参数。
实际应用中,咱们有下面几种写法上的选择:
// 不取消 await foreach (var item in Sources) // 经过WithCancellation取消 await foreach (var item in Sources.WithCancellation(cancellationToken)) // 经过SourcesAsync取消 await foreach (var item in SourcesAsync(cancellationToken)) // 经过SourcesAsync和WithCancellation取消 await foreach (var item in SourcesAsync(cancellationToken).WithCancellation(cancellationToken)) // 经过不一样的Token取消 await foreach (var item in SourcesAsync(tokenA).WithCancellation(tokenB))
几种方式区别于应用场景,实质上没有区别。对两个Token
的方式,任何一个Token
被取消时,任务会被取消。
同步迭代其实在各个代码中用的都比较多,但异步迭代用得很好。一方面,这是个相对新的东西,另外一方面,是会有点绕,因此不少人都不敢碰。
今天这个,也是我的的一些经验总结,但愿对你们理解迭代能有所帮助。
![]() |
微信公众号:老王Plus 扫描二维码,关注我的公众号,能够第一时间获得最新的我的文章和内容推送 本文版权归做者全部,转载请保留此声明和原文连接 |