欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

RWMutex:共享/专有的递归互斥锁

程序员文章站 2022-06-22 10:37:52
具有共享/独占访问权限,且具有升级/降级功能的互斥锁 介绍 我的目标是创建可以充当读/写锁定机制的对象。任何线程都可以锁定它以进行读取,但是只有一个线程可以锁定它以进行写入。在写入线程释放它之前,所有其他线程都将等待。在释放任何其他线程之前,写线程不会获取互斥体。 我可以使用Slim Reader ......

具有共享/独占访问权限,且具有升级/降级功能的互斥锁

 

介绍

我的目标是创建可以充当读/写锁定机制的对象。任何线程都可以锁定它以进行读取,但是只有一个线程可以锁定它以进行写入。在写入线程释放它之前,所有其他线程都将等待。在释放任何其他线程之前,写线程不会获取互斥体。

 

我可以使用slim reader / writer锁,但是:

  • 它们不是递归的,例如,acquiresrwlockexclusive()如果同一线程较早调用同一函数,则对的调用将阻塞。
  • 它们不可升级,例如,已将锁锁定为读取访问权限的线程无法将其锁定为写入操作。
  • 它们不是可复制的句柄。

我可以尝试c ++ 14,shared_lock但是我仍然需要c ++ 11支持。此外,我还不确定它是否可以真正满足我的要求。

因此,我不得不手动实现它。由于缺少,删除了普通的c ++ 11方法waitformultipleobjects (nyi)。现在具有升级/降级功能。

 

rwmutex

这一节很简单。

