欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

C#线程安全类型

程序员文章站 2022-10-06 22:54:52
1、IProducerConsumerCollection (线程安全接口) 此接口的所有实现必须都启用此接口的所有成员,若要从多个线程同时使用。 using System; using System.Collections; using System.Collections.Concurrent; ......

1、iproducerconsumercollection (线程安全接口)
  此接口的所有实现必须都启用此接口的所有成员,若要从多个线程同时使用。

C#线程安全类型
using system;
using system.collections;
using system.collections.concurrent;
using system.collections.generic;

namespace consoleapp1
{
    public class safestack<t> : iproducerconsumercollection<t>
    {
        // used for enforcing thread-safety
        private object m_lockobject = new object();

        // we'll use a regular old stack for our core operations
        private stack<t> m_sequentialstack = null;

        //
        // constructors
        //
        public safestack()
        {
            m_sequentialstack = new stack<t>();
        }

        public safestack(ienumerable<t> collection)
        {
            m_sequentialstack = new stack<t>(collection);
        }

        //
        // safe push/pop support
        //
        public void push(t item)
        {
            lock (m_lockobject) m_sequentialstack.push(item);
        }

        public bool trypop(out t item)
        {
            bool rval = true;
            lock (m_lockobject)
            {
                if (m_sequentialstack.count == 0) { item = default(t); rval = false; }
                else item = m_sequentialstack.pop();
            }
            return rval;
        }

        //
        // iproducerconsumercollection(t) support
        //
        public bool trytake(out t item)
        {
            return trypop(out item);
        }

        public bool tryadd(t item)
        {
            push(item);
            return true; // push doesn't fail
        }

        public t[] toarray()
        {
            t[] rval = null;
            lock (m_lockobject) rval = m_sequentialstack.toarray();
            return rval;
        }

        public void copyto(t[] array, int index)
        {
            lock (m_lockobject) m_sequentialstack.copyto(array, index);
        }



        //
        // support for ienumerable(t)
        //
        public ienumerator<t> getenumerator()
        {
            // the performance here will be unfortunate for large stacks,
            // but thread-safety is effectively implemented.
            stack<t> stackcopy = null;
            lock (m_lockobject) stackcopy = new stack<t>(m_sequentialstack);
            return stackcopy.getenumerator();
        }


        //
        // support for ienumerable
        //
        ienumerator ienumerable.getenumerator()
        {
            return ((ienumerable<t>)this).getenumerator();
        }

        // 
        // support for icollection
        //
        public bool issynchronized
        {
            get { return true; }
        }

        public object syncroot
        {
            get { return m_lockobject; }
        }

        public int count
        {
            get { return m_sequentialstack.count; }
        }

        public void copyto(array array, int index)
        {
            lock (m_lockobject) ((icollection)m_sequentialstack).copyto(array, index);
        }
    }
}
safestack
C#线程安全类型
using system;
using system.collections.concurrent;

namespace consoleapp1
{
    class program
    {
        static void main()
        {
            testsafestack();

            // keep the console window open in debug mode.
            console.writeline("press any key to exit.");
            console.readkey();
        }

        // test our implementation of iproducerconsumercollection(t)
        // demonstrates:
        //      ipcc(t).tryadd()
        //      ipcc(t).trytake()
        //      ipcc(t).copyto()
        static void testsafestack()
        {
            safestack<int> stack = new safestack<int>();
            iproducerconsumercollection<int> ipcc = (iproducerconsumercollection<int>)stack;

            // test push()/tryadd()
            stack.push(10); console.writeline("pushed 10");
            ipcc.tryadd(20); console.writeline("ipcc.tryadded 20");
            stack.push(15); console.writeline("pushed 15");

            int[] testarray = new int[3];

            // try copyto() within boundaries
            try
            {
                ipcc.copyto(testarray, 0);
                console.writeline("copyto() within boundaries worked, as expected");
            }
            catch (exception e)
            {
                console.writeline("copyto() within boundaries unexpectedly threw an exception: {0}", e.message);
            }

            // try copyto() that overflows
            try
            {
                ipcc.copyto(testarray, 1);
                console.writeline("copyto() with index overflow worked, and it should not have");
            }
            catch (exception e)
            {
                console.writeline("copyto() with index overflow threw an exception, as expected: {0}", e.message);
            }

            // test enumeration
            console.write("enumeration (should be three items): ");
            foreach (int item in stack)
                console.write("{0} ", item);
            console.writeline("");

            // test trypop()
            int popped = 0;
            if (stack.trypop(out popped))
            {
                console.writeline("successfully popped {0}", popped);
            }
            else console.writeline("failed to pop!!");

            // test count
            console.writeline("stack count is {0}, should be 2", stack.count);

            // test trytake()
            if (ipcc.trytake(out popped))
            {
                console.writeline("successfully ipcc-trytaked {0}", popped);
            }
            else console.writeline("failed to ipcc.trytake!!");
        }
    }
}
program

