C#线程学习笔记七:Task详细用法
一、task类简介:
task类是在.net framework 4.0中提供的新功能,主要用于异步操作的控制。它比thread和threadpool提供了更为强大的功能,并且更方便使用。
task和task<tresult>类:前者接收的是action委托类型;后者接收的是func<tresult>委托类型。
任务task和线程thread的区别:
1、任务是架构在线程之上。也就是说任务最终还是要抛给线程去执行,它们都是在同一命名空间system.threading下。
2、任务跟线程并不是一对一的关系。比如说开启10个任务并不一定会开启10个线程,因为使用task开启新任务时,是从线程池中调用线程,这点与
threadpool.queueuserworkitem类似。
二、task的创建
2.1创建方式1:调用构造函数
class program { static void main(string[] args) { #region 工作者线程:使用任务实现异步 threadpool.setmaxthreads(1000, 1000); printmessage("main thread start."); //调用构造函数创建task对象 task<int> task = new task<int>(n => asyncmethod((int)n), 10); //启动任务 task.start(); //等待任务完成 task.wait(); console.writeline("the method result is: " + task.result); console.readline(); #endregion } /// <summary> /// 打印线程池信息 /// </summary> /// <param name="data"></param> private static void printmessage(string data) { //获得线程池中可用的工作者线程数量及i/o线程数量 threadpool.getavailablethreads(out int workthreadnumber, out int iothreadnumber); console.writeline("{0}\n currentthreadid is:{1}\n currentthread is background:{2}\n workerthreadnumber is:{3}\n iothreadnumbers is:{4}\n", data, thread.currentthread.managedthreadid, thread.currentthread.isbackground.tostring(), workthreadnumber.tostring(), iothreadnumber.tostring()); } /// <summary> /// 异步方法 /// </summary> /// <param name="n"></param> /// <returns></returns> private static int asyncmethod(int n) { thread.sleep(1000); printmessage("asynchoronous method."); int sum = 0; for (int i = 1; i < n; i++) { //运算溢出检查 checked { sum += i; } } return sum; } }
2.2创建方式2:任务工厂
class program { static void main(string[] args) { #region 工作者线程:使用任务工厂实现异步 ////无参无返回值 //threadpool.setmaxthreads(1000, 1000); //task.factory.startnew(() => printmessage("main thread.")); //console.read(); //有参有返回值 threadpool.setmaxthreads(1000, 1000); printmessage("main thread start."); var task = task.factory.startnew(n => asyncmethod((int)n), 10); //等待任务完成 task.wait(); console.writeline("the method result is: " + task.result); console.readline(); #endregion } /// <summary> /// 打印线程池信息 /// </summary> /// <param name="data"></param> private static void printmessage(string data) { //获得线程池中可用的工作者线程数量及i/o线程数量 threadpool.getavailablethreads(out int workthreadnumber, out int iothreadnumber); console.writeline("{0}\n currentthreadid is:{1}\n currentthread is background:{2}\n workerthreadnumber is:{3}\n iothreadnumbers is:{4}\n", data, thread.currentthread.managedthreadid, thread.currentthread.isbackground.tostring(), workthreadnumber.tostring(), iothreadnumber.tostring()); } /// <summary> /// 异步方法 /// </summary> /// <param name="n"></param> /// <returns></returns> private static int asyncmethod(int n) { thread.sleep(1000); printmessage("asynchoronous method."); int sum = 0; for (int i = 1; i < n; i++) { //运算溢出检查 checked { sum += i; } } return sum; } }
2.3创建方式3:run方法
class program { static void main(string[] args) { #region 工作者线程:使用task.run实现异步 threadpool.setmaxthreads(1000, 1000); printmessage("main thread start."); var task = task.run(() => asyncmethod(10)); //等待任务完成 task.wait(); console.writeline("the method result is: " + task.result); console.readline(); #endregion } /// <summary> /// 打印线程池信息 /// </summary> /// <param name="data"></param> private static void printmessage(string data) { //获得线程池中可用的工作者线程数量及i/o线程数量 threadpool.getavailablethreads(out int workthreadnumber, out int iothreadnumber); console.writeline("{0}\n currentthreadid is:{1}\n currentthread is background:{2}\n workerthreadnumber is:{3}\n iothreadnumbers is:{4}\n", data, thread.currentthread.managedthreadid, thread.currentthread.isbackground.tostring(), workthreadnumber.tostring(), iothreadnumber.tostring()); } /// <summary> /// 异步方法 /// </summary> /// <param name="n"></param> /// <returns></returns> private static int asyncmethod(int n) { thread.sleep(1000); printmessage("asynchoronous method."); int sum = 0; for (int i = 1; i < n; i++) { //运算溢出检查 checked { sum += i; } } return sum; } }
三、task的简略生命周期
可通过status属性获取。
状态 | 说明 |
created | 表示默认初始化任务,但是工厂及run创建方式会直接跳过。 |
waitingtorun | 表示等待任务调度器分配线程给任务执行。 |
rantocompletion | 表示任务执行完毕。 |
四、task的控制
方法名 | 说明 |
task.wait | 如task1.wait();就是等待task1任务的执行,执行完成后状态变为completed。 |
task.waitall | 等待所有的任务执行完毕。 |
task.waitany | 等待任意一个任务完成后就继续向下执行。 |
task.continuewith | 上一个任务执行完成后自动启动下一个任务,实现任务的按序进行。 |
cancellationtokensource | 通过其token来取消一个task。 |
4.1、组合任务
class program { public static void main() { #region 工作者线程:task组合任务 //创建一个任务 task<int> task = new task<int>(() => { int sum = 0; console.writeline("使用任务实现异步。"); for (int i = 0; i < 10; i++) { sum += i; } return sum; }); //任务启动并安排到任务队列等待执行(system.threading.tasks.taskscheduler) task.start(); //任务完成时执行处理 task cwt = task.continuewith(t => { console.writeline("任务的执行结果:{0}", t.result.tostring()); }); task.wait(); cwt.wait(); console.readline(); #endregion } }
运行结果如下:
4.2、串行任务
class program { public static void main() { #region 工作者线程:task串行任务 //堆栈 concurrentstack<int> stack = new concurrentstack<int>(); //t1最早串行 var t1 = task.factory.startnew(() => { stack.push(1); stack.push(2); }); //t2、t3并行执行 var t2 = t1.continuewith(t => { stack.trypop(out int result); console.writeline("task t2 result={0},thread id is {1}.", result, thread.currentthread.managedthreadid); }); var t3 = t1.continuewith(t => { stack.trypop(out int result); console.writeline("task t3 result={0},thread id is {1}.", result, thread.currentthread.managedthreadid); }); //等待t2、t3执行完毕 task.waitall(t2, t3); //t4串行执行 var t4 = task.factory.startnew(() => { console.writeline("the stack count={0},thread id is {1}.", stack.count, thread.currentthread.managedthreadid); }); t4.wait(); console.readline(); #endregion } }
运行结果如下:
4.3、子任务
class program { public static void main() { #region 工作者线程:task子任务 task<string[]> parent = new task<string[]>(state => { console.writeline(state); string[] result = new string[2]; //创建并启动子任务 new task(() => { result[0] = "子任务1。"; }, taskcreationoptions.attachedtoparent).start(); new task(() => { result[1] = "子任务2。"; }, taskcreationoptions.attachedtoparent).start(); return result; }, "我是父任务,我创建了2个子任务,它们执行完后我才会结束执行。"); //任务完成时执行处理 parent.continuewith(t => { array.foreach(t.result, r => console.writeline(r)); }); //启动父任务 parent.start(); parent.wait(); console.readline(); #endregion } }
运行结果如下:
4.4、动态并行任务
/// <summary> /// 结点类 /// </summary> class node { public node left { get; set; } public node right { get; set; } public string text { get; set; } } class program { public static void main() { #region 工作者线程:task动态并行任务 node root = getnode(); displaytree(root); console.readline(); #endregion } /// <summary> /// getnode方法 /// </summary> /// <returns></returns> static node getnode() { node root = new node { left = new node { left = new node { text = "l-l" }, right = new node { text = "l-r" }, text = "l" }, right = new node { left = new node { text = "r-l" }, right = new node { text = "r-r" }, text = "r" }, text = "root" }; return root; } /// <summary> /// displaytree方法 /// </summary> /// <param name="root"></param> static void displaytree(node root) { var task = task.factory.startnew ( () => displaynode(root), cancellationtoken.none, taskcreationoptions.none, taskscheduler.default ); task.wait(); } /// <summary> /// displaynode方法 /// </summary> /// <param name="current"></param> static void displaynode(node current) { if (current.left != null) { task.factory.startnew ( () => displaynode(current.left), cancellationtoken.none, taskcreationoptions.attachedtoparent, taskscheduler.default ); } if (current.right != null) { task.factory.startnew ( () => displaynode(current.right), cancellationtoken.none, taskcreationoptions.attachedtoparent, taskscheduler.default ); console.writeline("the current node text={0},thread id is {1}.", current.text, thread.currentthread.managedthreadid); } } }
运行结果如下:
4.5、取消任务
class program { static void main(string[] args) { #region 取消任务 threadpool.setmaxthreads(1000, 1000); printmessage("main thread start."); cancellationtokensource cts = new cancellationtokensource(); //调用构造函数创建task对象,将一个cancellationtoken传给task构造器从而使task和cancellationtoken关联起来。 task<int> task = new task<int>(n => asyncmethod(cts.token, (int)n), 10); //启动任务 task.start(); //延迟取消任务 thread.sleep(3000); //取消任务 cts.cancel(); console.writeline("the method result is: " + task.result); console.readline(); #endregion } /// <summary> /// 打印线程池信息 /// </summary> /// <param name="data"></param> private static void printmessage(string data) { //获得线程池中可用的工作者线程数量及i/o线程数量 threadpool.getavailablethreads(out int workthreadnumber, out int iothreadnumber); console.writeline("{0}\n currentthreadid is:{1}\n currentthread is background:{2}\n workerthreadnumber is:{3}\n iothreadnumbers is:{4}\n", data, thread.currentthread.managedthreadid, thread.currentthread.isbackground.tostring(), workthreadnumber.tostring(), iothreadnumber.tostring()); } /// <summary> /// 异步方法 /// </summary> /// <param name="ct"></param> /// <param name="n"></param> /// <returns></returns> private static int asyncmethod(cancellationtoken ct, int n) { thread.sleep(1000); printmessage("asynchoronous method."); int sum = 0; try { for (int i = 1; i < n; i++) { //当cancellationtokensource对象调用cancel方法时,就会引起operationcanceledexception异常, //通过调用cancellationtoken的throwifcancellationrequested方法来定时检查操作是否已经取消, //这个方法和cancellationtoken的iscancellationrequested属性类似。 ct.throwifcancellationrequested(); thread.sleep(500); //运算溢出检查 checked { sum += i; } } } catch (exception e) { console.writeline("exception is:" + e.gettype().name); console.writeline("operation is canceled."); } return sum; } }
运行结果如下:
4.6、处理单个任务中的异常
class program { public static void main() { #region 工作者线程:处理单个任务中的异常 try { task<int> task = task.run(() => singletaskexceptionmethod("single task.", 2)); int result = task.getawaiter().getresult(); console.writeline("result:{0}", result); } catch (exception ex) { console.writeline("single task exception caught:{0}", ex.message); } console.readline(); #endregion } /// <summary> /// singletaskexception方法 /// </summary> /// <param name="name"></param> /// <param name="seconds"></param> /// <returns></returns> static int singletaskexceptionmethod(string name, int seconds) { console.writeline("task {0} is running on thread {1}.is it threadpool thread?:{2}", name, thread.currentthread.managedthreadid, thread.currentthread.isthreadpoolthread); thread.sleep(timespan.fromseconds(seconds)); throw new exception("boom."); } }
运行结果如下:
4.7、处理多个任务中的异常
class program { public static void main() { #region 工作者线程:处理多个任务中的异常 try { var t1 = new task<int>(() => multipletaskexceptionmethod("multiple task 1", 3)); var t2 = new task<int>(() => multipletaskexceptionmethod("multiple task 2", 2)); var complextask = task.whenall(t1, t2); var exceptionhandler = complextask.continuewith ( t => console.writeline("result:{0}", t.result), taskcontinuationoptions.onlyonfaulted ); t1.start(); t2.start(); task.waitall(t1, t2); console.readline(); } catch (aggregateexception ex) { ex.handle ( exception => { console.writeline(exception.message); return true; } ); } #endregion } /// <summary> /// multipletaskexception方法 /// </summary> /// <param name="name"></param> /// <param name="seconds"></param> /// <returns></returns> static int multipletaskexceptionmethod(string name, int seconds) { console.writeline("task {0} is running on thread id {1}. is it threadpool thread?:{2}", name, thread.currentthread.managedthreadid, thread.currentthread.isthreadpoolthread); thread.sleep(timespan.fromseconds(seconds)); throw new exception(string.format("task {0} boom.", name)); } }
运行结果如下:
4.8、task.fromresult的应用
class program { //字典 private static readonly idictionary<string, string> cache = new dictionary<string, string>() { {"0001","a"}, {"0002","b"}, {"0003","c"}, {"0004","d"}, {"0005","e"}, {"0006","f"}, }; public static void main() { #region 工作者线程:task.fromresult的应用 task<string> task = getvaluefromcachemethod("0006"); console.writeline("result={0}", task.result.tostring()); console.readline(); #endregion } /// <summary> /// getvaluefromcache方法 /// </summary> /// <param name="key"></param> /// <returns></returns> private static task<string> getvaluefromcachemethod(string key) { console.writeline("getvaluefromcache开始执行……"); string result = string.empty; thread.sleep(3000); console.writeline("getvaluefromcache继续执行……"); if (cache.trygetvalue(key, out result)) { return task.fromresult(result); } return task.fromresult(""); } }
运行结果如下:
4.9、使用iprogress实现异步编程的进程通知
iprogress<in t>只提供了一个方法void report(t value),通过report方法把一个t类型的值报告给iprogress,然后iprogress<in t>的实现类progress<in t>的构造函数
接收类型为action<t>的形参,通过这个委托让进度显示在ui界面中。
class program { public static void main() { #region 工作者线程:使用iprogress实现异步编程的进程通知 task task = display(); task.wait(); console.readline(); #endregion } /// <summary> /// doprocessing方法 /// </summary> /// <param name="progress"></param> static void doprocessing(iprogress<int> progress) { for (int i = 1; i <= 100; i++) { thread.sleep(100); if (progress != null) { progress.report(i); } } } /// <summary> /// display方法 /// </summary> /// <returns></returns> static async task display() { //当前线程 var progress = new progress<int> ( percent => { console.clear(); console.write("{0}%", percent); } ); //线程池线程 await task.run(() => doprocessing(progress)); console.writeline(""); console.writeline("结束"); } }
运行结果如下:
4.10、factory.fromasync的应用
(简apm模式(委托)转换为任务)(beginxxx和endxxx)
带回调方式:
class program { //使用委托实现异步,是使用了异步编程模型apm。 private delegate string asynchronoustask(string threadname); public static void main() { #region 工作者线程:带回调方式的factory.fromasync的应用 asynchronoustask d = testmethod; console.writeline("option 1"); task<string> task = task<string>.factory.fromasync(d.begininvoke("asynctaskthread", callback, "a delegate asynchronous called."), d.endinvoke); task.continuewith(t => console.writeline("callback is finished,now running a continuation. result: {0}",t.result)); while (!task.iscompleted) { console.writeline(task.status); thread.sleep(timespan.fromseconds(0.5)); } console.writeline(task.status); console.readline(); #endregion } /// <summary> /// fromasync方法 /// </summary> /// <param name="threadname"></param> /// <returns></returns> private static string fromasyncmethod(string threadname) { console.writeline("starting..."); console.writeline("is it threadpool thread?:{0}", thread.currentthread.isthreadpoolthread); thread.sleep(timespan.fromseconds(2)); thread.currentthread.name = threadname; return string.format("thread name:{0}", thread.currentthread.name); } /// <summary> /// callback方法 /// </summary> /// <param name="ar"></param> private static void callback(iasyncresult ar) { console.writeline("starting a callback..."); console.writeline("state passed to a callbak: {0}", ar.asyncstate); console.writeline("is it threadpool thread?:{0}", thread.currentthread.isthreadpoolthread); console.writeline("threadpool worker thread id: {0}",thread.currentthread.managedthreadid); } }
运行结果如下:
不带回调方式:
class program { //使用委托实现异步,是使用了异步编程模型apm。 private delegate string asynchronoustask(string threadname); public static void main() { #region 工作者线程:不带回调方式的factory.fromasync的应用 asynchronoustask d = fromasyncmethod; task<string> task = task<string>.factory.fromasync(d.begininvoke, d.endinvoke, "asynctaskthread", "a delegate asynchronous called."); task.continuewith(t => console.writeline("task is completed, now running a continuation! result: {0}",t.result)); while (!task.iscompleted) { console.writeline(task.status); thread.sleep(timespan.fromseconds(0.5)); } console.writeline(task.status); console.readline(); #endregion } /// <summary> /// fromasync方法 /// </summary> /// <param name="threadname"></param> /// <returns></returns> private static string fromasyncmethod(string threadname) { console.writeline("starting..."); console.writeline("is it threadpool thread?:{0}", thread.currentthread.isthreadpoolthread); thread.sleep(timespan.fromseconds(2)); thread.currentthread.name = threadname; return string.format("thread name:{0}", thread.currentthread.name); } }
运行结果如下:
参考自: