欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

C#线程同步--限量使用

程序员文章站 2022-04-04 15:13:37
问题抽象:当某一资源同一时刻允许一定数量的线程使用的时候,需要有个机制来阻塞多余的线程,直到资源再次变得可用。线程同步方案:Semaphore、SemaphoreSlim、CountdownEvent方案特性:限量供应;除所有者外,其他人无条件等待;先到先得,没有先后顺序 1、Semaphore类 ......

问题抽象:当某一资源同一时刻允许一定数量的线程使用的时候,需要有个机制来阻塞多余的线程,直到资源再次变得可用。
线程同步方案:semaphore、semaphoreslim、countdownevent
方案特性:限量供应;除所有者外,其他人无条件等待;先到先得,没有先后顺序

1、semaphore类
      用于控制线程的访问数量,默认的构造函数为initialcount和maximumcount,表示默认设置的信号量个数和最大信号量个数。当你waitone的时候,信号量自减,当release的时候,信号量自增,然而当信号量为0的时候,后续的线程就不能拿到waitone了,所以必须等待先前的线程通过release来释放。

C#线程同步--限量使用
using system;
using system.threading;

namespace consoleapp1
{
    class program
    {
        static void main(string[] args)
        {
            thread t1 = new thread(run1);
            t1.start();
            thread t2 = new thread(run2);
            t2.start();
            thread t3 = new thread(run3);
            t3.start();
            console.readkey();
        }

        //初始可以授予2个线程信号,因为第3个要等待前面的release才能得到信号
        static semaphore sem = new semaphore(2, 10);

        static void run1()
        {
            sem.waitone();
            console.writeline("大家好,我是run1;" + datetime.now.tostring("mm:ss"));

            //两秒后
            thread.sleep(2000);
            sem.release();
        }

        static void run2()
        {
            sem.waitone();
            console.writeline("大家好,我是run2;" + datetime.now.tostring("mm:ss"));

            //两秒后
            thread.sleep(2000);
            sem.release();
        }

        static void run3()
        {
            sem.waitone();
            console.writeline("大家好,我是run3;" + datetime.now.tostring("mm:ss"));

            //两秒后
            thread.sleep(2000);
            sem.release();
        }
    }
}
program

在以上的方法中release()方法相当于自增一个信号量,release(5)自增5个信号量。但是,release()到构造函数的第二个参数maximumcount的值就不能再自增了。

semaphore可用于进程级交互。

C#线程同步--限量使用
using system;
using system.diagnostics;
using system.threading;

namespace consoleapp1
{
    class program
    {
        static void main(string[] args)
        {

            thread t1 = new thread(run1);
            t1.start();

            thread t2 = new thread(run2);
            t2.start();

            console.read();
        }

        //初始可以授予2个线程信号,因为第3个要等待前面的release才能得到信号
        static semaphore sem = new semaphore(3, 10, "命名semaphore");

        static void run1()
        {
            sem.waitone();

            console.writeline("进程:" + process.getcurrentprocess().id + "  我是run1" + datetime.now.timeofday);
        }

        static void run2()
        {
            sem.waitone();

            console.writeline("进程:" + process.getcurrentprocess().id + "  我是run2" + datetime.now.timeofday);
        }
    }
}
program

C#线程同步--限量使用

直接运行两次bin目录的exe文件,就能发现最多只能输出3个。

semaphore可以限制可同时访问某一资源或资源池的线程数。
        semaphore类在内部维护一个计数器,当一个线程调用semaphore对象的wait系列方法时,此计数器减一,只要计数器还是一个正数,线程就不会阻塞。当计数器减到0时,再调用semaphore对象wait系列方法的线程将被阻塞,直到有线程调用semaphore对象的release()方法增加计数器值时,才有可能解除阻塞状态。
 
示例说明:
图书馆都配备有若干台公用计算机供读者查询信息,当某日读者比较多时,必须排队等候。uselibrarycomputer实例用多线程模拟了多人使用多台计算机的过程
C#线程同步--限量使用
using system;
using system.threading;

namespace consoleapp1
{
    class program
    {
        //图书馆拥有的公用计算机  
        private const int computernum = 3;
        private static computer[] librarycomputers;
        //同步信号量  
        public static semaphore sp = new semaphore(computernum, computernum);

        static void main(string[] args)
        {
            //图书馆拥有computernum台电脑  
            librarycomputers = new computer[computernum];
            for (int i = 0; i < computernum; i++)
                librarycomputers[i] = new computer("computer" + (i + 1).tostring());
            int peoplenum = 0;
            random ran = new random();
            thread user;
            system.console.writeline("敲任意键模拟一批批的人排队使用{0}台计算机,esc键结束模拟……", computernum);
            //每次创建若干个线程,模拟人排队使用计算机  
            while (system.console.readkey().key != consolekey.escape)
            {
                peoplenum = ran.next(0, 10);
                system.console.writeline("\n有{0}人在等待使用计算机。", peoplenum);

                for (int i = 1; i <= peoplenum; i++)
                {
                    user = new thread(usecomputer);
                    user.start("user" + i.tostring());
                }
            }
        }

        //线程函数  
        static void usecomputer(object username)
        {
            sp.waitone();//等待计算机可用  

            //查找可用的计算机  
            computer cp = null;
            for (int i = 0; i < computernum; i++)
                if (librarycomputers[i].isoccupied == false)
                {
                    cp = librarycomputers[i];
                    break;
                }
            //使用计算机工作  
            cp.use(username.tostring());

            //不再使用计算机,让出来给其他人使用  
            sp.release();
        }
    }

