C#请求唯一性校验支持高并发的实现方法
使用场景描述:
网络请求中经常会遇到发送的请求,服务端响应是成功的,但是返回的时候出现网络故障,导致客户端无法接收到请求结果,那么客户端程序可能判断为网络故障,而重复发送同一个请求。当然如果接口中定义了请求结果查询接口,那么这种重复会相对少一些。特别是交易类的数据,这种操作更是需要避免重复发送请求。另外一种情况是用户过于快速的点击界面按钮,产生连续的相同内容请求,那么后端也需要进行过滤,这种一般出现在系统对接上,无法去控制第三方系统的业务逻辑,需要从自身业务逻辑里面去限定。
其他需求描述:
这类请求一般存在时间范围和高并发的特点,就是短时间内会出现重复的请求,因此对模块需要支持高并发性。
技术实现:
对请求的业务内容进行md5摘要,并且将md5摘要存储到缓存中,每个请求数据都通过这个一个公共的调用的方法进行判断。
代码实现:
公共调用代码 uniquecheck 采用单例模式创建唯一对象,便于在多线程调用的时候,只访问一个统一的缓存库
/* * volatile就像大家更熟悉的const一样,volatile是一个类型修饰符(type specifier)。 * 它是被设计用来修饰被不同线程访问和修改的变量。 * 如果没有volatile,基本上会导致这样的结果:要么无法编写多线程程序,要么编译器失去大量优化的机会。 */ private static readonly object lockhelper = new object(); private volatile static uniquecheck _instance; /// <summary> /// 获取单一实例 /// </summary> /// <returns></returns> public static uniquecheck getinstance() { if (_instance == null) { lock (lockhelper) { if (_instance == null) _instance = new uniquecheck(); } } return _instance; }
这里需要注意volatile的修饰符,在实际测试过程中,如果没有此修饰符,在高并发的情况下会出现报错。
自定义一个可以进行并发处理队列,代码如下:concurrentlinkedqueue
using system; using system.collections.generic; using system.text; using system.threading; namespace packgeuniquecheck { /// <summary> /// 非加锁并发队列,处理100个并发数以内 /// </summary> /// <typeparam name="t"></typeparam> public class concurrentlinkedqueue<t> { private class node<k> { internal k item; internal node<k> next; public node(k item, node<k> next) { this.item = item; this.next = next; } } private node<t> _head; private node<t> _tail; public concurrentlinkedqueue() { _head = new node<t>(default(t), null); _tail = _head; } public bool isempty { get { return (_head.next == null); } } /// <summary> /// 进入队列 /// </summary> /// <param name="item"></param> public void enqueue(t item) { node<t> newnode = new node<t>(item, null); while (true) { node<t> curtail = _tail; node<t> residue = curtail.next; //判断_tail是否被其他process改变 if (curtail == _tail) { //a 有其他process执行c成功,_tail应该指向新的节点 if (residue == null) { //c 其他process改变了tail节点,需要重新取tail节点 if (interlocked.compareexchange<node<t>>( ref curtail.next, newnode, residue) == residue) { //d 尝试修改tail interlocked.compareexchange<node<t>>(ref _tail, newnode, curtail); return; } } else { //b 帮助其他线程完成d操作 interlocked.compareexchange<node<t>>(ref _tail, residue, curtail); } } } } /// <summary> /// 队列取数据 /// </summary> /// <param name="result"></param> /// <returns></returns> public bool trydequeue(out t result) { node<t> curhead; node<t> curtail; node<t> next; while (true) { curhead = _head; curtail = _tail; next = curhead.next; if (curhead == _head) { if (next == null) //queue为空 { result = default(t); return false; } if (curhead == curtail) //queue处于enqueue第一个node的过程中 { //尝试帮助其他process完成操作 interlocked.compareexchange<node<t>>(ref _tail, next, curtail); } else { //取next.item必须放到cas之前 result = next.item; //如果_head没有发生改变,则将_head指向next并退出 if (interlocked.compareexchange<node<t>>(ref _head, next, curhead) == curhead) break; } } } return true; } /// <summary> /// 尝试获取最后一个对象 /// </summary> /// <param name="result"></param> /// <returns></returns> public bool trygettail(out t result) { result = default(t); if (_tail == null) { return false; } result = _tail.item; return true; } } }
虽然是一个非常简单的唯一性校验逻辑,但是要做到高效率,高并发支持,高可靠性,以及低内存占用,需要实现这样的需求,需要做细致的模拟测试。
using system; using system.collections.generic; using system.text; using system.threading; using system.collections; namespace packgeuniquecheck { public class uniquecheck { /* * volatile就像大家更熟悉的const一样,volatile是一个类型修饰符(type specifier)。 * 它是被设计用来修饰被不同线程访问和修改的变量。 * 如果没有volatile,基本上会导致这样的结果:要么无法编写多线程程序,要么编译器失去大量优化的机会。 */ private static readonly object lockhelper = new object(); private volatile static uniquecheck _instance; /// <summary> /// 获取单一实例 /// </summary> /// <returns></returns> public static uniquecheck getinstance() { if (_instance == null) { lock (lockhelper) { if (_instance == null) _instance = new uniquecheck(); } } return _instance; } private uniquecheck() { //创建一个线程安全的哈希表,作为字典缓存 _datakey = hashtable.synchronized(new hashtable()); queue myqueue = new queue(); _dataqueue = queue.synchronized(myqueue); _myqueue = new concurrentlinkedqueue<string>(); _timer = new thread(doticket); _timer.start(); } #region 公共属性设置 /// <summary> /// 设定定时线程的休眠时间长度:默认为1分钟 /// 时间范围:1-7200000,值为1毫秒到2小时 /// </summary> /// <param name="value"></param> public void settimespan(int value) { if (value > 0&& value <=7200000) { _timespan = value; } } /// <summary> /// 设定缓存cache中的最大记录条数 /// 值范围:1-5000000,1到500万 /// </summary> /// <param name="value"></param> public void setcachemaxnum(int value) { if (value > 0 && value <= 5000000) { _cachemaxnum = value; } } /// <summary> /// 设置是否在控制台中显示日志 /// </summary> /// <param name="value"></param> public void setisshowmsg(bool value) { helper.isshowmsg = value; } /// <summary> /// 线程请求阻塞增量 /// 值范围:1-cachemaxnum,建议设置为缓存最大值的10%-20% /// </summary> /// <param name="value"></param> public void setblocknumext(int value) { if (value > 0 && value <= _cachemaxnum) { _blocknumext = value; } } /// <summary> /// 请求阻塞时间 /// 值范围:1-max,根据阻塞增量设置请求阻塞时间 /// 阻塞时间越长,阻塞增量可以设置越大,但是请求实时响应就越差 /// </summary> /// <param name="value"></param> public void setblockspantime(int value) { if (value > 0) { _blockspantime = value; } } #endregion #region 私有变量 /// <summary> /// 内部运行线程 /// </summary> private thread _runner = null; /// <summary> /// 可处理高并发的队列 /// </summary> private concurrentlinkedqueue<string> _myqueue = null; /// <summary> /// 唯一内容的时间健值对 /// </summary> private hashtable _datakey = null; /// <summary> /// 内容时间队列 /// </summary> private queue _dataqueue = null; /// <summary> /// 定时线程的休眠时间长度:默认为1分钟 /// </summary> private int _timespan = 3000; /// <summary> /// 定时计时器线程 /// </summary> private thread _timer = null; /// <summary> /// 缓存cache中的最大记录条数 /// </summary> private int _cachemaxnum = 500000; /// <summary> /// 线程请求阻塞增量 /// </summary> private int _blocknumext = 10000; /// <summary> /// 请求阻塞时间 /// </summary> private int _blockspantime = 100; #endregion #region 私有方法 private void startrun() { _runner = new thread(doaction); _runner.start(); helper.showmsg("内部线程启动成功!"); } private string getitem() { string tp = string.empty; bool result = _myqueue.trydequeue(out tp); return tp; } /// <summary> /// 执行循环操作 /// </summary> private void doaction() { while (true) { while (!_myqueue.isempty) { string item = getitem(); _dataqueue.enqueue(item); if (!_datakey.containskey(item)) { _datakey.add(item, datetime.now); } } //helper.showmsg("当前数组已经为空,处理线程进入休眠状态..."); thread.sleep(2); } } /// <summary> /// 执行定时器的动作 /// </summary> private void doticket() { while (true) { helper.showmsg("当前数据队列个数:" + _dataqueue.count.tostring()); if (_dataqueue.count > _cachemaxnum) { while (true) { helper.showmsg(string.format("当前队列数:{0},已经超出最大长度:{1},开始进行清理操作...", _dataqueue.count, _cachemaxnum.tostring())); string item = _dataqueue.dequeue().tostring(); if (!string.isnullorempty(item)) { if (_datakey.containskey(item)) { _datakey.remove(item); } if (_dataqueue.count <= _cachemaxnum) { helper.showmsg("清理完成,开始休眠清理线程..."); break; } } } } thread.sleep(_timespan); } } /// <summary> /// 线程进行睡眠等待 /// 如果当前负载压力大大超出了线程的处理能力 /// 那么需要进行延时调用 /// </summary> private void blockthread() { if (_dataqueue.count > _cachemaxnum + _blocknumext) { thread.sleep(_blockspantime); } } #endregion #region 公共方法 /// <summary> /// 开启服务线程 /// </summary> public void start() { if (_runner == null) { startrun(); } else { if (_runner.isalive == false) { startrun(); } } } /// <summary> /// 关闭服务线程 /// </summary> public void stop() { if (_runner != null) { _runner.abort(); _runner = null; } } /// <summary> /// 添加内容信息 /// </summary> /// <param name="item">内容信息</param> /// <returns>true:缓存中不包含此值,队列添加成功,false:缓存中包含此值,队列添加失败</returns> public bool additem(string item) { blockthread(); item = helper.makemd5(item); if (_datakey.containskey(item)) { return false; } else { _myqueue.enqueue(item); return true; } } /// <summary> /// 判断内容信息是否已经存在 /// </summary> /// <param name="item">内容信息</param> /// <returns>true:信息已经存在于缓存中,false:信息不存在于缓存中</returns> public bool checkitem(string item) { item = helper.makemd5(item); return _datakey.containskey(item); } #endregion } }
模拟测试代码:
private static string _example = guid.newguid().tostring(); private static uniquecheck _uck = null; static void main(string[] args) { _uck = uniquecheck.getinstance(); _uck.start(); _uck.setisshowmsg(false); _uck.setcachemaxnum(20000000); _uck.setblocknumext(1000000); _uck.settimespan(6000); _uck.additem(_example); thread[] threads = new thread[20]; for (int i = 0; i < 20; i++) { threads[i] = new thread(addinfo); threads[i].start(); } thread checkthread = new thread(checkinfo); checkthread.start(); string value = console.readline(); checkthread.abort(); for (int i = 0; i < 50; i++) { threads[i].abort(); } _uck.stop(); } static void addinfo() { while (true) { _uck.additem(guid.newguid().tostring()); } } static void checkinfo() { while (true) { console.writeline("开始时间:{0}...", datetime.now.tostring("yyyy-mm-dd hh:mm:ss.ffff")); console.writeline("插入结果:{0}", _uck.additem(_example)); console.writeline("结束时间:{0}", datetime.now.tostring("yyyy-mm-dd hh:mm:ss.ffff")); //调整进程休眠时间,可以测试高并发的情况 //thread.sleep(1000); } }
测试截图:
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对的支持。
上一篇: 十年精华收藏关于路由和路由协议的漏洞
下一篇: 如何创建ajax对象并兼容多个浏览器
推荐阅读