C# Task用法
1、task的优势
threadpool相比thread来说具备了很多优势,但是threadpool却又存在一些使用上的不方便。比如:
◆ threadpool不支持线程的取消、完成、失败通知等交互性操作;
◆ threadpool不支持线程执行的先后次序;
以往,如果开发者要实现上述功能,需要完成很多额外的工作,现在,fcl中提供了一个功能更强大的概念:task。task在线程池的基础上进行了优化,并提供了更多的api。在fcl4.0中,如果我们要编写多线程程序,task显然已经优于传统的方式。
以下是一个简单的任务示例:
using system; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { static void main(string[] args) { task t = new task(() => { console.writeline("任务开始工作……"); //模拟工作过程 thread.sleep(5000); }); t.start(); t.continuewith((task) => { console.writeline("任务完成,完成时候的状态为:"); console.writeline("iscanceled={0}\tiscompleted={1}\tisfaulted={2}", task.iscanceled, task.iscompleted, task.isfaulted); }); console.readkey(); } } }
2、task的用法
2.1、创建任务
无返回值的方式
方式1:
var t1 = new task(() => taskmethod("task 1"));
t1.start();
task.waitall(t1);//等待所有任务结束
注:
任务的状态:
start之前为:created
start之后为:waitingtorun
方式2:
task.run(() => taskmethod("task 2"));
方式3:
task.factory.startnew(() => taskmethod("task 3")); 直接异步的方法
或者
var t3=task.factory.startnew(() => taskmethod("task 3"));
task.waitall(t3);//等待所有任务结束
注:
任务的状态:
start之前为:running
start之后为:running
using system; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { static void main(string[] args) { var t1 = new task(() => taskmethod("task 1")); var t2 = new task(() => taskmethod("task 2")); t2.start(); t1.start(); task.waitall(t1, t2); task.run(() => taskmethod("task 3")); task.factory.startnew(() => taskmethod("task 4")); //标记为长时间运行任务,则任务不会使用线程池,而在单独的线程中运行。 task.factory.startnew(() => taskmethod("task 5"), taskcreationoptions.longrunning); #region 常规的使用方式 console.writeline("主线程执行业务处理."); //创建任务 task task = new task(() => { console.writeline("使用system.threading.tasks.task执行异步操作."); for (int i = 0; i < 10; i++) { console.writeline(i); } }); //启动任务,并安排到当前任务队列线程中执行任务(system.threading.tasks.taskscheduler) task.start(); console.writeline("主线程执行其他处理"); task.wait(); #endregion thread.sleep(timespan.fromseconds(1)); console.readline(); } static void taskmethod(string name) { console.writeline("task {0} is running on a thread id {1}. is thread pool thread: {2}", name, thread.currentthread.managedthreadid, thread.currentthread.isthreadpoolthread); } } }
async/await的实现方式:
using system; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { async static void asyncfunction() { await task.delay(1); console.writeline("使用system.threading.tasks.task执行异步操作."); for (int i = 0; i < 10; i++) { console.writeline(string.format("asyncfunction:i={0}", i)); } } public static void main() { console.writeline("主线程执行业务处理."); asyncfunction(); console.writeline("主线程执行其他处理"); for (int i = 0; i < 10; i++) { console.writeline(string.format("main:i={0}", i)); } console.readline(); } } }
带返回值的方式
方式4:
task<int> task = createtask("task 1");
task.start();
int result = task.result;
using system; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { static task<int> createtask(string name) { return new task<int>(() => taskmethod(name)); } static void main(string[] args) { taskmethod("main thread task"); task<int> task = createtask("task 1"); task.start(); int result = task.result; console.writeline("task 1 result is: {0}", result); task = createtask("task 2"); //该任务会运行在主线程中 task.runsynchronously(); result = task.result; console.writeline("task 2 result is: {0}", result); task = createtask("task 3"); console.writeline(task.status); task.start(); while (!task.iscompleted) { console.writeline(task.status); thread.sleep(timespan.fromseconds(0.5)); } console.writeline(task.status); result = task.result; console.writeline("task 3 result is: {0}", result); #region 常规使用方式 //创建任务 task<int> getsumtask = new task<int>(() => getsum()); //启动任务,并安排到当前任务队列线程中执行任务(system.threading.tasks.taskscheduler) getsumtask.start(); console.writeline("主线程执行其他处理"); //等待任务的完成执行过程。 getsumtask.wait(); //获得任务的执行结果 console.writeline("任务执行结果:{0}", getsumtask.result.tostring()); #endregion } static int taskmethod(string name) { console.writeline("task {0} is running on a thread id {1}. is thread pool thread: {2}", name, thread.currentthread.managedthreadid, thread.currentthread.isthreadpoolthread); thread.sleep(timespan.fromseconds(2)); return 42; } static int getsum() { int sum = 0; console.writeline("使用task执行异步操作."); for (int i = 0; i < 100; i++) { sum += i; } return sum; } } }
async/await的实现:
using system; using system.threading.tasks; namespace consoleapp1 { class program { public static void main() { var ret1 = asyncgetsum(); console.writeline("主线程执行其他处理"); for (int i = 1; i <= 3; i++) console.writeline("call main()"); int result = ret1.result; //阻塞主线程 console.writeline("任务执行结果:{0}", result); } async static task<int> asyncgetsum() { await task.delay(1); int sum = 0; console.writeline("使用task执行异步操作."); for (int i = 0; i < 100; i++) { sum += i; } return sum; } } }
2.2、组合任务.continuewith
简单demo:
using system; using system.threading.tasks; namespace consoleapp1 { class program { public static void main() { //创建一个任务 task<int> task = new task<int>(() => { int sum = 0; console.writeline("使用task执行异步操作."); for (int i = 0; i < 100; i++) { sum += i; } return sum; }); //启动任务,并安排到当前任务队列线程中执行任务(system.threading.tasks.taskscheduler) task.start(); console.writeline("主线程执行其他处理"); //任务完成时执行处理。 task cwt = task.continuewith(t => { console.writeline("任务完成后的执行结果:{0}", t.result.tostring()); }); task.wait(); cwt.wait(); } } }
任务的串行:
using system; using system.collections.concurrent; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { static void main(string[] args) { concurrentstack<int> stack = new concurrentstack<int>(); //t1先串行 var t1 = task.factory.startnew(() => { stack.push(1); stack.push(2); }); //t2,t3并行执行 var t2 = t1.continuewith(t => { int result; stack.trypop(out result); console.writeline("task t2 result={0},thread id {1}", result, thread.currentthread.managedthreadid); }); //t2,t3并行执行 var t3 = t1.continuewith(t => { int result; stack.trypop(out result); console.writeline("task t3 result={0},thread id {1}", result, thread.currentthread.managedthreadid); }); //等待t2和t3执行完 task.waitall(t2, t3); //t7串行执行 var t4 = task.factory.startnew(() => { console.writeline("当前集合元素个数:{0},thread id {1}", stack.count, thread.currentthread.managedthreadid); }); t4.wait(); } } }
子任务:
using system; using system.threading.tasks; namespace consoleapp1 { class program { public static void main() { task<string[]> parent = new task<string[]>(state => { console.writeline(state); string[] result = new string[2]; //创建并启动子任务 new task(() => { result[0] = "我是子任务1。"; }, taskcreationoptions.attachedtoparent).start(); new task(() => { result[1] = "我是子任务2。"; }, taskcreationoptions.attachedtoparent).start(); return result; }, "我是父任务,并在我的处理过程中创建多个子任务,所有子任务完成以后我才会结束执行。"); //任务处理完成后执行的操作 parent.continuewith(t => { array.foreach(t.result, r => console.writeline(r)); }); //启动父任务 parent.start(); //等待任务结束 wait只能等待父线程结束,没办法等到父线程的continuewith结束 //parent.wait(); console.readline(); } } }
动态并行(taskcreationoptions.attachedtoparent) 父任务等待所有子任务完成后 整个任务才算完成
using system; using system.threading; using system.threading.tasks; namespace consoleapp1 { class node { public node left { get; set; } public node right { get; set; } public string text { get; set; } } class program { static node getnode() { node root = new node { left = new node { left = new node { text = "l-l" }, right = new node { text = "l-r" }, text = "l" }, right = new node { left = new node { text = "r-l" }, right = new node { text = "r-r" }, text = "r" }, text = "root" }; return root; } static void main(string[] args) { node root = getnode(); displaytree(root); } static void displaytree(node root) { var task = task.factory.startnew(() => displaynode(root), cancellationtoken.none, taskcreationoptions.none, taskscheduler.default); task.wait(); } static void displaynode(node current) { if (current.left != null) task.factory.startnew(() => displaynode(current.left), cancellationtoken.none, taskcreationoptions.attachedtoparent, taskscheduler.default); if (current.right != null) task.factory.startnew(() => displaynode(current.right), cancellationtoken.none, taskcreationoptions.attachedtoparent, taskscheduler.default); console.writeline("当前节点的值为{0};处理的threadid={1}", current.text, thread.currentthread.managedthreadid); } } }
2.3、取消任务 cancellationtokensource
using system; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { private static int taskmethod(string name, int seconds, cancellationtoken token) { console.writeline("task {0} is running on a thread id {1}. is thread pool thread: {2}", name, thread.currentthread.managedthreadid, thread.currentthread.isthreadpoolthread); for (int i = 0; i < seconds; i++) { thread.sleep(timespan.fromseconds(1)); if (token.iscancellationrequested) return -1; } return 42 * seconds; } private static void main(string[] args) { var cts = new cancellationtokensource(); var longtask = new task<int>(() => taskmethod("task 1", 10, cts.token), cts.token); console.writeline(longtask.status); cts.cancel(); console.writeline(longtask.status); console.writeline("first task has been cancelled before execution"); cts = new cancellationtokensource(); longtask = new task<int>(() => taskmethod("task 2", 10, cts.token), cts.token); longtask.start(); for (int i = 0; i < 5; i++) { thread.sleep(timespan.fromseconds(0.5)); console.writeline(longtask.status); } cts.cancel(); for (int i = 0; i < 5; i++) { thread.sleep(timespan.fromseconds(0.5)); console.writeline(longtask.status); } console.writeline("a task has been completed with result {0}.", longtask.result); } } }
2.4、处理任务中的异常
单个任务:
using system; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { static int taskmethod(string name, int seconds) { console.writeline("task {0} is running on a thread id {1}. is thread pool thread: {2}", name, thread.currentthread.managedthreadid, thread.currentthread.isthreadpoolthread); thread.sleep(timespan.fromseconds(seconds)); throw new exception("boom!"); return 42 * seconds; } static void main(string[] args) { try { task<int> task = task.run(() => taskmethod("task 2", 2)); int result = task.getawaiter().getresult(); console.writeline("result: {0}", result); } catch (exception ex) { console.writeline("task 2 exception caught: {0}", ex.message); } console.writeline("----------------------------------------------"); console.writeline(); } } }
多个任务:
using system; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { static int taskmethod(string name, int seconds) { console.writeline("task {0} is running on a thread id {1}. is thread pool thread: {2}", name, thread.currentthread.managedthreadid, thread.currentthread.isthreadpoolthread); thread.sleep(timespan.fromseconds(seconds)); throw new exception(string.format("task {0} boom!", name)); return 42 * seconds; } public static void main(string[] args) { try { var t1 = new task<int>(() => taskmethod("task 3", 3)); var t2 = new task<int>(() => taskmethod("task 4", 2)); var complextask = task.whenall(t1, t2); var exceptionhandler = complextask.continuewith(t => console.writeline("result: {0}", t.result), taskcontinuationoptions.onlyonfaulted ); t1.start(); t2.start(); task.waitall(t1, t2); } catch (aggregateexception ex) { ex.handle(exception => { console.writeline(exception.message); return true; }); } } } }
async/await的方式:
using system; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { static async task thrownotimplementedexceptionasync() { throw new notimplementedexception(); } static async task throwinvalidoperationexceptionasync() { throw new invalidoperationexception(); } static async task normal() { await fun(); } static task fun() { return task.run(() => { for (int i = 1; i <= 10; i++) { console.writeline("i={0}", i); thread.sleep(200); } }); } static async task observeoneexceptionasync() { var task1 = thrownotimplementedexceptionasync(); var task2 = throwinvalidoperationexceptionasync(); var task3 = normal(); try { //异步的方式 task alltasks = task.whenall(task1, task2, task3); await alltasks; //同步的方式 //task.waitall(task1, task2, task3); } catch (notimplementedexception ex) { console.writeline("task1 任务报错!"); } catch (invalidoperationexception ex) { console.writeline("task2 任务报错!"); } catch (exception ex) { console.writeline("任务报错!"); } } public static void main() { task task = observeoneexceptionasync(); console.writeline("主线程继续运行........"); task.wait(); } } }
2.5、task.fromresult的应用
using system; using system.collections.generic; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { static idictionary<string, string> cache = new dictionary<string, string>() { {"0001","a"}, {"0002","b"}, {"0003","c"}, {"0004","d"}, {"0005","e"}, {"0006","f"}, }; public static void main() { task<string> task = getvaluefromcache("0006"); console.writeline("主程序继续执行。。。。"); string result = task.result; console.writeline("result={0}", result); } private static task<string> getvaluefromcache(string key) { console.writeline("getvaluefromcache开始执行。。。。"); string result = string.empty; //task.delay(5000); thread.sleep(5000); console.writeline("getvaluefromcache继续执行。。。。"); if (cache.trygetvalue(key, out result)) { return task.fromresult(result); } return task.fromresult(""); } } }
2.6、使用iprogress实现异步编程的进程通知
iprogress<in t>只提供了一个方法void report(t value),通过report方法把一个t类型的值报告给iprogress,然后iprogress<in t>的实现类progress<in t>的构造函数接收类型为action<t>的形参,通过这个委托让进度显示在ui界面中。
using system; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { static void doprocessing(iprogress<int> progress) { for (int i = 0; i <= 100; ++i) { thread.sleep(100); if (progress != null) { progress.report(i); } } } static async task display() { //当前线程 var progress = new progress<int>(percent => { console.clear(); console.write("{0}%", percent); }); //线程池线程 await task.run(() => doprocessing(progress)); console.writeline(""); console.writeline("结束"); } public static void main() { task task = display(); task.wait(); } } }
2.7、factory.fromasync的应用 (简apm模式(委托)转换为任务)(beginxxx和endxxx)
带回调方式的
using system; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { private delegate string asynchronoustask(string threadname); private static string test(string threadname) { console.writeline("starting..."); console.writeline("is thread pool thread: {0}", thread.currentthread.isthreadpoolthread); thread.sleep(timespan.fromseconds(2)); thread.currentthread.name = threadname; return string.format("thread name: {0}", thread.currentthread.name); } private static void callback(iasyncresult ar) { console.writeline("starting a callback..."); console.writeline("state passed to a callbak: {0}", ar.asyncstate); console.writeline("is thread pool thread: {0}", thread.currentthread.isthreadpoolthread); console.writeline("thread pool worker thread id: {0}", thread.currentthread.managedthreadid); } //执行的流程是 先执行test--->callback--->task.continuewith static void main(string[] args) { asynchronoustask d = test; console.writeline("option 1"); task<string> task = task<string>.factory.fromasync( d.begininvoke("asynctaskthread", callback, "a delegate asynchronous call"), d.endinvoke); task.continuewith(t => console.writeline("callback is finished, now running a continuation! result: {0}", t.result)); while (!task.iscompleted) { console.writeline(task.status); thread.sleep(timespan.fromseconds(0.5)); } console.writeline(task.status); } } }
不带回调方式的
using system; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { private delegate string asynchronoustask(string threadname); private static string test(string threadname) { console.writeline("starting..."); console.writeline("is thread pool thread: {0}", thread.currentthread.isthreadpoolthread); thread.sleep(timespan.fromseconds(2)); thread.currentthread.name = threadname; return string.format("thread name: {0}", thread.currentthread.name); } //执行的流程是 先执行test--->task.continuewith static void main(string[] args) { asynchronoustask d = test; task<string> task = task<string>.factory.fromasync( d.begininvoke, d.endinvoke, "asynctaskthread", "a delegate asynchronous call"); task.continuewith(t => console.writeline("task is completed, now running a continuation! result: {0}", t.result)); while (!task.iscompleted) { console.writeline(task.status); thread.sleep(timespan.fromseconds(0.5)); } console.writeline(task.status); } } }
上一篇: QQ上线小程序“轻应用”
下一篇: 互联网软件商业模式的变迁逻辑