欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

我是如何一步步的在并行编程中将lock锁次数降到最低实现无锁编程

程序员文章站 2022-04-23 09:17:18
在并行编程中,经常会遇到多线程间操作共享集合的问题,很多时候大家都很难逃避这个问题做到一种无锁编程状态,你也知道一旦给共享集合套上lock之后,并发和伸缩能力往往会造成很大影响,这篇就来谈谈如何尽可能的减少lock锁次数甚至没有。 一:缘由 1. 业务背景 昨天在review代码的时候,看到以前自己 ......

在并行编程中,经常会遇到多线程间操作共享集合的问题,很多时候大家都很难逃避这个问题做到一种无锁编程状态,你也知道一旦给共享集合套上lock之后,并发和伸缩能力往往会造成很大影响,这篇就来谈谈如何尽可能的减少lock锁次数甚至没有。

一:缘由

1. 业务背景

昨天在review代码的时候,看到以前自己写的这么一段代码,精简后如下:

        private static list<long> executefilterlist(int shopid, list<memorycachetrade> trades, list<filterconditon> filteritemlist, matrixsearchcontext searchcontext)
        {
            var customeridlist = new list<long>();

            var index = 0;

            parallel.foreach(filteritemlist, new paralleloptions() { maxdegreeofparallelism = 4 },
                            (filteritem) =>
            {
                var context = new filteritemcontext()
                {
                    starttime = searchcontext.starttime,
                    endtime = searchcontext.endtime,
                    shopid = shopid,
                    field = filteritem.field,
                    filtertype = filteritem.filtertype,
                    itemlist = filteritem.filtervalue,
                    searchlist = trades.tolist()
                };

                var smallcustomeridlist = context.execute();

                lock (filteritemlist)
                {
                    if (index == 0)
                    {
                        customeridlist.addrange(smallcustomeridlist);
                        index++;
                    }
                    else
                    {
                        customeridlist = customeridlist.intersect(smallcustomeridlist).tolist();
                    }
                }
            });

            return customeridlist;
        }

这段代码实现的功能是这样的,filteritemlist承载着所有原子化的筛选条件,然后用多线程的形式并发执行里面的item,最后将每个item获取的客户人数集合在高层进行整体求交,画个简图就是下面这样。

我是如何一步步的在并行编程中将lock锁次数降到最低实现无锁编程

2. 问题分析

其实这代码存在着一个很大的问题,在parallel中直接使用lock锁的话,filteritemlist有多少个,我的lock就会锁多少次,这对并发和伸缩性是有一定影响的,现在就来想想怎么优化吧!

3. 测试案例

为了方便演示,我模拟了一个小案例,方便大家看到实时结果,修改后的代码如下:

        public static void main(string[] args)
        {
            var filteritemlist = new list<string>() { "conditon1", "conditon2", "conditon3", "conditon4", "conditon5", "conditon6" };
            paralleltest1(filteritemlist);
        }

        public static void paralleltest1(list<string> filteritemlist)
        {
            var totalcustomeridlist = new list<int>();

            bool isfirst = true;

            parallel.foreach(filteritemlist, new paralleloptions() { maxdegreeofparallelism = 2 }, (query) =>
            {
                var smallcustomeridlist = getcustomeridlist(query);

                lock (filteritemlist)
                {
                    if (isfirst)
                    {
                        totalcustomeridlist.addrange(smallcustomeridlist);
                        isfirst = false;
                    }
                    else
                    {
                        totalcustomeridlist = totalcustomeridlist.intersect(smallcustomeridlist).tolist();
                    }

                    console.writeline($"{datetime.now} 被锁了");
                }
            });

            console.writeline($"最后交集客户id:{string.join(",", totalcustomeridlist)}");
        }

        public static list<int> getcustomeridlist(string query)
        {
            var dict = new dictionary<string, list<int>>()
            {
                ["conditon1"] = new list<int>() { 1, 2, 4, 7 },
                ["conditon2"] = new list<int>() { 1, 4, 6, 7 },
                ["conditon3"] = new list<int>() { 1, 4, 5, 7 },
                ["conditon4"] = new list<int>() { 1, 2, 3, 7 },
                ["conditon5"] = new list<int>() { 1, 2, 4, 5, 7 },
                ["conditon6"] = new list<int>() { 1, 3, 4, 7, 9 },
            };

            return dict[query];
        }

