欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Python3 Tornado 限制 IP在时间段内请求次数

程序员文章站 2022-07-08 15:49:05
原因:有个接口服务需要限制接口请求次数。处理逻辑:1、新建空列表;2、有IP 访问则记录 IP、时间、次数; eg:[{'192.168.1.124': {'2020-12-08 17:00:33': 10}}, {'192.168.1.125': {'2020-12-08 17:00:33': 9}}]3、判断当前时间和列表中的IP访问时间是否相同: ①、相同则增加访问次数; ②、不同则更改原有的访问时间和次数;4、同一秒访问10次则限制3分钟后再......

原因:有个接口服务需要限制接口请求次数。

 

处理逻辑:

1、新建空列表;

2、有IP 访问则记录 IP、时间、次数;  eg: [{'192.168.1.124': {'2020-12-08 17:00:33': 10}}, {'192.168.1.125': {'2020-12-08 17:00:33': 9}}]

3、判断当前时间和列表中的IP访问时间是否相同:

      ①、相同则增加访问次数;

      ②、不同则更改原有的访问时间和次数;

4、同一秒访问10次则限制3分钟后再访问;

5、防止列表一直存储着信息,需要在限制的时候清空,防止占满内存。


代码:

import datetime  # 时间包
#第三方 格式化时间包  pip install python-dateutil
from dateutil.relativedelta import relativedelta 

expire_time = ''  # 过期时间
iplist = []       # 存放访问的IP 格式 [{'192.168.1.124': {'2020-12-08 17:00:33': 10}}, {'192.168.1.125': {'2020-12-08 17:00:33': 9}}]


def counts(host):
    global expire_time, iplist
    key_list = []  # 临时存储列表中的IP键
    index_list = {}  # 临时存储列表中的下标值
    datetimes = datetime.datetime.now()
    nowtime = datetimes.strftime("%Y-%m-%d %H:%M:%S") # 转化成字符串便于存储
    print('nowtime', nowtime)
    if expire_time == '':  # 判断无过期时间
        if iplist:         # 判断有访问IP
            for index, item in enumerate(iplist):   # 将原有的访问IP遍历出来
                tmp_key = tuple(item.keys())[0]     # 取出第一层字典键
                if tmp_key not in key_list:         # 不在临时IP存储列表中
                    key_list.append(tmp_key)        # 不在临时IP列表则加入
                    tmp_key_dict = {tmp_key: index} # 拼接新临时存储IP下标,方便从IPlist中取值
                    index_list.update(tmp_key_dict) # 更新到临时存储IP字典中
            print('iplist', iplist)
            print("key_list", key_list)
            print("index_list", index_list)
            if host in key_list:  # 在列表中则判断时间和次数
                index = index_list[host]                # 取IP在IPList中的下标
                item = iplist[index]                    # 取出IPList对应的字典值
                times_key = tuple(item[host].keys())[0] # 取出IPList的第二层时间键
                # 转为时间格式进行时间间隔计算
                time_key = datetime.datetime.strptime(times_key, '%Y-%m-%d %H:%M:%S')
                # print("时间格式",type(datetimes - time_key),datetimes - time_key)
                #    >>> 时间格式 <class 'datetime.timedelta'> 0:00:00.213644
                # datetime.timedelta 格式数据通过 .seconds取相差的秒数,取分钟及其他类型会报错,有了解的大佬欢迎在评论区指教!
                if (datetimes - time_key).seconds == 0:  # 时间相同 则判断是否超过10次
                    if item[host][times_key] >= 10:  # 同一秒访问等于或大于10 就不允许请求接口了
                        # 设置过期时间
                        expire_time = (datetimes + relativedelta(minutes=+3)).strftime("%Y-%m-%d %H:%M:%S")
                        iplist = []  # 重置记录列表
                        # 返回限制结果
                        return {'code': -1, 'msg': "Error", 'data': "访问频率过快,请3分钟后再访问!"}
                    else:  # 不等于10次 则增加该IP请求次数
                        num = item[host].get(times_key, 1) + 1  # 取出该字典的次数+1
                        item[host][times_key] = num             # 重新赋值给该IP
                else:  # 时间不同则重新给IP计数
                    item[host] = {nowtime: 1}     # 重置该IP访问次数
            else:  # 表示第一次访问接口
                tmp_data = {host: {nowtime: 1}}   # 构建该IP的访问次数
                iplist.append(tmp_data)           # 添加到列表中
        else:  # 判断无访问IP即表示第一次访问接口
            tmp_data = {host: {nowtime: 1}}       # 构建该IP的访问次数
            iplist.append(tmp_data)               # 添加到列表中
    else:  # 限制时间已过就可以访问接口
        if nowtime == expire_time:
            tmp_data = {host: {nowtime: 1}}       # 构建该IP的访问次数
            iplist.append(tmp_data)               # 添加到列表中
        else: #限制时间未过则需要返回限制结果,否则返回None
            iplist = []  # 重置记录列表
            return {'code': -1, 'msg': "Error", 'data': "访问频率过快,请3分钟后再访问!"}


if __name__ == '__main__':

    for i in range(11):
        print('遍历的次数:【%s】' % i)
        print(counts("192.168.1.124"))
        print(counts("192.168.1.125"))

 

本文地址:https://blog.csdn.net/qq_42142258/article/details/110877394

相关标签: Tornado