欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

并行编程和任务(一)

程序员文章站 2022-04-29 13:54:25
前言 并发、并行。同步、异步、互斥、多线程。我太难了。被这些词搞懵了。前面我们在写.Net基础系列的时候写过了关于.Net的异步编程。那么其他的都是些什么东西呀。今天我们首先就来解决这个问题。把这些词搞懂搞透。理清逻辑。然后最后我们进入并行编程的介绍。 概念初识 首先我们看并发和并行: 并发:并发指 ......

前言

  并发、并行。同步、异步、互斥、多线程。我太难了。被这些词搞懵了。前面我们在写.net基础系列的时候写过了关于.net的异步编程。那么其他的都是些什么东西呀。今天我们首先就来解决这个问题。把这些词搞懂搞透。理清逻辑。然后最后我们进入并行编程的介绍。

概念初识

首先我们看并发和并行:

并发:并发指的是在操作系统中,一个是时间段内有多个程序在运行,但是呢。这几个程序都运行在同一个处理机上,并且任意时间点都是一个程序运行在处理机上。

并行:并行指的是在操作系统中,一个时间段内有多个程序在运行,但是呢。这几个程序分别运行在不同的处理机上。也就是说这些程序是一起运行的。

简单理解也就是并发就像三个包子给一个人吃,一口吃一个包子。并行就是三个包子给三个人吃,三个人一口分别吃三个包子。

然后我们看看异步与多线程概念:

刚刚我们讲到并发的理解概念,其中并发包含两种关系-同步和互斥。同步和互斥我们都是相对于临界资源来谈的。

互斥:进程间相互排斥使用临界资源的现象就叫互斥。就好比进程a在访问list集合的时候,进程b也想访问,但是a在访问。b就阻塞等待a访问完成之后才去访问。

同步:进程间的关系不是临界资源的相互排斥,而是相互依赖。例如进程b需要读取一个集合结果,但是这个集合结果需要进程a返回,当进程a没有返回集合结果时,进程b就会因为没有获得信息而阻塞。当进程a返回信息。进程b就可以获得信息被唤起继续运行。

多线程:多线程可以说是程序设计的一个逻辑概念,多线程实现了线程的切换。使其看起来似乎是在同时运行多个线程一样。是进程中并发运行的一段代码。

异步:异步与同步相对应。同步是进程间相互依赖。异步是进程间相互独立。不需要等待上一个进程的结果。可以做自己的事情。

上面我们就介绍完了并发、并行、互斥、同步、多线程、异步。我们总结下其中关联吧:

异步与多线程并不相等。异步是需要达到的目的,多线程是一个是实现异步的一种手段。最后达到的目的是什么呢?就是并发中线程的切换。同步也可以实现线程切换,但是由于同步中io等待会浪费时间,所以同步切换进程与异步切换进行就有明显的时间差距。

parallel

今天我们介绍的是parallel类。该类位于system.threading.tasks命名空间中。依次来实现数据和任务的并行性。

其中定义了并行的for和foreach的静态方法、还包含着parallel.invoke()用于任务的并行性。我们下面就来看看吧。

parallel.for()

parallel.for()方法类似于#中的for循环语句,但是parallel.for()是可以并行运行的。不过并行运行并不保证迭代运行的顺序。我们来看看。

 public static void forex()
        {
            stopwatch stopwatch = new stopwatch();
            stopwatch.start();
            list<test> list = new list<test>();
            for (int i = 0; i < 100; i++)
            {
                test test = new test();
                test.name = "name:" + i;
                test.index = "index:" + i;
                test.time = datetime.now;
                list.add(test);
                task.delay(10).wait();
                console.writeline("c#中的for循环:" + i);
            }
            stopwatch.stop();
            console.writeline("for  0-100 执行完成 耗时:{0}", stopwatch.elapsedmilliseconds);

        }
        public static void parallelfor()
        {
            stopwatch stopwatch = new stopwatch();
            stopwatch.start();
            list<test> lists = new list<test>();
            parallel.for(1, 100, i =>
            {
                test tests = new test();
                tests.name = "name:" + i;
                tests.index = "index:" + i;
                tests.time = datetime.now;
                lists.add(tests);
                task.delay(10).wait();
                console.writeline("parallel中的parallelfor循环:" + i);
            });
            stopwatch.stop();
            console.writeline("parallelfor  0-100 执行完成 耗时:{0}", stopwatch.elapsedmilliseconds);
        }
        static void main(string[] args)
        {
            console.writeline("start");
            forex();
            console.writeline("for循环完成");
            parallelfor();
            console.writeline("end");
        }

 

