欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

C#请求唯一性校验支持高并发的实现方法

程序员文章站 2022-06-21 08:39:01
使用场景描述:   网络请求中经常会遇到发送的请求,服务端响应是成功的,但是返回的时候出现网络故障,导致客户端无法接收到请求结果,那么客户端程序可能判断为网络故障,而重复发送同一个请...

使用场景描述:

  网络请求中经常会遇到发送的请求,服务端响应是成功的,但是返回的时候出现网络故障,导致客户端无法接收到请求结果,那么客户端程序可能判断为网络故障,而重复发送同一个请求。当然如果接口中定义了请求结果查询接口,那么这种重复会相对少一些。特别是交易类的数据,这种操作更是需要避免重复发送请求。另外一种情况是用户过于快速的点击界面按钮,产生连续的相同内容请求,那么后端也需要进行过滤,这种一般出现在系统对接上,无法去控制第三方系统的业务逻辑,需要从自身业务逻辑里面去限定。

其他需求描述:

  这类请求一般存在时间范围和高并发的特点,就是短时间内会出现重复的请求,因此对模块需要支持高并发性。

技术实现:

  对请求的业务内容进行md5摘要,并且将md5摘要存储到缓存中,每个请求数据都通过这个一个公共的调用的方法进行判断。

代码实现:

  公共调用代码 uniquecheck 采用单例模式创建唯一对象,便于在多线程调用的时候,只访问一个统一的缓存库

/*
   * volatile就像大家更熟悉的const一样,volatile是一个类型修饰符(type specifier)。
   * 它是被设计用来修饰被不同线程访问和修改的变量。
   * 如果没有volatile,基本上会导致这样的结果:要么无法编写多线程程序,要么编译器失去大量优化的机会。
   */
  private static readonly object lockhelper = new object();
 
  private volatile static uniquecheck _instance;  
 
  /// <summary>
  /// 获取单一实例
  /// </summary>
  /// <returns></returns>
  public static uniquecheck getinstance()
  {
   if (_instance == null)
   {
    lock (lockhelper)
    {
     if (_instance == null)
      _instance = new uniquecheck();
    }
   }
   return _instance;
  }

  这里需要注意volatile的修饰符,在实际测试过程中,如果没有此修饰符,在高并发的情况下会出现报错。

  自定义一个可以进行并发处理队列,代码如下:concurrentlinkedqueue

using system;
using system.collections.generic;
using system.text;
using system.threading;

namespace packgeuniquecheck
{
 /// <summary>
 /// 非加锁并发队列,处理100个并发数以内
 /// </summary>
 /// <typeparam name="t"></typeparam>
 public class concurrentlinkedqueue<t>
 {
  private class node<k>
  {
   internal k item;
   internal node<k> next;

   public node(k item, node<k> next)
   {
    this.item = item;
    this.next = next;
   }
  }

  private node<t> _head;
  private node<t> _tail;

  public concurrentlinkedqueue()
  {
   _head = new node<t>(default(t), null);
   _tail = _head;
  }

  public bool isempty
  {
   get { return (_head.next == null); }
  }
  /// <summary>
  /// 进入队列
  /// </summary>
  /// <param name="item"></param>
  public void enqueue(t item)
  {
   node<t> newnode = new node<t>(item, null);
   while (true)
   {
    node<t> curtail = _tail;
    node<t> residue = curtail.next;

    //判断_tail是否被其他process改变
    if (curtail == _tail)
    {
     //a 有其他process执行c成功,_tail应该指向新的节点
     if (residue == null)
     {
      //c 其他process改变了tail节点,需要重新取tail节点
      if (interlocked.compareexchange<node<t>>(
       ref curtail.next, newnode, residue) == residue)
      {
       //d 尝试修改tail
       interlocked.compareexchange<node<t>>(ref _tail, newnode, curtail);
       return;
      }
     }
     else
     {
      //b 帮助其他线程完成d操作
      interlocked.compareexchange<node<t>>(ref _tail, residue, curtail);
     }
    }
   }
  }
  /// <summary>
  /// 队列取数据
  /// </summary>
  /// <param name="result"></param>
  /// <returns></returns>
  public bool trydequeue(out t result)
  {
   node<t> curhead;
   node<t> curtail;
   node<t> next;
   while (true)
   {
    curhead = _head;
    curtail = _tail;
    next = curhead.next;
    if (curhead == _head)
    {
     if (next == null) //queue为空
     {
      result = default(t);
      return false;
     }
     if (curhead == curtail) //queue处于enqueue第一个node的过程中
     {
      //尝试帮助其他process完成操作
      interlocked.compareexchange<node<t>>(ref _tail, next, curtail);
     }
     else
     {
      //取next.item必须放到cas之前
      result = next.item;
      //如果_head没有发生改变,则将_head指向next并退出
      if (interlocked.compareexchange<node<t>>(ref _head,
       next, curhead) == curhead)
       break;
     }
    }
   }
   return true;
  }
  /// <summary>
  /// 尝试获取最后一个对象
  /// </summary>
  /// <param name="result"></param>
  /// <returns></returns>
  public bool trygettail(out t result)
  {
   result = default(t);
   if (_tail == null)
   {
    return false;
   }
   result = _tail.item;
   return true;
  }
 }
}

