深入分析C# Task
task的msdn的描述如下:
【task类的表示单个操作不会返回一个值,通常以异步方式执行。
task对象是一种的中心思想基于任务的异步模式首次引入.netframework 4 中。
因为由执行工作task对象通常以异步方式执行线程池线程上而不是以同步方式在主应用程序线程中,可以使用status属性,并将iscanceled, iscompleted,和isfaulted属性,以确定任务的状态。
大多数情况下,lambda 表达式用于指定该任务所执行的工作量。
对于返回值的操作,您使用task类。】
1、task的优势
threadpool相比thread来说具备了很多优势,但是threadpool却又存在一些使用上的不方便。比如:
- threadpool不支持线程的取消、完成、失败通知等交互性操作;
- threadpool不支持线程执行的先后次序;
以往,如果开发者要实现上述功能,需要完成很多额外的工作,现在,fcl中提供了一个功能更强大的概念:task。task在线程池的基础上进行了优化,并提供了更多的api。在fcl4.0中,如果我们要编写多线程程序,task显然已经优于传统的方式。
以下是一个简单的任务示例:
using system; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { static void main(string[] args) { task t = new task(() => { console.writeline("任务开始工作……"); //模拟工作过程 thread.sleep(5000); }); t.start(); t.continuewith((task) => { console.writeline("任务完成,完成时候的状态为:"); console.writeline("iscanceled={0}\tiscompleted={1}\tisfaulted={2}", task.iscanceled, task.iscompleted, task.isfaulted); }); console.readkey(); } } }
2、task的用法
2.1、创建任务
(一)无返回值的方式
方式1:
var t1 = new task(() => taskmethod("task 1")); t1.start(); task.waitall(t1);//等待所有任务结束 注:任务的状态: start之前为:created start之后为:waitingtorun
方式2:
task.run(() => taskmethod("task 2"));
方式3:
task.factory.startnew(() => taskmethod("task 3")); 直接异步的方法 //或者 var t3=task.factory.startnew(() => taskmethod("task 3")); task.waitall(t3);//等待所有任务结束 //任务的状态: start之前为:running start之后为:running
using system; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { static void main(string[] args) { var t1 = new task(() => taskmethod("task 1")); var t2 = new task(() => taskmethod("task 2")); t2.start(); t1.start(); task.waitall(t1, t2); task.run(() => taskmethod("task 3")); task.factory.startnew(() => taskmethod("task 4")); //标记为长时间运行任务,则任务不会使用线程池,而在单独的线程中运行。 task.factory.startnew(() => taskmethod("task 5"), taskcreationoptions.longrunning); #region 常规的使用方式 console.writeline("主线程执行业务处理."); //创建任务 task task = new task(() => { console.writeline("使用system.threading.tasks.task执行异步操作."); for (int i = 0; i < 10; i++) { console.writeline(i); } }); //启动任务,并安排到当前任务队列线程中执行任务(system.threading.tasks.taskscheduler) task.start(); console.writeline("主线程执行其他处理"); task.wait(); #endregion thread.sleep(timespan.fromseconds(1)); console.readline(); } static void taskmethod(string name) { console.writeline("task {0} is running on a thread id {1}. is thread pool thread: {2}", name, thread.currentthread.managedthreadid, thread.currentthread.isthreadpoolthread); } } }
async/await的实现方式:
using system; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { async static void asyncfunction() { await task.delay(1); console.writeline("使用system.threading.tasks.task执行异步操作."); for (int i = 0; i < 10; i++) { console.writeline(string.format("asyncfunction:i={0}", i)); } } public static void main() { console.writeline("主线程执行业务处理."); asyncfunction(); console.writeline("主线程执行其他处理"); for (int i = 0; i < 10; i++) { console.writeline(string.format("main:i={0}", i)); } console.readline(); } } }
(二)带返回值的方式
方式4:
task<int> task = createtask("task 1"); task.start(); int result = task.result;
using system; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { static task<int> createtask(string name) { return new task<int>(() => taskmethod(name)); } static void main(string[] args) { taskmethod("main thread task"); task<int> task = createtask("task 1"); task.start(); int result = task.result; console.writeline("task 1 result is: {0}", result); task = createtask("task 2"); //该任务会运行在主线程中 task.runsynchronously(); result = task.result; console.writeline("task 2 result is: {0}", result); task = createtask("task 3"); console.writeline(task.status); task.start(); while (!task.iscompleted) { console.writeline(task.status); thread.sleep(timespan.fromseconds(0.5)); } console.writeline(task.status); result = task.result; console.writeline("task 3 result is: {0}", result); #region 常规使用方式 //创建任务 task<int> getsumtask = new task<int>(() => getsum()); //启动任务,并安排到当前任务队列线程中执行任务(system.threading.tasks.taskscheduler) getsumtask.start(); console.writeline("主线程执行其他处理"); //等待任务的完成执行过程。 getsumtask.wait(); //获得任务的执行结果 console.writeline("任务执行结果:{0}", getsumtask.result.tostring()); #endregion } static int taskmethod(string name) { console.writeline("task {0} is running on a thread id {1}. is thread pool thread: {2}", name, thread.currentthread.managedthreadid, thread.currentthread.isthreadpoolthread); thread.sleep(timespan.fromseconds(2)); return 42; } static int getsum() { int sum = 0; console.writeline("使用task执行异步操作."); for (int i = 0; i < 100; i++) { sum += i; } return sum; } } }
async/await的实现:
using system; using system.threading.tasks; namespace consoleapp1 { class program { public static void main() { var ret1 = asyncgetsum(); console.writeline("主线程执行其他处理"); for (int i = 1; i <= 3; i++) console.writeline("call main()"); int result = ret1.result; //阻塞主线程 console.writeline("任务执行结果:{0}", result); } async static task<int> asyncgetsum() { await task.delay(1); int sum = 0; console.writeline("使用task执行异步操作."); for (int i = 0; i < 100; i++) { sum += i; } return sum; } } }
2.2、组合任务.continuewith
简单demo:
using system; using system.threading.tasks; namespace consoleapp1 { class program { public static void main() { //创建一个任务 task<int> task = new task<int>(() => { int sum = 0; console.writeline("使用task执行异步操作."); for (int i = 0; i < 100; i++) { sum += i; } return sum; }); //启动任务,并安排到当前任务队列线程中执行任务(system.threading.tasks.taskscheduler) task.start(); console.writeline("主线程执行其他处理"); //任务完成时执行处理。 task cwt = task.continuewith(t => { console.writeline("任务完成后的执行结果:{0}", t.result.tostring()); }); task.wait(); cwt.wait(); } } }
任务的串行:
using system; using system.collections.concurrent; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { static void main(string[] args) { concurrentstack<int> stack = new concurrentstack<int>(); //t1先串行 var t1 = task.factory.startnew(() => { stack.push(1); stack.push(2); }); //t2,t3并行执行 var t2 = t1.continuewith(t => { int result; stack.trypop(out result); console.writeline("task t2 result={0},thread id {1}", result, thread.currentthread.managedthreadid); }); //t2,t3并行执行 var t3 = t1.continuewith(t => { int result; stack.trypop(out result); console.writeline("task t3 result={0},thread id {1}", result, thread.currentthread.managedthreadid); }); //等待t2和t3执行完 task.waitall(t2, t3); //t7串行执行 var t4 = task.factory.startnew(() => { console.writeline("当前集合元素个数:{0},thread id {1}", stack.count, thread.currentthread.managedthreadid); }); t4.wait(); } } }
子任务:
using system; using system.threading.tasks; namespace consoleapp1 { class program { public static void main() { task<string[]> parent = new task<string[]>(state => { console.writeline(state); string[] result = new string[2]; //创建并启动子任务 new task(() => { result[0] = "我是子任务1。"; }, taskcreationoptions.attachedtoparent).start(); new task(() => { result[1] = "我是子任务2。"; }, taskcreationoptions.attachedtoparent).start(); return result; }, "我是父任务,并在我的处理过程中创建多个子任务,所有子任务完成以后我才会结束执行。"); //任务处理完成后执行的操作 parent.continuewith(t => { array.foreach(t.result, r => console.writeline(r)); }); //启动父任务 parent.start(); //等待任务结束 wait只能等待父线程结束,没办法等到父线程的continuewith结束 //parent.wait(); console.readline(); } } }
动态并行(taskcreationoptions.attachedtoparent) 父任务等待所有子任务完成后 整个任务才算完成
using system; using system.threading; using system.threading.tasks; namespace consoleapp1 { class node { public node left { get; set; } public node right { get; set; } public string text { get; set; } } class program { static node getnode() { node root = new node { left = new node { left = new node { text = "l-l" }, right = new node { text = "l-r" }, text = "l" }, right = new node { left = new node { text = "r-l" }, right = new node { text = "r-r" }, text = "r" }, text = "root" }; return root; } static void main(string[] args) { node root = getnode(); displaytree(root); } static void displaytree(node root) { var task = task.factory.startnew(() => displaynode(root), cancellationtoken.none, taskcreationoptions.none, taskscheduler.default); task.wait(); } static void displaynode(node current) { if (current.left != null) task.factory.startnew(() => displaynode(current.left), cancellationtoken.none, taskcreationoptions.attachedtoparent, taskscheduler.default); if (current.right != null) task.factory.startnew(() => displaynode(current.right), cancellationtoken.none, taskcreationoptions.attachedtoparent, taskscheduler.default); console.writeline("当前节点的值为{0};处理的threadid={1}", current.text, thread.currentthread.managedthreadid); } } }
2.3、取消任务 cancellationtokensource
using system; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { private static int taskmethod(string name, int seconds, cancellationtoken token) { console.writeline("task {0} is running on a thread id {1}. is thread pool thread: {2}", name, thread.currentthread.managedthreadid, thread.currentthread.isthreadpoolthread); for (int i = 0; i < seconds; i++) { thread.sleep(timespan.fromseconds(1)); if (token.iscancellationrequested) return -1; } return 42 * seconds; } private static void main(string[] args) { var cts = new cancellationtokensource(); var longtask = new task<int>(() => taskmethod("task 1", 10, cts.token), cts.token); console.writeline(longtask.status); cts.cancel(); console.writeline(longtask.status); console.writeline("first task has been cancelled before execution"); cts = new cancellationtokensource(); longtask = new task<int>(() => taskmethod("task 2", 10, cts.token), cts.token); longtask.start(); for (int i = 0; i < 5; i++) { thread.sleep(timespan.fromseconds(0.5)); console.writeline(longtask.status); } cts.cancel(); for (int i = 0; i < 5; i++) { thread.sleep(timespan.fromseconds(0.5)); console.writeline(longtask.status); } console.writeline("a task has been completed with result {0}.", longtask.result); } } }
2.4、处理任务中的异常
单个任务:
using system; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { static int taskmethod(string name, int seconds) { console.writeline("task {0} is running on a thread id {1}. is thread pool thread: {2}", name, thread.currentthread.managedthreadid, thread.currentthread.isthreadpoolthread); thread.sleep(timespan.fromseconds(seconds)); throw new exception("boom!"); return 42 * seconds; } static void main(string[] args) { try { task<int> task = task.run(() => taskmethod("task 2", 2)); int result = task.getawaiter().getresult(); console.writeline("result: {0}", result); } catch (exception ex) { console.writeline("task 2 exception caught: {0}", ex.message); } console.writeline("----------------------------------------------"); console.writeline(); } } }
多个任务:
using system; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { static int taskmethod(string name, int seconds) { console.writeline("task {0} is running on a thread id {1}. is thread pool thread: {2}", name, thread.currentthread.managedthreadid, thread.currentthread.isthreadpoolthread); thread.sleep(timespan.fromseconds(seconds)); throw new exception(string.format("task {0} boom!", name)); return 42 * seconds; } public static void main(string[] args) { try { var t1 = new task<int>(() => taskmethod("task 3", 3)); var t2 = new task<int>(() => taskmethod("task 4", 2)); var complextask = task.whenall(t1, t2); var exceptionhandler = complextask.continuewith(t => console.writeline("result: {0}", t.result), taskcontinuationoptions.onlyonfaulted ); t1.start(); t2.start(); task.waitall(t1, t2); } catch (aggregateexception ex) { ex.handle(exception => { console.writeline(exception.message); return true; }); } } } }
async/await的方式:
using system; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { static async task thrownotimplementedexceptionasync() { throw new notimplementedexception(); } static async task throwinvalidoperationexceptionasync() { throw new invalidoperationexception(); } static async task normal() { await fun(); } static task fun() { return task.run(() => { for (int i = 1; i <= 10; i++) { console.writeline("i={0}", i); thread.sleep(200); } }); } static async task observeoneexceptionasync() { var task1 = thrownotimplementedexceptionasync(); var task2 = throwinvalidoperationexceptionasync(); var task3 = normal(); try { //异步的方式 task alltasks = task.whenall(task1, task2, task3); await alltasks; //同步的方式 //task.waitall(task1, task2, task3); } catch (notimplementedexception ex) { console.writeline("task1 任务报错!"); } catch (invalidoperationexception ex) { console.writeline("task2 任务报错!"); } catch (exception ex) { console.writeline("任务报错!"); } } public static void main() { task task = observeoneexceptionasync(); console.writeline("主线程继续运行........"); task.wait(); } } }
2.5、task.fromresult的应用
using system; using system.collections.generic; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { static idictionary<string, string> cache = new dictionary<string, string>() { {"0001","a"}, {"0002","b"}, {"0003","c"}, {"0004","d"}, {"0005","e"}, {"0006","f"}, }; public static void main() { task<string> task = getvaluefromcache("0006"); console.writeline("主程序继续执行。。。。"); string result = task.result; console.writeline("result={0}", result); } private static task<string> getvaluefromcache(string key) { console.writeline("getvaluefromcache开始执行。。。。"); string result = string.empty; //task.delay(5000); thread.sleep(5000); console.writeline("getvaluefromcache继续执行。。。。"); if (cache.trygetvalue(key, out result)) { return task.fromresult(result); } return task.fromresult(""); } } }
2.6、使用iprogress实现异步编程的进程通知
iprogress<in t>只提供了一个方法void report(t value),通过report方法把一个t类型的值报告给iprogress,然后iprogress<in t>的实现类progress<in t>的构造函数接收类型为action<t>的形参,通过这个委托让进度显示在ui界面中。
using system; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { static void doprocessing(iprogress<int> progress) { for (int i = 0; i <= 100; ++i) { thread.sleep(100); if (progress != null) { progress.report(i); } } } static async task display() { //当前线程 var progress = new progress<int>(percent => { console.clear(); console.write("{0}%", percent); }); //线程池线程 await task.run(() => doprocessing(progress)); console.writeline(""); console.writeline("结束"); } public static void main() { task task = display(); task.wait(); } } }
2.7、factory.fromasync的应用 (简apm模式(委托)转换为任务)(beginxxx和endxxx)
带回调方式的
using system; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { private delegate string asynchronoustask(string threadname); private static string test(string threadname) { console.writeline("starting..."); console.writeline("is thread pool thread: {0}", thread.currentthread.isthreadpoolthread); thread.sleep(timespan.fromseconds(2)); thread.currentthread.name = threadname; return string.format("thread name: {0}", thread.currentthread.name); } private static void callback(iasyncresult ar) { console.writeline("starting a callback..."); console.writeline("state passed to a callbak: {0}", ar.asyncstate); console.writeline("is thread pool thread: {0}", thread.currentthread.isthreadpoolthread); console.writeline("thread pool worker thread id: {0}", thread.currentthread.managedthreadid); } //执行的流程是 先执行test--->callback--->task.continuewith static void main(string[] args) { asynchronoustask d = test; console.writeline("option 1"); task<string> task = task<string>.factory.fromasync( d.begininvoke("asynctaskthread", callback, "a delegate asynchronous call"), d.endinvoke); task.continuewith(t => console.writeline("callback is finished, now running a continuation! result: {0}", t.result)); while (!task.iscompleted) { console.writeline(task.status); thread.sleep(timespan.fromseconds(0.5)); } console.writeline(task.status); } } }
不带回调方式的
using system; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { private delegate string asynchronoustask(string threadname); private static string test(string threadname) { console.writeline("starting..."); console.writeline("is thread pool thread: {0}", thread.currentthread.isthreadpoolthread); thread.sleep(timespan.fromseconds(2)); thread.currentthread.name = threadname; return string.format("thread name: {0}", thread.currentthread.name); } //执行的流程是 先执行test--->task.continuewith static void main(string[] args) { asynchronoustask d = test; task<string> task = task<string>.factory.fromasync( d.begininvoke, d.endinvoke, "asynctaskthread", "a delegate asynchronous call"); task.continuewith(t => console.writeline("task is completed, now running a continuation! result: {0}", t.result)); while (!task.iscompleted) { console.writeline(task.status); thread.sleep(timespan.fromseconds(0.5)); } console.writeline(task.status); } } }
//task启动带参数和返回值的函数任务 //下面的例子test2 是个带参数和返回值的函数。 private int test2(object i) { this.invoke(new action(() => { picturebox1.visible = true; })); system.threading.thread.sleep(3000); messagebox.show("hello:" + i); this.invoke(new action(() => { picturebox1.visible = false; })); return 0; } //测试调用 private void call() { //func<string, string> funcone = delegate(string s){ return "fff"; }; object i = 55; var t = task<int>.factory.startnew(new func<object, int>(test2), i); } //= 下载网站源文件例子 == == == == == == == == == == == == //httpclient 引用system.net.http private async task< int> test2(object i) { this.invoke(new action(() => { picturebox1.visible = true; })); httpclient client = new httpclient(); var a = await client.getasync("http://www.baidu.com"); task<string> s = a.content.readasstringasync(); messagebox.show (s.result); //system.threading.thread.sleep(3000); //messagebox.show("hello:"+ i); this.invoke(new action(() => { picturebox1.visible = false; })); return 0; } async private void call() { //func<string, string> funcone = delegate(string s){ return "fff"; }; object i = 55; var t = task<task<int>>.factory.startnew(new func<object, task<int>>(test2), i); } //----------或者---------- private async void test2() { this.invoke(new action(() => { picturebox1.visible = true; })); httpclient client = new httpclient(); var a = await client.getasync("http://www.baidu.com"); task<string> s = a.content.readasstringasync(); messagebox.show (s.result); this.invoke(new action(() => { picturebox1.visible = false; })); } private void call() { var t = task.run(new action(test2)); //相当于 //thread th= new thread(new threadstart(test2)); //th.start(); } task启动带参数和返回值的函数任务
以上就是深入分析c# task的详细内容,更多关于c# task的资料请关注其它相关文章!