    class computer
    {
        public readonly string computername = "";
        public computer(string name)
        {
            computername = name;
        }
        //是否被占用  
        public bool isoccupied = false;
        //人在使用计算机  
        public void use(string username)
        {
            system.console.writeline("{0}开始使用计算机{1}", username, computername);
            isoccupied = true;
            thread.sleep(new random().next(1, 2000)); //随机休眠,以模拟人使用计算机  
            system.console.writeline("{0}结束使用计算机{1}", username, computername);
            isoccupied = false;
        }
    }
}
program

 

2、semaphoreslim类
     在.net 4.0之前,framework中有一个重量级的semaphore,可以跨进程同步,semaphoreslim轻量级不行,msdn对它的解释为:限制可同时访问某一资源或资源池的线程数。
C#线程同步--限量使用
using system;
using system.threading;
using system.threading.tasks;

namespace consoleapp1
{
    class program
    {
        static semaphoreslim slim = new semaphoreslim(environment.processorcount, 12);

        static void main(string[] args)
        {
            for (int i = 0; i < 12; i++)
            {
                task.factory.startnew((obj) =>
                {
                    run(obj);
                }, i);
            }
            console.read();
        }

        static void run(object obj)
        {
            slim.wait();
            console.writeline("当前时间:{0}任务 {1}已经进入。", datetime.now, obj);
            //这里busy3s中
            thread.sleep(3000);
            slim.release();
        }
    }
}
program

同样,防止死锁的情况,我们需要知道”超时和取消标记“的解决方案,像semaphoreslim这种定死的”线程请求范围“,其实是降低了扩展性,使用需谨慎,在觉得有必要的时候使用它

注:semaphore类是semaphoreslim类的老版本,该版本使用纯粹的内核时间(kernel-time)方式。
    semaphoreslim类不使用windows内核信号量,而且也不支持进程间同步。所以在跨程序同步的场景下可以使用semaphore
 
3、countdownevent类
     这种采用信号状态的同步基元非常适合在动态的fork,join的场景,它采用“信号计数”的方式,就比如这样,一个麻将桌只能容纳4个人打麻将,如果后来的人也想搓一把碰碰运气,那么他必须等待直到麻将桌上的人走掉一位。好,这就是简单的信号计数机制,从技术角度上来说它是定义了最多能够进入关键代码的线程数。
     但是countdownevent更牛x之处在于我们可以动态的改变“信号计数”的大小,比如一会儿能够容纳8个线程,一下又4个,一下又10个,这样做有什么好处呢?比如一个任务需要加载1w条数据,那么可能出现这种情况。
例如:
加载user表:         根据user表的数据量,我们需要开5个task。
加载product表:    产品表数据相对比较多,计算之后需要开8个task。
加载order表:       由于我的网站订单丰富,计算之后需要开12个task。
C#线程同步--限量使用
using system;
using system.threading;
using system.threading.tasks;

namespace consoleapp1
{
    class program
    {
        //默认的容纳大小为“硬件线程“数
        static countdownevent cde = new countdownevent(environment.processorcount);

        static void loaduser(object obj)
        {
            try
            {
                console.writeline("threadid={0};当前任务:{1}正在加载user部分数据!", thread.currentthread.managedthreadid, obj);
            }
            finally
            {
                cde.signal();
            }
        }

        static void loadproduct(object obj)
        {
            try
            {
                console.writeline("threadid={0};当前任务:{1}正在加载product部分数据!", thread.currentthread.managedthreadid, obj);
            }
            finally
            {
                cde.signal();
            }
        }

        static void loadorder(object obj)
        {
            try
            {
                console.writeline("threadid={0};当前任务:{1}正在加载order部分数据!", thread.currentthread.managedthreadid, obj);
            }
            finally
            {
                cde.signal();
            }
        }

        static void main(string[] args)
        {
            //加载user表需要5个任务
            var usertaskcount = 5;
            //重置信号
            cde.reset(usertaskcount);
            for (int i = 0; i < usertaskcount; i++)
            {
                task.factory.startnew((obj) =>
                {
                    loaduser(obj);
                }, i);
            }
            //等待所有任务执行完毕
            cde.wait();
            console.writeline("\nuser表数据全部加载完毕!\n");

            //加载product需要8个任务
            var producttaskcount = 8;
            //重置信号
            cde.reset(producttaskcount);
            for (int i = 0; i < producttaskcount; i++)
            {
                task.factory.startnew((obj) =>
                {
                    loadproduct(obj);
                }, i);
            }
            cde.wait();
            console.writeline("\nproduct表数据全部加载完毕!\n");

            //加载order需要12个任务
            var ordertaskcount = 12;
            //重置信号
            cde.reset(ordertaskcount);
            for (int i = 0; i < ordertaskcount; i++)
            {
                task.factory.startnew((obj) =>
                {
                    loadorder(obj);
                }, i);
            }
            cde.wait();
            console.writeline("\norder表数据全部加载完毕!\n");

            console.writeline("\n(*^__^*) 嘻嘻,恭喜你,数据全部加载完毕\n");
            console.read();
        }
    }
}
program

我们看到有两个主要方法:wait和signal。每调用一次signal相当于麻将桌上走了一个人,直到所有人都搓过麻将wait才给放行,这里同样要注意也就是“超时“问题的存在性,尤其是在并行计算中,轻量级别给我们提供了”取消标记“的机制,这是在重量级别中不存在的

注:如果调用signal()没有到达指定的次数,那么wait()将一直等待,请确保使用每个线程完成后都要调用signal方法。