虽然是一个非常简单的唯一性校验逻辑,但是要做到高效率,高并发支持,高可靠性,以及低内存占用,需要实现这样的需求,需要做细致的模拟测试。

using system;
using system.collections.generic;
using system.text;
using system.threading;
using system.collections;

namespace packgeuniquecheck
{
 public class uniquecheck
 {
  /*
   * volatile就像大家更熟悉的const一样,volatile是一个类型修饰符(type specifier)。
   * 它是被设计用来修饰被不同线程访问和修改的变量。
   * 如果没有volatile,基本上会导致这样的结果:要么无法编写多线程程序,要么编译器失去大量优化的机会。
   */
  private static readonly object lockhelper = new object();

  private volatile static uniquecheck _instance;  

  /// <summary>
  /// 获取单一实例
  /// </summary>
  /// <returns></returns>
  public static uniquecheck getinstance()
  {
   if (_instance == null)
   {
    lock (lockhelper)
    {
     if (_instance == null)
      _instance = new uniquecheck();
    }
   }
   return _instance;
  }

  private uniquecheck()
  {
   //创建一个线程安全的哈希表,作为字典缓存
   _datakey = hashtable.synchronized(new hashtable());
   queue myqueue = new queue();
   _dataqueue = queue.synchronized(myqueue);
   _myqueue = new concurrentlinkedqueue<string>();
   _timer = new thread(doticket);
   _timer.start();
  }

  #region 公共属性设置
  /// <summary>
  /// 设定定时线程的休眠时间长度:默认为1分钟
  /// 时间范围:1-7200000,值为1毫秒到2小时
  /// </summary>
  /// <param name="value"></param>
  public void settimespan(int value)
  {
   if (value > 0&& value <=7200000)
   {
    _timespan = value;
   }
  }
  /// <summary>
  /// 设定缓存cache中的最大记录条数
  /// 值范围:1-5000000,1到500万
  /// </summary>
  /// <param name="value"></param>
  public void setcachemaxnum(int value)
  {
   if (value > 0 && value <= 5000000)
   {
    _cachemaxnum = value;
   }
  }
  /// <summary>
  /// 设置是否在控制台中显示日志
  /// </summary>
  /// <param name="value"></param>
  public void setisshowmsg(bool value)
  {
   helper.isshowmsg = value;
  }
  /// <summary>
  /// 线程请求阻塞增量
  /// 值范围:1-cachemaxnum,建议设置为缓存最大值的10%-20%
  /// </summary>
  /// <param name="value"></param>
  public void setblocknumext(int value)
  {
   if (value > 0 && value <= _cachemaxnum)
   {
    _blocknumext = value;
   }
  }
  /// <summary>
  /// 请求阻塞时间
  /// 值范围:1-max,根据阻塞增量设置请求阻塞时间
  /// 阻塞时间越长,阻塞增量可以设置越大,但是请求实时响应就越差
  /// </summary>
  /// <param name="value"></param>
  public void setblockspantime(int value)
  {
   if (value > 0)
   {
    _blockspantime = value;
   }
  }
  #endregion

  #region 私有变量
  /// <summary>
  /// 内部运行线程
  /// </summary>
  private thread _runner = null;
  /// <summary>
  /// 可处理高并发的队列
  /// </summary>
  private concurrentlinkedqueue<string> _myqueue = null;
  /// <summary>
  /// 唯一内容的时间健值对
  /// </summary>
  private hashtable _datakey = null;
  /// <summary>
  /// 内容时间队列
  /// </summary>
  private queue _dataqueue = null;
  /// <summary>
  /// 定时线程的休眠时间长度:默认为1分钟
  /// </summary>
  private int _timespan = 3000;
  /// <summary>
  /// 定时计时器线程
  /// </summary>
  private thread _timer = null;
  /// <summary>
  /// 缓存cache中的最大记录条数
  /// </summary>
  private int _cachemaxnum = 500000;
  /// <summary>
  /// 线程请求阻塞增量
  /// </summary>
  private int _blocknumext = 10000;
  /// <summary>
  /// 请求阻塞时间
  /// </summary>
  private int _blockspantime = 100;
  #endregion