1 class rwmutex
2     {
3     private:
4         handle hchangemap;
5         std::map<dword, handle> threads;
6         rwmutex(const rwmutex&) = delete;
7         rwmutex(rwmutex&&) = delete;

 

我需要std::map<dword,handle>为所有尝试访问共享资源的线程存储一个句柄,并且还需要一个互斥锁以确保对此映射的所有更改都是线程安全的。

构造函数

1 rwmutex(const rwmutex&) = delete;
2 void operator =(const rwmutex&) = delete;
3 
4 rwmutex()
5     {
6     hchangemapwrite = createmutex(0,0,0);
7     }

 

简单地创建一个映射互斥的句柄。对象不可复制。

创建

 1 handle createif(bool keepreaderlocked = false)
 2     {
 3                 waitforsingleobject(hchangemap, infinite);
 4                 dword id = getcurrentthreadid();
 5                 if (threads[id] == 0)
 6                     {
 7                     handle e0 = createmutex(0, 0, 0);
 8                     threads[id] = e0;
 9                     }
10                 handle e = threads[id];
11                 if (!keepreaderlocked)
12                       releasemutex(hchangemap);
13                 return e; 
14     }

 

当调用lockread()或lockwrite()来锁定对象时,将调用这个私有函数。如果当前线程还没有将自己变为可能访问这个互斥锁的线程中,这个函数将为该线程创建一个互斥锁。如果其他线程已经锁定这个互斥对象进行写访问,那么这个函数就会阻塞,直到写线程释放这个对象为止。这个函数返回当前线程的互斥句柄。

锁定读取/释放读取

 1 handle lockread()
 2     {
 3     auto f = createif();
 4     waitforsingleobject(f,infinite);
 5     return f;
 6     }
 7 void releaseread(handle f)
 8     {
 9     releasemutex(f);
10     }

 

当您要锁定对象以进行读取访问并稍后释放它时,将调用这些函数。

锁/释放

 1 void lockwrite()
 2     {
 3                 createif(true);
 4 
 5                 // wait for all 
 6                 vector<handle> allthreads;
 7                 allthreads.reserve(threads.size());
 8                 for (auto& a : threads)
 9                     {
10                     allthreads.push_back(a.second);
11                     }
12 
13                 waitformultipleobjects((dword)allthreads.size(), allthreads.data(), true, infinite);
14 
15                 // reader is locked
16     }
17 
18 void releasewrite()
19     {
20     
21     // release all
22     for (auto& a : threads)
23         releasemutex(a.second);
24     releasemutex(hchangemap);
25     }

 

当您希望锁定对象以进行写访问并在稍后释放它时,将调用这些函数。函数的作用是:

1.在锁期间没有注册新线程

2.任何读取线程都释放了锁

 

析构函数

1 rwmutex()
2     {
3     closehandle(hchangemap);
4     hchangemap = 0;
5     for (auto& a : threads)
6         closehandle(a.second);
7     threads.clear();
8     }

 

析构函数确保清除所有句柄。

可升级/可升级锁

有时,您希望将读锁升级为写锁,而不先解锁,以提高效率。因此,lockwrite被修改为:

 1 void lockwrite(dword updthread = 0)
 2 {
 3     createif(true);
 4 
 5     // wait for all
 6     allthreads.reserve(threads.size());
 7     allthreads.clear();
 8     for (auto& a : threads)
 9     {
10         if (updthread == a.first) // except ourself if in upgrade operation
11             continue;
12         allthreads.push_back(a.second);
13     }
14     auto tim = waitformultipleobjects((dword)allthreads.size(), allthreads.data(), true, wi);
15 
16     if (tim == wait_timeout && wi != infinite)
17         outputdebugstring(l"lockwrite debug timeout!");
18 
19     // we don't want to keep threads, the hchangemap is enough
20     // we also release the handle to the upgraded thread, if any
21     for (auto& a : threads)
22         releasemutex(a.second);
23 
24     // reader is locked
25 }
26 
27 void upgrade()
28 {
29     lockwrite(getcurrentthreadid());
30 }
31 
32 handle downgrade()
33 {
34     dword id = getcurrentthreadid();
35     auto z = threads[id];
36     auto tim = waitforsingleobject(z, wi);
37     if (tim == wait_timeout && wi != infinite)
38         outputdebugstring(l"downgrade debug timeout!");
39     releasemutex(hchangemap);
40     return z;
41 }

 

调用upgrade()现在的结果是:

更改被锁定的映射

等待除我们自己的线程之外的所有读线程退出

然后我们释放我们自己的线程互斥锁,因为更改锁定的映射就足够了。

调用downgrade()结果:

  • 直接从映射上获取手柄,无需重新锁定
  • 锁定此句柄,就像我们处于读取模式一样
  • 发布变更映射

因此,整个代码是(带有一些调试帮助):

  1 // rwmutex
  2     class rwmutex
  3         {
  4         private:
  5             handle hchangemap = 0;
  6             std::map<dword, handle> threads;
  7             dword wi = infinite;
  8             rwmutex(const rwmutex&) = delete;
  9             rwmutex(rwmutex&&) = delete;
 10             operator=(const rwmutex&) = delete;
 11 
 12         public:
 13 
 14             rwmutex(bool d = false)
 15                 {
 16                 if (d)
 17                     wi = 10000;
 18                 else
 19                     wi = infinite;
 20                 hchangemap = createmutex(0, 0, 0);
 21                 }
 22 
 23             ~rwmutex()
 24                 {
 25                 closehandle(hchangemap);
 26                 hchangemap = 0;
 27                 for (auto& a : threads)
 28                     closehandle(a.second);
 29                 threads.clear();
 30                 }
 31 
 32             handle createif(bool keepreaderlocked = false)
 33                 {
 34                 auto tim = waitforsingleobject(hchangemap, infinite);
 35                 if (tim == wait_timeout && wi != infinite)
 36                     outputdebugstring(l"lockread debug timeout!");
 37                 dword id = getcurrentthreadid();
 38                 if (threads[id] == 0)
 39                     {
 40                     handle e0 = createmutex(0, 0, 0);
 41                     threads[id] = e0;
 42                     }
 43                 handle e = threads[id];
 44                 if (!keepreaderlocked)    
 45                     releasemutex(hchangemap);
 46                 return e;
 47                 }
 48 
 49             handle lockread()
 50                 {
 51                 auto z = createif();
 52                 auto tim = waitforsingleobject(z, wi);
 53                 if (tim == wait_timeout && wi != infinite)
 54                     outputdebugstring(l"lockread debug timeout!");
 55                 return z;
 56                 }
 57 
 58     void lockwrite(dword updthread = 0)
 59     {
 60         createif(true);
 61 
 62         // wait for all 
 63         allthreads.reserve(threads.size());
 64         allthreads.clear();
 65         for (auto& a : threads)
 66         {
 67             if (updthread == a.first) // except ourself if in upgrade operation
 68                 continue;
 69             allthreads.push_back(a.second);
 70         }
 71         auto tim = waitformultipleobjects((dword)allthreads.size(), allthreads.data(), true, wi);
 72 
 73         if (tim == wait_timeout && wi != infinite)
 74             outputdebugstring(l"lockwrite debug timeout!");
 75 
 76         // we don't want to keep threads, the hchangemap is enough
 77         // we also release the handle to the upgraded thread, if any
 78         for (auto& a : threads)
 79             releasemutex(a.second);
 80 
 81         // reader is locked
 82     }
 83 
 84     void releasewrite()
 85     {
 86         releasemutex(hchangemap);
 87     }
 88 
 89     void releaseread(handle f)
 90     {
 91         releasemutex(f);
 92     }
 93 
 94     void upgrade()
 95     {
 96         lockwrite(getcurrentthreadid());
 97     }
 98 
 99     handle downgrade()
100     {
101         dword id = getcurrentthreadid();
102         auto z = threads[id];
103         auto tim = waitforsingleobject(z, wi);
104         if (tim == wait_timeout && wi != infinite)
105             outputdebugstring(l"downgrade debug timeout!");
106         releasemutex(hchangemap);
107         return z;
108     }              
109 };

 

要使用rwmutex,可以简单地创建锁定类:

 1 class rwmutexlockread
 2     {
 3     private:
 4         rwmutex* mm = 0;
 5     public:
 6 
 7         rwmutexlockread(const rwmutexlockread&) = delete;
 8         void operator =(const rwmutexlockread&) = delete;
 9 
10         rwmutexlockread(rwmutex*m)
11             {
12             if (m)
13                 {
14                 mm = m;
15                 mm->lockread();
16                 }
17             }
18         ~rwmutexlockread()
19             {
20             if (mm)
21                 {
22                 mm->releaseread();
23                 mm = 0;
24                 }
25             }
26     };
27 
28 class rwmutexlockwrite
29     {
30     private:
31         rwmutex* mm = 0;
32     public:
33         rwmutexlockwrite(rwmutex*m)
34             {
35             if (m)
36                 {
37                 mm = m;
38                 mm->lockwrite();
39                 }
40             }
41         ~rwmutexlockwrite()
42             {
43             if (mm)
44                 {
45                 mm->releasewrite();
46                 mm = 0;
47                 }
48             }
49     };

 

还有一个用于升级机制的新类:

 1 class rwmutexlockreadwrite
 2 {
 3 private:
 4     rwmutex* mm = 0;
 5     handle lm = 0;
 6     bool u = false;
 7 public:
 8 
 9     rwmutexlockreadwrite(const rwmutexlockreadwrite&) = delete;
10     void operator =(const rwmutexlockreadwrite&) = delete;
11 
12     rwmutexlockreadwrite(rwmutex*m)
13     {
14         if (m)
15         {
16             mm = m;
17             lm = mm->lockread();
18         }
19     }
20 
21     void upgrade()
22     {
23         if (mm && !u)
24         {
25             mm->upgrade();
26             lm = 0;
27             u = 1;
28         }
29     }
30 
31     void downgrade()
32     {
33         if (mm && u)
34         {
35             lm = mm->downgrade();
36             u = 0;
37         }
38     }
39 
40     ~rwmutexlockreadwrite()
41     {
42         if (mm)
43         {
44             if (u)
45                 mm->releasewrite();
46             else
47                 mm->releaseread(lm);
48             lm = 0;
49             mm = 0;
50         }
51     }
52 };

 

用法示例:

 1 rwmutex m;
 2 
 3 // ... other code
 4 void foo1() {
 5   rwmutexlockread lock(&m);
 6   }
 7 
 8 void foo2() {
 9  rwmutexlockwrite lock(&m);
10 }