2、concurrentstack类:安全堆栈

C#线程安全类型
using system;
using system.collections.concurrent;
using system.threading;
using system.threading.tasks;

namespace consoleapp1
{
    class program
    {
        static void main(string[] args)
        {
            task t = runprogram();
            t.wait();
        }

        static async task runprogram()
        {
            var taskstack = new concurrentstack<customtask>();
            var cts = new cancellationtokensource();

            var tasksource = task.run(() => taskproducer(taskstack));

            task[] processors = new task[4];
            for (int i = 1; i <= 4; i++)
            {
                string processorid = i.tostring();
                processors[i - 1] = task.run(
                    () => taskprocessor(taskstack, "processor " + processorid, cts.token));
            }

            await tasksource;
            cts.cancelafter(timespan.fromseconds(2));

            await task.whenall(processors);
        }

        static async task taskproducer(concurrentstack<customtask> stack)
        {
            for (int i = 1; i <= 20; i++)
            {
                await task.delay(50);
                var workitem = new customtask { id = i };
                stack.push(workitem);
                console.writeline("task {0} has been posted", workitem.id);
            }
        }

        static async task taskprocessor(
            concurrentstack<customtask> stack, string name, cancellationtoken token)
        {
            await getrandomdelay();
            do
            {
                customtask workitem;
                bool popsuccesful = stack.trypop(out workitem);
                if (popsuccesful)
                {
                    console.writeline("task {0} has been processed by {1}", workitem.id, name);
                }

                await getrandomdelay();
            }
            while (!token.iscancellationrequested);
        }

        static task getrandomdelay()
        {
            int delay = new random(datetime.now.millisecond).next(1, 500);
            return task.delay(delay);
        }

        class customtask
        {
            public int id { get; set; }
        }
    }
}
program

3、concurrentqueue类:安全队列

C#线程安全类型
using system;
using system.collections.concurrent;
using system.threading;
using system.threading.tasks;

namespace consoleapp1
{
    class program
    {
        static void main(string[] args)
        {
            task t = runprogram();
            t.wait();
        }

        static async task runprogram()
        {
            var taskqueue = new concurrentqueue<customtask>();
            var cts = new cancellationtokensource();

            var tasksource = task.run(() => taskproducer(taskqueue));

            task[] processors = new task[4];
            for (int i = 1; i <= 4; i++)
            {
                string processorid = i.tostring();
                processors[i - 1] = task.run(
                    () => taskprocessor(taskqueue, "processor " + processorid, cts.token));
            }

            await tasksource;
            cts.cancelafter(timespan.fromseconds(2));

            await task.whenall(processors);
        }

        static async task taskproducer(concurrentqueue<customtask> queue)
        {
            for (int i = 1; i <= 20; i++)
            {
                await task.delay(50);
                var workitem = new customtask { id = i };
                queue.enqueue(workitem);
                console.writeline("插入task {0} has been posted threadid={1}", workitem.id, thread.currentthread.managedthreadid);
            }
        }

        static async task taskprocessor(
            concurrentqueue<customtask> queue, string name, cancellationtoken token)
        {
            customtask workitem;
            bool dequeuesuccesful = false;

            await getrandomdelay();
            do
            {
                dequeuesuccesful = queue.trydequeue(out workitem);
                if (dequeuesuccesful)
                {
                    console.writeline("读取task {0} has been processed by {1} threadid={2}",
                                        workitem.id, name, thread.currentthread.managedthreadid);
                }

                await getrandomdelay();
            }
            while (!token.iscancellationrequested);
        }

        static task getrandomdelay()
        {
            int delay = new random(datetime.now.millisecond).next(1, 500);
            return task.delay(delay);
        }

