using System;
dom
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Collections.Concurrent;
using System.Threading.Tasks;
namespace WfConcurrentTest
{
public static class TQueue
{
public static int pages;
public static async Task RunProgram()
{
#region 建立任务序列,产生工做量
//1 建立一个任务序列
var taskQueue = new ConcurrentQueue<CustomTask>();
//2 建立一个取消标志,用来将任务放到队列后中止工做的。
var cts = new CancellationTokenSource();
//3 启动一个单独的工做者线程来将任务放入队列中(为异步处理产生工做量)
var taskSource = Task.Run(() => TaskProducer(taskQueue, pages));
#endregion
//建立4个工做者,随机等待一段时间,从任务队列中获得一个任务,一直重复这个过程
Task[] Workers = new Task[4];
for (int i = 1; i <= 4; i++)
{
string workerid = i.ToString();
Workers[i - 1] = Task.Run(() => TaskWorker(taskQueue, $"旅行社 {workerid}", cts.Token));
}
await taskSource;//等待单个工做者线程结束
cts.CancelAfter(200);
await Task.WhenAll(Workers);//待四个工做者均完成任务
}
private static async Task TaskWorker(ConcurrentQueue<CustomTask> Queue, string name, CancellationToken token)
{
CustomTask workItem;
bool dequeueSuccessful = false;
await GetRandomDelay();
do
{
dequeueSuccessful = Queue.TryDequeue(out workItem);//偿试将任务移出队列
if (dequeueSuccessful)
{
workItem.Dothing();
await GetRandomDelay();
}
} while (!token.IsCancellationRequested);
}
private static Task GetRandomDelay()
{
int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
return Task.Delay(delay);
}
/// <summary>
/// 产生工做量
/// 该方法为Static静态,static async Task
/// </summary>
/// <param name="Queue"></param>
/// <returns></returns>
static async Task TaskProducer(ConcurrentQueue<CustomTask> Queue, int pages)
{
for (int i = 1; i < pages; i++)
{
await Task.Delay(50);
var workItem = new CustomTask(i.ToString()); //生成任务
Queue.Enqueue(workItem);//向Queue中添加元素
Console.WriteLine($"{DateTime.Now} 计划: {workItem.Url} 已经下达");
}
}
}
class CustomTask
{
public string Url;
public CustomTask(string url)
{
this.Url = url;
}
public void Dothing()
{
Console.WriteLine($"{DateTime.Now} {Url} 欢迎您 !");
}
}
}