并行编程和任务(一)

这里我们可以看到最后的运行结果图使用for循环的执行下来都是依次执行。按照相应的顺序。但是我们使用parallel.for()的时候运行下来。也输出了所有的结果,但是其顺序就没有保证了。

parallel.foreach()

我们再看parallel.foreach()提供了一个并行处理数据的机制。这里类似于foreach语句,但是是以一部方式遍历。这里没有确定遍历的顺序,其执行顺序也就是不保证的。

  #region foreach 语句比较 
        public static void parallelforeach()
        {
            list<test> result = new list<test>();
            for (int i = 1; i < 100; i++)
            {
                test model = new test();
                model.name = "name" + i;
                model.index = "index" + i;
                model.time = datetime.now;
                result.add(model);
            }
            parallel.foreach<test>(result, s => {
                console.writeline(s.name);
            });
        }
        #endregion     
static void main(string[] args)
        {
            parallelforeach();
        }

 

并行编程和任务(一)

 

我们看这里的运行结果,对数据集合进行了遍历处理,但是其运行的顺序不定,是乱序的结果。这也就是异步遍历的一个表现。

parallelloopstate

下面我们来看parallelloopstate。它提供了两个方法。一个是break、一个是stop。

break:表示并行循环执行了当前迭代后应尽快停止执行。筛选出符合条件的执行,可能输出完全。

stop:表示并行循环应尽快停止执行。遇到符合条件后停止并行循环,可能不完全输出。

下面我们看看代码:

  public static list<test> getlisttest()
        {
            list<test> result = new list<test>();
            for (int i = 1; i < 100; i++)
            {
                test model = new test();
                model.name = i.tostring();
                model.index = "index" + i;
                model.time = datetime.now;
                result.add(model);
            }
            return result;
        }
        public static void braekfor()
        {
            var result = getlisttest();
            parallel.for(0, result.count, (int i, parallelloopstate ls) =>
            {
                task.delay(10).wait();
                if (i > 50)
                {
                    console.writeline("parallel.for使用break停止当前迭代:" + i);
                    ls.break();
                    return;
                }
                console.writeline("测试parallel.for的break:" + i);
            });

        }

        public static void stopfor()
        {
            var result = getlisttest();
            parallel.for(0, result.count, (int i, parallelloopstate ls) =>
            {
                task.delay(10).wait();
                if (i > 50)
                {
                    console.writeline("parallel.for使用stop停止 迭代:" + i);
                    ls.stop();
                    return;
                }
                console.writeline("测试parallel.for的stop:" + i);
            });
        }
        static void main(string[] args)
        {
            braekfor();
            stopfor();
        }

 

并行编程和任务(一)并行编程和任务(一)