        class customtask
        {
            public int id { get; set; }
        }
    }
}
program

4、concurrentdictionary类
  concurrentdictionary类写操作比使用锁的通常字典(dictionary)要慢的多,而读操作则要快些。因此对字典要大量的线程安全的读操作,concurrentdictionary类是最好的选择
  concurrentdictionary类的实现使用了细粒度锁(fine-grained locking)技术,这在多线程写入方面比使用锁的通常的字典(也被称为粗粒度锁)

C#线程安全类型
using system;
using system.collections.concurrent;
using system.collections.generic;
using system.diagnostics;

namespace consoleapp1
{
    class program
    {
        static void main(string[] args)
        {
            var concurrentdictionary = new concurrentdictionary<int, string>();
            var dictionary = new dictionary<int, string>();

            var sw = new stopwatch();

            sw.start();
            for (int i = 0; i < 1000000; i++)
            {
                lock (dictionary)
                {
                    dictionary[i] = item;
                }
            }
            sw.stop();
            console.writeline("writing to dictionary with a lock: {0}", sw.elapsed);

            sw.restart();
            for (int i = 0; i < 1000000; i++)
            {
                concurrentdictionary[i] = item;
            }
            sw.stop();
            console.writeline("writing to a concurrent dictionary: {0}", sw.elapsed);

            sw.restart();
            for (int i = 0; i < 1000000; i++)
            {
                lock (dictionary)
                {
                    currentitem = dictionary[i];
                }
            }
            sw.stop();
            console.writeline("reading from dictionary with a lock: {0}", sw.elapsed);

            sw.restart();
            for (int i = 0; i < 1000000; i++)
            {
                currentitem = concurrentdictionary[i];
            }
            sw.stop();
            console.writeline("reading from a concurrent dictionary: {0}", sw.elapsed);
        }

        const string item = "dictionary item";
        public static string currentitem;
    }
}
program

5、concurrentbag类

C#线程安全类型
namespace consoleapp1
{
    class crawlingtask
    {
        public string urltocrawl { get; set; }

        public string producername { get; set; }
    }
}
crawlingtask
C#线程安全类型
using system.collections.generic;

namespace consoleapp1
{
    static class module
    {
        public static dictionary<string, string[]> _contentemulation = new dictionary<string, string[]>();

        public static void createlinks()
        {
            _contentemulation["http://microsoft.com/"] = new[] { "http://microsoft.com/a.html", "http://microsoft.com/b.html" };
            _contentemulation["http://microsoft.com/a.html"] = new[] { "http://microsoft.com/c.html", "http://microsoft.com/d.html" };
            _contentemulation["http://microsoft.com/b.html"] = new[] { "http://microsoft.com/e.html" };

            _contentemulation["http://google.com/"] = new[] { "http://google.com/a.html", "http://google.com/b.html" };
            _contentemulation["http://google.com/a.html"] = new[] { "http://google.com/c.html", "http://google.com/d.html" };
            _contentemulation["http://google.com/b.html"] = new[] { "http://google.com/e.html", "http://google.com/f.html" };
            _contentemulation["http://google.com/c.html"] = new[] { "http://google.com/h.html", "http://google.com/i.html" };

            _contentemulation["http://facebook.com/"] = new[] { "http://facebook.com/a.html", "http://facebook.com/b.html" };
            _contentemulation["http://facebook.com/a.html"] = new[] { "http://facebook.com/c.html", "http://facebook.com/d.html" };
            _contentemulation["http://facebook.com/b.html"] = new[] { "http://facebook.com/e.html" };

            _contentemulation["http://twitter.com/"] = new[] { "http://twitter.com/a.html", "http://twitter.com/b.html" };
            _contentemulation["http://twitter.com/a.html"] = new[] { "http://twitter.com/c.html", "http://twitter.com/d.html" };
            _contentemulation["http://twitter.com/b.html"] = new[] { "http://twitter.com/e.html" };
            _contentemulation["http://twitter.com/c.html"] = new[] { "http://twitter.com/f.html", "http://twitter.com/g.html" };
            _contentemulation["http://twitter.com/d.html"] = new[] { "http://twitter.com/h.html" };
            _contentemulation["http://twitter.com/e.html"] = new[] { "http://twitter.com/i.html" };
        }
    }
}
module
C#线程安全类型
using system;
using system.collections.concurrent;
using system.collections.generic;
using system.threading.tasks;

