>>返回《C# 并发编程》html
异步封装编程
public static void MyDownloadStringTaskAsyncRun() { WebClient client = new WebClient(); string res = client.MyDownloadStringTaskAsync(new Uri("http://www.baidu.com")).Result; System.Console.WriteLine(res); } public static Task<string> MyDownloadStringTaskAsync(this WebClient client, Uri address) { var tcs = new TaskCompletionSource<string>(); // 这个事件处理程序会完成 Task 对象,并自行注销。 DownloadStringCompletedEventHandler handler = null; handler = (_, e) => { client.DownloadStringCompleted -= handler; if (e.Cancelled) tcs.TrySetCanceled(); else if (e.Error != null) tcs.TrySetException(e.Error); else tcs.TrySetResult(e.Result); }; // 登记事件,而后开始操做。 client.DownloadStringCompleted += handler; client.DownloadStringAsync(address); return tcs.Task; }
输出:并发
<!DOCTYPE html><!--STATUS OK--> <html> ... ... </html>
public static void GetResponseAsyncRun() { WebRequest request = WebRequest.Create("http://www.baidu.com"); var response = request.MyGetResponseAsync().Result; System.Console.WriteLine($"WebResponse.ContentLength:{response.ContentLength}"); } public static Task<WebResponse> MyGetResponseAsync(this WebRequest client) { return Task<WebResponse>.Factory.FromAsync(client.BeginGetResponse, client.EndGetResponse, null); }
输出:异步
WebResponse.ContentLength:14615
FromAsync
以前调用 BeginOperation
。FromAsync
,并让用 BeginOperation
方法返回的 IAsyncOperation
做为参数,这样也是能够的,可是 FromAsync
会采用效率较低的实现方式。await Task.Run(() => Parallel.ForEach(...));
async
经过使用 Task.Run
,全部的并行处理过程都推给了线程池。this
Task.Run
返回一个表明并行任务的 Task
对象线程
事件流中几种可能关注的状况:code
public delegate void HelloEventHandler(object sender, HelloEventArgs e); public class HelloEventArgs : EventArgs { public string Name { get; set; } public HelloEventArgs(string name) { Name = name; } public int SayHello() { System.Console.WriteLine(Name + " Hello."); return DateTime.Now.Millisecond; } } public static event HelloEventHandler HelloHandlerEvent; public static void FirstLastRun() { var task = Task.Run(() => { Thread.Sleep(500); HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("lilei")); HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("HanMeimei")); HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("Tom")); HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("Jerry")); }); var observable = Observable.FromEventPattern<HelloEventHandler, HelloEventArgs>( handler => (s, a) => handler.Invoke(s, a), handler => HelloHandlerEvent += handler, handler => HelloHandlerEvent -= handler) .Select(evt => evt.EventArgs.SayHello()).ObserveOn(Scheduler.Default) .Select(s => { // 复杂的计算过程。 Thread.Sleep(100); var result = s; Console.WriteLine("Now Millisecond result " + result + " on thread " + Environment.CurrentManagedThreadId); return result; }) .Take(3)//这个标识3个就结束了 ; var res = Task.Run(async () => await observable // //4个hello,3个result,res为最后一个的结果 //.FirstAsync()//4个hello,1个result,res为第一个的结果 //.LastAsync()//4个hello,3个result,res为最后一个的结果 //.ToList()//4个hello,3个result,res为3个的结果 ).Result; System.Console.WriteLine($"Res:{string.Join(',', res)},ResType:{res.GetType().Name}"); task.Wait(); }
输出:server
lilei Hello. HanMeimei Hello. Tom Hello. Jerry Hello. Now Millisecond result 534 on thread 7 Now Millisecond result 544 on thread 7 Now Millisecond result 544 on thread 7 Res:544,ResType:Int32
在 await
调用 Observable 对象或 LastAsync
时,代码(异步地)等待事件流完成,而后返 回最后一个元素。htm
cs IObservable<int> observable = ...; int lastElement = await observable.LastAsync(); // 或者 int lastElement = await observable;
使用 FirstAsync
可捕获事件流中, FirstAsync
方法执行后的下一个事件。
await
订阅事件流,而后在第一个事件到达后当即结束(并退订):cs IObservable<int> observable = ...; int nextElement = await observable.FirstAsync();
使用 ToList
可捕获事件流中的全部事件:
IObservable<int> observable = ...; IList<int> allElements = await observable.ToList();
任何异步操做都可看做一个知足如下条件之一的可观察流:
ToObservable
和 StartAsync
都会当即启动异步操做,而不会等待订阅
若是要让 observable 对象在接受订阅后才启动操做,可以使用 FromAsync
StartAsync
同样,它也支持使用 CancellationToken
取消public static void AsyncObservableRun() { var client = new HttpClient(); IObservable<int> response1 = Task.Run(() => { System.Console.WriteLine("Run 1."); return 1; }).ToObservable();//直接执行 IObservable<int> response2 = Observable.StartAsync(token => Task.Run(() => { System.Console.WriteLine("Run 2."); return 2; }, token));//直接执行 IObservable<int> response3 = Observable.FromAsync(token => Task.Run(() => { System.Console.WriteLine("Run 3."); return 3; }, token));//订阅后执行 var res = Task.Run(async () => await response1 //await response2 //await response3 ).Result; System.Console.WriteLine($"Res:{res}"); }
输出(response1):
Run 1. Run 2. Res:1
输出(response2):
Run 1. Run 2. Res:2
输出(response1):
Run 1. Run 2. Run 3. Res:3
ToObservable
和 StartAsync
都返回一个 observable 对象,表示一个已经启动的异步操做FromAsync
在每次被订阅时都会启动一个全新独立的异步操做。下面的例子使用一个已有的 URL 事件流,在每一个 URL 到达时发出一个请求:
public static void SelectManyRun() { IObservable<int> nums = new int[] { 1, 2, 3 }.ToObservable(); IObservable<int> observable = nums.SelectMany((n, token) => Task.Run<int>(() => { System.Console.WriteLine($"Run {n}."); return n + 1; }, token)); var res = Task.Run(async () => await observable.LastAsync()).Result; System.Console.WriteLine($"Res:{res}"); }
输出:
Run 1. Run 2. Run 3. Res:3
同一个项目中
如今须要它们能互相沟通。
网格转可观察流
public static void BlockToObservableRun() { var buffer = new BufferBlock<int>(); IObservable<int> integers = buffer.AsObservable(); integers.Subscribe( data => Console.WriteLine(data), ex => Console.WriteLine(ex), () => Console.WriteLine("Done")); buffer.Post(1); buffer.Post(2); buffer.Complete(); buffer.Completion.Wait(); }
输出:
1 2
AsObservable
方法会把数据流块的完成信息(或出错信息)转化为可观察流的完成信息。
AggregateException
对象中。可观察流转网格
public static void ObservableToBlockRun() { IObservable<DateTimeOffset> ticks = Observable.Interval(TimeSpan.FromSeconds(1)) .Timestamp() .Select(x => x.Timestamp) .Take(5); var display = new ActionBlock<DateTimeOffset>(x => Console.WriteLine(x)); ticks.Subscribe(display.AsObserver()); try { display.Completion.Wait(); Console.WriteLine("Done."); } catch (Exception ex) { Console.WriteLine(ex); } }
输出:
2020/2/1 上午1:42:24 +00:00 2020/2/1 上午1:42:25 +00:00 2020/2/1 上午1:42:26 +00:00 2020/2/1 上午1:42:27 +00:00 2020/2/1 上午1:42:28 +00:00 Done.