我们看对于parallel.for()来说这个案例。使用break()停止当前迭代会输出符合条件所有结果,但是我们使用stop的时候输出部分的时候就停止了。

        public static void braekforeach()
        {
            var result = getlisttest();
            parallel.foreach<test>(result, (test s, parallelloopstate ls) =>
            {
                task.delay(10).wait();
                if (convert.toint32(s.name) > 50)
                {
                    console.writeline("parallel.foreach使用break停止当前迭代:" + s.name);
                    ls.break();
                    return;
                }
                console.writeline("测试parallel.foreach的break:" + s.name);
            });

        }

        public static void stopforeach()
        {
            var result = getlisttest();
            parallel.foreach<test>(result, (test s, parallelloopstate ls) =>
            {
                task.delay(10).wait();
                if (convert.toint32(s.name) > 50)
                {
                    console.writeline("parallel.foreach使用stop停止 迭代:" + s.name);
                    ls.stop();
                    return;
                }
                console.writeline("测试parallel.foreach的stop:" + s.name);
            });
        }
        static void main(string[] args)
        {
            braekforeach();
            stopforeach();
        }

 

并行编程和任务(一)并行编程和任务(一)

我们再对parallel.foreach进行测试,发现对于stop和break的用法和意义是一样的。

parallel.invoke()

上面我们介绍了parallel.for和parallel.foreach以及提供的两个方法break和stop。上面介绍的这些都是对数据的并行处理执行。下面我们介绍parallel.invoke()。它是针对于任务的并行运行处理。

这里我们需要注意以下几点:

1、如果我们传入4个任务并行,那么我们至少需要四个逻辑处理内核(硬件线程)才可能使四个任务一起运行。但是当其中一个内核繁忙,那么底层的调度逻辑就可能会延迟某些方法的初始化执行。

2、parallel.invoke()所包含的并行任务不能相互依赖,因为运行执行的顺序不可保证。

3、使用parallel.invoke()我们需要测试运行结果,观察逻辑内核使用率以及实现加速。

4、使用parallel.invoke()会产生一些额外的开销,例如分配硬件线程。

我们看下面的案例:

下面我们对一个集合的数据进行添加然后输出。下面我们分为四组测试。500条数据和1000条数据各两个,分别是一般的同步任务和parallel.invoke()的并行任务执行。再观察其运行的时间比较。

 

       #region parallel.invoke()使用共同资源

        public static list<test> _tests = null;
        public static void taskfive_one()
        {
            for (int i = 0; i < 500; i++)
            {
                test test = new test();
                test.name = i.tostring();
                test.index = i.tostring();
                test.time = datetime.now;
                _tests.add(test);
            }
            console.writeline("taskfive_one 500条数据第一个方法 执行完成");
        }
        public static void taskfive_two()
        {
            for (int i = 500; i < 1000; i++)
            {
                test test = new test();
                test.name = i.tostring();
                test.index = i.tostring();
                test.time = datetime.now;
                _tests.add(test);
            }
            console.writeline("taskfive_two 500条数据第二个方法 执行完成");
        }
        public static void taskfive_three()
        {
            for (int i = 1000; i < 1500; i++)
            {
                test test = new test();
                test.name = i.tostring();
                test.index = i.tostring();
                test.time = datetime.now;
                _tests.add(test);
            }
            console.writeline("taskfive_three 500条数据第三个方法 执行完成");
        }
        public static void taskfive_four()
        {
            for (int i = 1500; i < 2000; i++)
            {
                test test = new test();
                test.name = i.tostring();
                test.index = i.tostring();
                test.time = datetime.now;
                _tests.add(test);
            }
            console.writeline("taskfive_four 500条数据第四个方法 执行完成");
        }



        public static void taskonethousand_one()
        {
            for (int i = 0; i < 1000; i++)
            {
                test test = new test();
                test.name = i.tostring();
                test.index = i.tostring();
                test.time = datetime.now;
                _tests.add(test);
            }
            console.writeline("taskonethousand_one 1000条数据第一个方法 执行完成");
        }
        public static void taskonethousand_two()
        {
            for (int i = 1000; i < 2000; i++)
            {
                test test = new test();
                test.name = i.tostring();
                test.index = i.tostring();
                test.time = datetime.now;
                _tests.add(test);
            }
            console.writeline("taskonethousand_two 1000条数据第二个方法 执行完成");
        }
        public static void taskonethousand_three()
        {
            for (int i = 2000; i < 3000; i++)
            {
                test test = new test();
                test.name = i.tostring();
                test.index = i.tostring();
                test.time = datetime.now;
                _tests.add(test);
            }
            console.writeline("taskonethousand_three 1000条数据第三个方法 执行完成");
        }
        public static void taskonethousand_four()
        {
            for (int i = 3000; i < 4000; i++)
            {
                test test = new test();
                test.name = i.tostring();
                test.index = i.tostring();
                test.time = datetime.now;
                _tests.add(test);
            }
            console.writeline("taskonethousand_three 1000条数据第四个方法 执行完成");
        }
        #endregion
        static void main(string[] args)
        {
            //五百条数据顺序完成
            stopwatch swfive = new stopwatch();
            swfive.start();
            thread.sleep(3000);
            _tests = new list<test>();
            taskfive_one();
            taskfive_two();
            taskfive_three();
            taskfive_four();
            swfive.stop();
            console.writeline("500条数据  顺序编程所耗时间:" + swfive.elapsedmilliseconds);


            //五百条数据并行完成
            stopwatch swfivetask = new stopwatch();
            swfivetask.start();
            thread.sleep(3000);
            _tests = new list<test>();
            parallel.invoke(() => taskfive_one(), () => taskfive_two(), () => taskfive_three(), () => taskfive_four());
            swfivetask.stop();
            console.writeline("500条数据  并行编程所耗时间:" + swfivetask.elapsedmilliseconds);


            //一千条数据顺序完成
            stopwatch swonethousand = new stopwatch();
            swonethousand.start();
            thread.sleep(3000);
            _tests = new list<test>();
            taskonethousand_one();
            taskonethousand_two();
            taskonethousand_three();
            taskonethousand_four();
            swonethousand.stop();
            console.writeline("1000条数据  顺序编程所耗时间:" + swonethousand.elapsedmilliseconds);


            //一千条数据并行完成
            stopwatch swonethousandtask = new stopwatch();
            swonethousandtask.start();
            thread.sleep(3000);
            _tests = new list<test>();
            parallel.invoke(() => taskonethousand_one(), () => taskonethousand_two(), () => taskonethousand_three(), () => taskonethousand_four());
            swonethousandtask.stop();
            console.writeline("1000条数据  并行编程所耗时间:" + swonethousandtask.elapsedmilliseconds);
        }

 

并行编程和任务(一)

 