namespace consoleapp1
{
    class program
    {
        static void main(string[] args)
        {
            module.createlinks();
            task t = runprogram();
            t.wait();
        }

        static async task runprogram()
        {
            var bag = new concurrentbag<crawlingtask>();

            string[] urls = new[] { "http://microsoft.com/", "http://google.com/", "http://facebook.com/", "http://twitter.com/" };

            var crawlers = new task[4];
            for (int i = 1; i <= 4; i++)
            {
                string crawlername = "crawler " + i.tostring();
                bag.add(new crawlingtask { urltocrawl = urls[i - 1], producername = "root" });
                crawlers[i - 1] = task.run(() => crawl(bag, crawlername));
            }

            await task.whenall(crawlers);
        }

        static async task crawl(concurrentbag<crawlingtask> bag, string crawlername)
        {
            crawlingtask task;
            //尝试从bag中取出对象
            while (bag.trytake(out task))
            {
                ienumerable<string> urls = await getlinksfromcontent(task);
                if (urls != null)
                {
                    foreach (var url in urls)
                    {
                        var t = new crawlingtask
                        {
                            urltocrawl = url,
                            producername = crawlername
                        };
                        //将子集插入到bag中 
                        bag.add(t);
                    }
                }
                console.writeline("indexing url {0} posted by {1} is completed by {2}!",
                    task.urltocrawl, task.producername, crawlername);
            }
        }

        static async task<ienumerable<string>> getlinksfromcontent(crawlingtask task)
        {
            await getrandomdelay();

            if (module._contentemulation.containskey(task.urltocrawl)) return module._contentemulation[task.urltocrawl];

            return null;
        }

        static task getrandomdelay()
        {
            int delay = new random(datetime.now.millisecond).next(150, 200);
            return task.delay(delay);
        }


    }
}
program

6、blockingcollection类
  blockingcollection类: 我们能够改变任务存储在阻塞集合中的方式。默认情况下它使用的是concurrentqueue容器,但是我们能够使用任何实现了iproducerconsumercollection泛型接口的集合。

C#线程安全类型
namespace consoleapp1
{
    class customtask
    {
        public int id { get; set; }
    }
}
customtask
C#线程安全类型
using system;
using system.threading.tasks;

namespace consoleapp1
{
    static class module
    {
        public static task getrandomdelay()
        {
            int delay = new random(datetime.now.millisecond).next(1, 500);
            return task.delay(delay);
        }
    }
}
module
C#线程安全类型
using system;
using system.collections.concurrent;
using system.threading.tasks;

namespace consoleapp1
{
    class program
    {
        static void main(string[] args)
        {
            console.writeline("using a queue inside of blockingcollection");
            console.writeline();
            task t = runprogram();
            t.wait();

            //console.writeline();
            //console.writeline("using a stack inside of blockingcollection");
            //console.writeline();
            //task t = runprogram(new concurrentstack<customtask>());
            //t.wait();
        }

        static async task runprogram(iproducerconsumercollection<customtask> collection = null)
        {
            var taskcollection = new blockingcollection<customtask>();
            if (collection != null)
                taskcollection = new blockingcollection<customtask>(collection);
            //初始化collection中的数据
            var tasksource = task.run(() => taskproducer(taskcollection));

            task[] processors = new task[4];
            for (int i = 1; i <= 4; i++)
            {
                string processorid = "processor " + i;
                processors[i - 1] = task.run(
                    () => taskprocessor(taskcollection, processorid));
            }

            await tasksource;

            await task.whenall(processors);
        }
        /// <summary>
        /// 初始化collection中的数据
        /// </summary>
        /// <param name="collection"></param>
        /// <returns></returns>
        static async task taskproducer(blockingcollection<customtask> collection)
        {
            for (int i = 1; i <= 20; i++)
            {
                await task.delay(20);
                var workitem = new customtask { id = i };
                collection.add(workitem);
                console.writeline("task {0} has been posted", workitem.id);
            }
            collection.completeadding();
        }
        /// <summary>
        /// 打印collection中的数据
        /// </summary>
        /// <param name="collection"></param>
        /// <param name="name"></param>
        /// <returns></returns>
        static async task taskprocessor(
            blockingcollection<customtask> collection, string name)
        {
            await module.getrandomdelay();
            foreach (customtask item in collection.getconsumingenumerable())
            {
                console.writeline("task {0} has been processed by {1}", item.id, name);
                await module.getrandomdelay();
            }
        }
    }
}
program