  #region 私有方法
  private void startrun()
  {
   _runner = new thread(doaction);
   _runner.start();
   helper.showmsg("内部线程启动成功!");
  }

  private string getitem()
  {
   string tp = string.empty;
   bool result = _myqueue.trydequeue(out tp);
   return tp;
  }
  /// <summary>
  /// 执行循环操作
  /// </summary>
  private void doaction()
  {
   while (true)
   {
    while (!_myqueue.isempty)
    {
     string item = getitem();
     _dataqueue.enqueue(item);
     if (!_datakey.containskey(item))
     {
      _datakey.add(item, datetime.now);
     }
    }
    //helper.showmsg("当前数组已经为空,处理线程进入休眠状态...");
    thread.sleep(2);
   }
  }
  /// <summary>
  /// 执行定时器的动作
  /// </summary>
  private void doticket()
  {
   while (true)
   {
    helper.showmsg("当前数据队列个数:" + _dataqueue.count.tostring());
    if (_dataqueue.count > _cachemaxnum)
    {
     while (true)
     {
      helper.showmsg(string.format("当前队列数:{0},已经超出最大长度:{1},开始进行清理操作...", _dataqueue.count, _cachemaxnum.tostring()));
      string item = _dataqueue.dequeue().tostring();
      if (!string.isnullorempty(item))
      {
       if (_datakey.containskey(item))
       {
        _datakey.remove(item);
       }
       if (_dataqueue.count <= _cachemaxnum)
       {
        helper.showmsg("清理完成,开始休眠清理线程...");
        break;
       }
      }
     }
    }
    thread.sleep(_timespan);
   }
  }

  /// <summary>
  /// 线程进行睡眠等待
  /// 如果当前负载压力大大超出了线程的处理能力
  /// 那么需要进行延时调用
  /// </summary>
  private void blockthread()
  {
   if (_dataqueue.count > _cachemaxnum + _blocknumext)
   {
    thread.sleep(_blockspantime);
   }
  }
  #endregion

  #region 公共方法
  /// <summary>
  /// 开启服务线程
  /// </summary>
  public void start()
  {
   if (_runner == null)
   {
    startrun();
   }
   else
   {
    if (_runner.isalive == false)
    {
     startrun();
    }
   }

  }
  /// <summary>
  /// 关闭服务线程
  /// </summary>
  public void stop()
  {
   if (_runner != null)
   {
    _runner.abort();
    _runner = null;
   }
  }

  /// <summary>
  /// 添加内容信息
  /// </summary>
  /// <param name="item">内容信息</param>
  /// <returns>true:缓存中不包含此值,队列添加成功,false:缓存中包含此值,队列添加失败</returns>
  public bool additem(string item)
  {
   blockthread();
   item = helper.makemd5(item);
   if (_datakey.containskey(item))
   {
    return false;
   }
   else
   {
    _myqueue.enqueue(item);
    return true;
   }
  }
  /// <summary>
  /// 判断内容信息是否已经存在
  /// </summary>
  /// <param name="item">内容信息</param>
  /// <returns>true:信息已经存在于缓存中,false:信息不存在于缓存中</returns>
  public bool checkitem(string item)
  {
   item = helper.makemd5(item);
   return _datakey.containskey(item);
  }
  #endregion 

 }
}

模拟测试代码:

private static string _example = guid.newguid().tostring();

  private static uniquecheck _uck = null;

  static void main(string[] args)
  {
   _uck = uniquecheck.getinstance();
   _uck.start();
   _uck.setisshowmsg(false);
   _uck.setcachemaxnum(20000000);
   _uck.setblocknumext(1000000);
   _uck.settimespan(6000);

   _uck.additem(_example);
   thread[] threads = new thread[20];

   for (int i = 0; i < 20; i++)
   {
    threads[i] = new thread(addinfo);
    threads[i].start();
   }

   thread checkthread = new thread(checkinfo);
   checkthread.start();

   string value = console.readline();

   checkthread.abort();
   for (int i = 0; i < 50; i++)
   {
    threads[i].abort();
   }
   _uck.stop();
  }

  static void addinfo()
  {
   while (true)
   {
    _uck.additem(guid.newguid().tostring());
   }
  }

  static void checkinfo()
  {
   while (true)
   {
    console.writeline("开始时间:{0}...", datetime.now.tostring("yyyy-mm-dd hh:mm:ss.ffff"));
    console.writeline("插入结果:{0}", _uck.additem(_example));
    console.writeline("结束时间:{0}", datetime.now.tostring("yyyy-mm-dd hh:mm:ss.ffff"));
          //调整进程休眠时间,可以测试高并发的情况
    //thread.sleep(1000);
   }
   
  }

测试截图:

C#请求唯一性校验支持高并发的实现方法

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对的支持。