我们看这次的运行结果,发现我们使用顺序编程和并行编程所需要的时间相差无几的。那么怎么回事呢?我们仔细检查下,发现我们似乎对资源进行了共享。我们下面处理下,对list集合不进行共享看看。

 

        #region parallel.invoke()不使用共同资源

        public static void taskfive_one()
        {
            list<test> tests = new list<test>();
            for (int i = 0; i < 500; i++)
            {
                test test = new test();
                test.name = "name" + i.tostring();
                test.index = "index" + i.tostring();
                test.time = datetime.now;
                tests.add(test);
            }
            console.writeline("taskfive_one 500条数据第一个方法 执行完成");
        }
        public static void taskfive_two()
        {
            list<test> tests = new list<test>();
            for (int i = 500; i < 1000; i++)
            {
                test test = new test();
                test.name = "name" + i.tostring();
                test.index = "index" + i.tostring();
                test.time = datetime.now;
                tests.add(test);
            }
            console.writeline("taskfive_two 500条数据第二个方法 执行完成");
        }
        public static void taskfive_three()
        {
            list<test> tests = new list<test>();
            for (int i = 1000; i < 1500; i++)
            {
                test test = new test();
                test.name = "name" + i.tostring();
                test.index = "index" + i.tostring();
                test.time = datetime.now;
                tests.add(test);
            }
            console.writeline("taskfive_three 500条数据第三个方法 执行完成");
        }
        public static void taskfive_four()
        {
            list<test> tests = new list<test>();
            for (int i = 1500; i < 2000; i++)
            {
                test test = new test();
                test.name = "name" + i.tostring();
                test.index = "index" + i.tostring();
                test.time = datetime.now;
                tests.add(test);
            }
            console.writeline("taskfive_four 500条数据第四个方法 执行完成");
        }



        public static void taskonethousand_one()
        {
            list<test> tests = new list<test>();
            for (int i = 0; i < 1000; i++)
            {
                test test = new test();
                test.name = "name" + i.tostring();
                test.index = "index" + i.tostring();
                test.time = datetime.now;
                tests.add(test);
            }
            console.writeline("taskonethousand_one 1000条数据第一个方法 执行完成");
        }
        public static void taskonethousand_two()
        {
            list<test> tests = new list<test>();
            for (int i = 1000; i < 2000; i++)
            {
                test test = new test();
                test.name = "name" + i.tostring();
                test.index = "index" + i.tostring();
                test.time = datetime.now;
                tests.add(test);
            }
            console.writeline("taskonethousand_two 1000条数据第二个方法 执行完成");
        }
        public static void taskonethousand_three()
        {
            list<test> tests = new list<test>();
            for (int i = 2000; i < 3000; i++)
            {
                test test = new test();
                test.name = "name" + i.tostring();
                test.index = "index" + i.tostring();
                test.time = datetime.now;
                tests.add(test);
            }
            console.writeline("taskonethousand_three 1000条数据第三个方法 执行完成");
        }
        public static void taskonethousand_four()
        {
            list<test> tests = new list<test>();
            for (int i = 3000; i < 4000; i++)
            {
                test test = new test();
                test.name = "name" + i.tostring();
                test.index = "index" + i.tostring();
                test.time = datetime.now;
                tests.add(test);
            }
            console.writeline("taskonethousand_four 1000条数据第四个方法 执行完成");
        }
        #endregion

 

        static void main(string[] args)
        {
            stopwatch swtest = new stopwatch();
            swtest.start();
            thread.sleep(3000);
            taskfive_one();
            taskfive_two();
            taskfive_three();
            taskfive_four();
            swtest.stop();
            console.writeline("500条数据  顺序编程所耗时间:" + swtest.elapsedmilliseconds);


            //五百条数据并行完成 
            swtest.restart();
            thread.sleep(3000);
            parallel.invoke(() => taskfive_one(), () => taskfive_two(), () => taskfive_three(), () => taskfive_four());
            swtest.stop();
            console.writeline("500条数据  并行编程所耗时间:" + swtest.elapsedmilliseconds);


            //一千条数据顺序完成 
            swtest.restart();
            thread.sleep(3000);
            taskonethousand_one();
            taskonethousand_two();
            taskonethousand_three();
            taskonethousand_four();
            swtest.stop();
            console.writeline("1000条数据  顺序编程所耗时间:" + swtest.elapsedmilliseconds);


            //一千条数据并行完成 
            swtest.restart();
            thread.sleep(3000);
            parallel.invoke(() => taskonethousand_one(), () => taskonethousand_two(), () => taskonethousand_three(), () => taskonethousand_four());
            swtest.stop();
            console.writeline("1000条数据  并行编程所耗时间:" + swtest.elapsedmilliseconds);
        }

 

并行编程和任务(一)

 

  我们看下我们修改共享资源后,对于500条数据的运行结果,顺序编程比并行编程还是要快点,但是在1000条数据的时候并行编程就明显比顺序编程要快了。而且在测试中并行编程的运行顺序也是不固定的。我们在日常编程中我们需要衡量我们的应用是否需要并行编程,不然可能造成更多的性能损耗。

项目源码地址


世界上那些最容易的事情中,拖延时间最不费力。坚韧是成功的一大要素,只要在门上敲得够久够大声,终会把人唤醒的。

    欢迎大家扫描下方二维码,和我一起学习更多的知识