7、使用threadstatic特性
  threadstatic特性是最简单的tls使用,且只支持静态字段,只需要在字段上标记这个特性就可以了

C#线程安全类型
using system;
using system.threading;

namespace consoleapp1
{
    class program
    {
        //tls中的str变量
        //可以看到,str静态字段在两个线程中都是独立存储的,互相不会被修改。
        [threadstatic]
        static string str = "hehe";

        static void main(string[] args)
        {
            //另一个线程只会修改自己tls中的hehe
            thread th = new thread(() => { str = "mgen"; display(); });
            th.start();
            th.join();
            display();
        }
        static void display()
        {
            console.writeline("{0} {1}", thread.currentthread.managedthreadid, str);
        }

    }
}
program

8、使用命名的localdatastoreslot类型
  显然threadstatic特性只支持静态字段太受限制了。.net线程类型中的localdatastoreslot提供更好的tls支持。我们先来看看命名的localdatastoreslot类型,可以通过thread.allocatenameddataslot来分配一个命名的空间,通过thread.freenameddataslot来销毁一个命名的空间。空间数据的获取和设置则通过thread类型的getdata方法和setdata方法。

C#线程安全类型
using system;
using system.threading;

namespace consoleapp1
{
    class program
    {
        static void main(string[] args)
        {
            //创建slot
            localdatastoreslot slot = thread.allocatenameddataslot("slot");

            //设置tls中的值
            thread.setdata(slot, "hehe");

            //修改tls的线程
            thread th = new thread(() =>
            {
                thread.setdata(slot, "mgen");
                display();
            });

            th.start();
            th.join();
            display();

            //清除slot
            thread.freenameddataslot("slot");
        }

        //显示tls中slot值
        static void display()
        {
            localdatastoreslot dataslot = thread.getnameddataslot("slot");
            console.writeline("{0} {1}", thread.currentthread.managedthreadid, thread.getdata(dataslot));
        }

    }
}
program

9、使用未命名的localdatastoreslot类型
  线程同样支持未命名的localdatastoreslot,未命名的localdatastoreslot不需要手动清除,分配则需要thread.allocatedataslot方法。注意由于未命名的localdatastoreslot没有名称,因此无法使用thread.getnameddataslot方法,只能在多个线程中引用同一个localdatastoreslot才可以对tls空间进行操作,将上面的命名的localdatastoreslot代码改成未命名的localdatastoreslot执行

C#线程安全类型
using system;
using system.threading;

namespace consoleapp1
{
    class program
    {
        //静态localdatastoreslot变量
        static localdatastoreslot slot;

        static void main(string[] args)
        {
            //创建slot
            slot = thread.allocatedataslot();

            //设置tls中的值
            thread.setdata(slot, "hehe");

            //修改tls的线程
            thread th = new thread(() =>
            {
                thread.setdata(slot, "mgen");
                display();

            });

            th.start();
            th.join();
            display();
        }

        //显示tls中slot值
        static void display()
        {
            console.writeline("{0} {1}", thread.currentthread.managedthreadid, thread.getdata(slot));
        }

    }
}
program

10、使用.net 4.0的threadlocal<t>类型
  .net 4.0在线程方面加入了很多东西,其中就包括threadlocal<t>类型,他的出现更大的简化了tls的操作。threadlocal<t>类型和lazy<t>惊人相似,构造函数参数是func<t>用来创建对象(当然也可以理解成对象的默认值),然后用value属性来得到或者设置这个对象。
  threadlocal的操作或多或少有点像上面的未命名的localdatastoreslot,但threadlocal感觉更简洁更好理解。

C#线程安全类型
using system;
using system.threading;

namespace consoleapp1
{
    class program
    {
        static threadlocal<string> local;

        static void main(string[] args)
        {
            //创建threadlocal并提供默认值
            local = new threadlocal<string>(() => "hehe");

            //修改tls的线程
            thread th = new thread(() =>
            {

                local.value = "mgen";
                display();
            });

            th.start();
            th.join();
            display();
        }

        //显示tls中数据值
        static void display()
        {
            console.writeline("{0} {1}", thread.currentthread.managedthreadid, local.value);
        }

    }
}
program