------ output ------
2020/04/21 15:53:34 被锁了
2020/04/21 15:53:34 被锁了
2020/04/21 15:53:34 被锁了
2020/04/21 15:53:34 被锁了
2020/04/21 15:53:34 被锁了
2020/04/21 15:53:34 被锁了
最后交集客户id:1,7

二:第一次优化

从结果中可以看到,filteritemlist有6个,锁次数也是6次,那如何降低呢? 其实实现parallel代码的fcl大神也考虑到了这个问题,从底层给了一个很好的重载,如下所示:

public static parallelloopresult foreach<tsource, tlocal>(orderablepartitioner<tsource> source, paralleloptions paralleloptions, func<tlocal> localinit, func<tsource, parallelloopstate, long, tlocal, tlocal> body, action<tlocal> localfinally);

这个重载很特别,多了两个参数localinit和localfinally,过会说一下什么意思,先看修改后的代码体会一下

        public static void paralleltest2(list<string> filteritemlist)
        {
            var totalcustomeridlist = new list<int>();
            var isfirst = true;

            parallel.foreach<string, list<int>>(filteritemlist,
              new paralleloptions() { maxdegreeofparallelism = 2 },
              () => { return null; },
             (query, loop, index, smalllist) =>
             {
                 var smallcustomeridlist = getcustomeridlist(query);

                 if (smalllist == null) return smallcustomeridlist;

                 return smalllist.intersect(smallcustomeridlist).tolist();
             },
            (finalllist) =>
            {
                lock (filteritemlist)
                {
                    if (isfirst)
                    {
                        totalcustomeridlist.addrange(finalllist);
                        isfirst = false;
                    }
                    else
                    {
                        totalcustomeridlist = totalcustomeridlist.intersect(finalllist).tolist();
                    }
                    console.writeline($"{datetime.now} 被锁了");
                }
            });
            console.writeline($"最后交集客户id:{string.join(",", totalcustomeridlist)}");
        }

------- output ------
2020/04/21 16:11:46 被锁了
2020/04/21 16:11:46 被锁了
最后交集客户id:1,7
press any key to continue . . .

很好,这次优化将lock次数从6次降到了2次,这里我用了 new paralleloptions() { maxdegreeofparallelism = 2 } 设置了并发度为最多2个cpu核,程序跑起来后会开两个线程,将一个大集合划分为2个小集合,相当于1个集合3个条件,第一个线程在执行3个条件的起始处会执行你的localinit函数,在3个条件迭代完之后再执行你的localfinally,第二个线程也是按照同样方式执行自己的3个条件,说的有点晦涩,画一张图说明吧。

我是如何一步步的在并行编程中将lock锁次数降到最低实现无锁编程

三: 第二次优化

如果你了解task<t>这种带有返回值的task,这就好办了,多少个filteritemlist就可以开多少个task,反正task底层是使用线程池承载的,所以不用怕,这样就完美的实现无锁编程。

        public static void paralleltest3(list<string> filteritemlist)
        {
            var totalcustomeridlist = new list<int>();
            var tasks = new task<list<int>>[filteritemlist.count];

            for (int i = 0; i < filteritemlist.count; i++)
            {
                tasks[i] = task.factory.startnew((query) =>
                {
                    return getcustomeridlist(query.tostring());
                }, filteritemlist[i]);
            }

            task.waitall(tasks);

            for (int i = 0; i < tasks.length; i++)
            {
                var smallcustomeridlist = tasks[i].result;
                if (i == 0)
                {
                    totalcustomeridlist.addrange(smallcustomeridlist);
                }
                else
                {
                    totalcustomeridlist = totalcustomeridlist.intersect(smallcustomeridlist).tolist();
                }
            }

            console.writeline($"最后交集客户id:{string.join(",", totalcustomeridlist)}");
        }

------ output -------

最后交集客户id:1,7
press any key to continue . . .

四:总结

我们将原来的6个lock优化到了无锁编程,但并不说明无锁编程就一定比带有lock的效率高,大家要结合自己的使用场景合理的使用和混合搭配。

好了,本篇就说到这里,希望对您有帮助。


如您有更多问题与我互动,扫描下方进来吧~


我是如何一步步的在并行编程中将lock锁次数降到最低实现无锁编程我是如何一步步的在并行编程中将lock锁次数降到最低实现无锁编程