在.NET Core中使用Channel(二)

在咱们以前的文章中,看了一些很是简单的例子来讲明Channel是如何工做的,咱们看到了一些很是漂亮的特性,但大多数状况下它与其余某某Queue实现很是类似。让咱们进入一些更高级的话题。我说的是高级,但其中不少都很是简单。app

读/写分离async

若是你曾经在两个类之间共享队列,你就会知道任何一个类均可以读/写,即便它们本不该该这样作。例如:spa

class MyProducer
{
    private readonly Queue<int> _queue;

    public MyProducer(Queue<int> queue)
    {
        _queue = queue;
    }
}

class MyConsumer
{
    private readonly Queue<int> _queue;

    public MyConsumer(Queue<int> queue)
    {
        _queue = queue;
    }
}

所以,生产者应该只写队列,消费者应该只读队列,在这两种状况下,它们能够对队列执行全部操做。虽然你可能在本身的脑海中但愿消费者只读取,但另外一个开发人员可能会出现调用Enqueue,除了代码审查以外,没有什么能够阻止他们犯这个错误。.net

可是有了Channel,咱们能够作不一样的事情。线程

class Program
{
    static async Task Main(string[] args)
    {
        var myChannel = Channel.CreateUnbounded<int>();
        var producer = new MyProducer(myChannel.Writer);
        var consumer = new MyConsumer(myChannel.Reader);
    }
}

class MyProducer
{
    private readonly ChannelWriter<int> _channelWriter;

    public MyProducer(ChannelWriter<int> channelWriter)
    {
        _channelWriter = channelWriter;
    }
}

class MyConsumer
{
    private readonly ChannelReader<int> _channelReader;

    public MyConsumer(ChannelReader<int> channelReader)
    {
        _channelReader = channelReader;
    }
}

在这个例子中,我添加了一个main方法来向你展现如何建立writer/reader,但它很是简单。这里咱们能够看到,对于咱们的生产者,我只传递给它一个ChannelWriter,因此它只能作写操做。对于咱们的消费者,咱们传递给它一个ChannelReader,因此它只能读取。code

固然,这并不意味着其余开发人员不能修改代码并开始注入根Channel对象,或者同时传入ChannelWriter/ChannelReader,但这至少比以前的状况要好得多。对象

完成一个Channelblog

咱们在前面看到,当在通道上调用ReadAsync()时,它实际上会在那里等待消息,可是若是没有更多的消息到来呢?对于.net中的其余队列,咱们一般须要传递某种共享的布尔值或一个CancellationToken。但有了Channel,就更容易了。队列

考虑如下几点:ip

static async Task Main(string[] args)
{
    var myChannel = Channel.CreateUnbounded<int>();

    _ = Task.Factory.StartNew(async () =>
    {
        for (int i = 0; i < 10; i++)
        {
            await myChannel.Writer.WriteAsync(i);
        }

        myChannel.Writer.Complete();
    });

    try
    {
        while (true)
        {
            var item = await myChannel.Reader.ReadAsync();
            Console.WriteLine(item);
            await Task.Delay(1000);
        }
    }catch(ChannelClosedException e)
    {
        Console.WriteLine("Channel was closed!");
    }
}

我让第二个线程尽量快地写入咱们的Channel,而后完成它。而后咱们的读取器缓慢读取,每次读取之间有1秒的延迟。注意,咱们捕获了ChannelClosedExecption,当你尝试从关闭通道读取最后消息以后时将调用它。

我只是想说清楚。在Channel上调用Complete()不会当即关闭通道并杀死读取该通道的全部人。而是通知全部服务,一旦最后一条消息被读取,咱们就完成了。这很重要,由于这意味着当咱们等待新条目时,当队列是空的时,当队列是满的时,是否调用Complete()都可有可无。咱们能够确定,咱们将完成全部可获得的工做。

在Channel中使用IAsyncEnumerable

以咱们试图关闭一个Channel为例,有两件事引发了个人注意。

咱们有一个while(true)循环。这并非真的那么糟糕,但它有点碍眼。

为了打破这个循环,并知道Channel已经完成,咱们必须捕获异常并将其吞下。

使用命令“ReadAllAsync()”来解决这些问题,它返回一个IAsyncEnumerable。代码看起来有点像这样:

static async Task Main(string[] args)
{
    var myChannel = Channel.CreateUnbounded<int>();

    _ = Task.Factory.StartNew(async () =>
    {
        for (int i = 0; i < 10; i++)
        {
            await myChannel.Writer.WriteAsync(i);
        }

        myChannel.Writer.Complete();
    });

    await foreach(var item in myChannel.Reader.ReadAllAsync())
    {
        Console.WriteLine(item);
        await Task.Delay(1000);
    }
}

如今代码读起来好多了,而且去掉了捕获异常的一些多余的东西。由于咱们使用的是IAsyncEnumerable,因此咱们仍然能够像之前那样等待每一项,可是咱们再也不须要捕获异常,由于当Channel完成时,它只是简单地说没有其余东西了,而后循环退出。

一样,这消除了在处理队列时必须编写的一些混乱代码。之前你必须编写某种无限循环,而如今它只是一个真正整洁的循环,能够处理底层的全部东西。

接下来是什么

到目前为止,咱们一直在使用“无限的”通道。你可能已经猜到了,固然也能够选择使用BoundedChannel。查看本系列的下一部分,更好地理解这些东西。

 欢迎关注个人公众号,若是你有喜欢的外文技术文章,能够经过公众号留言推荐给我。

 

原文连接:https://dotnetcoretutorials.com/2020/11/24/using-channels-in-net-core-part-2-advanced-channels/

相关文章
相关标签/搜索