RWMutex:共享/专有的递归互斥锁
具有共享/独占访问权限,且具有升级/降级功能的互斥锁
介绍
我的目标是创建可以充当读/写锁定机制的对象。任何线程都可以锁定它以进行读取,但是只有一个线程可以锁定它以进行写入。在写入线程释放它之前,所有其他线程都将等待。在释放任何其他线程之前,写线程不会获取互斥体。
我可以使用slim reader / writer锁,但是:
- 它们不是递归的,例如,
acquiresrwlockexclusive()
如果同一线程较早调用同一函数,则对的调用将阻塞。 - 它们不可升级,例如,已将锁锁定为读取访问权限的线程无法将其锁定为写入操作。
- 它们不是可复制的句柄。
我可以尝试c ++ 14,shared_lock
但是我仍然需要c ++ 11支持。此外,我还不确定它是否可以真正满足我的要求。
因此,我不得不手动实现它。由于缺少,删除了普通的c ++ 11方法waitformultipleobjects (nyi)
。现在具有升级/降级功能。
rwmutex
这一节很简单。
1 class rwmutex 2 { 3 private: 4 handle hchangemap; 5 std::map<dword, handle> threads; 6 rwmutex(const rwmutex&) = delete; 7 rwmutex(rwmutex&&) = delete;
我需要std::map<dword,handle>
为所有尝试访问共享资源的线程存储一个句柄,并且还需要一个互斥锁以确保对此映射的所有更改都是线程安全的。
构造函数
1 rwmutex(const rwmutex&) = delete; 2 void operator =(const rwmutex&) = delete; 3 4 rwmutex() 5 { 6 hchangemapwrite = createmutex(0,0,0); 7 }
简单地创建一个映射互斥的句柄。对象不可复制。
创建
1 handle createif(bool keepreaderlocked = false) 2 { 3 waitforsingleobject(hchangemap, infinite); 4 dword id = getcurrentthreadid(); 5 if (threads[id] == 0) 6 { 7 handle e0 = createmutex(0, 0, 0); 8 threads[id] = e0; 9 } 10 handle e = threads[id]; 11 if (!keepreaderlocked) 12 releasemutex(hchangemap); 13 return e; 14 }
当调用lockread()或lockwrite()来锁定对象时,将调用这个私有函数。如果当前线程还没有将自己变为可能访问这个互斥锁的线程中,这个函数将为该线程创建一个互斥锁。如果其他线程已经锁定这个互斥对象进行写访问,那么这个函数就会阻塞,直到写线程释放这个对象为止。这个函数返回当前线程的互斥句柄。
锁定读取/释放读取
1 handle lockread() 2 { 3 auto f = createif(); 4 waitforsingleobject(f,infinite); 5 return f; 6 } 7 void releaseread(handle f) 8 { 9 releasemutex(f); 10 }
当您要锁定对象以进行读取访问并稍后释放它时,将调用这些函数。
锁/释放
1 void lockwrite() 2 { 3 createif(true); 4 5 // wait for all 6 vector<handle> allthreads; 7 allthreads.reserve(threads.size()); 8 for (auto& a : threads) 9 { 10 allthreads.push_back(a.second); 11 } 12 13 waitformultipleobjects((dword)allthreads.size(), allthreads.data(), true, infinite); 14 15 // reader is locked 16 } 17 18 void releasewrite() 19 { 20 21 // release all 22 for (auto& a : threads) 23 releasemutex(a.second); 24 releasemutex(hchangemap); 25 }
当您希望锁定对象以进行写访问并在稍后释放它时,将调用这些函数。函数的作用是:
1.在锁期间没有注册新线程
2.任何读取线程都释放了锁
析构函数
1 rwmutex() 2 { 3 closehandle(hchangemap); 4 hchangemap = 0; 5 for (auto& a : threads) 6 closehandle(a.second); 7 threads.clear(); 8 }
析构函数确保清除所有句柄。
可升级/可升级锁
有时,您希望将读锁升级为写锁,而不先解锁,以提高效率。因此,lockwrite
被修改为:
1 void lockwrite(dword updthread = 0) 2 { 3 createif(true); 4 5 // wait for all 6 allthreads.reserve(threads.size()); 7 allthreads.clear(); 8 for (auto& a : threads) 9 { 10 if (updthread == a.first) // except ourself if in upgrade operation 11 continue; 12 allthreads.push_back(a.second); 13 } 14 auto tim = waitformultipleobjects((dword)allthreads.size(), allthreads.data(), true, wi); 15 16 if (tim == wait_timeout && wi != infinite) 17 outputdebugstring(l"lockwrite debug timeout!"); 18 19 // we don't want to keep threads, the hchangemap is enough 20 // we also release the handle to the upgraded thread, if any 21 for (auto& a : threads) 22 releasemutex(a.second); 23 24 // reader is locked 25 } 26 27 void upgrade() 28 { 29 lockwrite(getcurrentthreadid()); 30 } 31 32 handle downgrade() 33 { 34 dword id = getcurrentthreadid(); 35 auto z = threads[id]; 36 auto tim = waitforsingleobject(z, wi); 37 if (tim == wait_timeout && wi != infinite) 38 outputdebugstring(l"downgrade debug timeout!"); 39 releasemutex(hchangemap); 40 return z; 41 }
调用upgrade()现在的结果是:
更改被锁定的映射
等待除我们自己的线程之外的所有读线程退出
然后我们释放我们自己的线程互斥锁,因为更改锁定的映射就足够了。
调用downgrade()
结果:
- 直接从映射上获取手柄,无需重新锁定
- 锁定此句柄,就像我们处于读取模式一样
- 发布变更映射
因此,整个代码是(带有一些调试帮助):
1 // rwmutex 2 class rwmutex 3 { 4 private: 5 handle hchangemap = 0; 6 std::map<dword, handle> threads; 7 dword wi = infinite; 8 rwmutex(const rwmutex&) = delete; 9 rwmutex(rwmutex&&) = delete; 10 operator=(const rwmutex&) = delete; 11 12 public: 13 14 rwmutex(bool d = false) 15 { 16 if (d) 17 wi = 10000; 18 else 19 wi = infinite; 20 hchangemap = createmutex(0, 0, 0); 21 } 22 23 ~rwmutex() 24 { 25 closehandle(hchangemap); 26 hchangemap = 0; 27 for (auto& a : threads) 28 closehandle(a.second); 29 threads.clear(); 30 } 31 32 handle createif(bool keepreaderlocked = false) 33 { 34 auto tim = waitforsingleobject(hchangemap, infinite); 35 if (tim == wait_timeout && wi != infinite) 36 outputdebugstring(l"lockread debug timeout!"); 37 dword id = getcurrentthreadid(); 38 if (threads[id] == 0) 39 { 40 handle e0 = createmutex(0, 0, 0); 41 threads[id] = e0; 42 } 43 handle e = threads[id]; 44 if (!keepreaderlocked) 45 releasemutex(hchangemap); 46 return e; 47 } 48 49 handle lockread() 50 { 51 auto z = createif(); 52 auto tim = waitforsingleobject(z, wi); 53 if (tim == wait_timeout && wi != infinite) 54 outputdebugstring(l"lockread debug timeout!"); 55 return z; 56 } 57 58 void lockwrite(dword updthread = 0) 59 { 60 createif(true); 61 62 // wait for all 63 allthreads.reserve(threads.size()); 64 allthreads.clear(); 65 for (auto& a : threads) 66 { 67 if (updthread == a.first) // except ourself if in upgrade operation 68 continue; 69 allthreads.push_back(a.second); 70 } 71 auto tim = waitformultipleobjects((dword)allthreads.size(), allthreads.data(), true, wi); 72 73 if (tim == wait_timeout && wi != infinite) 74 outputdebugstring(l"lockwrite debug timeout!"); 75 76 // we don't want to keep threads, the hchangemap is enough 77 // we also release the handle to the upgraded thread, if any 78 for (auto& a : threads) 79 releasemutex(a.second); 80 81 // reader is locked 82 } 83 84 void releasewrite() 85 { 86 releasemutex(hchangemap); 87 } 88 89 void releaseread(handle f) 90 { 91 releasemutex(f); 92 } 93 94 void upgrade() 95 { 96 lockwrite(getcurrentthreadid()); 97 } 98 99 handle downgrade() 100 { 101 dword id = getcurrentthreadid(); 102 auto z = threads[id]; 103 auto tim = waitforsingleobject(z, wi); 104 if (tim == wait_timeout && wi != infinite) 105 outputdebugstring(l"downgrade debug timeout!"); 106 releasemutex(hchangemap); 107 return z; 108 } 109 };
要使用rwmutex
,可以简单地创建锁定类:
1 class rwmutexlockread 2 { 3 private: 4 rwmutex* mm = 0; 5 public: 6 7 rwmutexlockread(const rwmutexlockread&) = delete; 8 void operator =(const rwmutexlockread&) = delete; 9 10 rwmutexlockread(rwmutex*m) 11 { 12 if (m) 13 { 14 mm = m; 15 mm->lockread(); 16 } 17 } 18 ~rwmutexlockread() 19 { 20 if (mm) 21 { 22 mm->releaseread(); 23 mm = 0; 24 } 25 } 26 }; 27 28 class rwmutexlockwrite 29 { 30 private: 31 rwmutex* mm = 0; 32 public: 33 rwmutexlockwrite(rwmutex*m) 34 { 35 if (m) 36 { 37 mm = m; 38 mm->lockwrite(); 39 } 40 } 41 ~rwmutexlockwrite() 42 { 43 if (mm) 44 { 45 mm->releasewrite(); 46 mm = 0; 47 } 48 } 49 };
还有一个用于升级机制的新类:
1 class rwmutexlockreadwrite 2 { 3 private: 4 rwmutex* mm = 0; 5 handle lm = 0; 6 bool u = false; 7 public: 8 9 rwmutexlockreadwrite(const rwmutexlockreadwrite&) = delete; 10 void operator =(const rwmutexlockreadwrite&) = delete; 11 12 rwmutexlockreadwrite(rwmutex*m) 13 { 14 if (m) 15 { 16 mm = m; 17 lm = mm->lockread(); 18 } 19 } 20 21 void upgrade() 22 { 23 if (mm && !u) 24 { 25 mm->upgrade(); 26 lm = 0; 27 u = 1; 28 } 29 } 30 31 void downgrade() 32 { 33 if (mm && u) 34 { 35 lm = mm->downgrade(); 36 u = 0; 37 } 38 } 39 40 ~rwmutexlockreadwrite() 41 { 42 if (mm) 43 { 44 if (u) 45 mm->releasewrite(); 46 else 47 mm->releaseread(lm); 48 lm = 0; 49 mm = 0; 50 } 51 } 52 };
用法示例:
1 rwmutex m; 2 3 // ... other code 4 void foo1() { 5 rwmutexlockread lock(&m); 6 } 7 8 void foo2() { 9 rwmutexlockwrite lock(&m); 10 }